2015-11-23 15 views
9

Jestem nowicjuszem w Scala i iskry, proszę o tym pamiętać :)Jak dodać kolumny do org.apache.spark.sql.Row wewnątrz mapPartitions

Faktycznie, mam trzy pytania

  1. Jak należy zdefiniować funkcję, aby przekazać je do df.rdd.mapPartitions, jeśli chcę, aby utworzyć nowy wiersz z kilku dodatkowych kolumn
  2. Jak mogę dodać kilka kolumn do obiektu wiersz (lub utworzyć nowy)
  3. Jak utworzyć DataFrame z utworzonego RDD

Dziękuję w góry

+0

Mogę zapytać dlaczego trzeba to? Może jakiś przykładowy kod/wejście/wyjście. Jest to możliwe, ale zwykle są lepsze sposoby. – zero323

+0

Pewnie Mam dwa różne zestawy elementów, jeden jest ogromny (w formie ramki danych), a drugi jest dość mały, a ja mam znaleźć minimalną wartość między tymi dwoma zestawami. Mój pomysł polega na tym, że umieszczam mniejszy zbiór w jakiejś całkiem optymalnej strukturze, przekazuję go do mapPartitions, obliczam pewne wartości dla każdego przedmiotu i umieszczam je "blisko" do innych wartości. –

+0

Tam nie powinno być potrzeby 'mapPartitions'. – zero323

Odpowiedz

18

Zwykle nie ma potrzeby do tego i lepiej jest użyć UDF ale tutaj jesteś:

Jak funkcję zdefiniować, aby przekazać je do df.rdd.mapPartitions, jeśli chcę, aby utworzyć nowy wiersz z kilku dodatkowych kolumn

należy wziąć Iterator[Row] i powrócić Iterator[T] tak w przypadku, gdy sho uld użyć czegoś jak ten

import org.apache.spark.sql.Row 

def transformRows(iter: Iterator[Row]): Iterator[Row] = ??? 

Jak mogę dodać kilka kolumn do obiektu wiersz (lub utworzyć nowy)

Istnieje wiele sposobów dostępu wartości w tym Row.get* metod Row.toSeq etc Nowy można utworzyć przy użyciu Row.apply, Row.fromSeq, Row.fromTuple lub RowFactory. Na przykład:

def transformRow(row: Row): Row = Row.fromSeq(row.toSeq ++ Array[Any](-1, 1)) 

Jak utworzyć DataFrame z utworzonej RDD

Jeśli masz RDD[Row] można użyć SQLContext.createDataFrame i dostarczenie schematu.

Wyrażając to wszystko razem:

import org.apache.spark.sql.types.{IntegerType, StructField, StructType} 

val df = sc.parallelize(Seq(
    (1.0, 2.0), (0.0, -1.0), 
    (3.0, 4.0), (6.0, -2.3))).toDF("x", "y") 

def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow) 

val newSchema = StructType(df.schema.fields ++ Array(
    StructField("z", IntegerType, false), StructField("v", IntegerType, false))) 

sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show 

// +---+----+---+---+ 
// | x| y| z| v| 
// +---+----+---+---+ 
// |1.0| 2.0| -1| 1| 
// |0.0|-1.0| -1| 1| 
// |3.0| 4.0| -1| 1| 
// |6.0|-2.3| -1| 1| 
// +---+----+---+---+ 
+0

Podoba mi się sposób, w jaki napisany jest kod scala :) Dziękujemy! –

+0

@AzatFazulzyanov Dokładniej: lubisz sposób ** zero323 ** zapisuje kod scala (/ iskry)! – javadba

+0

v dobrze zrobione z 'newSchema'. Zapisuję mnie, aby przejść do przebudowy tej konstrukcji – javadba

Powiązane problemy