diff --git a/docs/modules/ROOT/pages/spring-cloud-gateway/gatewayfilter-factories/cacherequestbody-factory.adoc b/docs/modules/ROOT/pages/spring-cloud-gateway/gatewayfilter-factories/cacherequestbody-factory.adoc index 513c99d6e0..b7b44332f0 100644 --- a/docs/modules/ROOT/pages/spring-cloud-gateway/gatewayfilter-factories/cacherequestbody-factory.adoc +++ b/docs/modules/ROOT/pages/spring-cloud-gateway/gatewayfilter-factories/cacherequestbody-factory.adoc @@ -14,7 +14,7 @@ public RouteLocator routes(RouteLocatorBuilder builder) { .route("cache_request_body_route", r -> r.path("/downstream/**") .filters(f -> f.prefixPath("/httpbin") .cacheRequestBody(String.class).uri(uri)) - .build(); + .build()); } ---- @@ -36,7 +36,7 @@ spring: bodyClass: java.lang.String ---- `CacheRequestBody` extracts the request body and converts it to a body class (such as `java.lang.String`, defined in the preceding example). -`CacheRequestBody` then places it in the attributes available from `ServerWebExchange.getAttributes()`, with a key defined in `ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR`. +`CacheRequestBody` then places it in the attributes available from `ServerWebExchange.getAttributes()`, with a key defined in `ServerWebExchangeUtils.CACHE_REQUEST_BODY_OBJECT_ATTR`. NOTE: This filter works only with HTTP (including HTTPS) requests. diff --git a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/CacheRequestBodyGatewayFilterFactory.java b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/CacheRequestBodyGatewayFilterFactory.java index 7943494e5c..d47f77028d 100644 --- a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/CacheRequestBodyGatewayFilterFactory.java +++ b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/factory/CacheRequestBodyGatewayFilterFactory.java @@ -24,13 +24,9 @@ import org.springframework.cloud.gateway.filter.GatewayFilter; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.support.ServerWebExchangeUtils; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.codec.HttpMessageReader; import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.util.Assert; import org.springframework.web.reactive.function.server.HandlerStrategies; -import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.server.ServerWebExchange; import static org.springframework.cloud.gateway.support.GatewayToStringStyler.filterToStringCreator; @@ -70,36 +66,11 @@ public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { return chain.filter(exchange); } - Object cachedBody = exchange.getAttribute(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR); - if (cachedBody != null) { - return chain.filter(exchange); - } - - return ServerWebExchangeUtils.cacheRequestBodyAndRequest(exchange, (serverHttpRequest) -> { - final ServerRequest serverRequest = ServerRequest - .create(exchange.mutate().request(serverHttpRequest).build(), messageReaders); - return serverRequest.bodyToMono((config.getBodyClass())).doOnNext(objectValue -> { - Object previousCachedBody = exchange.getAttributes() - .put(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR, objectValue); - if (previousCachedBody != null) { - // store previous cached body - exchange.getAttributes().put(CACHED_ORIGINAL_REQUEST_BODY_BACKUP_ATTR, previousCachedBody); - } - }).then(Mono.defer(() -> { - ServerHttpRequest cachedRequest = exchange - .getAttribute(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR); - Assert.notNull(cachedRequest, "cache request shouldn't be null"); + return ServerWebExchangeUtils.cacheRequestBodyObject(exchange, config.getBodyClass(), messageReaders, + (serverHttpRequest, cachedBody) -> { exchange.getAttributes().remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR); - return chain.filter(exchange.mutate().request(cachedRequest).build()).doFinally(s -> { - // - Object backupCachedBody = exchange.getAttributes() - .get(CACHED_ORIGINAL_REQUEST_BODY_BACKUP_ATTR); - if (backupCachedBody instanceof DataBuffer dataBuffer) { - DataBufferUtils.release(dataBuffer); - } - }); - })); - }); + return chain.filter(exchange.mutate().request(serverHttpRequest).build()); + }); } @Override diff --git a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/handler/predicate/ReadBodyRoutePredicateFactory.java b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/handler/predicate/ReadBodyRoutePredicateFactory.java index 4226ecbc2e..e598f19239 100644 --- a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/handler/predicate/ReadBodyRoutePredicateFactory.java +++ b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/handler/predicate/ReadBodyRoutePredicateFactory.java @@ -29,7 +29,6 @@ import org.springframework.cloud.gateway.support.ServerWebExchangeUtils; import org.springframework.http.codec.HttpMessageReader; import org.springframework.web.reactive.function.server.HandlerStrategies; -import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.server.ServerWebExchange; /** @@ -43,8 +42,6 @@ public class ReadBodyRoutePredicateFactory extends AbstractRoutePredicateFactory private static final String TEST_ATTRIBUTE = "read_body_predicate_test_attribute"; - private static final String CACHE_REQUEST_BODY_OBJECT_KEY = "cachedRequestBodyObject"; - private final List> messageReaders; public ReadBodyRoutePredicateFactory() { @@ -63,10 +60,7 @@ public AsyncPredicate applyAsync(Config config) { return new AsyncPredicate() { @Override public Publisher apply(ServerWebExchange exchange) { - Class inClass = config.getInClass(); - - Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY); - Mono modifiedBody; + Object cachedBody = exchange.getAttribute(ServerWebExchangeUtils.CACHE_REQUEST_BODY_OBJECT_ATTR); // We can only read the body from the request once, once that happens if // we try to read the body again an exception will be thrown. The below // if/else caches the body object as a request attribute in the @@ -87,15 +81,14 @@ public Publisher apply(ServerWebExchange exchange) { } return Mono.just(false); } - else { - return ServerWebExchangeUtils.cacheRequestBodyAndRequest(exchange, - (serverHttpRequest) -> ServerRequest - .create(exchange.mutate().request(serverHttpRequest).build(), messageReaders) - .bodyToMono(inClass) - .doOnNext(objectValue -> exchange.getAttributes() - .put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue)) - .map(objectValue -> config.getPredicate().test(objectValue))); - } + + return ServerWebExchangeUtils.cacheRequestBodyObject(exchange, config.getInClass(), messageReaders, + (serverHttpRequest, bodyObject) -> { + if (bodyObject == null) { + return Mono.just(false); + } + return Mono.just(config.predicate.test(bodyObject)); + }); } @Override diff --git a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/support/ServerWebExchangeUtils.java b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/support/ServerWebExchangeUtils.java index 7585a18204..3cb7017d21 100644 --- a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/support/ServerWebExchangeUtils.java +++ b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/support/ServerWebExchangeUtils.java @@ -20,9 +20,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashSet; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; @@ -43,12 +45,14 @@ import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpStatus; +import org.springframework.http.codec.HttpMessageReader; import org.springframework.http.server.reactive.AbstractServerHttpResponse; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.Assert; import org.springframework.web.reactive.DispatcherHandler; +import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.util.UriComponentsBuilder; @@ -174,6 +178,12 @@ public final class ServerWebExchangeUtils { */ public static final String CACHED_REQUEST_BODY_ATTR = "cachedRequestBody"; + /** + * Cached request decoded body object key. Used when + * {@link #cacheRequestBodyObject(ServerWebExchange, Class, List, BiFunction)} + */ + public static final String CACHE_REQUEST_BODY_OBJECT_ATTR = "cachedRequestBodyObject"; + /** * Gateway LoadBalancer {@link Response} attribute name. */ @@ -316,6 +326,29 @@ public static Map getUriTemplateVariables(ServerWebExchange exch return exchange.getAttributeOrDefault(URI_TEMPLATE_VARIABLES_ATTRIBUTE, new HashMap<>()); } + /** + * Caches the request body, the decoded body object and the created {@link ServerHttpRequestDecorator} in + * ServerWebExchange attributes. Those attributes are + * {@link #CACHED_REQUEST_BODY_ATTR} and + * {@link #CACHE_REQUEST_BODY_OBJECT_ATTR} and + * {@link #CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR} respectively. + * @param exchange the available ServerWebExchange. + * @param bodyClass the class of the body to be decoded + * @param messageReaders the list of message readers for decoding the body. + * @param function a function to apply on the decoded body and request. + * @param the class type of the decoded body. + * @param generic type for the return {@link Mono}. + * @return Mono of type T created by the function parameter. + */ + public static Mono cacheRequestBodyObject(ServerWebExchange exchange, Class bodyClass, + List> messageReaders, BiFunction> function) { + return cacheRequestBodyAndRequest(exchange, (serverHttpRequest) -> ServerRequest + .create(exchange.mutate().request(serverHttpRequest).build(), messageReaders).bodyToMono(bodyClass) + .doOnNext(objectValue -> exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_ATTR, objectValue)) + .flatMap(cachedBody -> function.apply(serverHttpRequest, cachedBody)) + .switchIfEmpty(function.apply(serverHttpRequest, null))); + } + /** * Caches the request body and the created {@link ServerHttpRequestDecorator} in * ServerWebExchange attributes. Those attributes are diff --git a/spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/filter/factory/CacheRequestBodyGatewayFilterFactoryTests.java b/spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/filter/factory/CacheRequestBodyGatewayFilterFactoryTests.java index cb4c508c8d..c71ab86f5c 100644 --- a/spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/filter/factory/CacheRequestBodyGatewayFilterFactoryTests.java +++ b/spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/filter/factory/CacheRequestBodyGatewayFilterFactoryTests.java @@ -16,6 +16,7 @@ package org.springframework.cloud.gateway.filter.factory; +import java.util.Collections; import java.util.Map; import org.junit.jupiter.api.Test; @@ -118,6 +119,19 @@ public void cacheRequestBodyExists() { .isOk(); } + @Test + public void cacheRequestBodyWithCircuitBreaker() { + testClient.post().uri("/post").header("Host", "www.cacherequestbodywithcircuitbreaker.org") + .bodyValue(BODY_VALUE).exchange().expectStatus().isOk().expectBody(Map.class) + .consumeWith(result -> { + Map response = result.getResponseBody(); + assertThat(response).isNotNull(); + + String responseBody = (String) response.get("data"); + assertThat(responseBody).isEqualTo(BODY_VALUE); + }); + } + @Test public void toStringFormat() { CacheRequestBodyGatewayFilterFactory.Config config = new CacheRequestBodyGatewayFilterFactory.Config(); @@ -163,6 +177,18 @@ public RouteLocator testRouteLocator(RouteLocatorBuilder builder) { .cacheRequestBody(String.class) .filter(new AssertCachedRequestBodyGatewayFilter(BODY_CACHED_EXISTS))) .uri(uri)) + .route("cache_request_body_with_circuitbreaker_test", + r -> r.path("/post") + .and() + .host("**.cacherequestbodywithcircuitbreaker.org") + .filters(f -> f.setHostHeader("www.cacherequestbody.org") + .prefixPath("/httpbin") + .cacheRequestBody(String.class) + .filter(new AssertCachedRequestBodyGatewayFilter(BODY_VALUE)) + .filter(new CheckCachedRequestBodyReleasedGatewayFilter()) + .circuitBreaker(config -> config.setStatusCodes(Collections.singleton("200")) + .setFallbackUri("/post"))) + .uri(uri)) .build(); } @@ -181,7 +207,7 @@ private static class AssertCachedRequestBodyGatewayFilter implements GatewayFilt @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { - String body = exchange.getAttribute(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR); + String body = exchange.getAttribute(ServerWebExchangeUtils.CACHE_REQUEST_BODY_OBJECT_ATTR); if (exceptNullBody) { assertThat(body).isNull(); } @@ -203,7 +229,7 @@ private static class SetExchangeCachedRequestBodyGatewayFilter implements Gatewa @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { - exchange.getAttributes().put(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR, bodyToSetCache); + exchange.getAttributes().put(ServerWebExchangeUtils.CACHE_REQUEST_BODY_OBJECT_ATTR, bodyToSetCache); return chain.filter(exchange); }