spring-cloud-gateway: Reading request body in filter produces an IllegalStateException

Using master branch:

@Override
  public GatewayFilter apply(Tuple args) {
   // @formatter:off
   return (exchange, chain) -> 
    exchange.getFormData()
      .flatMap(formData -> {
        String username = formData.getFirst("username");
        String password = formData.getFirst("password");

       //...
				
        ServerHttpRequest modifiedRequest = exchange.getRequest().mutate()
          .uri(newUri)
          .build();
				
        return chain.filter(
          exchange
            .mutate()
            .request(modifiedRequest)
        .build()
       );
   });
   // @formatter:on
}

First call emmits IllegalStateException

017-12-16 21:39:37.809  INFO 18409 --- [erListUpdater-0] c.netflix.config.ChainedDynamicProperty  : Flipping property: admin.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2017-12-16 21:39:39.222  INFO 18409 --- [ctor-http-nio-4] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@494e7642: startup date [Sat Dec 16 21:39:39 MSK 2017]; parent: org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext@4fe89c24
2017-12-16 21:39:39.288  INFO 18409 --- [ctor-http-nio-4] f.a.AutowiredAnnotationBeanPostProcessor : JSR-330 'javax.inject.Inject' annotation found and supported for autowiring
2017-12-16 21:39:39.388  INFO 18409 --- [ctor-http-nio-4] c.netflix.config.ChainedDynamicProperty  : Flipping property: uaa.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2017-12-16 21:39:39.405  INFO 18409 --- [ctor-http-nio-4] c.n.u.concurrent.ShutdownEnabledTimer    : Shutdown hook installed for: NFLoadBalancer-PingTimer-uaa
2017-12-16 21:39:39.406  INFO 18409 --- [ctor-http-nio-4] c.netflix.loadbalancer.BaseLoadBalancer  : Client: uaa instantiated a LoadBalancer: DynamicServerListLoadBalancer:{NFLoadBalancer:name=uaa,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:null
2017-12-16 21:39:39.408  INFO 18409 --- [ctor-http-nio-4] c.n.l.DynamicServerListLoadBalancer      : Using serverListUpdater PollingServerListUpdater
2017-12-16 21:39:39.411  INFO 18409 --- [ctor-http-nio-4] c.netflix.config.ChainedDynamicProperty  : Flipping property: uaa.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2017-12-16 21:39:39.412  INFO 18409 --- [ctor-http-nio-4] c.n.l.DynamicServerListLoadBalancer      : DynamicServerListLoadBalancer for client uaa initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=uaa,current list of Servers=[192.168.1.10:25050],Load balancer stats=Zone stats: {defaultzone=[Zone:defaultzone;	Instance count:1;	Active connections count: 0;	Circuit breaker tripped count: 0;	Active connections per server: 0.0;]
},Server stats: [[Server:192.168.1.10:25050;	Zone:defaultZone;	Total Requests:0;	Successive connection failure:0;	Total blackout seconds:0;	Last connection made:Thu Jan 01 03:00:00 MSK 1970;	First connection made: Thu Jan 01 03:00:00 MSK 1970;	Active Connections:0;	total failure count in last (1000) msecs:0;	average resp time:0.0;	90 percentile resp time:0.0;	95 percentile resp time:0.0;	min resp time:0.0;	max resp time:0.0;	stddev resp time:0.0]
]}ServerList:org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList@67c89f69
2017-12-16 21:39:39.540 ERROR 18409 --- [ctor-http-nio-3] .a.w.r.e.DefaultErrorWebExceptionHandler : Failed to handle request [POST http://ant.den:25080/uaa/oauth/token?grant_type=password]

java.lang.IllegalStateException: Only one connection receive subscriber allowed.
	at reactor.ipc.netty.channel.FluxReceive.startReceiver(FluxReceive.java:276) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
	at reactor.ipc.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:127) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) ~[netty-common-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) ~[netty-common-4.1.17.Final.jar:4.1.17.Final]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]

