2014-10-08 18 views
53

Potrzebuję pomocy w implementacji równoległych połączeń asynchronicznych w RxJava. Podjąłem prosty przypadek użycia, w którym PIERWSZE połączenie pobiera (raczej wyszukuje) listę produktów (Tafli) do wyświetlenia. Kolejne wywołania wychodzą i pobierają (A) RECENZJE i (B) IMAGE PRODUKTÓWRxJava Fetching Observables Równolegle

Po kilku próbach dotarłem do tego miejsca.

1 Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm); 
2 List<Tile> allTiles = new ArrayList<Tile>(); 
3 ClientResponse response = new ClientResponse(); 

4 searchTile.parallel(oTile -> { 
5  return oTile.flatMap(t -> { 
6  Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId()); 
7  Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId()); 

8  return Observable.zip(reviews, imageUrl, (r, u) -> { 
9   t.setReviews(r); 
10   t.setImageUrl(u); 

11   return t; 
12  }); 

13  }); 
14 }).subscribe(e -> { 
15  allTiles.add((Tile) e); 
16 }); 

Linia 1: wychodzi i pobiera produkt (dachówka), który będzie wyświetlany

Linia 4: Bierzemy listę obserwowalnych i odłamek go sprowadzić recenzje imageUrls

Lie 6 , 7: Fetch obserwowalnym przeglądu i obserwowalnych url

Linia 8: 2 obserwable Wreszcie są spakowane do powrotu zaktualizowaną obserwowalne

Linia 15: FI nally line 15 zestawia wszystkie pojedyncze produkty, które mają być wyświetlane w kolekcji, które mogą być zwrócone z powrotem do warstwy wywołującej

Podczas gdy obserwowalne zostało zignorowane iw naszych testach przebiega 4 różne wątki; Pobieranie recenzji i obrazów wydaje się być jedna po drugiej. Podejrzewam, że krok zip na linii 8 zasadniczo powoduje sekwencyjne wywoływanie dwóch obserwowalnych (recenzji i adresu URL).

enter image description here

Czy ta grupa ma jakieś sugestie, aby równolegle pobierać reiews i URL obrazu. Zasadniczo wykres wodospadu dołączony powyżej powinien wyglądać bardziej pionowo w stosie. Rozmowy na opinie i zdjęcia powinny być równoległe

dzięki Anand Raman

+0

W jaki sposób generujesz wykres osi czasu transferu? Wygląda całkiem fajnie i jest użyteczny. Chciałbym go użyć sam. –

+0

Ponieważ mój system nawiązywał połączenia zewnętrzne, po prostu wywoływałam wywołania przez skrzypka. Fiddler ma opcję generowania linii czasu w sieci. Zasadniczo widzisz ten widok. Po ustawieniu skrzypka dla żądań proxy; po prostu wybierz sesje, które Cię interesują, a następnie kliknij kartę osi czasu w prawym okienku. dzięki anand – diduknow

Odpowiedz

82

Operator równolegle okazała się być problemem dla prawie wszystkich przypadków użycia i nie robić to, co najbardziej oczekują od niego, więc został usunięty w wersja 1.0.0.rc.4: https://github.com/ReactiveX/RxJava/pull/1716

Dobry przykład tego, jak wykonać ten typ zachowania i uzyskać wykonanie równoległe, można zobaczyć here.

W twoim przykładowym kodzie nie jest jasne, czy searchServiceClient jest synchroniczna czy asynchroniczna. Wpływa na to, jak rozwiązać problem tak, jakby był już asynchroniczny, nie jest potrzebne dodatkowe planowanie. Jeśli konieczne jest synchroniczne dodatkowe planowanie.

Pierwszy oto kilka prostych przykładów pokazujących synchroniczną i asynchroniczną zachowanie:

import rx.Observable; 
import rx.Subscriber; 
import rx.schedulers.Schedulers; 

public class ParallelExecution { 

    public static void main(String[] args) { 
     System.out.println("------------ mergingAsync"); 
     mergingAsync(); 
     System.out.println("------------ mergingSync"); 
     mergingSync(); 
     System.out.println("------------ mergingSyncMadeAsync"); 
     mergingSyncMadeAsync(); 
     System.out.println("------------ flatMapExampleSync"); 
     flatMapExampleSync(); 
     System.out.println("------------ flatMapExampleAsync"); 
     flatMapExampleAsync(); 
     System.out.println("------------"); 
    } 

