Ecco una semplice implementazione della funzione:
public class ZipWithIndex {
public static void main(String[] args) throws Exception {
ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> in = ee.readTextFile("/home/robert/flink-workdir/debug/input");
// count elements in each partition
DataSet<Tuple2<Integer, Long>> counts = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Integer, Long>>() {
@Override
public void mapPartition(Iterable<String> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
long cnt = 0;
for (String v : values) {
cnt++;
}
out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), cnt));
}
});
DataSet<Tuple2<Long, String>> result = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Long, String>>() {
long start = 0;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariable("counts");
Collections.sort(offsets, new Comparator<Tuple2<Integer, Long>>() {
@Override
public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) {
return ZipWithIndex.compare(o1.f0, o2.f0);
}
});
for(int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) {
start += offsets.get(i).f1;
}
}
@Override
public void mapPartition(Iterable<String> values, Collector<Tuple2<Long, String>> out) throws Exception {
for(String v: values) {
out.collect(new Tuple2<Long, String>(start++, v));
}
}
}).withBroadcastSet(counts, "counts");
result.print();
}
public static int compare(int x, int y) {
return (x < y) ? -1 : ((x == y) ? 0 : 1);
}
}
Questo è come funziona: sto usando la prima operazione mapPartition()
di andare oltre tutti gli elementi nelle partizioni per contare quanti elementi sono lì . Ho bisogno di conoscere il numero di elementi in ogni partizione per impostare correttamente gli offset quando si assegnano gli ID agli elementi. Il risultato del primo mapPartition
è un DataSet che contiene i mapping. Trasmetto questo DataSet a tutti i secondi operatori mapPartition()
che assegneranno gli ID agli elementi dall'input. Nel metodo open()
del secondo mapPartition()
Sto calcolando l'offset per ciascuna partizione.
Probabilmente darò il codice a Flink (dopo averlo discusso con gli altri committer).
fonte
2015-06-02 14:26:11
Questa è una domanda interessante. Proverò a proporre un'implementazione. –