2015-10-21 9 views
7

Próbuję obliczyć histogram wszystkich kolumn z pliku CSV za pomocą Spark Scala.Jak uzyskać histogram wszystkich kolumn w dużym pliku CSV/RDD [Array [double]] przy użyciu Apache Spark Scala?

Znalazłem, że DoubleRDDFunctions wspierające Histogram. Więc zakodowałem jak po, aby uzyskać histogram wszystkich kolumn.

  1. liczyć kolumna Get
  2. Tworzenie RDD[double] każdej kolumny i obliczyć histogram każdego RDD użyciu DoubleRDDFunctions

    var columnIndexArray = Array.tabulate(rdd.first().length) (_ * 1) 
    
    val histogramData = columnIndexArray.map(columns => { 
        rdd.map(lines => lines(columns)).histogram(6) 
    }) 
    

Czy to dobry sposób? Czy ktoś może zaproponować lepsze sposoby rozwiązania tego problemu?

Z góry dziękuję.

Odpowiedz

5

Niezupełnie lepiej, ale alternatywny sposób jest konwertowanie RDD do DataFrame i użycie UDF histogram_numeric.

Przykład Dane:

import scala.util.Random 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions.{callUDF, lit, col} 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.hive.HiveContext 

val sqlContext = new HiveContext(sc) 

Random.setSeed(1) 

val ncol = 5 

val rdd = sc.parallelize((1 to 1000).map(
    _ => Row.fromSeq(Array.fill(ncol)(Random.nextDouble)) 
)) 

val schema = StructType(
    (1 to ncol).map(i => StructField(s"x$i", DoubleType, false))) 

val df = sqlContext.createDataFrame(rdd, schema) 
df.registerTempTable("df") 

zapytania:

val nBuckets = 3 
val columns = df.columns.map(
    c => callUDF("histogram_numeric", col(c), lit(nBuckets)).alias(c)) 
val histograms = df.select(columns: _*) 

histograms.printSchema 

// root 
// |-- x1: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x2: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x3: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x4: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x5: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 

histograms.select($"x1").collect() 

// Array([WrappedArray([0.16874313309969038,334.0], 
// [0.513382068667877,345.0], [0.8421388886903808,321.0])]) 
+1

Daje to org.apache.spark.sql.AnalysisException: undefined function histogram_numeric. Używam iskry 1.5.1 –

+0

UDF wymagają HiveContext. – zero323

+0

dzięki ... Edytowałem nazwę zmiennej w Twojej odpowiedzi. –

1

The (scala API) transformacja, countByValue powinni robić to, co chcesz

tak na przykład w celu wygenerowania danych histogramu dla pierwszej kolumniew RDD:

val histCol1 = RDD.map(record => record.col_1).countByValue() 

w wyrażenie powyżej, rekord odnosi się tylko do wiersza danych w RDD, instancji klasy sprawy, która ma pole col_1

i tak histCol1 zwróci tabeli mieszania (Scala MAP), w której klucze są unikatowe wartości w kolumnie 1 (col_1), a wartości są oczywiście częstotliwości każdej unikatowej wartości

+0

Dzięki za propozycję. Ale muszę również podać rozmiar wiadra. Maksymalne wiadra 10. countByValue() będzie działać wydajniej niż podwójny histogram RDD? –

+0

"rozmiar wiadra" jest zwracany przez countByValue - każda wartość jest wielkości wiadra, natomiast klucz to nazwa wiadra – doug

+0

czy możemy poprawić rozmiar wiadra na jedną wartość? zamiast rozważać odrębną liczbę. Nie potrzebuję wszystkich różnych liczb, potrzebuję histogramu z maksymalnymi zasobnikami 10. –

Powiązane problemy