2012-06-11 7 views
6

Un pattern comune nel mio elaborazione dei dati è raggruppare per un gruppo di colonne, applicare un filtro, quindi appiattire nuovamente. Per esempio:Apache Pig: prefisso dello spazio dei nomi strip (: :) dopo l'operazione di gruppo

my_data_grouped = group my_data by some_column; 
my_data_grouped = filter my_data_grouped by <some expression>; 
my_data = foreach my_data_grouped flatten(my_data); 

Il problema qui è che se my_data inizia con uno schema simile (c1, c2, c3) dopo questa operazione si avrà uno schema simile (mydata :: c1, c2 mydata ::, MYDATA :: c3). C'è un modo per rimuovere facilmente il prefisso "mydata ::" se le colonne sono univoche?

So che posso fare qualcosa di simile:

my_data = foreach my_data generate c1 as c1, c2 as c2, c3 as c3; 

Tuttavia che diventa scomodo e difficile da mantenere per insiemi di dati con un sacco di colonne ed è impossibile per i set di dati con le colonne variabili.

risposta

1

È possibile inserire l'istruzione 'AS' sulla stessa riga di 'foreach'.

cioè

my_data_grouped = group my_data by some_column; 
my_data_grouped = filter my_data_grouped by <some expression>; 
my_data = FOREACH my_data_grouped FLATTEN(my_data) AS (c1, c2, c3); 

Tuttavia, questa è proprio la stessa cosa che farlo su 2 linee, e non alleviare il problema di 'insiemi di dati con le colonne variabili.

3

Se tutti i campi di uno schema hanno lo stesso set di prefissi (ad esempio group1 :: id, group1 :: amount, ecc.) È possibile ignorare il prefisso quando si fa riferimento a campi specifici (e basta fare riferimento a id, importo, ecc.)

in alternativa, se siete ancora alla ricerca di spogliare uno schema di un unico livello di prefisso è possibile utilizzare un UDF come questo:

public class RemoveGroupFromTupleSchema extends EvalFunc<Tuple> { 

@Override 
public Tuple exec(Tuple input) throws IOException { 
    Tuple result = input; 
    return result; 
} 


@Override 
public Schema outputSchema(Schema input) throws FrontendException { 
    if(input.size() != 1) { 
     throw new RuntimeException("Expected input (tuple) but input does not have 1 field"); 
    } 

    List<Schema.FieldSchema> inputSchema = input.getFields(); 
    List<Schema.FieldSchema> outputSchema = new ArrayList<Schema.FieldSchema>(inputSchema); 
    for(int i = 0; i < inputSchema.size(); i++) { 
     Schema.FieldSchema thisInputFieldSchema = inputSchema.get(i); 
     String inputFieldName = thisInputFieldSchema.alias; 
     Byte dataType = thisInputFieldSchema.type; 

     String outputFieldName; 
     int findLoc = inputFieldName.indexOf("::"); 
     if(findLoc == -1) { 
      outputFieldName = inputFieldName; 
     } 
     else { 
      outputFieldName = inputFieldName.substring(findLoc+2); 
     } 
     Schema.FieldSchema thisOutputFieldSchema = new Schema.FieldSchema(outputFieldName, dataType); 
     outputSchema.set(i, thisOutputFieldSchema); 
    } 

    return new Schema(outputSchema); 
} 
} 
+0

Come usare questo UDF? Grazie in anticipo. –

Problemi correlati