2017-12-16 21:39:40.409  INFO 18409 --- [erListUpdater-0] c.netflix.config.ChainedDynamicProperty  : Flipping property: uaa.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647

Second call produces io.netty.handler.codec.EncoderException:

2017-12-16 21:45:12.062 ERROR 18409 --- [ctor-http-nio-3] .a.w.r.e.DefaultErrorWebExceptionHandler : Failed to handle request [POST http://ant.den:25080/uaa/oauth/token?grant_type=password]

io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: DefaultHttpRequest
	at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:106) ~[netty-codec-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:348) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
	at reactor.ipc.netty.channel.ChannelOperationsHandler.doWrite(ChannelOperationsHandler.java:283) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
	at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:463) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
	at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:183) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1041) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
	at reactor.ipc.netty.http.HttpOperations.lambda$sendHeaders$0(HttpOperations.java:130) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
	at reactor.ipc.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:106) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:3008) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:167) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:3008) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE]
	at reactor.ipc.netty.NettyOutbound.subscribe(NettyOutbound.java:356) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
	at reactor.core.publisher.MonoSource.subscribe(MonoSource.java:51) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE]
	at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:383) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
	at reactor.ipc.netty.http.client.HttpClientOperations.onHandlerStart(HttpClientOperations.java:479) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) ~[netty-common-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) ~[netty-common-4.1.17.Final.jar:4.1.17.Final]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]
Caused by: java.lang.IllegalStateException: unexpected message type: DefaultHttpRequest
	at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:86) ~[netty-codec-http-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.handler.codec.http.HttpClientCodec$Encoder.encode(HttpClientCodec.java:167) ~[netty-codec-http-4.1.17.Final.jar:4.1.17.Final]
	at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:88) ~[netty-codec-4.1.17.Final.jar:4.1.17.Final]
	... 30 common frames omitted

When using M4, the same code works perfect.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 38 (23 by maintainers)

Most upvoted comments

@spencergibb @rstoyanchev I wrote two filters to change request and response body with some support classes. Now form-url-encoded body can be changed successfully to any object in json to be passed to server behind gateway, and visa-versa. You can see them at branch rewrite-body here. Some tests has been written too.

But in list here I don’t see anything about request/response body changing. Before I’d continue to spend time, possible, on unneeded functionality, or in a wrong way, please, let me know, do you need this? Should I continue this work being sure it may be usefull for community, or I should stop and use only what I really need at this point in my own project?

It is not ready yet, some codecs/readers/writers are not implemented, but the main concept should be clear:

  • Rewrite request body example usage:
