2015-05-05 12 views
6

Próbuję napisać prosty program przy użyciu RxJava do generowania nieskończonej sekwencji liczb naturalnych. Tak dalece znalazłem dwa sposoby generowania sekwencji liczb przy użyciu Observable.timer() i Observable.interval(). Nie jestem pewien, czy te funkcje są właściwym sposobem podejścia do tego problemu. Spodziewałem się prostej funkcji takiej, jaką mamy w Javie 8, aby generować nieskończone liczby naturalne.Generowanie nieskończonej sekwencji liczb naturalnych za pomocą RxJava

IntStream.iterate (1, wartość -> wartość +1) .forEach (System.out :: println);

Próbowałem używać IntStream with Observable, ale to nie działa poprawnie. Wysyła nieskończony strumień liczb tylko do pierwszego abonenta. Jak poprawnie wygenerować nieskończoną liczbę naturalną?

import rx.Observable; 
import rx.functions.Action1; 

import java.util.stream.IntStream; 

public class NaturalNumbers { 

    public static void main(String[] args) { 
     Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> { 
      IntStream stream = IntStream.iterate(1, val -> val + 1); 
      stream.forEach(naturalNumber -> subscriber.onNext(naturalNumber)); 
     }); 

     Action1<Integer> first = naturalNumber -> System.out.println("First got " + naturalNumber); 
     Action1<Integer> second = naturalNumber -> System.out.println("Second got " + naturalNumber); 
     Action1<Integer> third = naturalNumber -> System.out.println("Third got " + naturalNumber); 
     naturalNumbers.subscribe(first); 
     naturalNumbers.subscribe(second); 
     naturalNumbers.subscribe(third); 

    } 
} 

Odpowiedz

3

Problemem jest to, że na naturalNumbers.subscribe(first); The OnSubscribe ty realizowany jest nazywany i robisz forEach nad nieskończonym strumieniu, więc dlaczego program nie kończy.

Jednym ze sposobów radzenia sobie z tym problemem jest asynchroniczne subskrybowanie ich w innym wątku. Aby łatwo zobaczyć wyniki musiałem wprowadzić spać do przetwarzania Stream:

Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> { 
    IntStream stream = IntStream.iterate(1, i -> i + 1); 
    stream.peek(i -> { 
     try { 
      // Added to visibly see printing 
      Thread.sleep(50); 
     } catch (InterruptedException e) { 
     } 
    }).forEach(subscriber::onNext); 
}); 

final Subscription subscribe1 = naturalNumbers 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe(first); 
final Subscription subscribe2 = naturalNumbers 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe(second); 
final Subscription subscribe3 = naturalNumbers 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe(third); 

Thread.sleep(1000); 

System.out.println("Unsubscribing"); 
subscribe1.unsubscribe(); 
subscribe2.unsubscribe(); 
subscribe3.unsubscribe(); 
Thread.sleep(1000); 
System.out.println("Stopping"); 
+0

Dzięki Mike za odpowiedź. Czy byłoby inaczej, gdybym wywołał metodę subscribeOn podczas tworzenia Observable zamiast wywoływania go trzy razy, jak pokazano w powyższym fragmencie kodu. Testowałem to i zachowanie jest takie samo, ale wciąż chcę je potwierdzić. – Shekhar

+0

Ten problem został poprawnie zidentyfikowany, ale jest to zła rada - nigdy nie powinieneś używać 'subscribeOn' do rozwiązania tego problemu - zobacz moją odpowiedź na pytanie dlaczego. –

+1

W ten sposób wywołanie 'unsubscribe' powoduje rozłączenie subskrybenta, więc przestaje odbierać wiadomości, ale nie zatrzymuje pętli generatora, która nieprzerwanie pobiera energię procesora. Zobacz moją odpowiedź na pytanie, jak rozwiązać obie strony historii. –

2

