2016-09-28 21 views
5

Powiedzmy mam trochę iterator:Jak radzić sobie ze źródłem, które emituje przyszłość [T]?

val nextElemIter: Iterator[Future[Int]] = Iterator.continually(...) 


I chcę zbudować źródło z tego iteratora:

val source: Source[Future[Int], NotUsed] = 
    Source.fromIterator(() => nextElemIter) 


Więc teraz moje źródło emituje Future s. Nigdy nie widziałem kontrakty były przekazywane pomiędzy etapami w docs Akka lub gdziekolwiek indziej, więc zamiast tego zawsze można zrobić coś takiego:

val source: Source[Int, NotUsed] = 
    Source.fromIterator(() => nextElemIter).mapAsync(1)(identity /* was n => n */) 


A teraz mam stałe źródło emitujące T zamiast Future[T]. Ale to jest czcze i złe.

Jaki jest właściwy sposób radzenia sobie z takimi sytuacjami?

+3

myślę, że 'mapAsync' jest tutaj idealnie w porządku. W końcu jest to dokładnie przeznaczone do tego celu - spłaszczanie przyszłości w strumieniach. –

+1

'mapAsync (1) (tożsamość)' jest właściwym sposobem robienia tego. – expert

+0

@expert edytowane. –

Odpowiedz

4

Odpowiedzi na twoje pytanie bezpośrednio: Zgadzam się z komentarzem Vladimira, że ​​nie ma nic "hacky" o użyciu mapAsync do celów opisanych. Nie mogę wymyślić bardziej bezpośredniej metody rozwijania wartości Future z poziomu podstawowych wartości Int.

odpowiadając na pytanie pośrednio ...

Staraj się trzymać z Futures

strumieni, jako mechanizm współbieżności, są niezwykle użyteczne, gdy wymagane jest ciśnienie wsteczne. Jednak czyste operacje Future mają również swoje zastosowanie w aplikacjach.

Jeśli twoja Iterator[Future[Int]] zamierza wytworzyć znaną, ograniczoną liczbę wartości Future, to możesz chcieć trzymać się przy użyciu kontraktów Futures dla współbieżności.

Wyobraź sobie, że chcesz filtrować, mapować, & zmniejszyć wartości Int.

def isGoodInt(i : Int) : Boolean = ???   //filter 
def transformInt(i : Int) : Int = ???   //map 
def combineInts(i : Int, j : Int) : Int = ??? //reduce 

Futures zapewniają bezpośredni sposób korzystania z tych funkcji:

val finalVal : Future[Int] = 
    Future sequence { 
    for { 
     f <- nextElemIter.toSeq //launch all of the Futures 
     i <- f if isGoodInt(i) 
    } yield transformInt(i) 
    } map (_ reduce combineInts) 

porównaniu z nieco pośredni sposób korzystania z Stream, jak sugeruje:

val finalVal : Future[Int] = 
    Source.fromIterator(() => nextElemIter) 
     .via(Flow[Future[Int]].mapAsync(1)(identity)) 
     .via(Flow[Int].filter(isGoodInt)) 
     .via(Flow[Int].map(transformInt)) 
     .to(Sink.reduce(combineInts)) 
     .run() 
Powiązane problemy