2016-03-27 17 views
6

Mam przepływ integracji wiosennej, który obejmuje wykonywanie asynchroniczne, zwracanie wartości z bramy do kontrolera, kontynuowanie przepływu integracji po zwróceniu wartości.Praktyki obsługi błędów w przepływie integracji wiosennej

Oto brama:

@MessagingGateway 
public interface GW { 

    @Gateway(requestChannel = "f.input") 
    Task input(Collection<MessengerIncomingRequest> messages); 

} 

I tu jest przepływ:

@Bean 
IntegrationFlow jFlow() { 
     return IntegrationFlows.from(
     MessageChannels.executor("f.input", executor())) 
     .split() 
     .channel(MessageChannels.executor(executor())) 
     .transform(transformer) 
     .channel(routerChannel()) 
     .get(); 
} 

@Bean 
ThreadPoolTaskExecutor executor() { 
     ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor(); 
     ... 
     return pool; 
} 

@Bean 
MessageChannel routerChannel() { 
     return MessageChannels 
     .publishSubscribe("routerChannel", executor()) 
     .get(); 
} 

@Bean 
IntegrationFlow routerChannelFlow() { 
     return IntegrationFlows 
     .from(routerChannel()) 
     .publishSubscribeChannel(s -> s 
     .subscribe(f -> f.bridge(null)) 
     .subscribe(process())) 
     .get(); 
} 

