2015-08-21 21 views

Odpowiedz

1

Niestety, to była moja wina, ja już znalazłem funkcję withColumn(String colName, Column col) co powinno rozwiązać mój problem

+0

Jedynym problemem związanym z withColumn jest to, że trudno będzie sekwencyjnie pobierać elementy z listy i dodawać je do wybranych wierszy. Jeśli masz sposób na zrobienie tego, prawdopodobnie jest lepszy w ten sposób, ale twoje pytanie jest ogólne, aby powiedzieć;) – Niemand

+0

Dlaczego, będę konwertować moją listę przede wszystkim do obiektu kolumny i dodać go jak drugi argument funkcji. Czy to nie jest w porządku? ... – Guforu

+0

Interesujące. Opublikuj, jak to zrobiłeś po zakończeniu. – Niemand

5

Należy prawdopodobnie przekształcić swoją listę do jednego RDD kolumny i zastosować dołączyć na critetia pickeg przez Ciebie. Prosta konwersja DataFrame:

val df1 = sparkContext.makeRDD(yourList).toDF("newColumn") 

Jeśli trzeba utworzyć dodatkową kolumnę do wykonania przyłączenia na można dodać więcej kolumn, mapowanie listy:

val df1 = sparkContext.makeRDD(yourList).map(i => (i, fun(i)).toDF("newColumn", "joinOnThisColumn") 

nie jestem zaznajomiony z wersji Java, ale zalecana spróbuj użyć JavaSparkContext.parallelize(yourList) i zastosuj podobne operacje odwzorowania w oparciu o this doc.

+0

ok, dziękuję, spróbuję Twojego rozwiązania. Jednak znalazłem także funkcję API Java, a nie Scala. Dziękuję bardzo. – Guforu

1

Ten wątek jest trochę stary, ale spotkałem się z podobną sytuacją używając Java. Myślę, że bardziej niż cokolwiek, było pojęciowe niezrozumienie tego, jak powinienem podejść do tego problemu.

Aby rozwiązać problem, utworzyłem proste POJO, aby pomóc w nowej kolumnie dla zestawu danych (w przeciwieństwie do próby zbudowania na istniejącym). Myślę, że koncepcyjnie, nie zrozumiałem, że najlepiej było wygenerować zestaw danych podczas wstępnej lektury, gdzie trzeba dodać dodatkową kolumnę. Mam nadzieję, że to pomoże komuś w przyszłości.

Rozważmy następujący:

 JavaRDD<MyPojo> myRdd = dao.getSession().read().jdbc("jdbcurl","mytable",someObject.getProperties()).javaRDD().map(new Function<Row,MyPojo>() { 

         private static final long serialVersionUID = 1L; 

         @Override 
         public MyPojo call(Row row) throws Exception { 
         Integer curDos = calculateStuff(row); //manipulate my data 

         MyPojo pojoInst = new MyPojo(); 

         pojoInst.setBaseValue(row.getAs("BASE_VALUE_COLUMN")); 
         pojoInst.setKey(row.getAs("KEY_COLUMN")); 
         pojoInst.setCalculatedValue(curDos); 

         return dos; 
         } 
        }); 

     Dataset<Row> myRddRFF = dao.getSession().createDataFrame(myRdd, MyPojo.class); 

//continue load or other operation here... 
1

Oto przykład, gdzie mieliśmy datę kolumny i chciał, aby dodać kolejną kolumnę z miesiąca.

Dataset<Row> newData = data.withColumn("month", month((unix_timestamp(col("date"), "MM/dd/yyyy")).cast("timestamp"))); 

Mamy nadzieję, że to pomoże!

Pozdrawiam!

Powiązane problemy