2016-03-22 12 views
10

Ho 3 tubi a cascata (uno aderire contro altri due) come descritto di seguito,generazione personalizzata join logica in cascata garantire MAP_SIDE solo

  • LHSPipe - (maggiore dimensioni)

enter image description here

  • RHSPipes - (dimensioni più piccole che potrebbe in forma per memo ry)

enter image description here

psuedocodarlo come segue, Questo esempio riguarda due si unisce

SE F1DecidingFactor = YES poi registrazione LHSPipe con RHS Lookup # 1 CON (LHSPipe.F1Input = RHS Lookup # 1.Join # F1) e impostare il risultato della ricerca (SET LHSPipe.F1Output = Result # F1) Altrimenti SET LHSPipe.F1Output = N/A

La stessa logica si applica per il calcolo F2.

i risultati attesi,

enter image description here

Questo scenario mi ha costretto ad andare con personalizzato Registrati operazione come if-else decide se aderire o meno.

Considerando lo scenario precedente, vorrei partecipare al join MAP-SIDE (mantenendo RHSPipe in memoria del nodo attività MAP), stavo pensando alle soluzioni possibili seguenti, ognuna ha i suoi pro e contro. Hai bisogno dei tuoi suggerimenti su questi.

Opzione # 1:

CoGroup - Siamo in grado di costruire su misura unire logica utilizzando CoGroup con BufferJoiner seguita da personalizzato join (operazione), ma che wouldnt garantire MAP-SIDE unirsi.

Opzione # 2:

HashJoin - Assicura MAP-SIDE unirsi, ma per quanto vedo personalizzato unire non può essere costruito utilizzando questo.

Si prega di correggere la mia comprensione e suggerire le vostre opinioni per lavorare su questo requisito.

Grazie in anticipo.

+0

È possibile fornire il codice di esempio e anche cosa si desidera fare in join personalizzato? – Ambrish

+0

Anche i dati di input di esempio e l'output previsto saranno utili. – Ambrish

+0

Hai preso in considerazione il partizionamento dei dati in sottoinsiemi? – kpie

risposta

1

Il modo migliore per risolvere questo problema (che posso pensare) è modificare il set di dati più piccolo. È possibile aggiungere un nuovo campo (F1DecidingFactor) al set di dati più piccolo.Il valore di F1Result può vorrei: codice

Sudo

if F1DecidingFactor == "Yes" then 
    F1Result = ACTUAL_VALUE 
else 
    F1Result = "N/A" 

Tabella Risultato

|F1#Join|F1#Result|F1#DecidingFactor| 
| Yes|  0|    True| 
| Yes|  1|   False| 
|  No|  0|    N/A| 
|  No|  1|    N/A| 

Si può fare sopra via a cascata pure.

Dopo questo, puoi fare il tuo lato della mappa.

Se non è possibile modificare un set di dati più piccolo, sono disponibili 2 opzioni per risolvere il problema.

Opzione 1

aggiungere nuovi campi per i vostri piccoli tubi che equivale a voi decidere fattore (cioè F1DecidingFactor_RHS = Yes). Quindi includilo nei criteri di partecipazione. Una volta terminato il join, si avranno valori solo per le righe in cui questa condizione corrisponde. Altrimenti sarà nullo/vuoto. Esempio di codice:

principale Classe

import cascading.operation.Insert; 
import cascading.pipe.Each; 
import cascading.pipe.HashJoin; 
import cascading.pipe.Pipe; 
import cascading.pipe.assembly.Discard; 
import cascading.pipe.joiner.LeftJoin; 
import cascading.tuple.Fields; 

