2015-11-21 14 views
5

mam nieskończony strumień zdarzeń:Jak pogrupować zdarzenia przychodzące z nieskończonego strumienia?

(timestamp, session_uid, traffic) 

tj

... 
(1448089943, session-1, 10) 
(1448089944, session-1, 20) 
(1448089945, session-2, 50) 
(1448089946, session-1, 30) 
(1448089947, session-2, 10) 
(1448089948, session-3, 10) 
... 

Te wydarzenia Chcę grupować według session_uid i obliczyć sumę ruchu dla każdej sesji.

Napisałem przepływ akka-streams, który działa dobrze z wykorzystaniem strumienia skończonego groupBy (mój kod bazowy na this przykład z książki kucharskiej). Ale z nieskończonym strumieniem nie zadziała, ponieważ funkcja groupBy powinna przetwarzać cały strumień przychodzący i dopiero po tym będzie gotowy do zwrócenia wyniku.

Myślę, że powinienem wdrożyć grupowanie z limitem czasu, tj. Jeśli nie otrzymam zdarzenia z określonym strumieniem_początkowym więcej niż 5 minut od ostatniego, powinienem zwrócić zgrupowane zdarzenia dla tej wartości session_uid. Ale jak to zaimplementować, użyj tylko akka-streams?

+0

używam 'Akka-streams' (jak wspomniano w tagach). –

+0

Zamiast limitu czasu, czy liczba aktualizacji jest dobra? Na przykład. tworzyć zgrupowanie co 10 000 aktualizacji. –

+0

Jeśli masz nieskończoną parę, w jaki sposób grupa może "przetworzyć cały strumień przychodzący, a dopiero potem będzie gotowy do zwrócenia wyniku"? Jeśli strumień jest naprawdę nieskończony, to nigdy się nie wydarzy. –

Odpowiedz

3

wymyśliłem nieco gnarly rozwiązania, ale myślę, że to załatwia sprawę.

Zasadniczą ideą jest użycie metody Źródło keepAlive jako licznika czasu, który wyzwoli zakończenie.

Ale aby to zrobić, musimy najpierw trochę streścić dane. Timer będzie trzeba wysłać za spust lub inną wartość krotki z oryginalnego źródła, zatem:

sealed trait Data 

object TimerTrigger extends Data 
case class Value(tstamp : Long, session_uid : String, traffic : Int) extends Data 

następnie przekształcić nasze źródło krotek do źródła wartości. jeszcze użyjemy groupBy zrobić grupowania podobnych do przypadku skończonej stream:

val originalSource : Source[(Long, String, Int), Unit] = ??? 

type IDGroup = (String, Source[Value, Unit]) //uid -> Source of Values for uid 

val groupedDataSource : Source[IDGroup, Unit] = 
    originalSource.map(t => Value(t._1, t._2, t._3)) 
       .groupBy(_.session_uid) 

Najtrudniejsze jest obsługa ugrupowań, które są po prostu krotki: (String, Source[Value,Unit]). Musimy timer powiadomić nas, jeśli upłynął czas więc potrzebujemy innego abstrakcję wiedzieć, czy jesteśmy jeszcze computing czy mamy zakończone obliczeń ze względu na timeout:

sealed trait Sum { 
    val sum : Int 
} 
case class StillComputing(val sum : Int) extends Sum 
case class ComputedSum(val sum : Int) extends Sum 

val zeroSum : Sum = StillComputing(0) 

Teraz możemy spuścić źródło każdego Grupa. keepAlive wyśle ​​TimerTrigger, jeśli źródło wartości nie wytworzy czegoś po timeOut. Data z keepalive jest następnie wzór porównywane albo TimerTrigger lub nową wartość z oryginalnego źródła:

val evaluateSum : ((Sum , Data)) => Sum = { 
    case (runningSum, data) => { 
    data match { 
     case TimerTrigger => ComputedSum(runningSum.sum) 
     case v : Value => StillComputing(runningSum.sum + v.traffic) 
    } 
    } 
}//end val evaluateSum 

type SumResult = (String, Future[Int]) // uid -> Future of traffic sum for uid 

def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult = 
    idGroup._1 -> idGroup._2.keepAlive(timeOut,() => TimerTrigger) 
          .scan(zeroSum)(evaluateSum) 
          .collect {case c : ComputedSum => c.sum} 
          .runWith(Sink.head) 

Kolekcja jest stosowana do częściowego funkcji, która pasuje tylko gotowy sumy, dlatego Sink zostanie osiągnięta tylko po tym, jak timer wystrzelił.

Następnie zastosować tę procedurę obsługi do każdej grupy, która wychodzi:

val timeOut = FiniteDuration(5, MINUTES) 

val sumSource : Source[SumResult, Unit] = 
    groupedDataSource map handleGroup(timeOut) 

Mamy teraz źródłem (String,Future[Int]) która jest session_uid i Przyszłość sumę ruchu dla tego identyfikatora.

Tak jak powiedziałem, zawiłe, ale spełnia wymagania. Ponadto, nie jestem całkowicie pewien, co się stanie, jeśli uid, który był już zgrupowany i został przekroczony limit czasu, ale wtedy pojawia się nowa wartość z tym samym uid.

+0

Dziękuję bardzo! Twoja odpowiedź daje mi kilka przydatnych pomysłów, ale ma kilka problemów w kontekście mojego zadania. Na przykład ma [problem z pamięcią zużycia] (http://stackoverflow.com/questions/33865423/is-groupby-leaking-in-akka-stream) w funkcji 'groupBy'. –

+0

@maxd Każde rozwiązanie będzie miało potencjalny problem z pamięcią, ponieważ liczba aktywnych uids może się powiększyć i dlatego liczba działających sum może wzrosnąć. Ale jesteś mile widziany, szczęśliwy hacking. –

-1

może można go zaimplementować prosty aktora

case class SessionCount(name: String) 

class Hello private() extends Actor { 
    var sessionMap = Map[String, Int]() 

    override def receive: Receive = { 
    case (_, session: String, _) => 
     sessionMap = sessionMap + (session -> (sessionMap.getOrElse(session, 0) + 1)) 

    case SessionCount(name: String) => sender() ! sessionMap.get(name).getOrElse(0) 
    } 
} 


object Hello { 
    private val actor = ActorSystem.apply().actorOf(Props(new Hello)) 
    private implicit val timeOver = Timeout(10, TimeUnit.SECONDS) 
    type Value = (String, String, String) 

    def add(value: Value) = actor ! value 

    def count(name:String) = (actor ? SessionCount(name)).mapTo[Int] 
} 
+0

Wiem, że możliwym do rozwiązania moim zadaniem jest użycie tylko aktorów Akka, ale myślę, że strumienie akka powinny również znaleźć rozwiązanie dla mojego zadania. I chcę to znaleźć. FYI: Przykładowy kod oblicza liczbę sesji, ale nie całkowity ruch dla każdej sesji. Co więcej, nie obsługiwał on nieskończonego strumienia poprawnie. –

+0

jak to jest używać 'words.forearch (e => Hello.add (e))' –

1

To wydaje się być w przypadku zastosowania do Source.groupedWithin „Chunk się ten strumień na grupy elementów otrzymanych w oknie czasowym, lub ograniczone przez daną liczbę elementów, co nastąpi wcześniej”

def groupedWithin(n: Int, d: FiniteDuration): Source[List[Out], Mat] 

Here's the link to the docs

+0

Jest to również niedopuszczalne, ponieważ sesja może występować w kilku porcjach. –

Powiązane problemy