2016-10-26 9 views
6

Próbuję przeczytać w jakimś jsonie, wywnioskować schemat i napisać go ponownie jako parkiet do s3 (s3a). Z jakiegoś powodu, około jednej trzeciej drogi przez część zapisu przebiegu, iskry zawsze błędy z błędem zawartym poniżej. Nie mogę znaleźć żadnych oczywistych przyczyn problemu: nie ma w nim pamięci; nie ma długich przerw w GC. Wydaje się, że nie ma żadnych dodatkowych komunikatów o błędach w dziennikach poszczególnych executorów.iskka: SAXParseException podczas pisania do parkietu na s3

Skrypt działa dobrze na innym zbiorze danych, który mam, który ma bardzo podobną strukturę, ale kilka rzędów wielkości mniejszych.

Używam iskry 2.0.1-hadoop-2.7 i używam FileOutputCommitter. Wersja algorytmu nie ma znaczenia.

Edycja: To nie wydaje się być problemem w źle sformatowanych plikach json lub uszkodzonych. Rozpakowałem i przeczytałem każdy plik oddzielnie, bez żadnego błędu.

Oto uproszczona wersja skryptu:

object Foo { 

    def parseJson(json: String): Option[Map[String, Any]] = { 
    if (json == null) 
     Some(Map()) 
    else 
     parseOpt(json).map((j: JValue) => j.values.asInstanceOf[Map[String, Any]]) 
     } 
    } 
} 

// read in as text and parse json using json4s 
val jsonRDD: RDD[String] = sc.textFile(inputPath) 
    .map(row -> Foo.parseJson(row)) 

// infer a schema that will encapsulate the most rows in a sample of size sampleRowNum 
val schema: StructType = Infer.getMostCommonSchema(sc, jsonRDD, sampleRowNum) 

// get documents compatibility with schema 
val jsonWithCompatibilityRDD: RDD[(String, Boolean)] = jsonRDD 
    .map(js => (js, Infer.getSchemaCompatibility(schema, Infer.inferSchema(js)).toBoolean)) 
    .repartition(partitions) 

val jsonCompatibleRDD: RDD[String] = jsonWithCompatibilityRDD 
    .filter { case (js: String, compatible: Boolean) => compatible } 
    .map { case (js: String, _: Boolean) => js } 

// create a dataframe from documents with compatible schema 
val dataFrame: DataFrame = spark.read.schema(schema).json(jsonCompatibleRDD) 

To uzupełnia wcześniejszy schemat wnioskowania kroki pomyślnie. Sam błąd występuje na ostatniej linii, ale przypuszczam, że może objąć przynajmniej bezpośrednio poprzedzającego statemnt, jeśli nie wcześniej:

org.apache.spark.SparkException: Task failed while writing rows 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.RuntimeException: Failed to commit task 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:275) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:257) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) 
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258) 
    ... 8 more 
    Suppressed: java.lang.NullPointerException 
     at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147) 
     at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113) 
     at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112) 
     at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569) 
     at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$abortTask$1(WriterContainer.scala:282) 
     at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$2.apply$mcV$sp(WriterContainer.scala:258) 
     at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1354) 
     ... 9 more 
Caused by: com.amazonaws.AmazonClientException: Unable to unmarshall response (Failed to parse XML document with handler class com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler). Response Code: 200, Response Text: OK 
    at com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:738) 
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:399) 
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232) 
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528) 
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3480) 
    at com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:604) 
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:962) 
    at org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:1147) 
    at org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:1136) 
    at org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:142) 
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) 
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) 
    at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:400) 
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117) 
    at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112) 
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:267) 
    ... 13 more 
Caused by: com.amazonaws.AmazonClientException: Failed to parse XML document with handler class com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler 
    at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseXmlInputStream(XmlResponsesSaxParser.java:150) 
    at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:279) 
    at com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:75) 
    at com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:72) 
    at com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62) 
    at com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31) 
    at com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:712) 
    ... 29 more 
Caused by: org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 2; XML document structures must start and end within the same entity. 
    at org.apache.xerces.util.ErrorHandlerWrapper.createSAXParseException(Unknown Source) 
    at org.apache.xerces.util.ErrorHandlerWrapper.fatalError(Unknown Source) 
    at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source) 
    at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source) 
    at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source) 
    at org.apache.xerces.impl.XMLScanner.reportFatalError(Unknown Source) 
    at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.endEntity(Unknown Source) 
    at org.apache.xerces.impl.XMLDocumentScannerImpl.endEntity(Unknown Source) 
    at org.apache.xerces.impl.XMLEntityManager.endEntity(Unknown Source) 
    at org.apache.xerces.impl.XMLEntityScanner.load(Unknown Source) 
    at org.apache.xerces.impl.XMLEntityScanner.skipChar(Unknown Source) 
    at org.apache.xerces.impl.XMLDocumentScannerImpl$PrologDispatcher.dispatch(Unknown Source) 
    at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown Source) 
    at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source) 
    at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source) 
    at org.apache.xerces.parsers.XMLParser.parse(Unknown Source) 
    at org.apache.xerces.parsers.AbstractSAXParser.parse(Unknown Source) 
    at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseXmlInputStream(XmlResponsesSaxParser.java:141) 
    ... 35 more 

