Chociaż już masz doskonałą odpowiedź , Myślałem, że nadal mogę przedstawić opinię lub dwie na ten temat.
Pamiętam, że widziałem gdzieś (na czyimś blogu) "używaj aktorów dla stanu i korzystaj z kontraktów terminowych na współbieżność".
Więc moją pierwszą myślą byłoby jakoś wykorzystać aktorów. Mówiąc dokładniej, miałbym głównego aktora z routerem uruchamiającym wielu pracowników, z liczbą robotników ograniczonych zgodnie z allowableParallelism
. Zakładając więc, mam
def doWorkInternal (symbol: String): Unit
który działa od Ciebie doWork
podjętą „poza przyszłości”, musiałbym coś wzdłuż tych linii (bardzo prymitywny, nie odrywając wiele szczegółów pod uwagę, i praktycznie kopiuje kod z Akka dokumentacja):
import akka.actor._
case class WorkItem (symbol: String)
case class WorkItemCompleted (symbol: String)
case class WorkLoad (symbols: Array[String])
case class WorkLoadCompleted()
class Worker extends Actor {
def receive = {
case WorkItem (symbol) =>
doWorkInternal (symbol)
sender() ! WorkItemCompleted (symbol)
}
}
class Master extends Actor {
var pending = Set[String]()
var originator: Option[ActorRef] = None
var router = {
val routees = Vector.fill (allowableParallelism) {
val r = context.actorOf(Props[Worker])
context watch r
ActorRefRoutee(r)
}
Router (RoundRobinRoutingLogic(), routees)
}
def receive = {
case WorkLoad (symbols) =>
originator = Some (sender())
context become processing
for (symbol <- symbols) {
router.route (WorkItem (symbol), self)
pending += symbol
}
}
def processing: Receive = {
case Terminated (a) =>
router = router.removeRoutee(a)
val r = context.actorOf(Props[Worker])
context watch r
router = router.addRoutee(r)
case WorkItemCompleted (symbol) =>
pending -= symbol
if (pending.size == 0) {
context become receive
originator.get ! WorkLoadCompleted
}
}
}
można zapytać głównego aktora z ask
i otrzymać WorkLoadCompleted
w przyszłości.
Ale myślenie o "stanie" (liczby jednoczesnych żądań w przetwarzaniu), aby gdzieś się ukryć, wraz z implementacją niezbędnego kodu, aby go nie przekraczać, oto coś z "przyszłego pośrednika bramy", jeśli nie " t umysł imperatyw styl i Zmienne (używany tylko wewnętrznie chociaż) budowle:
object Guardian
{
private val incoming = new collection.mutable.HashMap[String, Promise[Unit]]()
private val outgoing = new collection.mutable.HashMap[String, Future[Unit]]()
private val pending = new collection.mutable.Queue[String]
def doWorkGuarded (symbol: String): Future[Unit] = {
synchronized {
val p = Promise[Unit]()
incoming(symbol) = p
if (incoming.size <= allowableParallelism)
launchWork (symbol)
else
pending.enqueue (symbol)
p.future
}
}
private def completionHandler (t: Try[Unit]): Unit = {
synchronized {
for (symbol <- outgoing.keySet) {
val f = outgoing (symbol)
if (f.isCompleted) {
incoming (symbol).completeWith (f)
incoming.remove (symbol)
outgoing.remove (symbol)
}
}
for (i <- outgoing.size to allowableParallelism) {
if (pending.nonEmpty) {
val symbol = pending.dequeue()
launchWork (symbol)
}
}
}
}
private def launchWork (symbol: String): Unit = {
val f = doWork(symbol)
outgoing(symbol) = f
f.onComplete(completionHandler)
}
}
doWork
teraz jest dokładnie jak twoje, wracając Future[Unit]
, z myślą, że zamiast używać coś jak
val futures = symbols.map (doWork (_)).toSeq
val future = Future.sequence(futures)
które uruchomi futures nie dotyczących allowableParallelism
w ogóle, chciałbym zamiast używać
val futures = symbols.map (Guardian.doWorkGuarded (_)).toSeq
val future = Future.sequence(futures)
Pomyśl o jakimś hipotetycznym bazy kierowcy dostępu z braku blokowania interfejsu, czyli powracającej futures na wniosek, który jest ograniczony w współbieżności przez budowany przez na przykład pewna pula połączeń - nie chciałbyś, aby zwracała ona kontrakty terminowe nie uwzględniające poziomu paralelizmu i wymagała, abyś się z nimi żonglował, aby utrzymać pod kontrolą kontrolę równoległości.
Ten przykład jest bardziej przykładowy niż praktyczny, ponieważ normalnie nie oczekiwałbym, że interfejs "wychodzący" będzie wykorzystywał takie kontrakty terminowe (co jest ok w przypadku interfejsu "przychodzącego").
Dodam, że byłoby jeszcze lepiej, jeśli za każdym razem Future zakończeniu nowej rozpoczęto, zamiast czekać na całej kohorty/grupy, aby zakończyć. – experquisite
to blokowanie IO, a następnie zapakowane w Future {} lub czy Asynchronizacja IO nie używa wątku podczas oczekiwania na zdalnym serwerze? Jeśli to blokowanie, wówczas ustalona pula wątków z 5 wątkami wydaje mi się najprostszym rozwiązaniem. Ale używaj tej puli tylko do blokowania IO i nic więcej. –
Wsparcie IO dla doWork() nie jest blokujące, działa na wątkach, nad którymi nie mam kontroli, które zapakowałem do Observables na różnych poziomach abstrakcji. – experquisite