2015-12-31 8 views
12

Jestem nowy RxJS i próbuję napisać aplikację, która będzie wykonywać następujące rzeczy:RxJS, jak odpytywać API do ciągłego sprawdzania dostępności zaktualizowanych rekordów przy użyciu dynamicznego timestamp

  1. Na obciążenia sprawiają, że żądanie AJAX (podrobione jako fetchItems() dla uproszczenia) w celu pobrania listy elementów.
  2. Co sekundę, wykonaj żądanie AJAX, aby uzyskać przedmioty.
  3. Podczas sprawdzania nowych pozycji, TYLKO elementy zostały zmienione po zwrocie ostatniego znacznika czasu.
  4. Nie powinno być żadnych stanów zewnętrznych względem obserwowalnych.

Moja first attempt była bardzo prosta i spełnione cele 1, 2 i 4.

var data$ = Rx.Observable.interval(1000) 
    .startWith('run right away') 
    .map(function() { 
    // `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`, but 
    // I am not yet tracking the `modifiedSince` timestamp yet so all items will always be returned 
    return fetchItems(); 
    }); 

Teraz jestem podekscytowany, że było łatwo, to nie może być tak, że znacznie trudniej zrealizować cel 3 ... kilka godzin później to where I am at:

var modifiedSince = null; 
var data$ = Rx.Observable.interval(1000) 
    .startWith('run right away') 
    .flatMap(function() { 
    // `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince` 
    return fetchItems(modifiedSince); 
    }) 
    .do(function(item) { 
    if(item.updatedAt > modifiedSince) { 
     modifiedSince = item.updatedAt; 
    } 
    }) 
    .scan(function(previous, current) { 
    previous.push(current); 
    return previous; 
    }, []); 

rozwiązuje ten cel 3, ale cofa się na strzał 4. jestem teraz przechowującej stan poza obserwowalny.

Zakładam, że globalny modifiedSince i blok .do() nie są najlepszym sposobem na osiągnięcie tego. Wszelkie wskazówki będą mile widziane.

EDYCJA: mam nadzieję, że wyjaśniono, czego szukam z tym pytaniem.

+0

skąd wiesz pozycja się zmieniła i co raz ostatni wystąpił zmiana? Chodzi o zarządzanie stanem, jeśli dobrze określisz swój stan, operator 'scan' powinien pomóc w utrzymaniu tego stanu przez całe życie twojego obserwowalnego. – user3743222

+0

@ user3743222 Mam zaktualizowane pytanie, aby wyjaśnić. Muszę uzyskać max 'updatedAt' z' fetchItems (modifiedSince) 'i podać tę wartość do' fetchItems' przy następnym wywołaniu. – user774031

Odpowiedz

2

Oto inne rozwiązanie, które nie wykorzystuje zamknięcia ani "stanu zewnętrznego".

zrobiłem następującą hipotezę:

  • fetchItems zwraca Rx.Observable przedmiotów, nie znaczy tablicą elementów

To sprawia, że ​​korzystanie z operatorem expand który pozwala emitować wartości, które śledzić związek rekurencyjny typu x_n+1 = f(x_n). Przekazujesz x_n+1, zwracając obserwowalny, który emituje tę wartość, na przykład Rx.Observable.return(x_n+1) i możesz zakończyć rekursję, zwracając Rx.Observable.empty(). Tutaj wydaje się, że nie masz warunku końcowego, więc to będzie działać wiecznie.

scan pozwala również emitować wartości następujące po zależności rekursywnej (x_n+1 = f(x_n, y_n)). Różnica polega na tym, że scan zmusza cię do użycia funkcji syncronicznej (więc x_n+1 jest zsynchronizowany z y_n), podczas gdy z expand możesz użyć asynchronicznej funkcji w postaci obserwowalnej.

Kod nie jest testowany, więc bądź na bieżąco, jeśli to działa, czy nie.

Wymagane dokumenty: expand, combineLatest

