mam trochę kodu:Spark Scala uzyskać dane z powrotem z rdd.foreachPartition
println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass)
val lastRevs = distinctFileGidsRDD.
foreachPartition(iter => {
SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
while(iter.hasNext) {
val item = iter.next()
//println(item(0))
println("String: "+item(0).toString())
val jsonStr = DB.readOnly { implicit session =>
sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar".
map { resultSet => resultSet.string(1) }.single.apply()
}
println("\nJSON: "+jsonStr)
}
})
println("\nEND Last Revs Class: "+ lastRevs.getClass)
Wyjścia kod (z ciężkimi edycje) coś takiego:
BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",...)
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",...)
...
JSON: None()
END Last Revs Class: void
Pytanie 1: Jak można Dostaję wartość lastRevs do użytecznego formatu, takiego jak ciąg JSON/null lub opcja jak Some/None?
PYTANIE 2: Moje preferencje: Czy istnieje inny sposób uzyskania danych partycji w formacie podobnym do RDD (zamiast formatu iteratora)?
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}
z http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
Pytanie 3: Czy metoda uzyskiwania danych, które używam metody sane (podany śledzę powyższy link)? (Pomijając fakt, że jest to JDBC systemu scalikejdbc, teraz będzie to kluczowy magazyn wartości innego typu niż ten prototyp.)
Nie rozumiem pytanie. 'lastRevs' powinno być' Unit', ponieważ '.forEachPartition' jest używane tylko dla jego efektu ubocznego (funkcja to T => Unit). Myślę, że chcesz przekształcić dane, np. Używając 'mapPartitions'. Chciałbym zrozumieć, jaki jest tutaj ostateczny cel, ponieważ indywidualne pytania nie mają większego sensu (dla mnie) – maasg
@maasg: Tak. Oto odpowiedź, której szukam - mapPartitions. Znalazłem inny przykład na http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine. – codeaperature