2015-10-12 14 views
13

Scenariusz: Moje dane wejściowe będą wielokrotnymi małymi XML-ami i mam na celu odczytanie tych XML-ów jako RDD. Wykonaj sprzężenie z innym zestawem danych i utwórz RDD i wyślij dane wyjściowe jako plik XML.Przetwarzanie XML w Spark

Czy można odczytać XML za pomocą iskry, załadować dane jako RDD? Jeśli to możliwe, to jak będzie czytany XML.

Sample XML:

<root> 
    <users> 
     <user> 
       <account>1234<\account> 
       <name>name_1<\name> 
       <number>34233<\number> 
     <\user> 
     <user> 
       <account>58789<\account> 
       <name>name_2<\name> 
       <number>54697<\number> 
     <\user>  
    <\users> 
<\root> 

Jak to zostanie załadowany do RDD?

+7

BTW, Twój XML nie jest wcale XML.Musisz zastąpić wszystkie '\\' do '/' –

+0

Hi Pavani! Zaczynam od tego ćwiczenia na Sparku i chcę wiedzieć, że Rozwiązania są bardziej zaawansowane w klasie. Czy możesz mi pomóc? –

Odpowiedz

15

Tak, możliwe, ale szczegóły będą się różnić w zależności od wybranej metody.

  • Jeśli pliki są małe, jak już wspomniano, najprostszym rozwiązaniem jest załadowanie danych z wykorzystaniem SparkContext.wholeTextFiles. Ładuje dane jako RDD[(String, String)], gdzie pierwszym elementem jest ścieżka, a druga zawartość pliku. Następnie analizujesz każdy plik indywidualnie, tak jak w trybie lokalnym.
  • W przypadku większych plików można użyć Hadoop input formats.
    • Jeśli struktura jest prosta, można podzielić rekordy za pomocą textinputformat.record.delimiter. Możesz znaleźć prosty przykład: here. Wejście nie jest XML, ale powinna dać ci i pomysł, jak postępować
    • Inaczej Kornak zapewnia XmlInputFormat
  • Wreszcie możliwe jest, aby odczytać plik, używając SparkContext.textFile i dostosować później na płycie obejmującym między partycjami. Koncepcyjnie oznacza to coś podobnego do tworzenia przesuwne okno lub partitioning records into groups of fixed size:

    • korzystanie mapPartitionsWithIndex partycje do identyfikacji rekordy podziale między partycjami, zbierać połamane rekordy
    • użycie sekund mapPartitionsWithIndex do naprawy uszkodzonych rekordów

Edytuj:

Jest też stosunkowo nowy spark-xml pakiet, który pozwala wyodrębnić konkretne zapisy w etykiet:

val df = sqlContext.read 
    .format("com.databricks.spark.xml") 
    .option("rowTag", "foo") 
    .load("bar.xml") 
4

Oto sposób, aby go wykonać -> Użyłem HadoopInputFormats do odczytu danych XML w iskry jak wyjaśnił zero323.

Dane wejściowe ->

<root> 
    <users> 
     <user> 
      <account>1234<\account> 
      <name>name_1<\name> 
      <number>34233<\number><\user> 
     <user> 
      <account>58789<\account> 
      <name>name_2<\name> 
      <number>54697<\number> 
     <\user> 
    <\users> 
<\root> 

Kod do odczytu wejścia XML ->

Dostaniesz kilka słoików w tym link

//---------------spark_import 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 
import org.apache.spark.sql.SQLContext 

//----------------xml_loader_import 
import org.apache.hadoop.io.LongWritable 
import org.apache.hadoop.io.Text 
import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.io.{ LongWritable, Text } 
import com.cloudera.datascience.common.XmlInputFormat 

object Tester_loader { 
    case class User(account: String, name: String, number: String) 
    def main(args: Array[String]): Unit = { 

    val sparkHome = "/usr/big_data_tools/spark-1.5.0-bin-hadoop2.6/" 
    val sparkMasterUrl = "spark://SYSTEMX:7077" 

    var jars = new Array[String](3) 

    jars(0) = "/home/hduser/Offload_Data_Warehouse_Spark.jar" 
    jars(1) = "/usr/big_data_tools/JARS/Spark_jar/avro/spark-avro_2.10-2.0.1.jar" 

    val conf = new SparkConf().setAppName("XML Reading") 
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
     .setMaster("local") 
     .set("spark.cassandra.connection.host", "127.0.0.1") 
     .setSparkHome(sparkHome) 
     .set("spark.executor.memory", "512m") 
     .set("spark.default.deployCores", "12") 
     .set("spark.cores.max", "12") 
     .setJars(jars) 

    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 

    //---------------------------------loading user from XML 

    val pages = readFile("src/input_data", "<user>", "<\\user>", sc) //calling function 1.1 

    val xmlUserDF = pages.map { tuple => 
     { 
     val account = extractField(tuple, "account") 
     val name = extractField(tuple, "name") 
     val number = extractField(tuple, "number") 

     User(account, name, number) 
     } 
    }.toDF() 
    println(xmlUserDF.count()) 
    xmlUserDF.show() 
    } 

