2015-08-07 11 views
16

RxJava v1.0.13 wprowadziła nowy typ obserwowalnego: rx.Single. Pasuje on świetnie do modelu żądanie-odpowiedź, ale brakuje standardowych efektów ubocznych wprowadzających operatorów takich jak doOnNext(). W rezultacie znacznie trudniej jest dokonać wielu rzeczy.Jak utworzyć buforowanie/gorącą wersję rx.Single?

Mój pomysł polegał na zamianie doOnNext() z wieloma subskrypcjami na tę samą pojedynczą instancję. Może to jednak powodować wielokrotne wykonywanie czynności nakładania: raz na każdą subskrypcję. Realizacja

Przykład rx.Single:

private class WorkerSubscribe<SomeData>() : Single.OnSubscribe<SomeData> { 
    override fun call(sub: SingleSubscriber<in SomeData>) { 
     try { 
      val result = fetchSomeData() 
      sub.onSuccess(result) 
     } catch(t: Throwable) { 
      sub.onError(t) 
     } 
    } 
} 

val single = Single.create<SomeData>(WorkerSubscribe()) 

Zastosowanie:

single.subscribe({}, {}) 
single.subscribe({}, {}) // Data is fetched for the second time 

Czy to możliwe, aby utworzyć instancję pojedynczych że nie będzie fetchSomeData() wiele razy, nawet gdy single.subscribe() jest nazywane wiele razy, ale pamięć podręczna i zwracają ten sam wynik?

Odpowiedz

3

Trzeba RxJava Subject: BehaviorSubject lub AsyncSubject

+2

nie byłby to krok wstecz z użyciem Jedynka w pierwszej kolejności? – atok

+3

czy zamieścisz przykład kodu, aby rozwiązanie nie wymagało wywnioskowania? –

0

Proszę sprawdzić cache() operator. Powinien on buforować emisje z Observable i odtworzyć je w kolejnych wersjach Subscriber.

+0

To wydaje się bardziej komentarzem niż odpowiedzią, ponieważ nie ma tu definitywnej sugestii, poza tym, że "dowiedz się więcej o operatorze pamięci podręcznej" –

+0

'rx.Single' nie ma metody" cache() ". –

0

Można utworzyć obiekt BehaviorSubject/ReplaySubject/AsyncSubject - a następnie wywołać metodę SingSon.

+1

czy zamieścisz przykład kodu, aby rozwiązanie nie wymagało wywnioskowania? –

+0

Żadna z tych klas nie ma metody 'toSingle()'. Można jednak utworzyć 'Single.fromObservable()'; z którego dziedziczy "Przedmiot". – Bryan

0

zrobiłem obejście, że nie jestem z niego zadowolony, ale to działa:

public class Network { 
    private Object data; 
    private Single<Object> dataSingle; 

    public Single<Object> getData { 
     if (data == null) { 
     if (dataSingle == null) { 
      dataSingle = Single.create(...) 
      .doOnSuccess(data -> this.data = data;) 
      .sibscribeOn(..); 
     } 
     return dataSingle; 
     } else { 
     return Single.just(data); 
     } 
    } 
} 
1

Potrzebne mi było podobne zachowanie i znaleźć jakieś rozwiązanie.

Możesz przekonwertować Single na Observable, aby zastosować cache(), a następnie przekonwertować z powrotem na Single.

yourSingle.toObservable().cacheWithInitialCapacity(1).toSingle() 

używam cacheWithInitialCapacity(1) zamiast tylko cache() jako Optimization - Single nigdy nie będzie emitować więcej niż jedną pozycję.

 

Jest to również dobry pomysł, aby zapewnić Transformer realizacji ma

public class SingleUtils { 

    public static <T> Single.Transformer<T, T> cached() { 
     return single -> single.toObservable() 
      .cacheWithInitialCapacity(1) 
      .toSingle(); 
    } 
} 

więc można użyć buforowania gdziekolwiek chcesz wywołując tylko

yourSingle.compose(SingleUtils.cached()) 

Edit: Począwszy od rxJava 1.2.2 został dodany (https://github.com/ReactiveX/RxJava/releases/tag/v1.2.2)

Wdrożone właśnie w ten sposób (https://github.com/ReactiveX/RxJava/pull/4757)