2015-11-16 11 views
5

Po prostu próbuję zrozumieć, jak wygląda model programowania. Scenariusz: używam Pub/Sub + Dataflow do analizy instrumentów na forum internetowym. Mam strumienia danych pochodzących z pub/Sub, który wygląda tak:Długo żyjący stan z przepływem danych Google

ID | TS | EventType 
1 | 1 | Create 
1 | 2 | Comment 
2 | 2 | Create 
1 | 4 | Comment 

I chcę skończyć ze strumieniem pochodzących z przepływu danych, który wygląda tak:

ID | TS | num_comments 
1 | 1 | 0 
1 | 2 | 1 
2 | 2 | 0 
1 | 4 | 2 

Chcę zadanie, które czy ten pakiet jest uruchamiany jako proces strumieniowy, a nowe liczby są zapełniane w miarę pojawiania się nowych zdarzeń. Moje pytanie brzmi: gdzie jest idiomatyczne miejsce do przechowywania stanu dla bieżącego identyfikatora tematu i czy liczy się komentarz? Zakładając, że tematy mogą żyć przez lata. Obecne pomysły są:

  • Napisz „” aktualny wpis do tematu i id do BigTable w zapytaniu DoFn co obecna liczba komentarz dla identyfikatora tematu kapie Nawet kiedy piszę to ja nie jestem. wentylator.
  • Jakoś wykorzystać wejścia boczne? Wygląda na to, że może to jest odpowiedź, ale jeśli tak, to nie rozumiem w pełni.
  • Skonfiguruj zadanie transmisji strumieniowej za pomocą globalnego okna z wyzwalaczem, który uruchamia się za każdym razem, gdy jest pobierany rekord, i polegaj na Dataflow, aby zachować historię całego panelu. (Nieograniczona wymóg przechowywania?)

EDIT: Właśnie w celu wyjaśnienia, nie będę miał żadnych problemów wykonawczych któregokolwiek z tych trzech strategii, albo milion różnych innych sposobów działania, jestem bardziej zainteresowany tym, co jest Najlepszym sposobem na zrobienie tego za pomocą Dataflow jest . Co będzie najbardziej odporne na awarię, konieczność ponownego przetworzenia historii na zasypkę, itp.

EDIT2: Obecnie występuje błąd w usłudze przepływu danych, w której aktualizacje kończą się niepowodzeniem w przypadku dodania danych wejściowych do transformacji spłaszczenia, co będzie oznaczać musisz odrzucić i odbudować dowolny stan naliczony w zadaniu, jeśli zmienisz zadanie, które obejmuje dodanie czegoś do operacji spłaszczania.

Odpowiedz

7

Powinieneś być w stanie użyć wyzwalaczy i kombajnu, aby to osiągnąć.

PCollection<ID> comments = /* IDs from the source */; 
PCollection<KV<ID, Long>> commentCounts = comments 
    // Produce speculative results by triggering as data comes in. 
    // Note that this won't trigger after *every* element, but it will 
    // trigger relatively quickly (as the system divides incoming data 
    // into work units). You could also throttle this with something 
    // like: 
    // AfterProcessingTime.pastFirstElementInPane() 
    //  .plusDelayOf(Duration.standardMinutes(5)) 
    // which will produce output every 5 minutes 
    .apply(Window.triggering(
      Repeatedly.forever(AfterPane.elementCountAtLeast(1))) 
     .accumulatingFiredPanes()) 
    // Count the occurrences of each ID 
    .apply(Count.perElement()); 

// Produce an output String -- in your use case you'd want to produce 
// a row and write it to the appropriate source 
commentCounts.apply(new DoFn<KV<ID, Long>, String>() { 
    public void processElement(ProcessContext c) { 
    KV<ID, Long> element = c.element(); 
    // This includes details about the pane of the window being 
    // processed, and including a strictly increasing index of the 
    // number of panes that have been produced for the key.   
    PaneInfo pane = c.pane(); 
    return element.key() + " | " + pane.getIndex() + " | " + element.value(); 
    } 
}); 

zależności od danych, można również czytać całych komentarzy od źródła, wyodrębnić identyfikator, a następnie użyj Count.perKey() uzyskać liczniki dla każdego identyfikatora. Jeśli chcesz bardziej skomplikowaną kombinację, możesz przyjrzeć się definiowaniu niestandardowego CombineFn i użyć Combine.perKey.

+0

w prawo, więc jest to numer 3 na mojej liście potencjalnych wdrożeń. Moje pytanie brzmi: czy to dobry pomysł *? Stan tutaj jest niejawnie obsługiwany przez przepływ danych. Co się stanie, jeśli będę musiał ponownie uruchomić pracę? Jak wdrożyć historyczne zasypki? – bfabry

+1

W zależności od wprowadzonych zmian może być dostępna opcja [Aktualizuj istniejący potok] (https://cloud.google.com/dataflow/pipelines/updating-a-pipeline). Jeśli zmiany są bardziej znaczące, wymienione podejście działa, jeśli używasz niestandardowego źródła, które umożliwia odczytanie wszystkich starych danych. –

+0

Niestandardowe źródło jako sposób radzenia sobie z zalewami itp. To ciekawy pomysł. Wydaje się, że to rozwiązuje to pytanie. Czy jest to dobry pomysł, aby mieć ten stan, który właśnie rośnie na zawsze? Co się stanie, jeśli temat forum może zostać zamknięty, czy istnieje sposób, aby powiedzieć "nie będą już wydarzenia, na których nam zależy", aby zostały odrzucone? – bfabry

2

Od BigQuery nie obsługuje nadpisywania wiersze, jednym ze sposobów, aby przejść o to napisać zdarzenia do BigQuery i wyszukiwania danych za pomocą COUNT:

SELECT ID, COUNT (NUM_COMMENTS) z tabeli grupy przez ID;

Możesz także wykonywać agregacje per_kart liczby num_comments w Dataflow przed wpisaniem wpisów do BigQuery; powyższe zapytanie nadal będzie działać.

+0

Dzięki za wykonanie zdjęcia :-). Dla celów problemu możesz zignorować, że miejscem docelowym jest BQ. Miejsce docelowe powinno być jednak dołączone tylko. Obliczenia w rzeczywistości są również bardziej skomplikowane niż tylko suma i wolelibyśmy być wyrażeni w naszym ETL w przepływie danych zamiast skomplikowanych (i kosztownych w uruchamianiu) zapytań BQ. Ponadto to rozwiązanie nie dałoby nam historii szeregów czasowych dotyczących liczby komentarzy na dany temat. – bfabry

Powiązane problemy