Zwykle nie ma potrzeby do tego i lepiej jest użyć UDF ale tutaj jesteś:
Jak funkcję zdefiniować, aby przekazać je do df.rdd.mapPartitions, jeśli chcę, aby utworzyć nowy wiersz z kilku dodatkowych kolumn
należy wziąć Iterator[Row]
i powrócić Iterator[T]
tak w przypadku, gdy sho uld użyć czegoś jak ten
import org.apache.spark.sql.Row
def transformRows(iter: Iterator[Row]): Iterator[Row] = ???
Jak mogę dodać kilka kolumn do obiektu wiersz (lub utworzyć nowy)
Istnieje wiele sposobów dostępu wartości w tym Row.get*
metod Row.toSeq
etc Nowy można utworzyć przy użyciu Row.apply
, Row.fromSeq
, Row.fromTuple
lub RowFactory
. Na przykład:
def transformRow(row: Row): Row = Row.fromSeq(row.toSeq ++ Array[Any](-1, 1))
Jak utworzyć DataFrame z utworzonej RDD
Jeśli masz RDD[Row]
można użyć SQLContext.createDataFrame
i dostarczenie schematu.
Wyrażając to wszystko razem:
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
val df = sc.parallelize(Seq(
(1.0, 2.0), (0.0, -1.0),
(3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
val newSchema = StructType(df.schema.fields ++ Array(
StructField("z", IntegerType, false), StructField("v", IntegerType, false)))
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show
// +---+----+---+---+
// | x| y| z| v|
// +---+----+---+---+
// |1.0| 2.0| -1| 1|
// |0.0|-1.0| -1| 1|
// |3.0| 4.0| -1| 1|
// |6.0|-2.3| -1| 1|
// +---+----+---+---+
Mogę zapytać dlaczego trzeba to? Może jakiś przykładowy kod/wejście/wyjście. Jest to możliwe, ale zwykle są lepsze sposoby. – zero323
Pewnie Mam dwa różne zestawy elementów, jeden jest ogromny (w formie ramki danych), a drugi jest dość mały, a ja mam znaleźć minimalną wartość między tymi dwoma zestawami. Mój pomysł polega na tym, że umieszczam mniejszy zbiór w jakiejś całkiem optymalnej strukturze, przekazuję go do mapPartitions, obliczam pewne wartości dla każdego przedmiotu i umieszczam je "blisko" do innych wartości. –
Tam nie powinno być potrzeby 'mapPartitions'. – zero323