2015-11-29 24 views
7

Oceniam Spark SQL, aby zaimplementował prosty moduł raportowania (kilka prostych agregacji na danych Avro już przechowywanych w HDFS). Nie mam wątpliwości, że Spark SQL może dobrze pasować zarówno do moich wymagań funkcjonalnych, jak i niefunkcjonalnych.Jak przyspieszyć testy jednostkowe SQL Spark?

Jednak, oprócz wymagań produkcyjnych, chcę się upewnić, że moduł będzie testowalny. Stosujemy podejście BDD z bardzo skoncentrowanymi scenariuszami, co oznacza, że ​​ten moduł będzie wymagał uruchomienia dziesiątek/setek zapytań SQL w przypadku bardzo prostych danych (zapisy 1..10).

Aby uzyskać przybliżone pojęcie o wydajności mogę oczekiwać od Spark SQL w trybie lokalnym, ja szybko prototyp kilka testów:

  1. select count(*) from myTable
  2. select key, count(*) from myTable group by key

Pierwszy test Trwa średnio 100ms, ale drugi trwa 500ms. Taka wydajność jest niedopuszczalna, ponieważ spowodowałaby zbyt powolny zestaw testów.

Dla porównania, mogę uruchomić ten sam test w ciągu 10 ms za pomocą Cruncha i jego MemPipeline (1500ms z MRPipeline w trybie lokalnym), a także 1500ms z Hive w trybie osadzonym. Spark SQL jest więc nieco szybszy niż MR w trybie lokalnym, ale wciąż jest sposobem na powolne budowanie dobrych zestawów testowych.

Czy można przyspieszyć Spark SQL w trybie lokalnym?

Czy istnieje lepszy/szybszy sposób na przetestowanie modułu Spark SQL?

(nie posiadają wyprofilowane wykonanie jeszcze ale ponieważ groupBy().countByKey() na RDD trwa 40ms średnio Spodziewam się że winowajcą jest optymalizator kwerendy)


Moja szybkie & brudny kodu testu w następujący sposób:

SparkConf sparkConf = new SparkConf() 
       .setMaster("local[4]") 
       .setAppName("poc-sparksql"); 

    try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) { 
     SQLContext sqlCtx = new SQLContext(ctx); 

     for (int i = 0; i < ITERATIONS; i++) { 
      Stopwatch testCaseSw = new Stopwatch().start(); 

      DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro"); 
      df.registerTempTable("myTable"); 
      DataFrame result = sqlCtx.sql("select count(*) from myTable"); 

      System.out.println("Results: " + result.collectAsList()); 
      System.out.println("Elapsed: " + testCaseSw.elapsedMillis()); 
     } 

     for (int i = 0; i < ITERATIONS; i++) { 
      Stopwatch testCaseSw = new Stopwatch().start(); 

      DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro"); 
      df.registerTempTable("myTable"); 
      DataFrame result = sqlCtx.sql("select a, count(*) from myTable group by a "); 

      System.out.println("Results: " + result.collectAsList()); 
      System.out.println("Elapsed: " + testCaseSw.elapsedMillis()); 
     } 
} 
+0

Czy rozważyć buforowanie? – eliasah

+0

Jeśli testujesz różne kwerendy na tych samych danych, załaduj dane raz .., a następnie kwerendy .. –

+0

Według moich testów buforowanie nie pomaga (wywołanie sql jest powolne). Bardziej myślałem o czymś w rodzaju możliwości wyłączenia niektórych optymalizacji. Nie widzę buforowania jako rozwiązania, ponieważ 1- "dobry" test ma swój wkład mający na celu ułatwienie zrozumienia danego zachowania, więc każdy test ma inne wejście.Podany przeze mnie niedbały kod nie próbuje naśladować tego, co zrobiłby pakiet testowy (automatyczne serializowanie avro tabeli korniszonów itp.) - zapytania SQL są deterministyczne, jeśli dane wejściowe były zawsze takie same, a następnie buforowałem zebrane dane wyjściowe, a nie input –

Odpowiedz

0

Jeśli szukasz optymalizacji poziomu ms, istnieją różne wskaźniki.

  1. Odczytaj dane raz i pamięć podręczną, a następnie kilka razy zapytanie SQL. wewnętrzna oznacza obciążenie pętli "tarła nowe zadanie w everyIteartion"
DataFrame df = sqlCtx.load("/tmp/test.avro","com.databricks.spark.avro"); 
df.registerTempTable("myTable"); 
df.cache() 

for (int i = 0; i < ITERATIONS; i++) { 
     Stopwatch testCaseSw = new Stopwatch().start(); 
     DataFrame result = sqlCtx.sql("select count(*) from myTable"); 
     // Dont do printLn inside the loop , save the output in some hashMap and print it later once the loop is complete 
     System.out.println("Results: " + result.collectAsList()); 
     System.out.println("Elapsed: " + testCaseSw.elapsedMillis()); 
} 
  1. wyekstrahowania System.out.println zewnątrz pętli, tak jak jego spożywania trochę czasu.

Proszę spojrzeć: http://bytepadding.com/big-data/spark/understanding-spark-through-map-reduce/