2016-03-09 10 views
5

Buduję RDD z listy adresów URL, a następnie próbuję pobrać dane z niektórymi asynchronicznymi połączeniami http. Potrzebuję wszystkich wyników przed wykonaniem innych obliczeń. Idealnie, muszę wykonać wywołania http na różnych węzłach w celu skalowania rozważań.Praca z iskrami za pomocą asynchronicznego wywołania HTTP

zrobiłem coś takiego:

//init spark 
val sparkContext = new SparkContext(conf) 
val datas = Seq[String]("url1", "url2") 

//create rdd 
val rdd = sparkContext.parallelize[String](datas) 

//httpCall return Future[String] 
val requests = rdd.map((url: String) => httpCall(url)) 

//await all results (Future.sequence may be better) 
val responses = requests.map(r => Await.result(r, 10.seconds)) 

//print responses 
response.collect().foreach((s: String) => println(s)) 

//stop spark 
sparkContext.stop() 

tego dzieła, ale Spark praca nigdy skończyć!

Zastanawiam się, jakie są najlepsze praktyki dotyczące radzenia sobie z Przyszłością przy użyciu Spark (lub Future [RDD]).

Myślę, że ten przypadek użycia wygląda całkiem zwyczajnie, ale nie znalazł jeszcze żadnej odpowiedzi.

poważaniem

Odpowiedz

4

ta sprawa wygląda dość powszechne stosowanie

Nie bardzo, bo to po prostu nie działa jak (prawdopodobnie) spodziewać. Ponieważ każde zadanie działa na standardowej Scali Iterators, operacje te zostaną zgniecione razem. Oznacza to, że wszystkie operacje będą blokowane w praktyce. Zakładając, że masz trzy adresy URL [ „x”, „y”, „z”] Ty kod zostanie wykonane w następującej kolejności:

Await.result(httpCall("x", 10.seconds)) 
Await.result(httpCall("y", 10.seconds)) 
Await.result(httpCall("z", 10.seconds)) 

można łatwo odtworzyć samo zachowanie lokalnie. Jeśli chcesz wykonywać swój kod asynchronicznie powinien obsługiwać ten jawnie przy użyciu mapPartitions:

rdd.mapPartitions(iter => { 
    ??? // Submit requests 
    ??? // Wait until all requests completed and return Iterator of results 
}) 

ale jest to stosunkowo trudne. Nie ma gwarancji, że wszystkie dane dla danej partycji mieszczą się w pamięci, więc prawdopodobnie będziesz potrzebował również mechanizmu batchowania.

Wszystko, co powiedziałem, nie mogłem odtworzyć problemu, który opisałeś, może być problem z konfiguracją lub problem z samym httpCall.

Na marginesie, że pojedynczy limit czasu na zabicie całego zadania nie wygląda na dobry pomysł.

1

To przyniesie pracę.

Nie można oczekiwać, że obiekty żądania będą dystrybuowane, a odpowiedzi zbierane w klastrze przez inne węzły. Jeśli to zrobisz, iskierka wzywająca do przyszłości nigdy się nie skończy. Futures nigdy nie zadziałają w tym przypadku.

Jeśli twoja mapa() składa prośby o synchronizację (http), zbierz odpowiedzi w ramach tego samego wywołania akcji/transformacji, a następnie poddaj wyniki (odpowiedzi) dalszemu mapowaniu/zmniejszeniu/innym połączeniom.

W twoim przypadku, przepisz logikę, zbierz odpowiedzi dla każdego połączenia zsynchronizowanego i usuń pojęcie kontraktów terminowych, wtedy wszystko powinno być w porządku.

+0

Problem polega na tym, że nie powinno dojść do przesunięcia danych między 'request' i' response', więc obie transformacje powinny być wykonywane na tym samym etapie, stąd te same executory i konteksty. – zero323

1

W końcu zrobiłem to za pomocą scalaj-http zamiast Dispatch. Połączenia są synchroniczne, ale pasuje to do mojego przypadku użycia.

Wydaje mi się, że zadanie Spark nigdy nie zostało zakończone przy użyciu opcji Wysłanie, ponieważ połączenie HTTP nie zostało poprawnie zamknięte.

Pozdrawiam

1

mogłam znaleźć łatwy sposób na osiągnięcie tego celu. Ale po kilku powtórzeniach jest to, co zrobiłem i działa na ogromną listę zapytań. Zasadniczo użyliśmy tego do wykonania operacji wsadowej dla ogromnego zapytania na wiele podrzędnych zapytań.

// Break down your huge workload into smaller chunks, in this case huge query string is broken 
// down to a small set of subqueries 
// Here if needed to optimize further down, you can provide an optimal partition when parallelizing 
val queries = sqlContext.sparkContext.parallelize[String](subQueryList.toSeq) 

// Then map each one those to a Spark Task, in this case its a Future that returns a string 
val tasks: RDD[Future[String]] = queries.map(query => { 
    val task = makeHttpCall(query) // Method returns http call response as a Future[String] 
    task.recover { 
     case ex => logger.error("recover: " + ex.printStackTrace()) } 
    task onFailure { 
     case t => logger.error("execution failed: " + t.getMessage) } 
    task 
}) 

// Note:: Http call is still not invoked, you are including this as part of the lineage 

// Then in each partition you combine all Futures (means there could be several tasks in each partition) and sequence it 
// And Await for the result, in this way you making it to block untill all the future in that sequence is resolved 

val contentRdd = tasks.mapPartitions[String] { f: Iterator[Future[String]] => 
    val searchFuture: Future[Iterator[String]] = Future sequence f 
    Await.result(searchFuture, threadWaitTime.seconds) 
} 

// Note: At this point, you can do any transformations on this rdd and it will be appended to the lineage. 
// When you perform any action on that Rdd, then at that point, 
// those mapPartition process will be evaluated to find the tasks and the subqueries to perform a full parallel http requests and 
// collect those data in a single rdd. 

Jeśli nie chcesz, aby wykonać dowolną transformację od zawartości jak parsowania ładowność reakcji itp Następnie można użyć foreachPartition zamiast mapPartitions do wykonywania tych wszystkich http natychmiast wywołuje.

Powiązane problemy