2013-04-17 19 views
15

Szukam przykładu, który używa nowego interfejsu API do odczytywania i zapisywania plików sekwencji.Czytanie i pisanie plików sekwencji przy użyciu Hadoop 2.0 Apis

Skutecznie Muszę wiedzieć, jak korzystać z tych funkcji

createWriter(Configuration conf, org.apache.hadoop.io.SequenceFile.Writer.Option... opts) 

Stara definicja nie działa dla mnie:

SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass()); 

Podobnie muszę wiedzieć, jaki będzie kod do czytania plik sekwencji, jak follwoing jest przestarzała:

SequenceFile.Reader(fs, path, conf); 

Oto sposób wykorzystać ten sam -

String uri = args[0]; 
    Configuration conf = new Configuration(); 
    Path path = new Path(uri); 

    IntWritable key = new IntWritable(); 
    Text value = new Text(); 

    CompressionCodec Codec = new GzipCodec(); 
    SequenceFile.Writer writer = null; 
    Option optPath = SequenceFile.Writer.file(path); 
    Option optKey = SequenceFile.Writer.keyClass(key.getClass()); 
    Option optVal = SequenceFile.Writer.valueClass(value.getClass()); 
    Option optCom = SequenceFile.Writer.compression(CompressionType.RECORD, Codec); 

     writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCom); 

Odpowiedz

12
public class SequenceFilesTest { 
    @Test 
    public void testSeqFileReadWrite() throws IOException { 
    Configuration conf = new Configuration(); 
    FileSystem fs = FileSystem.getLocal(conf); 
    Path seqFilePath = new Path("file.seq"); 
    SequenceFile.Writer writer = SequenceFile.createWriter(conf, 
      Writer.file(seqFilePath), Writer.keyClass(Text.class), 
      Writer.valueClass(IntWritable.class)); 

    writer.append(new Text("key1"), new IntWritable(1)); 
    writer.append(new Text("key2"), new IntWritable(2)); 

    writer.close(); 

    SequenceFile.Reader reader = new SequenceFile.Reader(conf, 
      Reader.file(seqFilePath)); 

    Text key = new Text(); 
    IntWritable val = new IntWritable(); 

    while (reader.next(key, val)) { 
     System.err.println(key + "\t" + val); 
    } 

    reader.close(); 
    } 
} 
+1

to nadal używać starego API 'czytnika SequenceFile.Reader = new Reader (fs, seqFilePath, conf);' –

+0

Dzięki za wysiłek –

+0

Co masz na myśli przez starego API (mapred vs MapReduce to? kod używa niczego z tych pakietów) –

-4

Musisz ustawić SequenceFile jako formatu wejściowego

job.setInputFormatClass(SequenceFileInputFormat.class); 

znajdziesz przykład czytając formularz SeequnceFile HDFS here.

7

spóźnię się o więcej niż rok odpowiedzieć, ale po prostu zaczęło z Hadoop 2.4.1 :)

Poniżej znajduje się kod, ktoś może uznać za użyteczne.

