2014-11-23 14 views
5

Tło: Mam funkcję:Sekwencjonowanie Scala Futures z ograniczonym równoległości (bez aprowizacji z ExecutorContexts)

def doWork(symbol: String): Future[Unit] 

która inicjuje pewne skutki uboczne, aby pobrać dane i przechowywać je i uzupełnia przyszłego gdy jego done . Jednak infrastruktura zaplecza ma ograniczenia użytkowania, tak że nie więcej niż 5 z tych żądań może być wykonywanych równolegle. Mam listę symboli N, które trzeba uzyskać poprzez:

var symbols = Array("MSFT",...) 

ale chcę sekwencjonowania je tak, że nie więcej niż 5 realizują jednocześnie. Biorąc pod uwagę:

val allowableParallelism = 5 

moje obecne rozwiązanie jest (zakładając, że pracuję z async/czekają):

val symbolChunks = symbols.toList.grouped(allowableParallelism).toList 
    def toThunk(x: List[String]) =() => Future.sequence(x.map(doWork)) 
    val symbolThunks = symbolChunks.map(toThunk) 
    val done = Promise[Unit]() 
    def procThunks(x: List[() => Future[List[Unit]]]): Unit = x match { 
    case Nil => done.success() 
    case x::xs => x().onComplete(_ => procThunks(xs)) 
    } 
    procThunks(symbolThunks) 
    await { done.future } 

ale z oczywistych powodów, nie jestem strasznie zadowolony z niego. Czuję, że to powinno być możliwe z fałdami, ale za każdym razem, gdy próbuję, kończę z niecierpliwością tworząc Futures. Wypróbowałem też wersję z RxScala Observables, używając concatMap, ale to też wydawało się przesadą.

Czy istnieje lepszy sposób na osiągnięcie tego?

+0

Dodam, że byłoby jeszcze lepiej, jeśli za każdym razem Future zakończeniu nowej rozpoczęto, zamiast czekać na całej kohorty/grupy, aby zakończyć. – experquisite

+0

to blokowanie IO, a następnie zapakowane w Future {} lub czy Asynchronizacja IO nie używa wątku podczas oczekiwania na zdalnym serwerze? Jeśli to blokowanie, wówczas ustalona pula wątków z 5 wątkami wydaje mi się najprostszym rozwiązaniem. Ale używaj tej puli tylko do blokowania IO i nic więcej. –

+0

Wsparcie IO dla doWork() nie jest blokujące, działa na wątkach, nad którymi nie mam kontroli, które zapakowałem do Observables na różnych poziomach abstrakcji. – experquisite

Odpowiedz

5

Mam przykład jak to zrobić ze strumieniem scalaz. Jest to całkiem sporo kodu, ponieważ wymaga on konwersji scala Future to scalaz Task (abstrakcja dla odroczonego obliczania). Jednak wymagane jest dodanie go do projektu raz. Inną opcją jest użycie zadania do zdefiniowania "doWork". Osobiście preferuję zadanie do budowania programów asynchronicznych.

import scala.concurrent.{Future => SFuture} 
    import scala.util.Random 
    import scala.concurrent.ExecutionContext.Implicits.global 


    import scalaz.stream._ 
    import scalaz.concurrent._ 

    val P = scalaz.stream.Process 

    val rnd = new Random() 

    def doWork(symbol: String): SFuture[Unit] = SFuture { 
    Thread.sleep(rnd.nextInt(1000)) 
    println(s"Symbol: $symbol. Thread: ${Thread.currentThread().getName}") 
    } 

    val symbols = Seq("AAPL", "MSFT", "GOOGL", "CVX"). 
    flatMap(s => Seq.fill(5)(s).zipWithIndex.map(t => s"${t._1}${t._2}")) 

    implicit class Transformer[+T](fut: => SFuture[T]) { 
    def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = { 
     import scala.util.{Failure, Success} 
     import scalaz.syntax.either._ 
     Task.async { 
     register => 
      fut.onComplete { 
      case Success(v) => register(v.right) 
      case Failure(ex) => register(ex.left) 
      } 
     } 
    } 
    } 

    implicit class ConcurrentProcess[O](val process: Process[Task, O]) { 
    def concurrently[O2](concurrencyLevel: Int)(f: Channel[Task, O, O2]): Process[Task, O2] = { 
     val actions = 
     process. 
      zipWith(f)((data, f) => f(data)) 

     val nestedActions = 
     actions.map(P.eval) 

     merge.mergeN(concurrencyLevel)(nestedActions) 
    } 
    } 

    val workChannel = io.channel((s: String) => doWork(s).toTask) 

    val process = Process.emitAll(symbols).concurrently(5)(workChannel) 

    process.run.run 