    private static void mergingAsync() { 
     Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println); 
    } 

    private static void mergingSync() { 
     // here you'll see the delay as each is executed synchronously 
     Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println); 
    } 

    private static void mergingSyncMadeAsync() { 
     // if you have something synchronous and want to make it async, you can schedule it like this 
     // so here we see both executed concurrently 
     Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println); 
    } 

    private static void flatMapExampleAsync() { 
     Observable.range(0, 5).flatMap(i -> { 
      return getDataAsync(i); 
     }).toBlocking().forEach(System.out::println); 
    } 

    private static void flatMapExampleSync() { 
     Observable.range(0, 5).flatMap(i -> { 
      return getDataSync(i); 
     }).toBlocking().forEach(System.out::println); 
    } 

    // artificial representations of IO work 
    static Observable<Integer> getDataAsync(int i) { 
     return getDataSync(i).subscribeOn(Schedulers.io()); 
    } 

    static Observable<Integer> getDataSync(int i) { 
     return Observable.create((Subscriber<? super Integer> s) -> { 
      // simulate latency 
       try { 
        Thread.sleep(1000); 
       } catch (Exception e) { 
        e.printStackTrace(); 
       } 
       s.onNext(i); 
       s.onCompleted(); 
      }); 
    } 
} 

obserwuję jest próbą dostarczenia przykład, który lepiej pasuje do Twojego kodu:

import java.util.List; 

import rx.Observable; 
import rx.Subscriber; 
import rx.schedulers.Schedulers; 

public class ParallelExecutionExample { 

    public static void main(String[] args) { 
     final long startTime = System.currentTimeMillis(); 

     Observable<Tile> searchTile = getSearchResults("search term") 
       .doOnSubscribe(() -> logTime("Search started ", startTime)) 
       .doOnCompleted(() -> logTime("Search completed ", startTime)); 

     Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> { 
      Observable<Reviews> reviews = getSellerReviews(t.getSellerId()) 
        .doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime)); 
      Observable<String> imageUrl = getProductImage(t.getProductId()) 
        .doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime)); 

      return Observable.zip(reviews, imageUrl, (r, u) -> { 
       return new TileResponse(t, r, u); 
      }).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime)); 
     }); 

     List<TileResponse> allTiles = populatedTiles.toList() 
       .doOnCompleted(() -> logTime("All Tiles Completed ", startTime)) 
       .toBlocking().single(); 
    } 

    private static Observable<Tile> getSearchResults(String string) { 
     return mockClient(new Tile(1), new Tile(2), new Tile(3)); 
    } 

    private static Observable<Reviews> getSellerReviews(int id) { 
     return mockClient(new Reviews()); 
    } 

    private static Observable<String> getProductImage(int id) { 
     return mockClient("image_" + id); 
    } 

    private static void logTime(String message, long startTime) { 
     System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms"); 
    } 

    private static <T> Observable<T> mockClient(T... ts) { 
     return Observable.create((Subscriber<? super T> s) -> { 
      // simulate latency 
       try { 
        Thread.sleep(1000); 
       } catch (Exception e) { 
       } 
       for (T t : ts) { 
        s.onNext(t); 
       } 
       s.onCompleted(); 
      }).subscribeOn(Schedulers.io()); 
     // note the use of subscribeOn to make an otherwise synchronous Observable async 
    } 

    public static class TileResponse { 

     public TileResponse(Tile t, Reviews r, String u) { 
      // store the values 
     } 

    } 

    public static class Tile { 

     private final int id; 

     public Tile(int i) { 
      this.id = i; 
     } 

     public int getSellerId() { 
      return id; 
     } 

     public int getProductId() { 
      return id; 
     } 

    } 

    public static class Reviews { 

    } 
} 

This wyjścia:

Search started => 65ms 
Search completed => 1094ms 
getProductImage[1] completed => 2095ms 
getSellerReviews[2] completed => 2095ms 
getProductImage[3] completed => 2095ms 
zip[1] completed => 2096ms 
zip[2] completed => 2096ms 
getProductImage[2] completed => 2096ms 
getSellerReviews[1] completed => 2096ms 
zip[3] completed => 2096ms 
All Tiles Completed => 2097ms 
getSellerReviews[3] completed => 2097ms 

Wykonałem symulację każdego wezwania IO, aby wykonać 1000ms, więc t jest oczywiste, gdzie opóźnienie jest i że dzieje się równolegle. Wydaje się, że postęp upływa w upłynięciu milisekund.

Podejście polega na tym, że flatmap łączy połączenia asynchroniczne, więc dopóki połączone obserwacje są połączone, będą one wykonywane jednocześnie.

Jeśli połączenie takie jak getProductImage(t.getProductId()) było synchroniczne, może być wykonane asynchronicznie w ten sposób: getProductImage (t.getProductId()). SubscribeOn (Schedulers.io).

Oto ważna część powyższego przykładu bez rejestrowania wszystkich typów i boilerplate:

Observable<Tile> searchTile = getSearchResults("search term");; 

    Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> { 
     Observable<Reviews> reviews = getSellerReviews(t.getSellerId()); 
     Observable<String> imageUrl = getProductImage(t.getProductId()); 

     return Observable.zip(reviews, imageUrl, (r, u) -> { 
      return new TileResponse(t, r, u); 
     }); 
    }); 

    List<TileResponse> allTiles = populatedTiles.toList() 
      .toBlocking().single(); 

