2013-09-05 20 views
6

Vorrei poter creare un InputFormat personalizzato che legge i file di sequenza, ma espone inoltre il percorso e l'offset del file all'interno del file in cui si trova il record.Estendi SequenceFileInputFormat per includere nome file + offset

Per fare un passo indietro, ecco il caso d'uso: Ho un file di sequenza contenente dati di dimensioni variabili. Le chiavi sono per lo più irrilevanti e i valori sono fino a un paio di megabyte contenenti una varietà di campi diversi. Vorrei indicizzare alcuni di questi campi in elasticsearch insieme al nome del file e all'offset. In questo modo, posso interrogare quei campi da elasticsearch, e quindi usare il nome e l'offset del file per tornare al file di sequenza e ottenere il record originale, invece di archiviarlo in ES.

Ho questo intero processo che funziona come un singolo programma java. La classe SequenceFile.Reader fornisce opportunamente i metodi getPosition e seek per far sì che ciò accada.

Tuttavia, saranno coinvolti molti terabyte di dati, quindi sarà necessario convertirlo in un lavoro MapReduce (probabilmente solo per la mappa). Dal momento che le chiavi effettive nel file di sequenza sono irrilevanti, l'approccio che speravo di fare sarebbe stato creare un InputFormat personalizzato che estenda o utilizzi in qualche modo SquenceFileInputFormat, ma invece di restituire le chiavi effettive, restituisce invece una chiave composta costituita dal file e offset.

Tuttavia, ciò si sta dimostrando più difficile nella pratica. Sembra che dovrebbe essere possibile, ma date le API reali e ciò che è esposto, è difficile. Qualche idea? Forse un approccio alternativo che dovrei prendere?

risposta

5

Nel caso in cui qualcuno incontri un problema simile, ecco la soluzione che ho trovato. Alla fine ho semplicemente duplicato parte del codice in SequenceFileInputFormat/RecordReader e semplicemente modificandolo. Avevo sperato di scrivere sia una sottoclasse o di un decoratore o qualcosa ... in questo modo non è abbastanza, ma funziona:

SequenceFileOffsetInputFormat.java:

import java.io.IOException; 
import java.util.List; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.SequenceFile; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 

public class SequenceFileOffsetInputFormat<V extends Writable> extends FileInputFormat<PathOffsetWritable, V> { 

    private static class SequenceFileOffsetRecordReader<V extends Writable> extends RecordReader<PathOffsetWritable, V> { 

     private SequenceFile.Reader in; 
     private long start; 
     private long end; 
     private boolean more = true; 
     private PathOffsetWritable key = null; 
     private Writable k = null; 
     private V value = null; 
     private Configuration conf; 

     @Override 
     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 
      FileSplit fileSplit = (FileSplit) split; 
      conf = context.getConfiguration(); 
      Path path = fileSplit.getPath(); 
      FileSystem fs = path.getFileSystem(conf); 
      this.in = new SequenceFile.Reader(fs, path, conf); 
      try { 
       this.k = (Writable) in.getKeyClass().newInstance(); 
       this.value = (V) in.getValueClass().newInstance(); 
      } catch (InstantiationException e) { 
       throw new IOException(e); 
      } catch (IllegalAccessException e) { 
       throw new IOException(e); 
      } 
      this.end = fileSplit.getStart() + fileSplit.getLength(); 

      if (fileSplit.getStart() > in.getPosition()) { 
       in.sync(fileSplit.getStart()); 
      } 

      this.start = in.getPosition(); 
      more = start < end; 

      key = new PathOffsetWritable(path, start); 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      if (!more) { 
       return false; 
      } 
      long pos = in.getPosition(); 

      more = in.next(k, value); 
      if (!more || (pos >= end && in.syncSeen())) { 
       key = null; 
       value = null; 
       more = false; 
      } else { 
       key.setOffset(pos); 
      } 
      return more; 
     } 

     @Override 
     public PathOffsetWritable getCurrentKey() { 
      return key; 
     } 

     @Override 
     public V getCurrentValue() { 
      return value; 
     } 

     @Override 
     public float getProgress() throws IOException, InterruptedException { 
      if (end == start) { 
       return 0.0f; 
      } else { 
       return Math.min(1.0f, (in.getPosition() - start)/(float)(end - start)); 
      } 
     } 

     @Override 
     public void close() throws IOException { 
      in.close(); 
     } 

    } 

    @Override 
    public RecordReader<PathOffsetWritable, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 
     return new SequenceFileOffsetRecordReader<V>(); 
    } 

    @Override 
    public List<InputSplit> getSplits(JobContext context) throws IOException { 
     return new SequenceFileInputFormat<PathOffsetWritable, V>().getSplits(context); 
    } 

    @Override 
    public long getFormatMinSplitSize() { 
     return SequenceFile.SYNC_INTERVAL; 
    } 


} 

PathOffsetWritable.java:

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 

public class PathOffsetWritable implements WritableComparable<PathOffsetWritable> { 

    private Text t = new Text(); 
    private Path path; 
    private long offset; 

    public PathOffsetWritable(Path path, long offset) { 
     this.path = path; 
     this.offset = offset; 
    } 

    public Path getPath() { 
     return path; 
    } 

    public long getOffset() { 
     return offset; 
    } 

    public void setPath(Path path) { 
     this.path = path; 
    } 

    public void setOffset(long offset) { 
     this.offset = offset; 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     t.readFields(in); 
     path = new Path(t.toString()); 
     offset = in.readLong(); 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     t.set(path.toString()); 
     t.write(out); 
     out.writeLong(offset); 
    } 

    @Override 
    public int compareTo(PathOffsetWritable o) { 
     int x = path.compareTo(o.path); 
     if (x != 0) { 
      return x; 
     } else { 
      return Long.valueOf(offset).compareTo(Long.valueOf(o.offset)); 
     } 
    } 


} 
Problemi correlati