2015-06-24 20 views
32

Próbuję uruchomić losową klasyfikację lasu za pomocą Spark ML api, ale mam problemy z utworzeniem właściwego wejścia ramki danych do potoku.Jak utworzyć poprawną ramkę danych do klasyfikacji w Spark ML

Oto przykładowe dane:

age,hours_per_week,education,sex,salaryRange 
38,40,"hs-grad","male","A" 
28,40,"bachelors","female","A" 
52,45,"hs-grad","male","B" 
31,50,"masters","female","B" 
42,40,"bachelors","male","B" 

wiek i hours_per_week są liczbami całkowitymi, podczas gdy inne funkcje, w tym etykiety salaryRange są kategoryczne (String)

ładuje plik CSV (pozwala wywołać sample.csv) można wykonać przez Spark csv library w następujący sposób:

val data = sqlContext.csvFile("/home/dusan/sample.csv") 

Domyślnie wszystkie kolumny są importowane jako ciąg tak musimy zmienić „wiek” i „hours_per_week” INT:

val toInt = udf[Int, String](_.toInt) 
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week"))) 

Wystarczy sprawdzić jak schemat wygląda teraz:

scala> dataFixed.printSchema 
root 
|-- age: integer (nullable = true) 
|-- hours_per_week: integer (nullable = true) 
|-- education: string (nullable = true) 
|-- sex: string (nullable = true) 
|-- salaryRange: string (nullable = true) 

Następnie pozwala ustawić krzyż walidator i rurociągu:

val rf = new RandomForestClassifier() 
val pipeline = new Pipeline().setStages(Array(rf)) 
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator) 

błąd pojawia się po uruchomieniu tej linii:

val cmModel = cv.fit(dataFixed) 

java.lang.IllegalArgumentException: Pole "funkcje" nie istnieje.

Możliwe jest ustawienie kolumny i kolumny z cechami w RandomForestClassifier, jednak mam 4 kolumny jako predyktory (funkcje) nie tylko jeden.

W jaki sposób powinienem uporządkować ramkę danych, aby etykiety i kolumny były poprawnie uporządkowane?

Dla wygody tutaj jest pełny kod:

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.ml.classification.RandomForestClassifier 
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator 
import org.apache.spark.ml.tuning.CrossValidator 
import org.apache.spark.ml.Pipeline 
import org.apache.spark.sql.DataFrame 

import org.apache.spark.sql.functions._ 
import org.apache.spark.mllib.linalg.{Vector, Vectors} 


object SampleClassification { 

    def main(args: Array[String]): Unit = { 

    //set spark context 
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local"); 
    val sc = new SparkContext(conf) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

    import sqlContext.implicits._ 
    import com.databricks.spark.csv._ 

    //load data by using databricks "Spark CSV Library" 
    val data = sqlContext.csvFile("/home/dusan/sample.csv") 

    //by default all columns are imported as string so we need to change "age" and "hours_per_week" to Int 
    val toInt = udf[Int, String](_.toInt) 
    val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week"))) 


    val rf = new RandomForestClassifier() 

    val pipeline = new Pipeline().setStages(Array(rf)) 

    val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator) 

    // this fails with error 
    //java.lang.IllegalArgumentException: Field "features" does not exist. 
    val cmModel = cv.fit(dataFixed) 
    } 

} 

Dzięki za pomoc!

+0

Nie wiesz o języku scala, ale gdzie ustawiasz etykiety i funkcje z zestawu danych jak LabeledPoint (etykiety, lista (funkcje)), sprawdź przykład w https://spark.apache.org/docs/latest/mllib -linear-methods.html –

+0

@ABC, Proszę sprawdzić mój komentarz w poniższym pytaniu. –

+0

sprawdź ten przykład: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala gdzie val model = pipeline.fit (training .toDF()) wykorzystuje ramkę danych w potoku –

Odpowiedz

29

Wystarczy upewnić się, że masz kolumnę "features" w swoim dataframe że jest typu VectorUDF jak pokazano poniżej:

scala> val df2 = dataFixed.withColumnRenamed("age", "features") 
df2: org.apache.spark.sql.DataFrame = [features: int, hours_per_week: int, education: string, sex: string, salaryRange: string] 

scala> val cmModel = cv.fit(df2) 
java.lang.IllegalArgumentException: requirement failed: Column features must be of type [email protected] but was actually IntegerType. 
    at scala.Predef$.require(Predef.scala:233) 
    at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37) 
    at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:50) 
    at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71) 
    at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:118) 
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164) 
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164) 
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) 
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) 
    at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) 
    at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:164) 
    at org.apache.spark.ml.tuning.CrossValidator.transformSchema(CrossValidator.scala:142) 
    at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59) 
    at org.apache.spark.ml.tuning.CrossValidator.fit(CrossValidator.scala:107) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76) 