Kiedy będziesz miał wszystkie te przemiany w zakresie, w zasadzie wszystko, czego potrzebujesz to:

val workChannel = io.channel((s: String) => doWork(s).toTask) 

    val process = Process.emitAll(symbols).concurrently(5)(workChannel) 

dość krótki i self-decribing

+0

Dzięki, starannie unikam scalazu, ponieważ jestem całkiem nowy na scala, ale wygląda to dobrze ... – experquisite

3

Chociaż już masz doskonałą odpowiedź , Myślałem, że nadal mogę przedstawić opinię lub dwie na ten temat.

Pamiętam, że widziałem gdzieś (na czyimś blogu) "używaj aktorów dla stanu i korzystaj z kontraktów terminowych na współbieżność".

Więc moją pierwszą myślą byłoby jakoś wykorzystać aktorów. Mówiąc dokładniej, miałbym głównego aktora z routerem uruchamiającym wielu pracowników, z liczbą robotników ograniczonych zgodnie z allowableParallelism. Zakładając więc, mam

def doWorkInternal (symbol: String): Unit 

który działa od Ciebie doWork podjętą „poza przyszłości”, musiałbym coś wzdłuż tych linii (bardzo prymitywny, nie odrywając wiele szczegółów pod uwagę, i praktycznie kopiuje kod z Akka dokumentacja):

import akka.actor._ 

case class WorkItem (symbol: String) 
case class WorkItemCompleted (symbol: String) 
case class WorkLoad (symbols: Array[String]) 
case class WorkLoadCompleted() 

class Worker extends Actor { 
    def receive = { 
     case WorkItem (symbol) => 
      doWorkInternal (symbol) 
      sender() ! WorkItemCompleted (symbol) 
    } 
} 

class Master extends Actor { 
    var pending = Set[String]() 
    var originator: Option[ActorRef] = None 

    var router = { 
     val routees = Vector.fill (allowableParallelism) { 
      val r = context.actorOf(Props[Worker]) 
      context watch r 
      ActorRefRoutee(r) 
     } 
     Router (RoundRobinRoutingLogic(), routees) 
    } 

    def receive = { 
     case WorkLoad (symbols) => 
      originator = Some (sender()) 
      context become processing 
      for (symbol <- symbols) { 
       router.route (WorkItem (symbol), self) 
       pending += symbol 
      } 
    } 

    def processing: Receive = { 
     case Terminated (a) => 
      router = router.removeRoutee(a) 
      val r = context.actorOf(Props[Worker]) 
      context watch r 
      router = router.addRoutee(r) 
     case WorkItemCompleted (symbol) => 
      pending -= symbol 
      if (pending.size == 0) { 
       context become receive 
       originator.get ! WorkLoadCompleted 
      } 
    } 
} 

można zapytać głównego aktora z ask i otrzymać WorkLoadCompleted w przyszłości.

Ale myślenie o "stanie" (liczby jednoczesnych żądań w przetwarzaniu), aby gdzieś się ukryć, wraz z implementacją niezbędnego kodu, aby go nie przekraczać, oto coś z "przyszłego pośrednika bramy", jeśli nie " t umysł imperatyw styl i Zmienne (używany tylko wewnętrznie chociaż) budowle:

object Guardian 
{ 
    private val incoming = new collection.mutable.HashMap[String, Promise[Unit]]() 
    private val outgoing = new collection.mutable.HashMap[String, Future[Unit]]() 
    private val pending = new collection.mutable.Queue[String] 