var modifiedSinceInitValue = // put your date here 
var polling_frequency = // put your value here 
var initial_state = {modifiedSince: modifiedSinceInitValue, itemArray : []} 
function max(property) { 
    return function (acc, current) { 
    acc = current[property] > acc ? current[property] : acc; 
    } 
}  
var data$ = Rx.Observable.return(initial_state) 
    .expand (function(state){ 
      return fetchItem(state.modifiedSince) 
        .toArray() 
        .combineLatest(Rx.Observable.interval(polling_frequency).take(1), 
        function (itemArray, _) { 
         return { 
         modifiedSince : itemArray.reduce(max('updatedAt'), modifiedSinceInitValue), 
         itemArray : itemArray 
         } 
        } 

    }) 
+0

Z niewielkimi zmianami w składni, które robią dokładnie to, czego szukam. Wielkie dzięki za twoją pomoc w tej sprawie. – user774031

1

Jak o tym:

var interval = 1000; 
function fetchItems() { 
    return items; 
} 

var data$ = Rx.Observable.interval(interval) 
    .map(function() { return fetchItems(); }) 
    .filter(function(x) {return x.lastModified > Date.now() - interval} 
    .skip(1) 
    .startWith(fetchItems()); 

To powinno filtrować źródła tylko dla nowych elementów, a także rozpocząć można wyłączyć z pełnej kolekcji. Po prostu zapisz funkcję filtra, aby była odpowiednia dla twojego źródła danych.

Lub przekazując argument do fetchItems:

var interval = 1000; 
function fetchItems(modifiedSince) { 
    var retVal = modifiedSince ? items.filter(function(x) {return x.lastModified > modifiedSince}) : items 
    return retVal; 
} 

var data$ = Rx.Observable.interval(interval) 
    .map(function() { return fetchItems(Date.now() - interval); }) 
    .skip(1) 
    .startWith(fetchItems()); 
+0

To nie rozwiąże mojego problemu z przekazaniem parametru "modifiedSince" do 'fetchItems'. Zaktualizowałem to pytanie, aby to wyjaśnić. – user774031

+0

Każdy element powinien mieć swój ostatni Zmodyfikowany zestaw, gdy go modyfikujesz. Twoja funkcja fetchItems powinna zwrócić wszystkie elementy. Następnie przy początkowym obciążeniu akceptujesz je wszystkie. Następnie akceptujesz tylko te, które zostały zmodyfikowane. Możesz także dodać go jako argument do fetchItems, jeśli chcesz tylko pobrać nowe. –

1

Wydaje się, oznacza to, że modifiedSince jest częścią państwa nosisz, więc powinien pojawić się w scan. Dlaczego nie przesuwasz akcji do do również w skanie? Twój materiał siewny będzie wtedy {modifiedSince: null, itemArray: []}.

Niepoprawnie, po prostu pomyślałem, że to może nie działać, ponieważ musisz podać modifiedSince z powrotem do funkcji fetchItem, która jest wcześniejsza. Nie masz tutaj cyklu? Oznacza to, że musisz użyć przedmiotu, aby przerwać ten cykl. Możesz też spróbować zamknąć enkapsulację modifiedSince w zamknięciu. Coś jak

function pollItems (fetchItems, polling_frequency) { 
var modifiedSince = null; 
var data$ = Rx.Observable.interval(polling_frequency) 
    .startWith('run right away') 
    .flatMap(function() { 
    // `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince` 
    return fetchItems(modifiedSince); 
    }) 
    .do(function(item) { 
    if(item.updatedAt > modifiedSince) { 
     modifiedSince = item.updatedAt; 
    } 
    }) 
    .scan(function(previous, current) { 
    previous.push(current); 
    return previous; 
    }, []); 

return data$; 
} 

muszę wybiec z okazji nowego roku, jeśli to nie pomoże, mogę dać kolejną szansę później (być może za pomocą operatora expand, druga wersja scan).

+0

Tak, można to zrobić w zamknięciu, to jest bardziej ćwiczenie w nauce RxJS. Miałem nadzieję, że można to zrobić bez przechowywania zewnętrznego stanu. – user774031

+0

Stan zewnętrzny nie jest przestępstwem. Jeśli spojrzysz na implementację operatorów RxJS, na przykład dla 'scan', to powinno również trzymać akumulator w zamknięciu. Podobnie jak używanie przedmiotów nie jest przestępstwem. Ale musisz się upewnić, że bronisz się przed pułapkami. Ale mimo wszystko spróbuję jutro uzyskać coś bezpaństwowca. – user3743222

Powiązane problemy