Przede wszystkim pozwól mi podkreślić, że jestem całkiem nowy zarówno dla Sparka, jak i Scali. Próbowałem zbadać obiecany występ Sparka, próbując migrować jedną z map Hadoop/zmniejszyć pracę, którą wykonałem w przeszłości. Zadanie to zajmuje 14 minut na Hadoop przy użyciu maszyn 3x r3.2xlarge dla wejścia 16 skompresowanych plików bzip po 170 MB każdy. Przetłumaczyłem go Scala/Spark najlepiej, że mogłem na coś takiego:Gdzie jest wąskie gardło wydajności w tym kodzie Spark/Scala?
val conceptData = spark.textFile(inputPath)
val result = conceptData.repartition(60).cache()
.map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)})
.flatMap(metrics => metrics._2.map(t => (t._1,(1,List((metrics._1,t._2.head))))))
.reduceByKey((a,b) => combine(a,b))
.map(t => t._1 + "\t" + t._2._1 + "\t" + print(t._2._2))
result.saveAsTextFile(outputPath)
def print(tuples: List[(String, Any)]): String =
{
tuples.map(l => l._1 + "\u200e" + l._2).reduce(_ + "\u200f" + _)
}
def combine(a: (Int, List[(String, Any)]), b: (Int, List[(String, Any)])): (Int, List[(String, Any)]) =
{
(a._1 + b._1,a._2 ++ b._2)
}
object JsonUtil {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
def fromJson[T](json: String)(implicit m : Manifest[T]): T = {
mapper.readValue[T](json)
}
}
użyłem komendy repartycyjnego na początku ustawić partycje do 60, ponieważ czytałem gdzieś, że dobrze jest mieć 2-3 partycje na rdzeń. Używam tego zadania Spark na tych samych maszynach r3.2xlarge 3x (każdy ma 8 rdzeni i 58G dostępne), więc mogę przesłać swoją pracę w sposób następujący:
spark/bin/spark-submit --executor-memory 58G --total-executor-cores 24 (... other arguments ...)
I zajęło więcej niż 1 godzinę, aby uruchomić przez to samo wejście ... Nie jestem pewien, czy problem występuje w konfiguracji Scala lub Spark, więc każda pomoc jest mile widziana.
poważaniem, Augusto
EDIT 1: Średnie czasy dla niektórych operacji:
Odczytywanie plików z S3: ~ 2 minut
flatMap: ~ 11 minut
reduceByKey :> 1 godzina
Używane klucze to ścieżki S3, więc mogą stać się dość długie, nie wiem, czy to ma znaczenie.
EDIT 2: I podstawiony funkcję reduceByKey
z .reduceByKey((a,b) => a)
a zadanie kończy się pod 10min więc nie musi być coś naprawdę złego w combine
funkcji
kilka pytań. 1. Czy możesz powiedzieć, jakiej wersji Sparka używasz? 2. Zakładam, że działasz w trybie Standalone. 3. Czy spojrzałeś na interfejs Spark? Co to mówi ? Na koniec, czy próbowałeś uruchomić każdą z tych operacji od Scala REPL? Możesz to zrobić, dzieląc pierwszą linię. Spark jest znacznie szybszy. Krótki rzut oka na twój kod mówi, że właśnie analizujesz dane 3G i uruchamiasz MR. Jeśli używasz Spark 1.1+ możesz załadować 'conceptData' jako tabelę SparkSQL, a następnie odczytać wartość tam i sprawdzić, ile czasu to trwa. –
1) Spark 1.2 2) Autonomiczny 3) Interfejs użytkownika wskazywał, że do wykonania linii flatMap wykonanie zajęło około 7 minut, to było po upływie większości czasu. – Augusto
Co powiesz na wykonanie następujących czynności w REPL. 'val conceptData = spark.textFile (inputPath)', po którym następuje 'conceptData.count' i zobacz, jak długo to trwa. Powinieneś użyć tych samych ustawień ze swojego REAL-a Scala (tj. Iskra/bin/iskr-shell -executor-memory 58G ..) Zakładam też, że ładujesz się z HDFS lokalnie (na EC2). –