2015-06-02 15 views
5

Vorrei assegnare ciascuna riga del mio input un- che dovrebbe essere un numero da 0 a N - 1, dove N è il numero di righe nell'input.zipWithIndex su Apache Flink

Approssimativamente, mi piacerebbe essere in grado di fare qualcosa di simile al seguente:

val data = sc.textFile(textFilePath, numPartitions) 
val rdd = data.map(line => process(line)) 
val rddMatrixLike = rdd.zipWithIndex.map { case (v, idx) => someStuffWithIndex(idx, v) } 

Ma in Apache Flink. È possibile?

+0

Questa è una domanda interessante. Proverò a proporre un'implementazione. –

risposta

6

Questa è ora una parte della versione 0.10-SNAPSHOT di Apache Flink. Esempi per zipWithIndex(in) e zipWithUniqueId(in) sono disponibili nel Flink documentation ufficiale.

5

Ecco una semplice implementazione della funzione:

public class ZipWithIndex { 

public static void main(String[] args) throws Exception { 

    ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment(); 

    DataSet<String> in = ee.readTextFile("/home/robert/flink-workdir/debug/input"); 

    // count elements in each partition 
    DataSet<Tuple2<Integer, Long>> counts = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Integer, Long>>() { 
     @Override 
     public void mapPartition(Iterable<String> values, Collector<Tuple2<Integer, Long>> out) throws Exception { 
      long cnt = 0; 
      for (String v : values) { 
       cnt++; 
      } 
      out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), cnt)); 
     } 
    }); 

    DataSet<Tuple2<Long, String>> result = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Long, String>>() { 
     long start = 0; 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      super.open(parameters); 
      List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariable("counts"); 
      Collections.sort(offsets, new Comparator<Tuple2<Integer, Long>>() { 
       @Override 
       public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) { 
        return ZipWithIndex.compare(o1.f0, o2.f0); 
       } 
      }); 
      for(int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) { 
       start += offsets.get(i).f1; 
      } 
     } 

     @Override 
     public void mapPartition(Iterable<String> values, Collector<Tuple2<Long, String>> out) throws Exception { 
      for(String v: values) { 
       out.collect(new Tuple2<Long, String>(start++, v)); 
      } 
     } 
    }).withBroadcastSet(counts, "counts"); 
    result.print(); 

} 

public static int compare(int x, int y) { 
    return (x < y) ? -1 : ((x == y) ? 0 : 1); 
} 
} 

Questo è come funziona: sto usando la prima operazione mapPartition() di andare oltre tutti gli elementi nelle partizioni per contare quanti elementi sono lì . Ho bisogno di conoscere il numero di elementi in ogni partizione per impostare correttamente gli offset quando si assegnano gli ID agli elementi. Il risultato del primo mapPartition è un DataSet che contiene i mapping. Trasmetto questo DataSet a tutti i secondi operatori mapPartition() che assegneranno gli ID agli elementi dall'input. Nel metodo open() del secondo mapPartition() Sto calcolando l'offset per ciascuna partizione.

Probabilmente darò il codice a Flink (dopo averlo discusso con gli altri committer).

+0

Grazie Robert! Potresti anche spiegare in poche parole come funziona? Per esempio. perché usiamo 'getRuntimeContext(). getIndexOfThisSubtask()' e perché i conteggi di radiodiffusione di ogni partizione potrebbero aiutare? –

+0

Buon punto. Aggiungerò presto qualche descrizione. –

+0

Descrizione aggiunta –

Problemi correlati