public class StackHashJoinTestOption2 { 
    public StackHashJoinTestOption2() { 
     Fields f1Input = new Fields("F1Input"); 
     Fields f2Input = new Fields("F2Input"); 
     Fields f1Join = new Fields("F1Join"); 
     Fields f2Join = new Fields("F2Join"); 

     Fields f1DecidingFactor = new Fields("F1DecidingFactor"); 
     Fields f2DecidingFactor = new Fields("F2DecidingFactor"); 
     Fields f1DecidingFactorRhs = new Fields("F1DecidingFactor_RHS"); 
     Fields f2DecidingFactorRhs = new Fields("F2DecidingFactor_RHS"); 

     Fields lhsJoinerOne = f1DecidingFactor.append(f1Input); 
     Fields lhsJoinerTwo = f2DecidingFactor.append(f2Input); 

     Fields rhsJoinerOne = f1DecidingFactorRhs.append(f1Join); 
     Fields rhsJoinerTwo = f2DecidingFactorRhs.append(f2Join); 

     Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output"); 

     // Large Pipe fields : 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input 
     Pipe largePipe = new Pipe("large-pipe"); 

     // Small Pipe 1 Fields : 
     // F1Join F1Result 
     Pipe rhsOne = new Pipe("small-pipe-1"); 

     // New field to small pipe. Expected Fields: 
     // F1Join F1Result F1DecidingFactor_RHS 
     rhsOne = new Each(rhsOne, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL); 

     // Small Pipe 2 Fields : 
     // F2Join F2Result 
     Pipe rhsTwo = new Pipe("small-pipe-2"); 

     // New field to small pipe. Expected Fields: 
     // F2Join F2Result F2DecidingFactor_RHS 
     rhsTwo = new Each(rhsTwo, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL); 

     // Joining first small pipe. Expected fields after join: 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS 
     Pipe resultsOne = new HashJoin(largePipe, lhsJoinerOne, rhsOne, rhsJoinerOne, new LeftJoin()); 

     // Joining second small pipe. Expected fields after join: 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS F2Join F2Result F2DecidingFactor_RHS 
     Pipe resultsTwo = new HashJoin(resultsOne, lhsJoinerTwo, rhsTwo, rhsJoinerTwo, new LeftJoin()); 

     Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE); 

     result = new Discard(result, f1DecidingFactorRhs); 
     result = new Discard(result, f2DecidingFactorRhs); 

     // result Pipe should have expected result 
    } 
} 

Opzione 2

Se si vuole avere il valore di default al posto di null/vuoto, allora vorrei suggerire di fare il primo HashJoin con predefinito Joiner seguiti da una funzione per aggiornare le tuple con valori appropriati. Qualcosa di simile:

classe principale

import cascading.pipe.Each; 
import cascading.pipe.HashJoin; 
import cascading.pipe.Pipe; 
import cascading.pipe.joiner.LeftJoin; 
import cascading.tuple.Fields; 

public class StackHashJoinTest { 
    public StackHashJoinTest() { 
     Fields f1Input = new Fields("F1Input"); 
     Fields f2Input = new Fields("F2Input"); 
     Fields f1Join = new Fields("F1Join"); 
     Fields f2Join = new Fields("F2Join"); 

     Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output"); 

     // Large Pipe fields : 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input 
     Pipe largePipe = new Pipe("large-pipe"); 

     // Small Pipe 1 Fields : 
     // F1Join F1Result 
     Pipe rhsOne = new Pipe("small-pipe-1"); 

     // Small Pipe 2 Fields : 
     // F2Join F2Result 
     Pipe rhsTwo = new Pipe("small-pipe-2"); 

     // Joining first small pipe. 
     // Expected fields after join: 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result 
     Pipe resultsOne = new HashJoin(largePipe, f1Input, rhsOne, f1Join, new LeftJoin()); 

     // Joining second small pipe. 
     // Expected fields after join: 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F2Join F2Result 
     Pipe resultsTwo = new HashJoin(resultsOne, f2Input, rhsTwo, f2Join, new LeftJoin()); 

     Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE); 

     // result Pipe should have expected result 
    } 
} 

Aggiornamento Funzione

import cascading.flow.FlowProcess; 
import cascading.operation.BaseOperation; 
import cascading.operation.Function; 
import cascading.operation.FunctionCall; 
import cascading.tuple.Fields; 
import cascading.tuple.TupleEntry; 

public class TestFunction extends BaseOperation<Void> implements Function<Void> { 

    private static final long serialVersionUID = 1L; 

    private static final String DECIDING_FACTOR = "No"; 
    private static final String DEFAULT_VALUE = "N/A"; 

    // Expected Fields: "F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output" 
    public TestFunction() { 
     super(Fields.ARGS); 
    } 

    @Override 
    public void operate(@SuppressWarnings("rawtypes") FlowProcess process, FunctionCall<Void> call) { 
     TupleEntry arguments = call.getArguments(); 

     TupleEntry result = new TupleEntry(arguments); 

     if (result.getString("F1DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) { 
      result.setString("F1Output", DEFAULT_VALUE); 
     } 

     if (result.getString("F2DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) { 
      result.setString("F2Output", DEFAULT_VALUE); 
     } 

     call.getOutputCollector().add(result); 
    } 

} 

Riferimenti

Questo dovrebbe risolvere il problema. Fammi sapere se questo aiuta.

Problemi correlati