@Bean 
IntegrationFlow process() { 
     return f -> 
     f.route(p -> p.getKind().name(), 
     m -> m.suffix("Channel") 
     .channelMapping(TaskKind.CREATE.name(), "create") 
     .... 
} 

@Bean 
IntegrationFlow createFlow() { 
     return IntegrationFlows.from(
     MessageChannels.direct("createChannel")) 
     .handle(routerService) 
     .get(); 
} 

Jak mogę określić procedurę obsługi błędu dla całego strumienia? Jakie są najlepsze praktyki? Wiem, że mogę umieścić blok try/catch dla wywołania metody gateway, ale będzie wychwytywał wyjątki, które występują w przepływie jFlow dla wszystkiego, co ma miejsce przed channel(routerChannel()).

Jak mogę obsługiwać błędy przez resztę przepływu? Lub przez cały przepływ?

UPDATE

Dodałem obsługi błędów dla publishSubscribeChannel

@Bean 
IntegrationFlow routerChannelFlow() { 
    return IntegrationFlows 
      .from(routerChannel()) 
      .publishSubscribeChannel(s -> s 
        .subscribe(f -> f.bridge(null)) 
        .subscribe(process()) 
        .errorHandler(errorHandler)) 
      .get(); 
} 

ale nie wydaje się, aby pomóc, ponieważ w przypadku wyjątku pojawia się następujący błąd:

cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has already received a reply:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: 

i moja obsługa błędów nie zostanie wywołana.

UPDATE

Według odpowiedź Gary'ego Próbowałem ten kod:

@Bean 
IntegrationFlow jFLow() { 
    return IntegrationFlows.from(
      MessageChannels.executor("f.input", executor())) 
      .split() 
      .channel(MessageChannels.executor(executor())) 
      .transform(transformer) 
      .channel(routerChannel()) 
      .get(); 
} 

@Bean 
IntegrationFlow exceptionOrErrorFlow() { 
    return IntegrationFlows.from(
      MessageChannels.direct("exceptionChannel")) 
      .handle(errorHandler, "handleError") 
      .get(); 
} 

    @Bean 
MessageChannel exceptionChannel() { 
    return MessageChannels.direct("exceptionChannel") 
      .get(); 
} 

@Bean 
IntegrationFlow process() { 
     return f -> 
     f.enrichHeaders((spec) -> 
        spec.header("errorChannel", "exceptionChannel", true)) 
     f.route(p -> p.getKind().name(), 
     m -> m.suffix("Channel") 
     .channelMapping(TaskKind.CREATE.name(), "create") 
     .... 
} 

@MessagingGateway(errorChannel = "exceptionChannel") 

Po kolejnej edycji I dodaje exceptionChannel do bramy, i przeniósł wzbogacając nagłówek do rewanżu (asynchroniczny) mojego przepływu. Wciąż kontroler zostaje zablokowany, jeśli wyjątek stanowi rzut synchronicznej części przepływu.

Odpowiedz

4

Po pierwsze, pozwól mi wyjaśnić, jak działa brama - pomoże to w zrozumieniu poniższego rozwiązania.

Komunikat żądania otrzymuje unikalny tymczasowy kanał odpowiedzi, który jest dodawany jako nagłówek replyChannel. Nawet jeśli brama ma jawnie replyChannel, która jest po prostu zmostkowana do żądania replyChannel - tak brama dopasowuje odpowiedź do żądania.

Teraz bramka ustawia także nagłówek żądania errorChannel na ten sam kanał odpowiedzi. W ten sposób, nawet jeśli przepływ jest asynchroniczny, wyjątek może zostać skierowany z powrotem do bramki i albo zgłoszony do wywołującego, albo skierowany do kanału błędu bramy (jeśli określono). Ten routing jest wykonywany przez MessagePublishingErrorHandler, który jest podłączony do ErrorHandlingTaskExecutor, który owija twój executor.

Ponieważ zwracasz wynik do bramy, a następnie kontynuujesz; interakcja bramy jest "zużytkowana" i nic nigdy nie otrzyma wiadomości (w tym wyjątku) wysłanej do nagłówka replyChannel. Stąd wiadomość logu, którą widzisz.

Jednym z rozwiązań jest naprawienie nagłówka errorChannel w wiadomości wysyłanej do niezależnego przepływu. Użyj nazwy, aby zastąpić (pamiętaj, aby ustawić zastąpienie na true) nagłówkiem errorChannel skonfigurowanym przez bramę. Powinno to nastąpić tak szybko, jak to możliwe w strumieniu, aby wszelkie wyjątki były kierowane na ten kanał (a następnie możesz zapisać tam swój program obsługi błędów).

Alternatywnym rozwiązaniem jest podłączenie własnego executora obsługi błędów, jawnie ustawiając defaultErrorChannel na jego MessagePublishingErrorHandler i usuwając nagłówek errorChannel.

Routing błędów asynchronicznych najpierw szuka nagłówka; jeśli jest obecny, komunikat o błędzie jest tam kierowany; jeśli nie ma nagłówka, a MPEH nie ma domyślnego kanału błędu; wiadomość zostanie przekierowana do domyślnej errorChannel, na którą (zwykle) jest subskrybowany LoggingChannelAdapter. Domyślnie errorChannel jest kanałem pub/sub, więc możesz subskrybować inne punkty końcowe.

EDIT

Zmieniasz kanał przed pub/sub.

Ważne jest, aby uzyskać co najmniej jedną odpowiedź na bramkę; powinieneś zostawić kanał błędu sam na jednej nodze pubu/sub i zaktualizować go w drugim odcinku. W ten sposób do dzwoniącego zostanie rzucony wyjątek na pierwszym etapie (możesz dodać do bramki errorChannel, jeśli chcesz wykonać jakieś działanie, na przykład routing do obsługi wyjątku). Aktualizację nagłówka musisz wykonać tylko w drugim odcinku, aby wyjątki były przesyłane bezpośrednio do programu obsługi błędów.

Jeśli ustawisz errorChannel na bramce do swojego exceptionChannel, dojdą tam wyjątki na obu nogach.

+0

Gary, dzięki za niesamowite wyjaśnienie! Chciałbym wyjaśnić pewne rzeczy: * 1. 'Użyj .enrichHeaders do zastąpienia (pamiętaj, aby ustawić nadpisanie na wartość true) nagłówka errorChannel, który został skonfigurowany przez bramkę. * - z której nazwy kanału należy zastąpić? * 2. 'jawne ustawienie defaultErrorChannel' * - czy mógłbyś podać wskazówkę, na jaki komponent bean/klasa powinienem ustawić ten defaultErrorChannel? Nie mogę go znaleźć nigdzie. –

+0

Myślę, że rozumiem, co miałeś na myśli, odpowiednio zaktualizowałem swoją odpowiedź, wciąż są pewne problemy, ale myślę, że widzę światło na końcu tunelu. –

+0

Zobacz moją edycję - zbyt szybko zmienisz nagłówek. –

Powiązane problemy