2012-07-02 12 views
5

Lubię generować wiele krotek z pojedynczej krotki. Mam na myśli: Mam plik z następującymi danymi.Dzielenie krotki na wiele krotek w Pig

>> cat data 
ID | ColumnName1:Value1 | ColumnName2:Value2 

więc załadować go za pomocą następującego polecenia

grunt >> A = load '$data' using PigStorage('|');  
grunt >> dump A;  
(ID,ColumnName1:Value1,ColumnName2:Value2) 

Teraz chcę podzielić to na dwie krotki krotki.

(ID, ColumnName1, Value1) 
(ID, ColumnName2, Value2) 

Czy mogę używać UDF razem z foreach i generować. Coś takiego jak poniższe?

grunt >> foreach A generate SOMEUDF(A) 

EDIT:

krotka wejściowe: (ID1, kolumna1, column2) wyjściowy: dwie krotki (ID1, COLUMN1) i (ID2, column2), więc to jest Lista czy powinienem zwrócić torba?

public class SPLITTUPPLE extends EvalFunc <List<Tuple>> 
{ 
    public List<Tuple> exec(Tuple input) throws IOException { 
     if (input == null || input.size() == 0) 
      return null; 
     try{ 
      // not sure how whether I can create tuples on my own. Looks like I should use TupleFactory. 
      // return list of tuples. 
     }catch(Exception e){ 
      throw WrappedIOException.wrap("Caught exception processing input row ", e); 
     } 
    } 
} 

Czy to podejście jest prawidłowe?

Odpowiedz

10

Można napisać UDF lub użyć skryptu PIG z wbudowanymi funkcjami.

Na przykład:

-- data should be chararray, PigStorage('|') return bytearray which will not work for this example 
inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray); 

-- split by | and create a row so we can dereference it later 
splt = foreach inpt generate FLATTEN(STRSPLIT($0, '\\|')) ; 

-- first column is id, rest is converted into a bag and flatten it to make rows 
id_vals = foreach splt generate $0 as id, FLATTEN(TOBAG(*)) as value; 
-- there will be records with (id, id), but id should not have ':' 
id_vals = foreach id_vals generate id, INDEXOF(value, ':') as p, STRSPLIT(value, ':', 2) as vals; 
final = foreach (filter id_vals by p != -1) generate id, FLATTEN(vals) as (col, val); 
dump final; 

testu wejść:

1|c1:11:33|c2:12 
234|c1:21|c2:22 
33|c1:31|c2:32 
345|c1:41|c2:42 

WYJŚCIE

(1,c1,11:33) 
(1,c2,12) 
(234,c1,21) 
(234,c2,22) 
(33,c1,31) 
(33,c2,32) 
(345,c1,41) 
(345,c2,42) 

Mam nadzieję, że to pomaga.

Pozdrawiam.

+0

Wielkie dzięki. Czy mogę zrobić to samo z pisaniem UDF. Aktualizuję pytanie. – FourOfAKind

+0

Tak, możesz. Zobacz następną odpowiedź. – alexeipab

+0

To wielka pomoc. Dziękuję za Twój czas. – FourOfAKind

6

Oto wersja UDF. Wolę wrócić worka:

import java.io.IOException; 

import org.apache.pig.EvalFunc; 
import org.apache.pig.backend.executionengine.ExecException; 
import org.apache.pig.data.BagFactory; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.data.DataType; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.TupleFactory; 
import org.apache.pig.impl.logicalLayer.FrontendException; 
import org.apache.pig.impl.logicalLayer.schema.Schema; 

/** 
* Converts input chararray "ID|ColumnName1:Value1|ColumnName2:Value2|.." into a bag 
* {(ID, ColumnName1, Value1), (ID, ColumnName2, Value2), ...} 
* 
* Default rows separator is '|' and key value separator is ':'. 
* In this implementation white spaces around separator characters are not removed. 
* ID can be made of any character (including sequence of white spaces). 
* @author 
* 
*/ 
public class TupleToBagColumnValuePairs extends EvalFunc<DataBag> { 

    private static final TupleFactory tupleFactory = TupleFactory.getInstance(); 
    private static final BagFactory bagFactory = BagFactory.getInstance(); 

    //Row separator character. Default is '|'. 
    private String rowsSeparator; 
    //Column value separator character. Default i 
    private String columnValueSeparator; 

    public TupleToBagColumnValuePairs() { 
     this.rowsSeparator = "\\|"; 
     this.columnValueSeparator = ":"; 
    } 

