2016-03-08 29 views
7

używam RxJava 1.1 komponować zauważalnego sekwencję od wewnątrz aplikacji wiosennym że wygląda następująco:wycofywania transakcji w reaktywnym aplikacji

@Transaction 
public Observable<Event> create(Event event) { 
    return Observable.just(event) 
      .flatMap(event -> { 
       //save event to db (blocking JPA operation) 
       Event event = eventRepository.save(event); 
       return Observable.just(event); 
      }) 
      //async REST call to service A 
      .flatMap(this::sendEventToServiceA) <---- may execute on different thread 
      //async REST call to service B 
      .flatMap(this::sendEventToServiceB) <---- may execute on different thread 
      .doOnError(throwable -> { 
       // ? rollback initally created transaction? 
      }) 
} 

Zdarzenie osiągnie warstwę serwisową mojego wniosku z jakiejś klasie kontrolera a to rozchodzi się przez łańcuch operacji zbudowany za pomocą funkcji flatMap() RxJavy. Zdarzenie jest najpierw przechowywane w bazie danych (dane źródłowe), a kolejne dwa asynchroniczne żądania HTTP są wykonywane jeden po drugim za pomocą biblioteki Spring AsyncRestTemplate za kulisami.

W przypadku wystąpienia błędu/wyjątku w dowolnym miejscu potoku, chciałbym móc wycofać transakcję bazy danych, aby zdarzenie NIE było przechowywane w bazie danych. Zauważyłem, że nie jest to łatwe, ponieważ na wiosnę kontekst transakcji jest powiązany z określonym wątkiem wykonania. Jeśli więc kod dojdzie do wywołania zwrotnego doOnError w innym wątku (AsyncRestTemplate używa własnego AsyncTaskExecutor), nie jest możliwe wycofanie wstępnie utworzonej transakcji.

Czy możesz poinformować o jakimkolwiek mechanizmie, aby uzyskać transakcje w wielowątkowej aplikacji składającej się z kilku asynchronicznych operacji napisanych w ten sposób?

Próbowałem również stworzyć transakcję programowo z:

TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition()); 

a następnie wysłać przedmiot transactionStatus wraz ze zdarzeniem w poprzek rurociągu, ale ponownie, gdy wystąpi błąd i modlę „platformTransactionManager.rollback (status); ", otrzymuję" synchronizacja transakcji nie jest aktywna ", ponieważ działa na innym wątku, jak sądzę.

p.s. Metody sendEventToServiceA/sendEventToServiceB wyglądać podobnie do tego:

public Observable<Event> sendEventToServiceA(event) { 
    .......... 
    ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate.exchange(
       "/serviceA/create?event_id=" + event.id, 
       HttpMethod.POST, requestEntity, String.class); 

    return ObservableUtil.toRxObservable(listenableFuture); 
} 

Odpowiedz

3

Jednym ze sposobów osiągnięcia tego celu jest zapewnienie, że błąd obserwuje się w tym samym wątku jako db zapisać:

@Transaction 
public Observable<Event> create(Event event) { 

    Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor()); 
    return Observable.just(event) 
      .flatMap(event -> { 
       //save event to db (blocking JPA operation) 
       Event event = eventRepository.save(event); 
       return Observable.just(event); 
      }) 
      .subscribeOn(scheduler) 
      //async REST call to service A 
      .flatMap(this::sendEventToServiceA) <---- may execute on different thread 
      //async REST call to service B 
      .flatMap(this::sendEventToServiceB) <---- may execute on different thread 
      .observeOn(scheduler) 
      .doOnError(throwable -> { 
       // ? rollback initally created transaction? 
      }) 
} 
+0

Thanks Dave! Twoje rozwiązanie z harmonogramem wydaje się działać dobrze. – odybour

+0

Znalazłem jeden drobny problem polegający na tym, że metoda flatMap, która wykonuje zapis do bazy danych, będzie działała na innym wątku niż ten, który utworzył transakcję na pierwszym miejscu ze względu na adnotację. Aby obejść ten problem, utworzyłem transakcję programowo w metodzie flatMap tuż przed operacją składowania, a następnie zapisałem transakcję w obiekcie kontekstowym, który przekazuję obserwowalnym potokom, a wewnątrz doOnError robię coś takiego: 'transactionManager.rollback (context. getTransaction()); '. – odybour

Powiązane problemy