Uwaga: Zawiera skomentowany kod 1.x do odczytu i zapisu pliku sekwencji. Zastanawiałem się, skąd ona podnieść system plików, ale kiedy wykonywany bezpośrednio na klastrze, to podniósł go prawidłowo (prawdopodobnie od podstawowej site.xml jak wspomniano w Configuration

import java.io.File; 
import java.io.IOException; 
import java.io.RandomAccessFile; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.hadoop.io.SequenceFile; 
import org.apache.hadoop.io.SequenceFile.Reader.Option; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.util.ReflectionUtils; 

public class SequenceFileOperator { 

    private Configuration conf = new Configuration(); 
    /*private FileSystem fs; 
    { 
     try { 
      fs = FileSystem.get(URI.create("hdfs://cldx-1336-1202:9000"), conf); 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    }*/ 

    public static void main(String[] args) throws IOException { 
     // TODO Auto-generated method stub 

     if (args == null || args.length < 2) { 

      System.out 
        .println("Following are the possible invocations <operation id> <arg1> <arg2> ..."); 

      System.out 
        .println("1 <absolute path of directory containing documents> <HDFS path of the sequence file"); 

      System.out.println("2 <HDFS path of the sequence file>"); 
      return; 
     } 

     int operation = Integer.valueOf(args[0]); 

     SequenceFileOperator docToSeqFileWriter = new SequenceFileOperator(); 

     switch (operation) { 

     case 1: { 
      String docDirectoryPath = args[1]; 
      String sequenceFilePath = args[2]; 

      System.out.println("Writing files present at " + docDirectoryPath 
        + " to the sequence file " + sequenceFilePath); 

      docToSeqFileWriter.loadDocumentsToSequenceFile(docDirectoryPath, 
        sequenceFilePath); 

      break; 
     } 

     case 2: { 

      String sequenceFilePath = args[1]; 

      System.out.println("Reading the sequence file " + sequenceFilePath); 

      docToSeqFileWriter.readSequenceFile(sequenceFilePath); 

      break; 
     } 

     } 

    } 

    private void readSequenceFile(String sequenceFilePath) throws IOException { 
     // TODO Auto-generated method stub 

     /* 
     * SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(fs, 
     * new Path(sequenceFilePath), conf); 
     */ 
     Option filePath = SequenceFile.Reader.file(new Path(sequenceFilePath)); 
     SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(conf, 
       filePath); 

     Writable key = (Writable) ReflectionUtils.newInstance(
       sequenceFileReader.getKeyClass(), conf); 
     Writable value = (Writable) ReflectionUtils.newInstance(
       sequenceFileReader.getValueClass(), conf); 

     try { 

      while (sequenceFileReader.next(key, value)) { 

       System.out 
         .printf("[%s] %s %s \n", 
           sequenceFileReader.getPosition(), key, 
           value.getClass()); 
      } 
     } finally { 
      IOUtils.closeStream(sequenceFileReader); 
     } 

    } 

    private void loadDocumentsToSequenceFile(String docDirectoryPath, 
      String sequenceFilePath) throws IOException { 
     // TODO Auto-generated method stub 

     File docDirectory = new File(docDirectoryPath); 

     if (!docDirectory.isDirectory()) { 
      System.out 
        .println("Please provide an absolute path of a directory that contains the documents to be added to the sequence file"); 
      return; 
     } 

     /* 
     * SequenceFile.Writer sequenceFileWriter = 
     * SequenceFile.createWriter(fs, conf, new Path(sequenceFilePath), 
     * Text.class, BytesWritable.class); 
     */ 
     org.apache.hadoop.io.SequenceFile.Writer.Option filePath = SequenceFile.Writer 
       .file(new Path(sequenceFilePath)); 
     org.apache.hadoop.io.SequenceFile.Writer.Option keyClass = SequenceFile.Writer 
       .keyClass(Text.class); 
     org.apache.hadoop.io.SequenceFile.Writer.Option valueClass = SequenceFile.Writer 
       .valueClass(BytesWritable.class); 

     SequenceFile.Writer sequenceFileWriter = SequenceFile.createWriter(
       conf, filePath, keyClass, valueClass); 

     File[] documents = docDirectory.listFiles(); 

     try { 
      for (File document : documents) { 

       RandomAccessFile raf = new RandomAccessFile(document, "r"); 
       byte[] content = new byte[(int) raf.length()]; 

       raf.readFully(content); 

       sequenceFileWriter.append(new Text(document.getName()), 
         new BytesWritable(content)); 

       raf.close(); 
      } 
     } finally { 
      IOUtils.closeStream(sequenceFileWriter); 
     } 

    } 
} 
0

do odczytu można użyć

Path path= new Path("/bar"); 
Reader sequenceFileReader = new SequenceFile.Reader(conf,SequenceFile.Reader.file(path)); 
Powiązane problemy