Oto mój conf:

spark.executor.extraJavaOptions -XX:+UseG1GC -XX:MaxPermSize=1G -XX:+HeapDumpOnOutOfMemoryError 
spark.executor.memory 16G 
spark.executor.uri https://s3.amazonaws.com/foo/spark-2.0.1-bin-hadoop2.7.tgz 
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem 
spark.hadoop.fs.s3a.buffer.dir /raid0/spark 
spark.hadoop.fs.s3n.buffer.dir /raid0/spark 
spark.hadoop.fs.s3a.connection.timeout 500000 
spark.hadoop.fs.s3n.multipart.uploads.enabled true 
spark.hadoop.parquet.block.size 2147483648 
spark.hadoop.parquet.enable.summary-metadata false 
spark.jars.packages com.databricks:spark-avro_2.11:3.0.1 
spark.local.dir /raid0/spark 
spark.mesos.coarse false 
spark.mesos.constraints priority:1 
spark.network.timeout 600 
spark.rpc.message.maxSize 500 
spark.speculation false 
spark.sql.parquet.mergeSchema false 
spark.sql.planner.externalSort true 
spark.submit.deployMode client 
spark.task.cpus 1 

Odpowiedz

1

SAXParseException może wskazywać źle sformatowany plik XML. Ponieważ zadanie kończy się mniej więcej w jednej trzeciej konsekwentnie, oznacza to, że prawdopodobnie kończy się niepowodzeniem w tym samym miejscu za każdym razem (plik, którego partycja zajmuje mniej więcej jedną trzecią długości listy partycji).

Czy możesz wkleić swój skrypt? Możliwe, że uda się owinąć krok Sparka w pętlę try/catch, która wyświetli plik, jeśli wystąpi ten błąd, co pozwoli łatwo przybliżyć problem.

+0

Dodano fragment. Może miejscem, w którym można spróbować catch, jest metoda 'Foo.parseJson'? – Luke

1

Z kłody:

spowodowane: org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 2; Struktury dokumentów XML muszą zaczynać się i kończyć w obrębie tego samego elementu.

i

Spowodowany przez: com.amazonaws.AmazonClientException: Nie można przetworzyć dokument XML z klasy obsługi com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser $ ListBucketHandler

Wygląda na to, że masz uszkodzony/niepoprawnie sformatowany plik, a Twój błąd rzeczywiście występuje podczas odczytu części zadania. Możesz to potwierdzić, próbując innej operacji, która wymusi odczyt, taki jak count().

Po potwierdzeniu, celem będzie znalezienie uszkodzonego pliku. Możesz to zrobić, wyświetlając listę plików s3, sc.parallelize(), a następnie próbując odczytać pliki w niestandardowej funkcji, używając map().

import boto3 
from pyspark.sql import Row  

def scanKeys(startKey, endKey): 
    bucket = boto3.resource('s3').Bucket('bucketName') 
    for obj in bucket.objects.filter(Prefix='prefix', Marker=startKey): 
     if obj.key < endKey: 
      yield obj.key 
     else: 
      return 

def testFile(s3Path): 
    s3obj = boto3.resource('s3').Object(bucket_name='bucketName', key=key) 
    body = s3obj.get()['Body'] 
    ... 
    logic to test file format, or use a try/except and attempt to parse it 
    ... 
    if fileFormatedCorrectly == True: 
     return Row(status='Good', key = s3Path) 
    else: 
     return Row(status='Fail', key = s3Path) 


keys = list(scanKeys(startKey, endKey)) 
keyListRdd = sc.parallelize(keys, 1000) 
keyListRdd.map(testFile).filter(lambda x: x.asDict.get('status') == 'Fail').collect() 

ten powróci drogi S3 na niepoprawnie sformatowane pliki

+0

Jeśli ścieżką s3 jest "nazwa_kompletu/a/b/c/d.txt", należy użyć opcji 'bucket = 'nazwa_kompletu'' i' klucz =' a/b/c/d.txt'' – David

+1

. Czy możesz edytować komentarz, aby usunąć moje nazwisko? – David

+1

I widzę, że używasz iskry scala. Ten link może pomóc w uzyskaniu wszystkich obiektów s3 https://gist.githubusercontent.com/pjrt/f1cad93b154ac8958e65/raw/7b0b764408f145f51477dc05ef1a99e8448bce6d/S3Puller.scala który był z http://tech.kinja.com/how-not-to- pull-from-s3-using-apache-spark-1704509219 – David

2

mogę myśleć o trzech możliwych przyczyn tego problemu.

  1. Wersja JVM. AWS SDK sprawdza następujące. "1.6.0_06", "1.6.0_13", "1.6.0_17", "1.6.0_65", "1.7.0_45". Jeśli używasz jednego z nich , spróbuj uaktualnić.
  2. Stary pakiet SDK AWS. Przejdź do https://github.com/aws/aws-sdk-java/issues/460, aby uzyskać obejście tego problemu.
  3. Jeśli w katalogu, w którym piszesz te pliki, znajduje się wiele plików, możesz uderzyć w numer https://issues.apache.org/jira/browse/HADOOP-13164. Rozważ zwiększenie limitu czasu do większych wartości.
Powiązane problemy