5

Właśnie zacząłem grać z nową obsługą reaktywną Spring 5 i chciałem symulować pewne asynchroniczne generowanie danych, po zauważeniu dwóch wadliwych zachowań:Wiosna 5 Reaktywne zawieszenie podczas rozszerzania Flux/implementacji Publisher i wywoływanie s.onNext() więcej niż raz

1) Wywoływanie s.onNext (String) więcej niż raz:

@GetMapping("/strings") 
public Publisher<String> getStrings(){ 

    return new Publisher<String>() { 

     @Override 
     public void subscribe(Subscriber<? super String> s) { 
      int i = 0; 
      while(++i <= 5){ 
       try { 
        Thread.sleep(1000); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       s.onNext("message"); 
      } 
      s.onComplete(); 
     } 
    }; 
} 

W tym przypadku jest StackTrace:

2016-08-03 13:35:04.986 DEBUG 5136 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /strings 
2016-08-03 13:35:04.994 DEBUG 5136 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public org.reactivestreams.Publisher<java.lang.String> com.codependent.spring5.playground.reactive.web.AccountsController.getStrings()] 
2016-08-03 13:35:04.994 DEBUG 5136 --- [nio-8080-exec-1] o.s.b.f.s.DefaultListableBeanFactory  : Returning cached instance of singleton bean 'accountsController' 
2016-08-03 13:35:07.120 DEBUG 5136 --- [nio-8080-exec-1] o.s.w.s.h.ExceptionHandlingWebHandler : Could not complete request 

java.lang.IllegalStateException: RECEIVED 
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor$State.onNext(AbstractResponseBodyProcessor.java:316) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:77) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:47) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at org.springframework.http.server.reactive.ChannelSendOperator$WriteWithBarrier.doNext(ChannelSendOperator.java:97) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at com.codependent.spring5.playground.reactive.web.AccountsController$4.subscribe(AccountsController.java:107) [classes/:na] 
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:59) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:73) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at org.springframework.http.server.reactive.ChannelSendOperator.subscribe(ChannelSendOperator.java:54) [spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.Operators$DeferredScalarSubscriber.complete(Operators.java:797) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager$SecondSubscriber.onNext(MonoThenApply.java:203) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:130) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1293) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:186) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:100) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:169) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:51) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:69) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:71) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:383) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:192) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:96) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:60) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:116) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:45) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwiseIfEmpty.subscribe(MonoOtherwiseIfEmpty.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenSupply$MonoConcatIgnoreManager.drain(MonoThenSupply.java:167) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenSupply.subscribe(MonoThenSupply.java:55) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at org.springframework.http.server.reactive.ServletHttpHandlerAdapter.service(ServletHttpHandlerAdapter.java:93) [spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:729) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:230) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:108) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:522) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:349) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:1110) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:785) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1425) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45] 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45] 
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] 

2016-08-03 13:35:07.121 DEBUG 5136 --- [nio-8080-exec-1] o.s.h.s.r.ServletServerHttpResponse  : Can't set the status 500 because the HTTP response has already been committed 
2016-08-03 13:35:08.127 ERROR 5136 --- [nio-8080-exec-1] a.c.c.C.[.[.0.0.0.[.[httpHandlerServlet] : Servlet.service() for servlet [httpHandlerServlet] in context with path [] threw exception 

reactor.core.Exceptions$BubblingException: java.lang.IllegalStateException: RECEIVED 
    at reactor.core.Exceptions.bubble(Exceptions.java:97) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.Exceptions.onErrorDropped(Exceptions.java:263) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager$SecondSubscriber.onError(MonoThenApply.java:209) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onError(FluxResume.java:105) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.OperatorAdapter.doOnSubscriberError(OperatorAdapter.java:113) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:91) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at com.codependent.spring5.playground.reactive.web.AccountsController$4.subscribe(AccountsController.java:107) ~[classes/:na] 
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:59) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:73) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at org.springframework.http.server.reactive.ChannelSendOperator.subscribe(ChannelSendOperator.java:54) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.Operators$DeferredScalarSubscriber.complete(Operators.java:797) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager$SecondSubscriber.onNext(MonoThenApply.java:203) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:130) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1293) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:186) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:100) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:169) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:51) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:69) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:71) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:383) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:192) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:96) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:60) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:116) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:45) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwiseIfEmpty.subscribe(MonoOtherwiseIfEmpty.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenSupply$MonoConcatIgnoreManager.drain(MonoThenSupply.java:167) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenSupply.subscribe(MonoThenSupply.java:55) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at org.springframework.http.server.reactive.ServletHttpHandlerAdapter.service(ServletHttpHandlerAdapter.java:93) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:729) ~[tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:230) ~[tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165) ~[tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:108) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:522) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:349) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:1110) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:785) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1425) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45] 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45] 
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] 
Caused by: java.lang.IllegalStateException: RECEIVED 
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor$State.onNext(AbstractResponseBodyProcessor.java:316) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:77) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:47) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at org.springframework.http.server.reactive.ChannelSendOperator$WriteWithBarrier.doNext(ChannelSendOperator.java:97) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    ... 57 common frames omitted 

