2014-12-18 15 views
10

Mam każdy rekord rozłożony na wiele linii w pliku wejściowym (bardzo duży plik).Jak przetwarzać wielowierszowe wpisy wejściowe w Spark

Ex:

Id: 2 
ASIN: 073870
    title: Test tile for this product 
    group: Book 
    salesrank: 168501 
    similar: 5 0738700811 1567184912 1567182813 0738700514 0738700915 
    categories: 2 
    |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Wicca[12484] 
    |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Witchcraft[12486] 
    reviews: total: 12 downloaded: 12 avg rating: 4.5 
    2001-12-16 cutomer: A11NCO6YTE4BTJ rating: 5 votes: 5 helpful: 4 
    2002-1-7 cutomer: A9CQ3PLRNIR83 rating: 4 votes: 5 helpful: 5 

Jak rozpoznać i przetwarzać każdy rekord linii MULTI IN iskry?

+1

Twój wkład niesamowicie wygląda jak JSON. Możesz chcieć przetworzyć do JSON z jednym rekordem na linię, a następnie wczytać przy pomocy 'SqlContext.jsonFile'. – huitseeker

+1

Musisz utworzyć własną wtyczkę 'InputFormat', która wie, jak podzielić te pliki wielowierszowe bez dzielenia pojedynczego rekordu. Lub, jak sugeruje @huitseeker, możesz go wstępnie przetworzyć w format, który hadoop już wie, jak sobie z nim poradzić. – lmm

+1

@huitseeker Ale to nie jest zgodne z formatem JSON –

Odpowiedz

7

Dokonałem tego, wprowadzając niestandardowy format wejściowy i czytnik płyt.

public class ParagraphInputFormat extends TextInputFormat { 

    @Override 
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) { 
     return new ParagraphRecordReader(); 
    } 
} 

public class ParagraphRecordReader extends RecordReader<LongWritable, Text> { 
    private long end; 
    private boolean stillInChunk = true; 

    private LongWritable key = new LongWritable(); 
    private Text value = new Text(); 

    private FSDataInputStream fsin; 
    private DataOutputBuffer buffer = new DataOutputBuffer(); 

    private byte[] endTag = "\n\r\n".getBytes(); 

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { 
     FileSplit split = (FileSplit) inputSplit; 
     Configuration conf = taskAttemptContext.getConfiguration(); 
     Path path = split.getPath(); 
     FileSystem fs = path.getFileSystem(conf); 

     fsin = fs.open(path); 
     long start = split.getStart(); 
     end = split.getStart() + split.getLength(); 
     fsin.seek(start); 

     if (start != 0) { 
      readUntilMatch(endTag, false); 
     } 
    } 

    public boolean nextKeyValue() throws IOException { 
     if (!stillInChunk) return false; 

     boolean status = readUntilMatch(endTag, true); 

     value = new Text(); 
     value.set(buffer.getData(), 0, buffer.getLength()); 
     key = new LongWritable(fsin.getPos()); 
     buffer.reset(); 

     if (!status) { 
      stillInChunk = false; 
     } 

     return true; 
    } 

    public LongWritable getCurrentKey() throws IOException, InterruptedException { 
     return key; 
    } 

    public Text getCurrentValue() throws IOException, InterruptedException { 
     return value; 
    } 

    public float getProgress() throws IOException, InterruptedException { 
     return 0; 
    } 

    public void close() throws IOException { 
     fsin.close(); 
    } 

    private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { 
     int i = 0; 
     while (true) { 
      int b = fsin.read(); 
      if (b == -1) return false; 
      if (withinBlock) buffer.write(b); 
      if (b == match[i]) { 
       i++; 
       if (i >= match.length) { 
        return fsin.getPos() < end; 
       } 
      } else i = 0; 
     } 
    } 

} 

ENDTAG identyfikuje koniec każdego rekordu.

9

Jeśli dane multi-line ma określony separator rekordu, można korzystać ze wsparcia Hadoop do ewidencji multi-line, zapewniając separator przez hadoop.Configuration obiektu:

Coś jak to powinno zrobić:

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 
val conf = new Configuration 
conf.set("textinputformat.record.delimiter", "id:") 
val dataset = sc.newAPIHadoopFile("/path/to/data", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) 
val data = dataset.map(x=>x._2.toString) 

Zapewni to RDD[String], gdzie każdy element odpowiada rekordowi. Następnie musisz przeanalizować każdy rekord zgodnie z wymaganiami aplikacji.

Powiązane problemy