Edit1

Zasadniczo istnieje potrzeba być dwoma polami w ramce danych "funkcje" dla wektora elementów i "etykiety" dla etykiet instancji. Instancja musi być typu Double.

stworzyć „możliwości” Pola Vector typu najpierw utworzyć udf jak pokazano poniżej:

val toVec4 = udf[Vector, Int, Int, String, String] { (a,b,c,d) => 
    val e3 = c match { 
    case "hs-grad" => 0 
    case "bachelors" => 1 
    case "masters" => 2 
    } 
    val e4 = d match {case "male" => 0 case "female" => 1} 
    Vectors.dense(a, b, e3, e4) 
} 

Teraz też zakodować „label” pole, utworzyć inną udf jak pokazano poniżej:

val encodeLabel = udf[Double, String](_ match { case "A" => 0.0 case "B" => 1.0}) 

teraz możemy przekształcić oryginalny dataframe stosując te dwa udf:

val df = dataFixed.withColumn(
    "features", 
    toVec4(
    dataFixed("age"), 
    dataFixed("hours_per_week"), 
    dataFixed("education"), 
    dataFixed("sex") 
) 
).withColumn("label", encodeLabel(dataFixed("salaryRange"))).select("features", "label") 

Należy pamiętać, że mogą istnieć dodatkowe kolumny/pola obecne w dataframe, ale w tym przypadku wybrałem tylko features i label:

scala> df.show() 
+-------------------+-----+ 
|   features|label| 
+-------------------+-----+ 
|[38.0,40.0,0.0,0.0]| 0.0| 
|[28.0,40.0,1.0,1.0]| 0.0| 
|[52.0,45.0,0.0,0.0]| 1.0| 
|[31.0,50.0,2.0,1.0]| 1.0| 
|[42.0,40.0,1.0,0.0]| 1.0| 
+-------------------+-----+ 

Teraz jej do połowy, aby ustawić odpowiednie parametry dla algorytmu uczenia, aby działa.

+0

Czy masz szansę pokazać, jak mogę utworzyć kolumnę o nazwie "funkcje" typu VectorUDF z moich danych? –

+1

@DusanGrubjesic: Dodałem przykłady kodu. Proszę sprawdzić ** EDIT1 ** – tuxdna

+0

to jest naprawdę świetne! Po prostu nie jestem pewien, w jaki sposób możemy przekazać informacje do klasyfikatora z ML, że teraz te e3 i e4 są kategoryczne cechy nie numeryczne? Ponieważ w aplecie "low level" mllib można było przekazać ** categoricalFeaturesInfo ** z indeksami i liczbą kategorii cech kategorycznych. W ml api "wysokiego poziomu" należy go pobrać bezpośrednio ze schematu. –

0

Zgodnie z dokumentacją dotyczącą iskry na mllib - losowych drzewach, wydaje mi się, że powinieneś zdefiniować mapę cech, której używasz, a punkty powinny być etykietą.

To powie algorytmowi, która kolumna powinna być używana jako prognoza, a które są funkcjami.

https://spark.apache.org/docs/latest/mllib-decision-tree.html

+1

. W pakiecie znajduje się stara api ** mllib **, a punkty powinny być rzeczywiście oznaczone jako LabeledPoint. Jednak próbuję użyć nowego api znajdującego się w pakiecie * ml *, ponieważ obsługuje on potoki, krzyżową walidację itd. Ten nowy api wykorzystuje DataFrame jako dane wejściowe. na przykład porównaj te dwa: [RandomForestClassifier] (https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.ml.classification.RandomForestClassifier) ​​od ** ml ** która używa DataFrame i RandomForestModel (https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.tree.model.RandomForestModel) od ** mllib ** –

45

Od wersji 1.4 urządzenia Spark można użyć transformatora org.apache.spark.ml.feature.VectorAssembler. Wystarczy podać nazwy kolumn, które mają być funkcjami.

val assembler = new VectorAssembler() 
    .setInputCols(Array("col1", "col2", "col3")) 
    .setOutputCol("features") 

i dodaj go do swojego rurociągu.

+1

[Odpowiedź tuxdna] (http://stackoverflow.com/a/31102246/1281433) wyjaśnił szczegóły problemu i jakie rozwiązanie ma wyglądać. ** Ta odpowiedź ** pokazuje, w jaki sposób można to osiągnąć. –

+1

Nie działałoby, ponieważ niektóre funkcje są typu String. Świetne rozwiązanie dla ściśle danych liczbowych. – gstvolvr

+2

@gstvolvr Najpierw musisz użyć 'StringIndexer', aby przekonwertować ciągi na numeryczne. Być może warto dodać ten krok do odpowiedzi dla jasności. – max

Powiązane problemy