2017-02-17 13 views
8

Wiosna 5 wprowadza reaktywny styl programowania dla reszta API z webflux. Sam jestem całkiem nowy i zastanawiałem się, czy zawijanie synchronicznych wywołań do bazy danych pod kątem Flux lub Mono ma sens w sensie preformencji? Jeśli tak, czy tak to jest:Wiosenna webflux i czytanie z bazy danych

@RestController 
public class HomeController { 

    private MeasurementRepository repository; 

    public HomeController(MeasurementRepository repository){ 
     this.repository = repository; 
    } 

    @GetMapping(value = "/v1/measurements") 
    public Flux<Measurement> getMeasurements() { 
     return Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L))); 
    } 

} 

Czy istnieje coś takiego jak asynchroniczny CrudRepository? Nie mogłem go znaleźć.

+0

kod JDBC natury synchronicznym nie ma żadnych reaktywnych JDBC kierowcy tam (i wątpliwości nie zawsze będzie). Tak więc dostęp do takiej bazy danych nie ma większego sensu. –

+0

Nie jestem zaznajomiony z flux, ale wiem, że możesz użyć Java 8 Stream jako typ zwracania w Spring Data JPA. Możesz zwrócić 'Stream '. Nie jestem pewien, czy ten komentarz pomaga, czy nie :) – burcakulug

+0

To dobry początek, ale nie jest tak asynchroniczny, więc wywołujący blokuje się podczas trwania operacji JDBC, która przerywa paraligmat nieblokowania Webflux. – NeilS

Odpowiedz

8

Jedną opcją byłoby użycie alternatywnych klientów SQL, którzy nie są całkowicie blokowani. Niektóre przykłady obejmują: https://github.com/mauricio/postgresql-async lub https://github.com/finagle/roc. Oczywiście żaden z tych sterowników nie jest oficjalnie wspierany przez dostawców baz danych. Ponadto funkcjonalność jest znacznie mniej atrakcyjna w porównaniu do dojrzałych abstrakcji opartych na JDBC, takich jak Hibernate lub jOOQ.

Alternatywny pomysł przyszedł mi ze świata Scala. Chodzi o to, aby wywoływanie blokujących wywołań w izolowanym wątku ThreadPool nie mieszać blokowania i nie blokowania połączeń. Pozwoli nam to kontrolować ogólną liczbę wątków i pozwoli CPU obsługiwać zadania nieblokujące w kontekście wykonania głównego z pewnymi potencjalnymi optymalizacjami. Zakładając, że mamy implementację opartą na JDBC, taką jak Spring Data JPA, która faktycznie blokuje, możemy sprawić, aby jej wykonanie było asynchroniczne i wysyłane do dedykowanej puli wątków.

@RestController 
public class HomeController { 

    private final MeasurementRepository repository; 
    private final Scheduler scheduler; 

    public HomeController(MeasurementRepository repository, @Qualifier("jdbcScheduler") Scheduler scheduler) { 
     this.repository = repository; 
     this.scheduler = scheduler; 
    } 

    @GetMapping(value = "/v1/measurements") 
    public Flux<Measurement> getMeasurements() { 
     return Mono.fromCallable(() -> repository.findByFromDateGreaterThanEqual(new Date(1486980000L))).publishOn(scheduler); 
    } 

} 

Nasz harmonogram dla JDBC powinien zostać skonfigurowany przy użyciu dedykowanej puli wątków z liczbą elementów równą liczbie połączeń.

@Configuration 
public class SchedulerConfiguration { 
    private final Integer connectionPoolSize; 

    public SchedulerConfiguration(@Value("${spring.datasource.maximum-pool-size}") Integer connectionPoolSize) { 
     this.connectionPoolSize = connectionPoolSize; 
    } 

    @Bean 
    public Scheduler jdbcScheduler() { 
     return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize)); 
    } 

} 

Istnieją jednak trudności z tym podejściem. Głównym z nich jest zarządzanie transakcjami. W JDBC transakcje są możliwe tylko w ramach pojedynczego java.sql.Connection. Aby wykonać kilka operacji w jednej transakcji, muszą one udostępnić połączenie. Jeśli chcemy dokonać pewnych obliczeń między nimi, musimy zachować połączenie. Nie jest to zbyt skuteczne, ponieważ ograniczamy liczbę połączeń bezczynności podczas wykonywania obliczeń pośrednich.

Ta koncepcja asynchronicznego opakowania JDBC nie jest nowa i jest już zaimplementowana w bibliotece Scala Slick 3. Wreszcie, nieblokująca JDBC może pojawić się na mapie drogowej Java. Jak ogłoszono na JavaOne we wrześniu 2016 r., Możliwe, że zobaczymy to w Javie 10.

+0

Czy jest zalecana opcja lub projekt WIP, który implementuje CrudRepository?Szkoda powiedzieć, że projekty SQL nie mogą używać reaktora/WebFlux do najwcześniejszej wersji Java 10. – NeilS

+0

@NeilS z wielu źródeł Widzę, że ta praca nie jest jeszcze zaplanowana, przynajmniej przez Pivotal: https://jira.spring.io/browse/DATAJPA-701 https://spring.io/blog/2017/02/23/spring-framework-5-0-m5-update # comment-3174616521 –

+0

Wywołanie publishOn (scheduler) wygląda dziwnie w moim przykładzie kodu. Shoud you us subscribeNa które uruchamiamy request() do tego wydawcy na danym harmonogramie? – etiennepeiniau

0

Wiosenne dane obsługują interfejs reaktywnego repozytorium dla Mongo i Cassandry.

Spring data MongoDb Reactive Interface

Wiosna danych MongoDB zapewnia wsparcie reaktywny z repozytorium projektu reaktora i RxJava 1 reaktywnych typów. Reaktywny interfejs API obsługuje reaktywną konwersję typów między typami reaktywnymi.

public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, String> { 

Flux<Person> findByLastname(String lastname); 

@Query("{ 'firstname': ?0, 'lastname': ?1}") 
Mono<Person> findByFirstnameAndLastname(String firstname, String lastname); 

// Accept parameter inside a reactive type for deferred execution 
Flux<Person> findByLastname(Mono<String> lastname); 

Mono<Person> findByFirstnameAndLastname(Mono<String> firstname, String lastname); 

@InfiniteStream // Use a tailable cursor 
Flux<Person> findWithTailableCursorBy(); 
} 

public interface RxJava1PersonRepository extends RxJava1CrudRepository<Person, String> { 

Observable<Person> findByLastname(String lastname); 

@Query("{ 'firstname': ?0, 'lastname': ?1}") 
Single<Person> findByFirstnameAndLastname(String firstname, String lastname); 

// Accept parameter inside a reactive type for deferred execution 
Observable<Person> findByLastname(Single<String> lastname); 

Single<Person> findByFirstnameAndLastname(Single<String> firstname, String lastname); 

@InfiniteStream // Use a tailable cursor 
Observable<Person> findWithTailableCursorBy(); 

}

Powiązane problemy