    def doWorkGuarded (symbol: String): Future[Unit] = { 
     synchronized { 
      val p = Promise[Unit]() 
      incoming(symbol) = p 
      if (incoming.size <= allowableParallelism) 
       launchWork (symbol) 
      else 
       pending.enqueue (symbol) 
      p.future 
     } 
    } 

    private def completionHandler (t: Try[Unit]): Unit = { 
     synchronized { 
      for (symbol <- outgoing.keySet) { 
       val f = outgoing (symbol) 
       if (f.isCompleted) { 
        incoming (symbol).completeWith (f) 
        incoming.remove (symbol) 
        outgoing.remove (symbol) 
       } 
      } 
      for (i <- outgoing.size to allowableParallelism) { 
       if (pending.nonEmpty) { 
        val symbol = pending.dequeue() 
        launchWork (symbol) 
       } 
      } 
     } 
    } 

    private def launchWork (symbol: String): Unit = { 
     val f = doWork(symbol) 
     outgoing(symbol) = f 
     f.onComplete(completionHandler) 
    } 
} 

doWork teraz jest dokładnie jak twoje, wracając Future[Unit], z myślą, że zamiast używać coś jak

val futures = symbols.map (doWork (_)).toSeq 
val future = Future.sequence(futures) 

które uruchomi futures nie dotyczących allowableParallelism w ogóle, chciałbym zamiast używać

val futures = symbols.map (Guardian.doWorkGuarded (_)).toSeq 
val future = Future.sequence(futures) 

Pomyśl o jakimś hipotetycznym bazy kierowcy dostępu z braku blokowania interfejsu, czyli powracającej futures na wniosek, który jest ograniczony w współbieżności przez budowany przez na przykład pewna pula połączeń - nie chciałbyś, aby zwracała ona kontrakty terminowe nie uwzględniające poziomu paralelizmu i wymagała, abyś się z nimi żonglował, aby utrzymać pod kontrolą kontrolę równoległości.

Ten przykład jest bardziej przykładowy niż praktyczny, ponieważ normalnie nie oczekiwałbym, że interfejs "wychodzący" będzie wykorzystywał takie kontrakty terminowe (co jest ok w przypadku interfejsu "przychodzącego").

+0

Dzięki. Po tym, jak spałem, myślę, że to, co naprawdę muszę zrobić, to streścić to, co próbuję zrobić w kombinatorze i myśleć o właściwym imieniu. To naprawdę jest bardzo podobne do Observable.concatMap, więc może powinienem usiąść i pomyśleć o tym, jak reprezentować typy, których potrzebuję. To, co próbuję zrobić, to w zasadzie "nadać leniwy strumień funkcji, które tworzą przyszłość, łączą się z ukończeniem jednego z tworzeniem następnego, powracając do przyszłości, kiedy wszystkie są zrobione". Biorąc pod uwagę coś takiego, mógłbym dalej generalizować przypadek współbieżności ... – experquisite

1

Po pierwsze, oczywiste jest, że potrzebna jest pewna czysto funkcjonalna osłona wokół Scala Future, która działa skutecznie i działa tak szybko, jak to tylko możliwe. Nazwijmy go Deferred:

import scala.concurrent.Future 
import scala.util.control.Exception.nonFatalCatch 

class Deferred[+T](f:() => Future[T]) { 
    def run(): Future[T] = f() 
} 

object Deferred { 
    def apply[T](future: => Future[T]): Deferred[T] = 
    new Deferred(() => nonFatalCatch.either(future).fold(Future.failed, identity)) 
} 

I tu jest rutyna:

import java.util.concurrent.CopyOnWriteArrayList 
import java.util.concurrent.atomic.AtomicInteger 

import scala.collection.immutable.Seq 
import scala.concurrent.{ExecutionContext, Future, Promise} 
import scala.util.control.Exception.nonFatalCatch 
import scala.util.{Failure, Success} 

