2012-06-11 7 views
6

Typowym wzorem w moim przetwarzaniu danych jest grupowanie według pewnego zestawu kolumn, stosowanie filtra, a następnie ponowne spłaszczanie. Na przykład:Apache Pig: prefiks paska przestrzeni nazw (: :) po operacji grupowej

my_data_grouped = group my_data by some_column; 
my_data_grouped = filter my_data_grouped by <some expression>; 
my_data = foreach my_data_grouped flatten(my_data); 

Problem polega na tym, że jeśli my_data zaczyna się od schematu podobnego (C1, C2, C3) po tej operacji nie będzie miała takiego schematu (mydata :: C1, C2, MYDATA :: MyData :: c3). Czy istnieje sposób na łatwe usunięcie prefiksu "mydata ::", jeśli kolumny są unikatowe?

wiem, że mogę zrobić coś takiego:

my_data = foreach my_data generate c1 as c1, c2 as c2, c3 as c3; 

jednak, że staje się niewygodne i trudne do utrzymania dla zbiorów danych z wielu kolumn i jest niemożliwy do zbiorów danych o zmiennych kolumn.

Odpowiedz

1

można umieścić „jako” oświadczenie na tej samej linii jako „foreach”.

tj

my_data_grouped = group my_data by some_column; 
my_data_grouped = filter my_data_grouped by <some expression>; 
my_data = FOREACH my_data_grouped FLATTEN(my_data) AS (c1, c2, c3); 

Jednak jest to tak samo jak robią to na 2 linie, a nie łagodzić problem dla "zbiorów danych o zmiennych kolumn.

3

Jeśli wszystkie pola w schemacie mają ten sam zestaw przedrostków (np. Grupa1 :: identyfikator, grupa1 :: liczba, itd.), Można zignorować prefiks, gdy odwołuje się do określonych pól (i po prostu odsyłać je jako identyfikator, kwotę itp.)

Alternatywnie, jeśli nadal poszukuje się rozebrać schemat pojedynczego poziomu poprzedzając można użyć UDF tak:

public class RemoveGroupFromTupleSchema extends EvalFunc<Tuple> { 

@Override 
public Tuple exec(Tuple input) throws IOException { 
    Tuple result = input; 
    return result; 
} 


@Override 
public Schema outputSchema(Schema input) throws FrontendException { 
    if(input.size() != 1) { 
     throw new RuntimeException("Expected input (tuple) but input does not have 1 field"); 
    } 

    List<Schema.FieldSchema> inputSchema = input.getFields(); 
    List<Schema.FieldSchema> outputSchema = new ArrayList<Schema.FieldSchema>(inputSchema); 
    for(int i = 0; i < inputSchema.size(); i++) { 
     Schema.FieldSchema thisInputFieldSchema = inputSchema.get(i); 
     String inputFieldName = thisInputFieldSchema.alias; 
     Byte dataType = thisInputFieldSchema.type; 

     String outputFieldName; 
     int findLoc = inputFieldName.indexOf("::"); 
     if(findLoc == -1) { 
      outputFieldName = inputFieldName; 
     } 
     else { 
      outputFieldName = inputFieldName.substring(findLoc+2); 
     } 
     Schema.FieldSchema thisOutputFieldSchema = new Schema.FieldSchema(outputFieldName, dataType); 
     outputSchema.set(i, thisOutputFieldSchema); 
    } 

    return new Schema(outputSchema); 
} 
} 
+0

Sposób korzystania z tego UDF? Z góry dziękuję. –

Powiązane problemy