    public TupleToBagColumnValuePairs(String rowsSeparator, String keyValueSeparator) { 
     this.rowsSeparator = rowsSeparator; 
     this.columnValueSeparator = keyValueSeparator; 
    } 

    /** 
    * Creates a tuple with 3 fields (id:chararray, column:chararray, value:chararray) 
    * @param outputBag Output tuples (id, column, value) are added to this bag 
    * @param id 
    * @param column 
    * @param value 
    * @throws ExecException 
    */ 
    protected void addTuple(DataBag outputBag, String id, String column, String value) throws ExecException { 
     Tuple outputTuple = tupleFactory.newTuple(); 
     outputTuple.append(id); 
     outputTuple.append(column); 
     outputTuple.append(value); 
     outputBag.add(outputTuple); 
    } 

    /** 
    * Takes column{separator}value from splitInputLine, splits id into column value and adds them to the outputBag as (id, column, value) 
    * @param outputBag Output tuples (id, column, value) should be added to this bag 
    * @param id 
    * @param splitInputLine format column{separator}value, which start from index 1 
    * @throws ExecException 
    */ 
    protected void parseColumnValues(DataBag outputBag, String id, 
      String[] splitInputLine) throws ExecException { 
     for (int i = 1; i < splitInputLine.length; i++) { 
      if (splitInputLine[i] != null) { 
       int columnValueSplitIndex = splitInputLine[i].indexOf(this.columnValueSeparator); 
       if (columnValueSplitIndex != -1) { 
        String column = splitInputLine[i].substring(0, columnValueSplitIndex); 
        String value = null; 
        if (columnValueSplitIndex + 1 < splitInputLine[i].length()) { 
         value = splitInputLine[i].substring(columnValueSplitIndex + 1); 
        } 
        this.addTuple(outputBag, id, column, value); 
       } else { 
        String column = splitInputLine[i]; 
        this.addTuple(outputBag, id, column, null); 
       } 
      } 
     } 
    } 

    /** 
    * input - contains only one field of type chararray, which will be split by '|' 
    * All inputs that are: null or of length 0 are ignored. 
    */ 
    @Override 
    public DataBag exec(Tuple input) throws IOException { 
     if (input == null || input.size() != 1 || input.isNull(0)) { 
      return null; 
     } 

     String inputLine = (String)input.get(0); 
     String[] splitInputLine = inputLine.split(this.rowsSeparator, -1); 

     if (splitInputLine.length > 1 && splitInputLine[0].length() > 0) { 
      String id = splitInputLine[0]; 
      DataBag outputBag = bagFactory.newDefaultBag();    
      if (splitInputLine.length == 1) { // there is just an id in the line 
       this.addTuple(outputBag, id, null, null); 
      } else { 
       this.parseColumnValues(outputBag, id, splitInputLine); 
      } 


      return outputBag; 
     } 
     return null; 
    } 

    @Override 
    public Schema outputSchema(Schema input) { 
     try { 
      if (input.size() != 1) { 
       throw new RuntimeException("Expected input to have only one field"); 
      } 

      Schema.FieldSchema inputFieldSchema = input.getField(0); 
      if (inputFieldSchema.type != DataType.CHARARRAY) { 
       throw new RuntimeException("Expected a CHARARRAY as input"); 
      } 

      Schema tupleSchema = new Schema(); 
      tupleSchema.add(new Schema.FieldSchema("id", DataType.CHARARRAY)); 
      tupleSchema.add(new Schema.FieldSchema("column", DataType.CHARARRAY)); 
      tupleSchema.add(new Schema.FieldSchema("value", DataType.CHARARRAY)); 

      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.BAG)); 
     } catch (FrontendException exx) { 
      throw new RuntimeException(exx); 
     } 
    } 

} 

Oto jak to jest stosowane w PIG:

register 'path to the jar'; 
define IdColumnValue myPackage.TupleToBagColumnValuePairs(); 

inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray); 
result = foreach inpt generate FLATTEN(IdColumnValue($0)) as (id1, c2, v2); 
dump result; 

Dobrym inspiracji do pisania UDF z torebki widoczne DataFu source code by LinkedIn

0

Można użyć TransposeTupleToBag (UDF z biblioteki DataFu) na wyjściu STRSPLIT, aby zdobyć torbę, a następnie FLATTEN worek, aby utworzyć osobny wiersz na oryginalną kolumnę.