2016-12-23 38 views
7

Próbuję przetworzyć plik dziennika. Najpierw odczytałem plik dziennika i podzieliłem te pliki zgodnie z moimi wymaganiami i zapisałem każdą kolumnę w oddzielnym JavaRDD. Teraz muszę przekonwertować te JavaRDD na DataFrames dla przyszłych operacji. Jest to kod, co starałem dotąd:Konwersja JavaRDD do DataFrame w Spark java

  SparkConf conf = new SparkConf().setAppName("AuctionBid").setMaster("local"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     JavaRDD<String> diskfile = sc.textFile("/Users/karuturi/Downloads/log.txt"); 
     JavaRDD<String> urlrdd=diskfile.flatMap(line -> Arrays.asList(line.split("\t")[0])); 
     System.out.println(urlrdd.take(1)); 
     SQLContext sql = new SQLContext(sc); 

i jest to sposób w jaki próbuję przekonwertować JavaRDD do DataFrame:

DataFrame fileDF = sqlContext.createDataFrame(urlRDD, Model.class); 

Jednak powyższa linia nie jest mylące o working.I Model.class.

Czy ktoś może mnie zasugerować.

Dzięki.

Odpowiedz

9

Import:

import java.io.Serializable; 

import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.sql.Dataset; 
import org.apache.spark.sql.Row; 

CRE zjadł klasę POJO dla adresu URL. Polecam wam pisać dla linii Log, który składa się z zawartości, daty, czasu, sposobu, cel, etc .. jako członkowie

public static class Url implements Serializable { 
    private String value; 

    public String getValue() { 
    return value; 
    } 

    public void setValue(String value) { 
    this.value = value; 
    } 
} 

Utwórz RDD URL obiektów z pliku tekstowego

JavaRDD<Url> urlsRDD = spark.read() 
    .textFile("/Users/karuturi/Downloads/log.txt") 
    .javaRDD() 
    .map(new Function<String, Url>() { 
    @Override 
    public Url call(String line) throws Exception { 
     String[] parts = line.split("\\t"); 
     Url url = new Url(); 
     url.setValue(parts[0].replaceAll("[", "")); 
     return url; 
    } 
    }); 

Tworzenie DataFrame z RDD

Dataset<Row> urlsDF = spark.createDataFrame(urlsRDD, Url.class); 

RDD to DataFrame - Spark 2.0
RDD to DataFrame - Spark 1.6

+0

Powyższy kod został napisany w Sparku 2.0+. – mrsrinivas

+0

Co jeśli chcę przekonwertować 'JavaRDD' z' SparseVector'? –

1

można bezpośrednio odczytać plik, używając SqlContext bezpośrednio

Zastosowanie metody SqlContext

czytać Więcej informacji można śledzić ten link

https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#creating-dataframes

Albo można importować

import sqlContext.implicits.*; 

Następnie użyj metody toDF() na RTD, aby przekształcić ją w ramkę danych.

+2

importowanie polecenia sqlContext.implicits._ nieobsługującego w iskrze java – user4342532

+0

Tak, przepraszam, widziałem to. Najlepszą alternatywą jest użycie sqlContext do odczytania pliku. Ponieważ konwersja rdd na ramkę danych używa refleksji, aby zredukować dodatkowe obliczenia, użyj sqlContext do odczytania pliku. –

4

Wystarczy flatmap danych według 7 tabeli kolumny i użyć fragmentu kodu poniżej

String[] columns = new String[7] {"clumn1","column2","column3","column4","column5","column6","column7"}; 
List<String> tableColumns = Arrays.asList(columns); 

StrucType schema = createSchema(tableColumns); 

    public StructType createSchema(List<String> tableColumns){ 

     List<StructField> fields = new ArrayList<StructField>(); 
     for(String column : tableColumns){   

       fields.add(DataTypes.createStructField(column, DataTypes.StringType, true));    

     } 
     return DataTypes.createStructType(fields); 
    } 

sqlContext.createDataFrame(urlRDD, schema); 
3

można zrobić coś podobnego (jestem konwersji w locie od Scala więc przepraszam żadnych literówek):

import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.StructField; 
import org.apache.spark.sql.types.StructType; 

JavaRDD<Row> rowRDD = urlrdd.map(new Function<String, Row>() { 
    @Override 
    public Row call(String record) throws Exception { 
     return RowFactory.create(record()); 
    } 
} 
// now you wish to create the target schema. This is basically a list of 
// fields (each field would be a column) which you are adding to a StructType 
List<StructField> fields = new ArrayList<>(); 
StructField field = DataTypes.createStructField("url", DataTypes.StringType, true); 
fields.add(field); 
StructType schema = DataTypes.createStructType(fields); 

// now you can create the dataframe: 
DataFrame df= sqlContext.createDataFrame(rowRDD, schema);  

kilka dodatkowych uwag:

  • Czemu flatmaping kiedy biorą tylko jodły t element? Mogłeś po prostu zrobić:

    JavaRDD<String> urlrdd=diskfile.flatMap(line -> line.split("\t")[0]);

  • Zakładam, w prawdziwym życiu, co chcesz usunąć „[” z adresu URL (można łatwo zrobić to na mapie).

  • Jeśli przenosisz się do wersji iskry 2.0 lub nowszej, to zamiast sqlContext powinieneś używać iskry (iskry).

  • Można utworzyć pojedynczą ramkę danych ze wszystkimi kolumnami. Możesz to zrobić, dodając do schematu wszystkie pola (tzn. Zamiast dodawać pojedyncze pola, dodaj je wszystkie). Zamiast używać urlrdd, użyj pliku dyskowego i wykonaj podział wewnątrz kreacji "publiczne wywołanie wiersza".To byłoby coś takiego:

    JavaRDD<Row> rowRDD = diskfile.map(new Function<String, Row>() { @override public Row call(String record) throws Exception { String[] recs = record.split("\t") return RowFactory.create(recs[0], recs[1], ...); } });

  • Można go utworzyć bezpośrednio: Wystarczy użyć

    sqlContext.read.option("sep","\t").csv.load(filename,schema)

Powiązane problemy