Mam przepływ integracji wiosennej, który obejmuje wykonywanie asynchroniczne, zwracanie wartości z bramy do kontrolera, kontynuowanie przepływu integracji po zwróceniu wartości.Praktyki obsługi błędów w przepływie integracji wiosennej
Oto brama:
@MessagingGateway
public interface GW {
@Gateway(requestChannel = "f.input")
Task input(Collection<MessengerIncomingRequest> messages);
}
I tu jest przepływ:
@Bean
IntegrationFlow jFlow() {
return IntegrationFlows.from(
MessageChannels.executor("f.input", executor()))
.split()
.channel(MessageChannels.executor(executor()))
.transform(transformer)
.channel(routerChannel())
.get();
}
@Bean
ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
...
return pool;
}
@Bean
MessageChannel routerChannel() {
return MessageChannels
.publishSubscribe("routerChannel", executor())
.get();
}
@Bean
IntegrationFlow routerChannelFlow() {
return IntegrationFlows
.from(routerChannel())
.publishSubscribeChannel(s -> s
.subscribe(f -> f.bridge(null))
.subscribe(process()))
.get();
}
@Bean
IntegrationFlow process() {
return f ->
f.route(p -> p.getKind().name(),
m -> m.suffix("Channel")
.channelMapping(TaskKind.CREATE.name(), "create")
....
}
@Bean
IntegrationFlow createFlow() {
return IntegrationFlows.from(
MessageChannels.direct("createChannel"))
.handle(routerService)
.get();
}
Jak mogę określić procedurę obsługi błędu dla całego strumienia? Jakie są najlepsze praktyki? Wiem, że mogę umieścić blok try/catch dla wywołania metody gateway, ale będzie wychwytywał wyjątki, które występują w przepływie jFlow
dla wszystkiego, co ma miejsce przed channel(routerChannel())
.
Jak mogę obsługiwać błędy przez resztę przepływu? Lub przez cały przepływ?
UPDATE
Dodałem obsługi błędów dla publishSubscribeChannel
@Bean
IntegrationFlow routerChannelFlow() {
return IntegrationFlows
.from(routerChannel())
.publishSubscribeChannel(s -> s
.subscribe(f -> f.bridge(null))
.subscribe(process())
.errorHandler(errorHandler))
.get();
}
ale nie wydaje się, aby pomóc, ponieważ w przypadku wyjątku pojawia się następujący błąd:
cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has already received a reply:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException:
i moja obsługa błędów nie zostanie wywołana.
UPDATE
Według odpowiedź Gary'ego Próbowałem ten kod:
@Bean
IntegrationFlow jFLow() {
return IntegrationFlows.from(
MessageChannels.executor("f.input", executor()))
.split()
.channel(MessageChannels.executor(executor()))
.transform(transformer)
.channel(routerChannel())
.get();
}
@Bean
IntegrationFlow exceptionOrErrorFlow() {
return IntegrationFlows.from(
MessageChannels.direct("exceptionChannel"))
.handle(errorHandler, "handleError")
.get();
}
@Bean
MessageChannel exceptionChannel() {
return MessageChannels.direct("exceptionChannel")
.get();
}
@Bean
IntegrationFlow process() {
return f ->
f.enrichHeaders((spec) ->
spec.header("errorChannel", "exceptionChannel", true))
f.route(p -> p.getKind().name(),
m -> m.suffix("Channel")
.channelMapping(TaskKind.CREATE.name(), "create")
....
}
@MessagingGateway(errorChannel = "exceptionChannel")
Po kolejnej edycji I dodaje exceptionChannel
do bramy, i przeniósł wzbogacając nagłówek do rewanżu (asynchroniczny) mojego przepływu. Wciąż kontroler zostaje zablokowany, jeśli wyjątek stanowi rzut synchronicznej części przepływu.
Gary, dzięki za niesamowite wyjaśnienie! Chciałbym wyjaśnić pewne rzeczy: * 1. 'Użyj .enrichHeaders do zastąpienia (pamiętaj, aby ustawić nadpisanie na wartość true) nagłówka errorChannel, który został skonfigurowany przez bramkę. * - z której nazwy kanału należy zastąpić? * 2. 'jawne ustawienie defaultErrorChannel' * - czy mógłbyś podać wskazówkę, na jaki komponent bean/klasa powinienem ustawić ten defaultErrorChannel? Nie mogę go znaleźć nigdzie. –
Myślę, że rozumiem, co miałeś na myśli, odpowiednio zaktualizowałem swoją odpowiedź, wciąż są pewne problemy, ale myślę, że widzę światło na końcu tunelu. –
Zobacz moją edycję - zbyt szybko zmienisz nagłówek. –