@Bean
public RouteLocator rewriteRequestRouteLocator(RouteLocatorBuilder builder) {
	
	return builder.routes()
			
		.route("RequestBodyRewrite_StringToString_Route", r ->
			r.path("/rewrite-request-body/post-string-string")
			.filters(f ->
				f.rewriteRequestBody(rw ->
					rw.classesOfBody(String.class, String.class)
					.rewrite((request, attrs, originalBody, mediaType) -> {
						System.out.println("originalBody:" + originalBody);
						return "bar";
					})
					.withResultMediaType(MediaType.TEXT_PLAIN)
					.build()
				)
			).uri("http://localhost:"+this.port)
		)
		
		.route("RequestBodyRewrite_StringToPerson_Route", r ->
			r.path("/rewrite-request-body/post-string-person")
			.filters(f ->
				f.rewriteRequestBody(rw ->
					rw.classesOfBody(String.class, Person.class)
					.rewrite((request, attrs, originalBody, mediaType) -> {
						System.out.println("originalBody:" + originalBody);
						return new Person(originalBody, "Smith");
					})
					.withResultMediaType(MediaType.APPLICATION_JSON)
					.build()
				)
			).uri("http://localhost:"+this.port)
		)
		
		.route("RequestBodyRewrite_formDataToJson_Route", r ->
			r.path("/rewrite-request-body/post-formdata-to-json")
			.filters(f ->
				f.rewriteRequestBody(rw ->
					rw.classesOfBody(MultiValueMap.class, Login.class)
					.rewrite((request, attrs, originalBody, mediaType) -> {
						Map<String, String> map = originalBody.toSingleValueMap();
						System.out.println("originalBody - mapped to <String,String>:" + map.toString());
						
						String user = map.get("user");
						String password = map.get("password");
						
						return new Login(user, password);
					})
					.withResultMediaType(MediaType.APPLICATION_JSON)
					.build()
				)
			).uri("http://localhost:"+this.port)
		)
		
		.route("RequestBodyRewrite_jsonToJson_Route", r ->
			r.path("/rewrite-request-body/post-json-to-json")
			.filters(f ->
				f.rewriteRequestBody(rw ->
					rw.classesOfBody(Login.class, Person.class)
					.rewrite((request, attrs, originalBody, mediaType) -> {
						System.out.println("originalBody: " + originalBody.toString());
						
						return new Person(originalBody.getUser(), "Smith");
					})
					.withResultMediaType(MediaType.APPLICATION_JSON)
					.build()
				)
			).uri("http://localhost:"+this.port)
		)
		
		.route("RequestBodyRewrite_jsonToFormData_Route", r ->
			r.path("/rewrite-request-body/post-json-to-formdata")
			.filters(f ->
				f.rewriteRequestBody(rw ->
					rw.classesOfBody(Person.class, MultiValueMap.class)
					.rewrite((request, attrs, originalBody, mediaType) -> {
						MultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
						map.add("first", "John");
						map.add("last", "Doe");
						return map;
					})
					.withResultMediaType(MediaType.APPLICATION_FORM_URLENCODED)
					.build()
				)
			).uri("http://localhost:"+this.port)
		)
		
		.route("RequestBodyRewrite_formDataToFormData_Route", r ->
			r.path("/rewrite-request-body/post-formdata-to-formdata")
			.filters(f ->
				f.rewriteRequestBody(rw ->
					rw.classesOfBody(MultiValueMap.class, MultiValueMap.class)
					.rewrite((request, attrs, originalBody, mediaType) -> {
						MultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
						map.add("firstName", "John");
						map.add("lastName", "Doe");
						return map;
					})
					.withResultMediaType(MediaType.APPLICATION_FORM_URLENCODED)
					.build()
				)
			).uri("http://localhost:"+this.port)
		)

	.build();
}
  • Rewrite response body example usage:
@Bean
public RouteLocator rewriteResponseRouteLocator(RouteLocatorBuilder builder) {
	
	RewriteResponseBody<ServerHttpResponse, Map<String, Object>, Person, MediaType, Person> personToPersonRewriteFn = (response, attrs, originalBody, mediaType) -> {
		
		return new Person(originalBody.getFirstName(), "Smith");
	};
	
	return builder.routes()
			
		.route("ResponseBodyRewrite_PersonToPerson_Route", r ->
			r.path("/body-response/person")
			.filters(f ->
				f.rewriteResponseBody(rw ->
					rw.classesOfBody(Person.class, Person.class)
					.withResultMediaType(MediaType.APPLICATION_JSON)
					.rewrite(personToPersonRewriteFn)
					.build()
				)
			)
			.uri("http://localhost:"+this.port)
		)
		
		.route("ResponseBodyRewrite_StringToPerson_Route", r ->
			r.path("/body-response/string2person")
			.filters(f ->
				f.rewriteResponseBody(rw ->
					rw.classesOfBody(String.class, Person.class)
					.withResultMediaType(MediaType.APPLICATION_JSON)
					.rewrite((response, attrs, originalBody, mediaType) -> {
						String[] nameParts = originalBody.split("\\s+");
						
						return new Person(nameParts[0], nameParts[1]);
					})
					.build()
				)
			)
			.uri("http://localhost:"+this.port)
		)
		
	.build();
}

Please let me know what is your decision.

PS: see possible usecase (not only my own). The main think is integration between ready services, and possibility to create custom security schemas with thin client.

Hi @re6exp

I tried to read the request body following your guidelines to @pravinkumarb84 but I could not achieve that. Could you share a sample code that reads request body. It will be a great help to me.

Thank you.