2015-01-03 12 views
7

Przede wszystkim pozwól mi podkreślić, że jestem całkiem nowy zarówno dla Sparka, jak i Scali. Próbowałem zbadać obiecany występ Sparka, próbując migrować jedną z map Hadoop/zmniejszyć pracę, którą wykonałem w przeszłości. Zadanie to zajmuje 14 minut na Hadoop przy użyciu maszyn 3x r3.2xlarge dla wejścia 16 skompresowanych plików bzip po 170 MB każdy. Przetłumaczyłem go Scala/Spark najlepiej, że mogłem na coś takiego:Gdzie jest wąskie gardło wydajności w tym kodzie Spark/Scala?

val conceptData = spark.textFile(inputPath) 
val result = conceptData.repartition(60).cache() 
    .map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)}) 
    .flatMap(metrics => metrics._2.map(t => (t._1,(1,List((metrics._1,t._2.head)))))) 
    .reduceByKey((a,b) => combine(a,b)) 
    .map(t => t._1 + "\t" + t._2._1 + "\t" + print(t._2._2)) 
result.saveAsTextFile(outputPath) 

def print(tuples: List[(String, Any)]): String = 
{ 
    tuples.map(l => l._1 + "\u200e" + l._2).reduce(_ + "\u200f" + _) 
} 

def combine(a: (Int, List[(String, Any)]), b: (Int, List[(String, Any)])): (Int, List[(String, Any)]) = 
{ 
    (a._1 + b._1,a._2 ++ b._2) 
} 

object JsonUtil { 
    val mapper = new ObjectMapper() with ScalaObjectMapper 
    mapper.registerModule(DefaultScalaModule) 
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) 

    def fromJson[T](json: String)(implicit m : Manifest[T]): T = { 
    mapper.readValue[T](json) 
    } 
} 

użyłem komendy repartycyjnego na początku ustawić partycje do 60, ponieważ czytałem gdzieś, że dobrze jest mieć 2-3 partycje na rdzeń. Używam tego zadania Spark na tych samych maszynach r3.2xlarge 3x (każdy ma 8 rdzeni i 58G dostępne), więc mogę przesłać swoją pracę w sposób następujący:

spark/bin/spark-submit --executor-memory 58G --total-executor-cores 24 (... other arguments ...) 

I zajęło więcej niż 1 godzinę, aby uruchomić przez to samo wejście ... Nie jestem pewien, czy problem występuje w konfiguracji Scala lub Spark, więc każda pomoc jest mile widziana.

poważaniem, Augusto

EDIT 1: Średnie czasy dla niektórych operacji:

Odczytywanie plików z S3: ~ 2 minut

flatMap: ~ 11 minut

reduceByKey :> 1 godzina

Używane klucze to ścieżki S3, więc mogą stać się dość długie, nie wiem, czy to ma znaczenie.

EDIT 2: I podstawiony funkcję reduceByKey z .reduceByKey((a,b) => a) a zadanie kończy się pod 10min więc nie musi być coś naprawdę złego w combine funkcji

+0

kilka pytań. 1. Czy możesz powiedzieć, jakiej wersji Sparka używasz? 2. Zakładam, że działasz w trybie Standalone. 3. Czy spojrzałeś na interfejs Spark? Co to mówi ? Na koniec, czy próbowałeś uruchomić każdą z tych operacji od Scala REPL? Możesz to zrobić, dzieląc pierwszą linię. Spark jest znacznie szybszy. Krótki rzut oka na twój kod mówi, że właśnie analizujesz dane 3G i uruchamiasz MR. Jeśli używasz Spark 1.1+ możesz załadować 'conceptData' jako tabelę SparkSQL, a następnie odczytać wartość tam i sprawdzić, ile czasu to trwa. –

+0

1) Spark 1.2 2) Autonomiczny 3) Interfejs użytkownika wskazywał, że do wykonania linii flatMap wykonanie zajęło około 7 minut, to było po upływie większości czasu. – Augusto

+0

Co powiesz na wykonanie następujących czynności w REPL. 'val conceptData = spark.textFile (inputPath)', po którym następuje 'conceptData.count' i zobacz, jak długo to trwa. Powinieneś użyć tych samych ustawień ze swojego REAL-a Scala (tj. Iskra/bin/iskr-shell -executor-memory 58G ..) Zakładam też, że ładujesz się z HDFS lokalnie (na EC2). –

Odpowiedz

0

ten zszedł do moich noobish umiejętności programowania Scala - to trwa tylko 15 min, kiedy zmienił się następującym bardziej wydajnych Scala:

val conceptData = spark.textFile(inputPath).repartition(24) 

val result = conceptData.map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)}) 
    .flatMap(metrics => metrics._2.map(t => (t._1,(1, List(metrics._1+"\u200e"+ t._2.head))))) 
    .reduceByKey((a,b) => (a._1 + b._1, a._2:::b._2)) 
    .map(t=> t._1 + "\t" + t._2._1 + "\t" + t._2._2.mkString("\u200f")) 

Można chyba jeszcze ulec dalszej poprawie. W każdym razie, dzięki za pomoc dla wszystkich.

poważaniem,

Augusto

0

opiera się na fakcie, że większość czasu spędza po flatMap, podejrzewam, że shuffle spowalnia, a nie wykorzystanie procesora. Możesz spróbować uruchomić zadanie przy mniejszej liczbie partycji. Inną rzeczą, którą możesz spróbować zastąpić reduceByKey() z foldByKey(), która jest skojarzona, ale nie jest przemienna, co oznacza, że ​​musi utrzymywać porządek RDD podczas działania kombajnu, a to może tłumaczyć mniejszy ruch sieciowy podczas przetasowania.

+0

Próbowałem używać foldByKey i praca umiera, jeśli tego używam. Próbowałem go dwa razy i stało się to w tym samym momencie (po 27 minutach (- jedyny komunikat o błędzie, który mogę znaleźć to: 'executor.CoarseGrainedExecutorBackend: Driver Disassociated' – Augusto

Powiązane problemy