2016-04-30 7 views
6

mam trochę kodu:Spark Scala uzyskać dane z powrotem z rdd.foreachPartition

 println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass) 
     val lastRevs = distinctFileGidsRDD. 
     foreachPartition(iter => { 
      SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword) 
      while(iter.hasNext) { 
      val item = iter.next() 
      //println(item(0)) 
      println("String: "+item(0).toString()) 
      val jsonStr = DB.readOnly { implicit session => 
       sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar". 
       map { resultSet => resultSet.string(1) }.single.apply() 
      } 
      println("\nJSON: "+jsonStr) 
      } 
     }) 
     println("\nEND Last Revs Class: "+ lastRevs.getClass) 

Wyjścia kod (z ciężkimi edycje) coś takiego:

BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD 
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM 
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",...) 
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw 
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",...) 
... 
JSON: None() 
END Last Revs Class: void 

Pytanie 1: Jak można Dostaję wartość lastRevs do użytecznego formatu, takiego jak ciąg JSON/null lub opcja jak Some/None?

PYTANIE 2: Moje preferencje: Czy istnieje inny sposób uzyskania danych partycji w formacie podobnym do RDD (zamiast formatu iteratora)?

dstream.foreachRDD { (rdd, time) => 
    rdd.foreachPartition { partitionIterator => 
    val partitionId = TaskContext.get.partitionId() 
    val uniqueId = generateUniqueId(time.milliseconds, partitionId) 
    // use this uniqueId to transactionally commit the data in partitionIterator 
    } 
} 

z http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

Pytanie 3: Czy metoda uzyskiwania danych, które używam metody sane (podany śledzę powyższy link)? (Pomijając fakt, że jest to JDBC systemu scalikejdbc, teraz będzie to kluczowy magazyn wartości innego typu niż ten prototyp.)

+0

Nie rozumiem pytanie. 'lastRevs' powinno być' Unit', ponieważ '.forEachPartition' jest używane tylko dla jego efektu ubocznego (funkcja to T => Unit). Myślę, że chcesz przekształcić dane, np. Używając 'mapPartitions'. Chciałbym zrozumieć, jaki jest tutaj ostateczny cel, ponieważ indywidualne pytania nie mają większego sensu (dla mnie) – maasg

+0

@maasg: Tak. Oto odpowiedź, której szukam - mapPartitions. Znalazłem inny przykład na http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine. – codeaperature

Odpowiedz

4

Aby utworzyć transformację, która wykorzystuje zasoby lokalne do executora (taki jak DB lub połączenie sieciowe), powinieneś użyć rdd.mapPartitions. Pozwala na zainicjowanie kodu lokalnie na executorze i wykorzystanie tych zasobów lokalnych do przetwarzania danych w partycji.

Kod powinien wyglądać następująco:

val lastRevs = distinctFileGidsRDD. 
     mapPartitions{iter => 
      SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword) 
      iter.map{ element => 
      DB.readOnly { implicit session => 
       sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${element(0)}::varchar" 
       .map { resultSet => resultSet.string(1) }.single.apply() 
      } 
      } 
     } 
+0

masz na myśli, że różni się od 'foreachPartition' tym, że używa zasobów Executora zamiast zasobów Kierowcy? To znaczy. kod 'foreachPartition' jest wykonywany na Driver, podczas gdy' mapPartitions' na Executorze ... prawda? – lisak

+2

@lisak Nie, Zarówno 'foreachPartition' jak i' mapPartitions' pozwolą ci uruchomić kod na executorach. Różnica polega na tym, że 'foreachPartition' ma tylko efekty uboczne (np. Zapis do db), podczas gdy' mapPartitions' zwraca wartość. Kluczem tego pytania jest "jak odzyskać dane" stąd "mapPartitions" jest drogą do zrobienia. – maasg

Powiązane problemy