2017-07-19 20 views
10

Pracujemy z spark 1.6 i staramy się zachować globalną tożsamość dla podobnych zdarzeń. Może istnieć kilka "grup" zdarzeń o identycznym ID (w przykładzie jako liczba, litery są dodawane tylko dla unikalności). I wiemy, że niektóre z tych wydarzeń są podobne, więc jesteśmy w stanie je połączyć. Chcemy zachować coś takiego:Stwórz stanowy łańcuch dla różnych zdarzeń i przypisz globalny identyfikator w iskrze

Z -> 1, 2, 3 
X -> 4 

więc w przyszłości, jeżeli pewne zdarzenia z identyfikatorem 4 przyjdą możemy przypisać X jako globalnej tożsamości.

Proszę sprawdzić przykład dla lepszego zobrazowania:

Powiedzmy mamy jakieś dane strumieniowe przyjście do pracy zapłonowej.

Ponieważ wydarzenie 1 to nasz pierwszy występ, chcemy przypisać 1 to Z. Następnie wiemy, że 1b i 2c są podobne. więc chcemy zachować mapowanie. To samo dotyczy 2e i 3f, więc potrzebujemy mapowania 3-2. Na razie mamy 3 pary: 1->Z, 2->1, .

I chcemy stworzyć "historyczną" ścieżkę: Z <- 1 <- 2 <- 3 Na koniec będziemy mieli wszystkie zdarzenia z ID = Z.

1a -> Z 
1b -> Z 
2c -> Z 
2d -> Z 
2e -> Z 
3f -> Z 
3g -> Z 
3h -> Z 
4i -> X 

Staraliśmy się wykorzystać mapwithstate ale jedyną rzeczą, byliśmy w stanie to zrobić, że 2->1 i 3->2. Z mapwithstate nie byliśmy w stanie uzyskać stanu dla "rodzica" w stanie dla bieżącego zdarzenia - np. bieżące zdarzenie 3 z rodzicem 2 i nie może uzyskać 2 -> 1 ani żadnego 1 -> Z.

Czy możliwe jest wykonanie globalnego mapowania? Wypróbowaliśmy już akumulatory i emisję, ale wygląda na to, że nie nadaje się zbyt dobrze. I nie byliśmy w stanie zastąpić zdarzeń 1 dla pierwszego mapowania i zdarzeń 2 dla drugiego mapowania z Z.

Jeśli pojawi się nowe zdarzenie 5 i będzie podobne z 3h, na przykład musimy ponownie przypisać mapowanie 5->Z.

+1

Myślę, że największym problemem do pokonania jest Sekwencyjność. Co się stanie, jeśli '1b' i' 2c' będą przetwarzane równolegle? – maasg

+0

@maasg Dzięki za komentarz !! Zapomniałem wspomnieć, że robimy agregację z 'groupBy'. '1b' i' 2c' są zgrupowane i wiemy, że '1b' i' 2c' są połączone i są podobne. abyśmy mogli zaktualizować '1b' tą informacją (to samo dla' 2c', gdzie wiemy, że '2c' jest podobne z' 1b'). Ale problem dotyczy '2d', ponieważ wiemy, że' 2' ma rodzica '1' (dla poprzedniego zdarzenia przechowujemy to w' stanie'), ale musimy wiedzieć, że '1' ma nadrzędne' Z' (i 'Z' jest również "nadrzędny" dla '2'), który jest" root ". – VladoDemcak

+1

Jak ważne jest podobieństwo pary do znalezienia sekwencji 'Z <-1 <-2 <-3'? Co się stanie, jeśli np. Przetworzymy '1a, 3f, 2c'? Czy będziemy mieć 'Z <- 1 <- 2' i' X <- 3' b/c nie mamy relacji "2> 3" w chwili 3 nadejdzie? – maasg

Odpowiedz

2

Poniżej znajduje się rozwiązanie dla danego problemu, używając zmiennego odniesienia do "stanu" RDD, który za każdym razem aktualizujemy za pomocą nowych wyników.

Używamy transform do oznaczania przychodzącego strumienia zdarzeń z unikalnym identyfikatorem globalnym, wykonując sprzężenie podobieństwa. Jest to połączenie "ręcznie", w którym używamy produktu z dwóch zestawów danych i porównujemy każdą pozycję parami.

Należy pamiętać, że jest to kosztowny proces. Istnieje wiele części, które można zmienić, w zależności od specyfiki oczekiwanego strumienia. Na przykład możemy zastąpić globalny stan RDD przez lokalny map i zastosować map-side do łączenia szybszego podobieństwa, ale to bardzo zależy od oczekiwanej liczności zbioru unikalnych identyfikatorów.

To było trudniejsze, niż się początkowo spodziewałem. Traktuj to tylko jako punkt wyjścia do bardziej niezawodnego rozwiązania. Na przykład operacja union na stanie RDD wymaga regularnego sprawdzania punktów, aby uniknąć wzrostu kontroli DAG. (Jest dużo miejsca na ulepszenia - ale to jest ponad rozsądny wysiłek, aby udzielić odpowiedzi.)

