wymyśliłem nieco gnarly rozwiązania, ale myślę, że to załatwia sprawę.
Zasadniczą ideą jest użycie metody Źródło keepAlive
jako licznika czasu, który wyzwoli zakończenie.
Ale aby to zrobić, musimy najpierw trochę streścić dane. Timer będzie trzeba wysłać za spust lub inną wartość krotki z oryginalnego źródła, zatem:
sealed trait Data
object TimerTrigger extends Data
case class Value(tstamp : Long, session_uid : String, traffic : Int) extends Data
następnie przekształcić nasze źródło krotek do źródła wartości. jeszcze użyjemy groupBy
zrobić grupowania podobnych do przypadku skończonej stream:
val originalSource : Source[(Long, String, Int), Unit] = ???
type IDGroup = (String, Source[Value, Unit]) //uid -> Source of Values for uid
val groupedDataSource : Source[IDGroup, Unit] =
originalSource.map(t => Value(t._1, t._2, t._3))
.groupBy(_.session_uid)
Najtrudniejsze jest obsługa ugrupowań, które są po prostu krotki: (String, Source[Value,Unit])
. Musimy timer powiadomić nas, jeśli upłynął czas więc potrzebujemy innego abstrakcję wiedzieć, czy jesteśmy jeszcze computing czy mamy zakończone obliczeń ze względu na timeout:
sealed trait Sum {
val sum : Int
}
case class StillComputing(val sum : Int) extends Sum
case class ComputedSum(val sum : Int) extends Sum
val zeroSum : Sum = StillComputing(0)
Teraz możemy spuścić źródło każdego Grupa. keepAlive
wyśle TimerTrigger
, jeśli źródło wartości nie wytworzy czegoś po timeOut
. Data
z keepalive jest następnie wzór porównywane albo TimerTrigger lub nową wartość z oryginalnego źródła:
val evaluateSum : ((Sum , Data)) => Sum = {
case (runningSum, data) => {
data match {
case TimerTrigger => ComputedSum(runningSum.sum)
case v : Value => StillComputing(runningSum.sum + v.traffic)
}
}
}//end val evaluateSum
type SumResult = (String, Future[Int]) // uid -> Future of traffic sum for uid
def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult =
idGroup._1 -> idGroup._2.keepAlive(timeOut,() => TimerTrigger)
.scan(zeroSum)(evaluateSum)
.collect {case c : ComputedSum => c.sum}
.runWith(Sink.head)
Kolekcja jest stosowana do częściowego funkcji, która pasuje tylko gotowy sumy, dlatego Sink zostanie osiągnięta tylko po tym, jak timer wystrzelił.
Następnie zastosować tę procedurę obsługi do każdej grupy, która wychodzi:
val timeOut = FiniteDuration(5, MINUTES)
val sumSource : Source[SumResult, Unit] =
groupedDataSource map handleGroup(timeOut)
Mamy teraz źródłem (String,Future[Int])
która jest session_uid i Przyszłość sumę ruchu dla tego identyfikatora.
Tak jak powiedziałem, zawiłe, ale spełnia wymagania. Ponadto, nie jestem całkowicie pewien, co się stanie, jeśli uid, który był już zgrupowany i został przekroczony limit czasu, ale wtedy pojawia się nowa wartość z tym samym uid.
używam 'Akka-streams' (jak wspomniano w tagach). –
Zamiast limitu czasu, czy liczba aktualizacji jest dobra? Na przykład. tworzyć zgrupowanie co 10 000 aktualizacji. –
Jeśli masz nieskończoną parę, w jaki sposób grupa może "przetworzyć cały strumień przychodzący, a dopiero potem będzie gotowy do zwrócenia wyniku"? Jeśli strumień jest naprawdę nieskończony, to nigdy się nie wydarzy. –