    //------------------------------------Functions 

    def readFile(path: String, start_tag: String, end_tag: String, sc: SparkContext) = { 
    val conf = new Configuration() 
    conf.set(XmlInputFormat.START_TAG_KEY, start_tag) 
    conf.set(XmlInputFormat.END_TAG_KEY, end_tag) 
    val rawXmls = sc.newAPIHadoopFile(path, classOf[XmlInputFormat], classOf[LongWritable], 
     classOf[Text], conf) 

    rawXmls.map(p => p._2.toString) 
    } 

    def extractField(tuple: String, tag: String) = { 
    var value = tuple.replaceAll("\n", " ").replace("<\\", "</") 

    if (value.contains("<" + tag + ">") && value.contains("</" + tag + ">")) { 

     value = value.split("<" + tag + ">")(1).split("</" + tag + ">")(0) 

    } 
    value 
    } 
} 

Output ->

+-------+------+------+ 
|account| name|number| 
+-------+------+------+ 
| 1234|name_1| 34233| 
| 58789|name_2| 54697| 
+-------+------+------+ 

Otrzymany wynik jest w dataframes można przekonwertować je do RDD jak na swoje wymagania jak this->

val xmlUserRDD = xmlUserDF.toJavaRDD.rdd.map { x => (x.get(0).toString(),x.get(1).toString(),x.get(2).toString()) } 

Proszę ocenić go, jeśli mogłoby to pomóc jak niektórzy.

3

To ci pomoże.

package packagename; 

import org.apache.spark.sql.Dataset; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.SQLContext; 
import org.apache.spark.sql.SparkSession; 

import com.databricks.spark.xml.XmlReader; 

public class XmlreaderSpark { 
    public static void main(String arr[]){ 
    String localxml="file path"; 
    String booksFileTag = "user"; 

    String warehouseLocation = "file:" + System.getProperty("user.dir") + "spark-warehouse"; 
    System.out.println("warehouseLocation" + warehouseLocation); 
    SparkSession spark = SparkSession 
       .builder() 
       .master("local") 
       .appName("Java Spark SQL Example") 
       .config("spark.some.config.option", "some-value").config("spark.sql.warehouse.dir", warehouseLocation) 
       .enableHiveSupport().config("set spark.sql.crossJoin.enabled", "true") 
       .getOrCreate(); 
    SQLContext sqlContext = new SQLContext(spark); 

    Dataset<Row> df = (new XmlReader()).withRowTag(booksFileTag).xmlFile(sqlContext, localxml); 
    df.show(); 

    } 
} 

Trzeba dodać tę zależność w pom.xml:

<dependency> 
    <groupId>com.databricks</groupId> 
    <artifactId>spark-xml_2.10</artifactId> 
    <version>0.4.0</version> 
</dependency> 

i plik wejściowy nie jest w odpowiednim formacie.

Dzięki.

3

Istnieją dwie dobre opcje dla prostych przypadkach:

  • wholeTextFiles. Użyj metody odwzorowania w parserze XML, który może być parserem ściągającym Scala XML (szybciej do kodu) lub parsera Pull SAX (lepsza wydajność).
  • Hadoop streaming XMLInputFormat które należy zdefiniować początkowy i końcowy znacznik <user></user> aby je przetwarzać, jednak tworzy jedną partycję za tagiem użytkownika
  • spark-xml package jest zbyt dobrym rozwiązaniem.

Wszystkie opcje są ograniczone do przetwarzania tylko prostych plików XML, które można interpretować jako zestaw danych z wierszami i kolumnami.

Jeśli jednak sprawimy, że będzie trochę skomplikowana, te opcje nie będą przydatne.

Na przykład, jeśli masz jeden podmiot tam:

<root> 
    <users> 
    <user>...</users> 
    <companies> 
    <company>...</companies> 
</root> 

Teraz trzeba wygenerować 2 RDD i zmienić parser do rozpoznawania tag <company>.

To tylko prosty przypadek, ale kod XML może być znacznie bardziej złożony i trzeba będzie wprowadzić więcej zmian.

Aby rozwiązać tę złożoność, zbudowaliśmy Flexter na szczycie Apache Spark, aby usunąć ból z processing XML files on Spark. Polecam również przeczytać o converting XML on Spark to Parquet. Ten ostatni post zawiera również kilka próbek kodu, które pokazują, w jaki sposób można odpytywać dane wyjściowe za pomocą SparkSQL.

Zastrzeżenie: Pracuję dla Sonra

+1

Proponuję dodać zastrzeżenie, że jesteś współzałożycielem tej firmy. – Davos