Tu naszkicować rdzeń rozwiązania dla całego badanego notebooka zobaczyć UniqueGlobalStateChains.snb

// this mutable reference points to the `states` that we keep across interations  
@transient var states: RDD[(String, (Int, Long))] = sparkContext.emptyRDD 

// we assume an incoming Event stream. Here we prepare it for the global id-process 
@transient val eventsById = eventStream.map(event => (event.id, event)) 
@transient val groupedEvents = eventsById.groupByKey() 

// this is the core of the solution. 
// We transform the incoming events into tagged events. 
// As a by-product, the mutable `states` reference will get updated with the latest state mapping. 
// the "chain" of events can be reconstructed ordering the states by timestamp 

@transient val taggedEvents = groupedEvents.transform{ (events, currentTime) => 
    val currentTransitions = states.reduceByKey{case (event1, event2) => Seq(event1, event2).maxBy{case (id, ts) => ts}}       
    val currentMappings = currentTransitions.map{case (globalId, (currentId, maxTx)) => (currentId, globalId)} 

    val newEventIds = events.keys // let's extract the ids of the incoming (grouped) events 
    val similarityJoinMap = newEventIds.cartesian(currentMappings) 
     .collect{case (eventId, (currentId, globalId)) if (isSimilar(currentId)(eventId)) => (eventId, globalId)} 
     .collectAsMap 
    //val similarityBC = sparkContext.broadcast(similarityJoinMap)     
    val newGlobalKeys = newEventIds.map(id => (id, similarityJoinMap.getOrElse(id, genGlobalId()))) 
    newGlobalKeys.cache() //avoid lazy evaluation to generate multiple global ids 

    val newTaggedEvents = events.join(newGlobalKeys).flatMap{case (eventId, (events, globalKey)) => 
             events.map(event => (event.id,event.payload, globalKey)) 
            } 
    val newStates = newGlobalKeys.map{case (eventId, globalKey) => (globalKey, (eventId, currentTime.milliseconds))} 
    currentState = newStates    
    states.unpersist(false)        
    states = newStates.union(states) 
    states.cache()        
    newTaggedEvents 
    } 

Biorąc pod uwagę to sekwencja wejściowa:

"1|a,1|b,3|c", "2|d,2|e,2|f", "3|g,3|h,3|i,4|j", "5|k", "4|f,1|g", "6|h" 

Dostajemy:

oznaczone wydarzenia z globalny identyfikator:

--- 
1|a: gen-4180,1|b: gen-4180,3|c: gen-5819 
--- 
2|d: gen-4180,2|e: gen-4180,2|f: gen-4180 
--- 
3|g: gen-4180,3|h: gen-4180,3|i: gen-4180,4|j: gen-5819 
--- 
5|k: gen-5819 
--- 
1|g: gen-2635,4|f: gen-4180 
--- 
6|h: gen-5819 

I możemy zrekonstruować łańcuch zdarzeń, które pochodzą z globalnej ID:

gen-4180: 1<-2<-3<-4 
gen-2635: 1 
gen-5819: 3<-4<-5<-6 

-o-

+0

woow niewiarygodna odpowiedź. Poszanowanie! Potrwa to kilka godzin, aż to zrozumiem :). Dziękuję również za notebooka, na pewno go wypróbuję i pobawię się tym. Mam na razie dwa pytania: '1)' Przyjąłbym następujący wynik: 'gen-4180: 1 <-2 <-3 <-4, gen-5819: 5 <-6'. Ale prawdopodobnie jest to związane z twoim '# comment77490972_45191359', prawda? '2)' jaki był cel 'sparkContext.broadcast (podobanieJoinMap)' skomentowałeś? Tylko do testowania? – VladoDemcak

+0

@VladoDemcak Q1) w odniesieniu do sekwencji, dodałem "3c" w pierwszej iteracji, aby sprawdzić scenariusz, w którym zdarzenia nadchodzą wcześniej. '3c' pobiera' 3 | c: gen-5819' i uruchamia łańcuch '3-4-5' (Jeśli moja interpretacja obszaru problemu jest poprawna, pls popraw mnie, jeśli to źle) Q2) emisja: to jest szybkość do łączenia, używając 'broadcast join'. Tylko optymalizacja perf. Usunąłem go, gdy miałem do czynienia z dziwnymi wynikami, ale wynikało to z leniwej oceny generującej wiele "globalId" dla tych samych zdarzeń. Zobacz '.catch' i notatkę obok niego. – maasg

+0

@VladoDemcak w zeszycie, zmienna 'currentState' jest używana tylko do celów wyświetlania (pobierz dane z' transform' i do widżetów notatnika w celu wizualizacji.) – maasg

Powiązane problemy