Observable.Generate jest dokładnie operatorowi rozwiązać tę klasę problemu reaktywnie. Zakładam też, że jest to przykład pedagogiczny, ponieważ używanie iteracji do tego jest prawdopodobnie lepsze.

Twój kod generuje cały strumień w wątku subskrybenta. Ponieważ jest to strumień nieskończony, wywołanie subscribe nigdy się nie zakończy. Poza tym oczywistym problemem, rezygnacja z subskrypcji również będzie problematyczna, ponieważ nie sprawdzasz jej w swojej pętli.

Chcesz użyć harmonogramu, aby rozwiązać ten problem - z pewnością nie używaj subscribeOn, ponieważ będzie to obciążenie dla wszystkich obserwatorów. Zaplanuj dostarczenie każdego numeru do onNext - i jako ostatni krok w każdej zaplanowanej akcji zaplanuj następną.

Zasadniczo to jest to, co daje Observable.generate - każda iteracja jest zaplanowana na dostarczonym harmonogramie (domyślnie jest to taki, który wprowadza współbieżność, jeśli go nie określisz). Działania związane z harmonogramem można anulować i unikać głodzenia wątków.

Rx.NET rozwiązuje to w ten sposób (w rzeczywistości istnieje async/await model, który jest lepiej, ale nie jest dostępna w języku Java AFAIK):

static IObservable<int> Range(int start, int count, IScheduler scheduler) 
{ 
    return Observable.Create<int>(observer => 
    { 
     return scheduler.Schedule(0, (i, self) => 
     { 
      if (i < count) 
      { 
       Console.WriteLine("Iteration {0}", i); 
       observer.OnNext(start + i); 
       self(i + 1); 
      } 
      else 
      { 
       observer.OnCompleted(); 
      } 
     }); 
    }); 
} 

Dwie rzeczy do odnotowania tutaj:

  • Wezwanie do harmonogramu zwraca uchwyt subskrypcji, który jest przekazywany z powrotem do obserwatora.
  • Harmonogram jest rekurencyjny - parametr self jest odwołaniem do programu planującego używanego do wywoływania kolejnej iteracji. Umożliwia to anulowanie subskrypcji w celu anulowania operacji.

Nie wiem, jak to wygląda w RxJava, ale pomysł powinien być taki sam.Ponownie, Observable.generate będzie prawdopodobnie dla ciebie prostszy, ponieważ zaprojektowano go tak, by zajmował się tym scenariuszem.

1

Podczas tworzenia nieskończonych sequencies należy zadbać, aby:

  1. subskrybować i obserwować na różnych wątkach; w przeciwnym razie będziesz obsługiwać tylko jednego subskrybenta.
  2. przestanie generować wartości, jak tylko zakończy się subskrypcja; inaczej zbiegłych pętle zje procesor

Pierwszy problem został rozwiązany za pomocą subscribeOn(), observeOn() i różne szeregowania.

Drugi problem najlepiej rozwiązać, korzystając z metod dostarczonych w bibliotece: Observable.generate() lub Observable.fromIterable(). Robią właściwe kontrole.

Sprawdź to:

Observable<Integer> naturalNumbers = 
     Observable.<Integer, Integer>generate(() -> 1, (s, g) -> { 
      logger.info("generating {}", s); 
      g.onNext(s); 
      return s + 1; 
     }).subscribeOn(Schedulers.newThread()); 
Disposable sub1 = naturalNumbers 
     .subscribe(v -> logger.info("1 got {}", v)); 
Disposable sub2 = naturalNumbers 
     .subscribe(v -> logger.info("2 got {}", v)); 
Disposable sub3 = naturalNumbers 
     .subscribe(v -> logger.info("3 got {}", v)); 

Thread.sleep(100); 

logger.info("unsubscribing..."); 
sub1.dispose(); 
sub2.dispose(); 
sub3.dispose(); 
Thread.sleep(1000); 

logger.info("done"); 
Powiązane problemy