Mam nadzieję, że to pomaga.

+0

Dzięki @benjchristensen za wspaniałą odpowiedź. Zapewnił jasność i rozwiązał mój problem. Dzięki za wskazanie skarbnicy przykładów w [https://github.com/benjchristensen/ReactiveLab]. Będzie zaglądać w to przez weekend. – diduknow

+0

jaki jest cel metod doOnXXX()? –

+0

@Pangea, myślę, że celem tych wywołań jest drukowanie, gdy zdarzenia mają miejsce, więc widać, że działa on równolegle. – ivant

4

Ludzie, którzy wciąż są @ JDK 7, których IDE nie wykrywa automatycznie źródła JDK 8 i co wypróbować powyższą wspaniałą odpowiedź (i wyjaśnienie) przez @benjchristensen może użyć tego bezwstydnie refraktored, JDK 7, kod . Kudos do @benjchristensen za niesamowite wyjaśnienie i przykład!

import java.util.List; 

import rx.Observable; 
import rx.Subscriber; 
import rx.functions.Action0; 
import rx.functions.Func1; 
import rx.functions.Func2; 
import rx.schedulers.Schedulers; 

public class ParallelExecutionExample 
{ 

    public static void main(String[] args) 
    { 
     final long startTime = System.currentTimeMillis(); 

     Observable<Tile> searchTile = getSearchResults("search term") 
       .doOnSubscribe(new Action0() 
         { 

          @Override 
          public void call() 
          { 
           logTime("Search started ", startTime); 
          } 
       }) 
       .doOnCompleted(new Action0() 
         { 

          @Override 
          public void call() 
          { 
           logTime("Search completed ", startTime); 
          } 
       }); 
     Observable<TileResponse> populatedTiles = searchTile.flatMap(new Func1<Tile, Observable<TileResponse>>() 
     { 

      @Override 
      public Observable<TileResponse> call(final Tile t) 
      { 
       Observable<Reviews> reviews = getSellerReviews(t.getSellerId()) 
         .doOnCompleted(new Action0() 
           { 

            @Override 
            public void call() 
            { 
             logTime("getSellerReviews[" + t.id + "] completed ", startTime); 
            } 
         }); 
       Observable<String> imageUrl = getProductImage(t.getProductId()) 
         .doOnCompleted(new Action0() 
           { 

            @Override 
            public void call() 
            { 
             logTime("getProductImage[" + t.id + "] completed ", startTime); 
            } 
         }); 
       return Observable.zip(reviews, imageUrl, new Func2<Reviews, String, TileResponse>() 
       { 

        @Override 
        public TileResponse call(Reviews r, String u) 
        { 
         return new TileResponse(t, r, u); 
        } 
       }) 
         .doOnCompleted(new Action0() 
           { 

            @Override 
            public void call() 
            { 
             logTime("zip[" + t.id + "] completed ", startTime); 
            } 
         }); 
      } 
     }); 

     List<TileResponse> allTiles = populatedTiles 
       .toList() 
       .doOnCompleted(new Action0() 
         { 

          @Override 
          public void call() 
          { 
           logTime("All Tiles Completed ", startTime); 
          } 
       }) 
       .toBlocking() 
       .single(); 
    } 

    private static Observable<Tile> getSearchResults(String string) 
    { 
     return mockClient(new Tile(1), new Tile(2), new Tile(3)); 
    } 

    private static Observable<Reviews> getSellerReviews(int id) 
    { 
     return mockClient(new Reviews()); 
    } 

    private static Observable<String> getProductImage(int id) 
    { 
     return mockClient("image_" + id); 
    } 

    private static void logTime(String message, long startTime) 
    { 
     System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms"); 
    } 

    private static <T> Observable<T> mockClient(final T... ts) 
    { 
     return Observable.create(new Observable.OnSubscribe<T>() 
     { 

      @Override 
      public void call(Subscriber<? super T> s) 
      { 
       try 
       { 
        Thread.sleep(1000); 
       } 
       catch (Exception e) 
       { 
       } 
       for (T t : ts) 
       { 
        s.onNext(t); 
       } 
       s.onCompleted(); 
      } 
     }) 
       .subscribeOn(Schedulers.io()); 
     // note the use of subscribeOn to make an otherwise synchronous Observable async 
    } 

    public static class TileResponse 
    { 

     public TileResponse(Tile t, Reviews r, String u) 
     { 
      // store the values 
     } 

    } 

    public static class Tile 
    { 

     private final int id; 

     public Tile(int i) 
     { 
      this.id = i; 
     } 

     public int getSellerId() 
     { 
      return id; 
     } 

     public int getProductId() 
     { 
      return id; 
     } 

    } 

    public static class Reviews 
    { 

    } 
}