2016-03-17 11 views
5

Chcę wykonać niektóre preprocessing na moje dane i chcę upuścić wiersze, które są rzadkie (dla niektórych wartości progowych).Jak upuszczać wiersze ze zbyt dużą liczbą wartości NULL?

Na przykład mam tabeli ramek danych z 10 funkcji i mam wiersz z 8 wartości null, a następnie chcę upuścić go.

Znalazłem kilka powiązanych tematów, ale nie mogę znaleźć żadnych przydatnych informacji do mojego celu.

stackoverflow.com/questions/3473778/count-number-of-nulls-in-a-row

Przykłady jak w linku powyżej nie będą działać dla mnie, ponieważ chcę, aby to zrobić wstępne przetwarzanie automatycznie. Nie mogę zapisać nazw kolumn i zrobić coś odpowiednio.

Czy jest to tak, aby wykonać tę operację usuwania bez używania nazw kolumn w Apache Spark ze scala?

Odpowiedz

3

data testu:

case class Document(a: String, b: String, c: String) 
val df = sc.parallelize(Seq(new Document(null, null, null), new Document("a", null, null), new Document("a", "b", null), new Document("a", "b", "c"), new Document(null, null, "c"))).df 

z UDF

Remiksowanie odpowiedź przez David i moją wersją RDD poniżej, można to zrobić za pomocą UDF, że trwa rzędu:

def nullFilter = udf((x:Row) => {Range(0, x.length).count(x.isNullAt(_)) < 2}) 
df.filter(nullFilter(struct(df.columns.map(df(_)) : _*))).show 

Z RDD

Można go przekształcić w pętlę rdd wiersza w wierszu i policzyć liczbę zerową.

sqlContext.createDataFrame(df.rdd.filter(x=> Range(0, x.length).count(x.isNullAt(_)) < 2), df.schema).show 
+1

Mogę to zrobić bez konwersji na RDD. Czekaj. –

2

To odkurzacz z UDF:

import org.apache.spark.sql.functions.udf 
def countNulls = udf((v: Any) => if (v == null) 1; else 0;)) 
df.registerTempTable("foo") 

sqlContext.sql(
    "select " + df.columns.mkString(", ") + ", " + df.columns.map(c => { 
    "countNulls(" + c + ")" 
    }).mkString(" + ") + "as nullCount from foo" 
).filter($"nullCount" > 8).show 

przypadku dokonywania ciągi zapytań sprawia, że ​​jesteś zdenerwowany, to można spróbować to:

var countCol: org.apache.spark.sql.Column = null 
df.columns.foreach(c => { 
    if (countCol == null) countCol = countNulls(col(c)) 
    else countCol = countCol + countNulls(col(c)) 
}); 

df.select(Seq(countCol as "nullCount") ++ df.columns.map(c => col(c)):_*) 
    .filter($"nullCount" > 8) 
+0

Jestem dla rozwiązań, które nie wymagają przełączania do RDD, nie jestem zbyt chętni do budowania zapytań opartych na łańcuchach. Czy można to zrobić bez tego? –

+0

Całkowicie się z tobą zgadzam. Próbujesz tego teraz. Najgorszy przypadek, ja myślę, że mogę zrobić UDF, który bierze tablicę kolumn i wypluwa liczbę za jednym zamachem. Daj mi sekundę. –

+0

Myślę, że mogę mieć rozwiązanie mieszania obu odpowiedzi w górę. Czekaj. –

1

Oto alternatywa w Spark 2.0:

val df = Seq((null,"A"),(null,"B"),("1","C")) 
     .toDF("foo","bar") 
     .withColumn("foo", 'foo.cast("Int")) 

df.show() 

+----+---+ 
| foo|bar| 
+----+---+ 
|null| A| 
|null| B| 
| 1| C| 
+----+---+ 

df.where('foo.isNull).groupBy('foo).count().show() 

+----+-----+ 
| foo|count| 
+----+-----+ 
|null| 2| 
+----+-----+ 
+0

Może mam niewłaściwe pytanie, ale nie wydaje się, aby odpowiedzieć na to pytanie: 1) Przegląda tylko jedno pole. 2) Wymaga podania pola. –

1

Jestem zaskoczony, że nie ma odpowiedzi na pytania d, że Spark SQL jest wyposażony w kilka standardowych funkcji, które spełniają wymagania:

Na przykład mam tabelę danych z 10 funkcji i mam wiersz z 8 wartością pustą, a następnie chcę go upuścić.

Można użyć jednego z wariantów DataFrameNaFunctions.drop metody z minNonNulls odpowiednio ustawione, powiedzmy 2.

kroplę (minNonNulls: int, cols SEQ [String]): DataFrame Zwraca nowy DataFrame, która upuszcza wiersze zawierające mniej niż minNonNulls wartości inne niż NULL i inne niż NaN w określonych kolumnach.

i spełnienia zmienność w nazwach kolumn jak w przypadku wymogu:

nie mogę napisać nazwy kolumn i coś zrobić odpowiednio.

można po prostu użyć Dataset.columns:

kolumny: Array [String] Zwraca wszystkie nazwy kolumn jako tablicę.


powiedzmy masz następujący zestaw danych z 5 funkcji (kolumny) i kilku rzędach prawie wszystkie null s.

val ns: String = null 
val features = Seq(("0","1","2",ns,ns), (ns, ns, ns, ns, ns), (ns, "1", ns, "2", ns)).toDF 
scala> features.show 
+----+----+----+----+----+ 
| _1| _2| _3| _4| _5| 
+----+----+----+----+----+ 
| 0| 1| 2|null|null| 
|null|null|null|null|null| 
|null| 1|null| 2|null| 
+----+----+----+----+----+ 

// drop rows with more than (5 columns - 2) = 3 nulls 
scala> features.na.drop(2, features.columns).show 
+----+---+----+----+----+ 
| _1| _2| _3| _4| _5| 
+----+---+----+----+----+ 
| 0| 1| 2|null|null| 
|null| 1|null| 2|null| 
+----+---+----+----+----+ 
Powiązane problemy