2013-05-01 16 views
8

Mam składnik WWW, który subskrybuje strumień.Jak buforować zdarzenia strumieniowe?

Ponieważ element sieciowy jest ponownie tworzony za każdym razem, gdy jest wyświetlany, muszę wyczyścić subskrybenta i ponownie go odtworzyć.

Teraz dodaję wszystkich abonentów do listy i removed() metody cyklu życia robię:

subscriptions.forEach((sub) => sub.cancel()); 

Teraz do problemu: gdy składnik Web nie jest wyświetlana, nie ma jeden słucha strumienia. Problem polega na tym, że w komponencie brakuje danych/zdarzeń, które nie są wyświetlane.

Potrzebuję buforowania. Zdarzenia muszą być buforowane i wysyłane natychmiast po zarejestrowaniu odbiorcy. According to the documentation, buforowanie odbywa się do momentu zarejestrowania detektora:

Sterownik buforuje wszystkie przychodzące zdarzenia do momentu zarejestrowania subskrybenta.

To działa, ale problemem jest to, że słuchacz będzie w pewnym momencie usunięte i ponownie zarejestrowany i wydaje się to nie powoduje buforowanie.

Wygląda na to, że buforowanie odbywa się tylko początkowo, a nie później, nawet jeśli nie ma wszystkich słuchaczy.

Pytanie brzmi: w jaki sposób mogę buforować w tej sytuacji, w której słuchacze mogą zniknąć i wrócić?

Odpowiedz

11

Uwaga: zwykle nie powinno być możliwe ponowne przesłanie do strumienia, który już został zamknięty. Wydaje się, że to błąd, o którym zapomnieliśmy naprawić.

Nie jestem zaznajomiony z komponentami sieci, ale mam nadzieję, że zajmę się twoim problemem z następującą sugestią.

Jednym ze sposobów (a jest ich oczywiście wiele) jest utworzenie nowego strumienia dla każdego subskrybenta (takiego jak html-events do), który zatrzymuje pierwotny strumień.

Powiedzmy, że origin to oryginalny Strumień. Następnie zaimplementuj program pobierający stream, który zwraca nowy strumień połączony z origin:

Kod niewyszukany.

Stream origin; 
var _subscription; 
final _listeners = new Set<StreamController>(); 

_addListener(controller) { 
    _listeners.add(controller); 
    if (_subscription == null) { 
    _subscription = origin.listen((event) { 
     // When we emit the event we want listeners to be able to unsubscribe 
     // or add new listeners. In order to avoid ConcurrentModificationErrors 
     // we need to make sure that the _listeners set is not modified while 
     // we are iterating over it with forEach. Here we just create a copy with 
     // toList(). 
     // Alternatively (more efficient) we could also queue subscription 
     // modification requests and do them after the forEach. 
     _listeners.toList().forEach((c) => c.add(event)); 
    }); 
    } 
    _subscription.resume(); // Just in case it was paused. 
} 
_removeListener(controller) { 
    _listeners.remove(controller); 
    if (_listeners.isEmpty) _subscription.pause(); 
} 

Stream get stream { 
    var controller; 
    controller = new StreamController(
     onListen:() => _addListener(controller), 
     onCancel:() => _removeListener(controller)); 
    return controller.stream; 
} 

Jeśli zachodzi potrzeba natychmiastowego buforowania zdarzeń, należy rozpocząć subskrypcję od razu, a nie leniwie, jak w przykładowym kodzie.

+1

Dokładnie to, co chciałem. Dziękuję Ci! Myślę, że zawinę to w implementację 'BufferedStreamController' (lub czegoś), ponieważ potrzebuję tego w kilku miejscach. –

+1

Edytował kod: Zapomniałem o równoczesnych modyfikacjach. Kiedy wydarzenie jest emitowane, chcemy, aby słuchacze mogli subskrybować lub wypisać się. –