trait ConcurrencyUtils {  
    def runWithBoundedParallelism[T](parallelism: Int = Runtime.getRuntime.availableProcessors()) 
            (operations: Seq[Deferred[T]]) 
            (implicit ec: ExecutionContext): Deferred[Seq[T]] = 
    if (parallelism > 0) Deferred { 
     val indexedOps = operations.toIndexedSeq // index for faster access 

     val promise = Promise[Seq[T]]() 

     val acc = new CopyOnWriteArrayList[(Int, T)] // concurrent acc 
     val nextIndex = new AtomicInteger(parallelism) // keep track of the next index atomically 

     def run(operation: Deferred[T], index: Int): Unit = { 
     operation.run().onComplete { 
      case Success(value) => 
      acc.add((index, value)) // accumulate result value 

      if (acc.size == indexedOps.size) { // we've done 
       import scala.collection.JavaConversions._ 
       // in concurrent setting next line may be called multiple times, that's why trySuccess instead of success 
       promise.trySuccess(acc.view.sortBy(_._1).map(_._2).toList) 
      } else { 
       val next = nextIndex.getAndIncrement() // get and inc atomically 
       if (next < indexedOps.size) { // run next operation if exists 
       run(indexedOps(next), next) 
       } 
      } 
      case Failure(t) => 
      promise.tryFailure(t) // same here (may be called multiple times, let's prevent stdout pollution) 
     } 
     } 

     if (operations.nonEmpty) { 
     indexedOps.view.take(parallelism).zipWithIndex.foreach((run _).tupled) // run as much as allowed 
     promise.future 
     } else { 
     Future.successful(Seq.empty) 
     } 
    } else { 
     throw new IllegalArgumentException("Parallelism must be positive") 
    } 
} 

W skrócie, możemy uruchomić jak najwięcej operacji początkowo jako dozwolone, a następnie na każdym zakończeniu pracy możemy uruchomić kolejną operację dostępne, jeśli każdy. Tak więc jedyną trudnością jest utrzymanie następnego wskaźnika operacji i akumulatora wyników w ustawieniach współbieżnych. Nie jestem ekspertem od równoczesnych współbieżności, więc daj mi znać, jeśli wystąpią potencjalne problemy w powyższym kodzie. Zwróć uwagę, że zwracana wartość jest również odroczonym obliczeniem, które powinno być run.

Wykorzystanie i testy:

import org.scalatest.{Matchers, FlatSpec} 
import org.scalatest.concurrent.ScalaFutures 
import org.scalatest.time.{Seconds, Span} 

import scala.collection.immutable.Seq 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.Future 
import scala.concurrent.duration._ 

class ConcurrencyUtilsSpec extends FlatSpec with Matchers with ScalaFutures with ConcurrencyUtils { 

    "runWithBoundedParallelism" should "return results in correct order" in { 
    val comp1 = mkDeferredComputation(1) 
    val comp2 = mkDeferredComputation(2) 
    val comp3 = mkDeferredComputation(3) 
    val comp4 = mkDeferredComputation(4) 
    val comp5 = mkDeferredComputation(5) 

    val compountComp = runWithBoundedParallelism(2)(Seq(comp1, comp2, comp3, comp4, comp5)) 

    whenReady(compountComp.run()) { result => 
     result should be (Seq(1, 2, 3, 4, 5)) 
    } 
    } 

    // increase default ScalaTest patience 
    implicit val defaultPatience = PatienceConfig(timeout = Span(10, Seconds)) 

    private def mkDeferredComputation[T](result: T, sleepDuration: FiniteDuration = 100.millis): Deferred[T] = 
    Deferred { 
     Future { 
     Thread.sleep(sleepDuration.toMillis) 
     result 
     } 
    } 

} 
+0

to nigdy się nie kończy, jeśli operacja Seq jest pusta, w takim przypadku runWithBoundedParallelism powinien zwrócić Future.successful (Seq.empty) – Somatik

+0

@Somatik zachęcamy do poprawy odpowiedzi – Tvaroh

+0

Zastosowano poprawkę do kodu przykładowego – Somatik

Powiązane problemy