2) Wywołanie s.onNext (Alert.class -Każdy DTO-) więcej niż raz:

@GetMapping("/alerts") 
public Publisher<Alert> getAlerts(){ 

    return new Publisher<Alert>() { 

     @Override 
     public void subscribe(Subscriber<? super Alert> s) { 
      int i = 0; 
      while(++i <= 5){ 
       try { 
        Thread.sleep(1000); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       s.onNext(new Alert((long)1, "ms")); 
      } 
      s.onComplete(); 
     } 
    }; 
} 

Teraz nie pokazuje żadnych błędów w logach ale dzwoniący dostaje kod 500 odpowiedzi i treść "['.

Log:

2016-08-03 13:37:11.834 DEBUG 5136 --- [nio-8080-exec-3] o.s.web.reactive.DispatcherHandler  : Processing GET request for [http://localhost:8080/alerts] 
2016-08-03 13:37:11.835 DEBUG 5136 --- [nio-8080-exec-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /alerts 
2016-08-03 13:37:11.836 DEBUG 5136 --- [nio-8080-exec-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public org.reactivestreams.Publisher<com.codependent.spring5.playground.reactive.dto.Alert> com.codependent.spring5.playground.reactive.web.AccountsController.getAlerts()] 
2016-08-03 13:37:11.836 DEBUG 5136 --- [nio-8080-exec-3] o.s.b.f.s.DefaultListableBeanFactory  : Returning cached instance of singleton bean 'accountsController' 

Dlaczego nie możemy powołać onNext() wiele razy? Jak mogliśmy to zrobić?

UWAGA: I jeśli tylko zadzwonić onNext raz działa ok:

@Override 
public void subscribe(Subscriber<? super String> s) { 
    s.onNext("my message" + Math.random()); 
    s.onComplete(); 
} 

lub

@Override 
public void subscribe(Subscriber<? super Alert> s) { 
    s.onNext(new Alert((long)1, "ms")); 
    s.onComplete(); 
} 
+0

Co stanie się, jeśli zwrócisz coś prostszego - EG 'Flux.just (new SensorRead (sensorId, Math.random()));' – Will

+0

W takim przypadku działa poprawnie. – codependent

Odpowiedz

4

Moja implementacja Wydawca nie śledzić biernej strumieni specyfikację, to jak to naprawiłem:

@GetMapping(value="/strings", produces="text/event-stream") 
public Publisher<String> getStrings(){ 
    return new Publisher<String>() { 

     private int loops = 5; 

     @Override 
     public void subscribe(Subscriber<? super String> s) { 

      s.onSubscribe(new Subscription() { 
       @Override 
       public void request(long n) { 
        for (int i = 0; i < n; i++) { 
         if(loops-- > 0){ 
          try { 
           Thread.sleep(1000); 
          } catch (InterruptedException e) { 
           e.printStackTrace(); 
          } 
          s.onNext("message"+Math.random());       
         }else{ 
          s.onComplete(); 
         } 
        } 
       } 
       @Override 
       public void cancel() { 
        loops = 0; 
       } 
      }); 
     } 
    }; 
} 

Jeśli chcesz dowiedzieć się więcej na ten temat, zapoznaj się z issue I opened in Spring's JIRA i przydatnym komentarzem, który tam otrzymałem.

1

Nie testowałem tego jeszcze jak jestem trochę zajęty - zrobi później, więc jeśli to nie zadziała, przepraszam! :)

Z powyższych komentarzy wynika, że ​​problem dotyczy tworzenia Flux.

Zakładam, że Spring Reactive Controllers są w stanie poradzić sobie z Fluxem, który emituje wielokrotność bez tego, że jest on połączony z WebSockets lub SSE. Znowu będę grał nieco później.

Flux ma wiele statycznych metod konstrukcji, które pomogą ci tutaj.

Jak o to zrobić w następujący sposób:

return Flux.intervalMillis(1000) 
.map(l -> new new SensorRead(sensorId, Math.random())); 

Ale to daje niekończącą strumień, który nie może być to, co chcesz.

Inną opcją jest coś takiego:

return Flux.range(1, 5) //Spit out 5 values starting from 1 
.delayMillis(1000) //Delay the onNext calls to separate 1 second apart 
.map(l -> new new SensorRead(sensorId, Math.random())); 

Aktualizacja

OK, więc kwestia ta zmieniła się dość znacznie.

W odpowiedzi na pytanie "Dlaczego nie możemy wywołać onNext() wiele razy? Jak możemy to zrobić?"

Oczywiście nie napisałem API, więc rozumowanie, na które nie potrafię odpowiedzieć, ale IMO jest niejednoznaczne i złożone, jeśli chodzi o sposób, w jaki można sobie poradzić z wieloma emisjami w miriadach na różne sposoby. być wyrażone:

HTTP 1.1 nie zezwala na wiele odpowiedzi na jedno żądanie, więc jedyną poprawną opcją jest niektóre collect na liście lub niski poziom zapisuje onnext do strumienia wyjściowego dla każdej emisji - obie mają złożoność wokół typu zawartości (EG XML vs JSON)

Jest to jeszcze bardziej skomplikowane, gdy wprowadzamy HTTP2, WebSockets i SSE, które mogą wykonać pewną formę wielokrotnych odpowiedzi na żądanie - ponownie każdy potrzebny ng należy traktować inaczej.

Jeśli chcesz mieć możliwość emisji wielokrotnej, musisz spojrzeć na WebSockets lub SSE.

Projekt Spring-Reactive ma klasy SSE, więc wygląda na to, że jest zaimplementowany.

EG

@RequestMapping("/sse/event") 
    Flux<SseEvent> sse() { 
     return Flux.interval(Duration.ofMillis(100)).map(l -> { 
      SseEvent event = new SseEvent(); 
      event.setId(Long.toString(l)); 
      event.setData("foo"); 
      event.setComment("bar"); 
      return event; 
     }).take(2); 
    } 

przyjrzeć się poniżej po więcej przykładów:

https://github.com/spring-projects/spring-reactive/blob/master/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java

Nadzieja to pomaga

+0

Dziękuję za sugestię, to na pewno działa w ten sposób. W każdym razie byłbym wdzięczny, gdyby ktoś mógł wyjaśnić, dlaczego nie możemy po prostu zaimplementować 'Publisher.subscribe (s)' i wywołać 's.onNext()'. – codependent

+0

Całe moje doświadczenie związane jest z 'RxJava' i nie mam żadnego doświadczenia z przepraszaniem Reactor'a (chociaż dopiero zaczynam uczyć się), więc nie mogę od razu komentować, ale będę miał spojrzenie. Generalnie, konstruowanie za pomocą metod statycznych jest łatwiejsze, aw 'RxJava' konstruktor' Observable' jest chroniony, więc nie możesz go bezpośrednio skonstruować. – Will

+0

ups Brakowało mi aktualizacji, dziękuję za odpowiedź. Twój przykład z punktem końcowym '/ sse/event' + Flux działa idealnie. Nie rozwiąże to jednak moich wątpliwości: Spring Reactive podąża za specyfikacją strumieni reaktywnych, więc użycie interfejsu wydawcy powinno być możliwe, a co za tym idzie możliwość wielokrotnego wywoływania 's.onNext()'. Podążając za twoją radą, dodałem również 'produkuje =" tekst/strumień zdarzeń "' w moim mapowaniu żądań i nadal generuje ten sam wyjątek :-( – codependent

Powiązane problemy