7

Mam problemy z rekursywnym łańcuchem obserwowalnych obiektów.RxJS: Rekurencyjna lista obserwowalnych i pojedynczy obserwator

Pracuję z RxJS, który jest obecnie w wersji 1.0.10621, i zawiera najbardziej podstawową funkcjonalność Rx, w połączeniu z Rx dla jQuery.

Pozwolę sobie przedstawić przykładowy scenariusz mojego problemu: Przepytam Twitter search API (odpowiedź JSON) dla tweetów/aktualizacji zawierających określone słowo kluczowe. Odpowiedź zawiera również "refresh_url", którego należy użyć do wygenerowania żądania kontynuacji. Odpowiedź na to kolejne żądanie ponownie będzie zawierała nowe refresh_url, itp.

Rx.jQuery pozwala mi uczynić API wyszukiwania Twittera możliwym do zaobserwowania zdarzeniem, które produkuje jeden onNext, a następnie kończy. Próbowałem do tej pory, aby program obsługi onNext zapamiętał parametr refresh_url i użył go w module obsługi onCompleted, aby wygenerować zarówno nowego obserwowalnego, jak i odpowiedniego obserwatora dla następnego żądania. W ten sposób jedna para obserwowalnych + obserwatorów podąża za drugą w nieskończoność.

Problem z tym podejściem jest:

  1. Następca obserwowalne/obserwator już żył, kiedy ich poprzednicy nie zostały jeszcze wyrzucać.

  2. Muszę wykonać wiele nieprzyjemnych zapisów księgowych, aby zachować ważne odniesienie do obecnego żywego obserwatora, którego faktycznie mogą być dwa. (Jeden w OnCompleted, a drugi gdzie indziej w cyklu życia) Odniesienie to jest oczywiście potrzebne do rezygnacji z subskrypcji/usunięcia obserwatora. Alternatywą dla księgowości byłoby wprowadzenie efektu ubocznego za pomocą "nadal działającego?" - boolowskiego, tak jak zrobiłem to w moim przykładzie.

Przykładowy kod:

  running = true; 
      twitterUrl = "http://search.twitter.com/search.json"; 
      twitterQuery = "?rpp=10&q=" + encodeURIComponent(text); 
      twitterMaxId = 0; //actually twitter ignores its since_id parameter 

      newTweetObserver = function() { 
       return Rx.Observer.create(
         function (tweet) { 
          if (tweet.id > twitterMaxId) { 
           twitterMaxId = tweet.id; 
           displayTweet(tweet); 
          } 
         } 
        ); 
      } 

      createTwitterObserver = function() { 
       twitterObserver = Rx.Observer.create(
         function (response) { 
          if (response.textStatus == "success") { 
           var data = response.data; 
           if (data.error == undefined) { 
            twitterQuery = data.refresh_url; 
            var tweetObservable; 
            tweetObservable = Rx.Observable.fromArray(data.results.reverse()); 
            tweetObservable.subscribe(newTweetObserver()); 
           } 
          } 
         }, 
         function(error) { alert(error); }, 
         function() { 
          //create and listen to new observer that includes a delay 
          if (running) { 
           twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000); 
           twitterObservable.subscribe(createTwitterObserver()); 
          } 
         } 
        ); 
       return twitterObserver; 
      } 
      twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery); 
      twitterObservable.subscribe(createTwitterObserver()); 

Nie daj się nabrać na podwójną warstwą obserwable/obserwatorów z wnioskami do tweetów. Mój przykład dotyczy głównie pierwszej warstwy: żądania danych z Twittera. Jeśli w rozwiązaniu tego problemu druga warstwa (przekształcając odpowiedzi na tweety) może stać się jednym z pierwszych, byłoby fantastycznie; Ale myślę, że to zupełnie inna sprawa. Na razie.

Erik Meijer wskazał mi operator Expand (patrz przykład poniżej) i zasugerował Join patterns jako alternatywę.

var ys = Observable.Expand 
(new[]{0}.ToObservable() // initial sequence 
        , i => (i == 10 ? Observable.Empty<int>() // terminate 
     : new[]{i+1}.ToObservable() // recurse 
) 
); 

ys.ToArray().Select(a => string.Join(",", a)).DumpLive(); 

Powinno to być możliwe do skopiowania do LINQPad. Zakłada pojedyncze obserwacje i produkuje jednego końcowego obserwatora.

Moje pytanie brzmi: w jaki sposób mogę zrobić najładniejszą sztuczkę w RxJS?

EDYTOWANIE:
Operator rozszerzenia może zostać prawdopodobnie zaimplementowany, jak pokazano w this thread. Ale trzeba by było generators (a ja mam tylko JS < 1.6).
Niestety RxJS 2.0.20304-beta nie implementuje metody Extend.

+2

I doszli do wniosku, że rozwiązaniem tego problemu jest rzeczywiście [Expand operator] (http://social.msdn.microsoft.com/Forums/da-DK/rx/thread/2746e373- bf43-4381-834c-8cc182704ae9), który nie został jeszcze zaimplementowany w RxJS do wersji 2.0.20304-beta. – derabbink

+1

expand jest dokładnie tym, czego potrzebujesz. Rozumiem, że ten post jest stary, ale możliwe jest po prostu przeszczepienie operatorów, których potrzebujesz, od przyszłej wersji Rx do własnej wersji Rx. Może wymagać pewnych manipulacji, ale podstawa kodu nie zmieniła się znacząco od wersji sprzed wersji v1. –

+0

Ponadto, poprawianie Rx (jeśli to możliwe) jest dobrą decyzją. Istnieje wiele poprawek i usprawnień w najnowszych wersjach. –

Odpowiedz

4

Postaram się więc rozwiązać twój problem nieco inaczej niż Ty i zabrać trochę swobód, które możesz łatwiej rozwiązać.

Więc 1 rzeczą nie mogę powiedzieć jest to, że jesteś stara się w następujących etapach

  • Pobierz pierwszą listę tweet, która zawiera kolejny url
  • Raz listy Tweet odebranych onNext obecnego obserwatora i uzyskać Kolejny zestaw tweets
    • Czy ta nieskończoność

Czy istnieje działanie użytkownika (uzyskać więcej/przewijanie do dołu). Tak czy siak, to naprawdę ten sam problem. Być może jednak źle odczytuję Twój problem. Oto odpowiedź na to.

function getMyTweets(headUrl) { 
    return Rx.Observable.create(function(observer) { 

     innerRequest(headUrl); 
     function innerRequest(url) { 
      var next = ''; 

      // Some magic get ajax function 
      Rx.get(url).subscribe(function(res) { 
       observer.onNext(res); 
       next = res.refresh_url; 
      }, 
      function() { 
       // Some sweet handling code 
       // Perhaps get head? 
      }, 
      function() { 
       innerRequest(next); 
      }); 
     } 
    }); 
} 

To może nie być odpowiedź, o którą pytasz. Jeśli nie, przepraszam!


Edycja: Po przejrzeniu kodu wygląda na to, że chcesz otrzymać wyniki jako tablicę i obserwować je.

// From the results perform a select then a merge (if ordering does not matter). 
getMyTweets('url') 
    .selectMany(function(data) { 
     return Rx.Observable.fromArray(data.results.reverse()); 
    }); 

// Ensures ordering 
getMyTweets('url') 
    .select(function(data) { 
     return Rx.Observable.fromArray(data.results.reverse()); 
    }) 
    .concat(); 
Powiązane problemy