2012-02-15 13 views
6

Sto cercando di imparare a utilizzare l'API Python di Yelp per MapReduce, MRJob. Il loro semplice esempio di contatore di parole ha senso, ma sono curioso di sapere come gestire un'applicazione che coinvolge più input. Per esempio, invece di contare semplicemente le parole in un documento, moltiplicando un vettore per matrice. Sono venuto con questa soluzione, quali funzioni, ma sente stupido:Più ingressi con MRJob

class MatrixVectMultiplyTast(MRJob): 
    def multiply(self,key,line): 
      line = map(float,line.split(" ")) 
      v,col = line[-1],line[:-1] 

      for i in xrange(len(col)): 
        yield i,col[i]*v 

    def sum(self,i,occurrences): 
      yield i,sum(occurrences) 

    def steps(self): 
      return [self.mr (self.multiply,self.sum),] 

if __name__=="__main__": 
    MatrixVectMultiplyTast.run() 

Questo codice viene eseguito ./matrix.py < input.txt e il motivo per cui funziona è che la matrice salvata in input.txt da colonne, con il valore del vettore corrispondente nella parte fine della linea.

Quindi, la seguente matrice e vettore:

enter image description here

sono rappresentati come input.txt come:

enter image description here

In breve, come andrei circa memorizzazione della matrice e vector più naturalmente in file separati e passandoli entrambi in MRJob?

risposta

3

Se hai bisogno di elaborare i dati grezzi contro un altro (o lo stesso row_i, row_j) insieme di dati, è possibile:

1) Creare un secchio S3 per archiviare una copia dei tuoi dati. Passa il percorso di questa copia alla tua classe di attività, ad es. self.options.bucket e self.options.my_datafile_copy_location nel codice sottostante. Avvertenza: Purtroppo, sembra che l'intero file debba essere "scaricato" sulle macchine task prima di essere processato. Se le connessioni vacilla o impiega troppo tempo per caricarsi, questo lavoro potrebbe fallire. Ecco un codice Python/MRJob per farlo.

mettere questo nella funzione mapper:

d1 = line1.split('\t', 1) 
v1, col1 = d1[0], d1[1] 
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>) 
bucket = conn.get_bucket(self.options.bucket) # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING) 
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip() 
### CAVEAT: Needs to get the whole file before processing the rest. 
for line2 in data_copy.split('\n'): 
    d2 = line2.split('\t', 1) 
    v2, col2 = d2[0], d2[1] 
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here: 
    yield <your output key, value pairs> 
conn.close() 

2) creare un dominio SimpleDB, e memorizzare tutti i dati in là. Leggi qui il boto e SimpleDB: http://code.google.com/p/boto/wiki/SimpleDbIntro

Il codice mapper sarebbe simile a questa:

dline = dline.strip() 
d0 = dline.split('\t', 1) 
v1, c1 = d0[0], d0[1] 
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>) 
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME) 
for item in domain: 
    v2, c2 = item.name, item['column'] 
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here: 
    yield <your output key, value pairs> 
sdb.close() 

Questa seconda opzione può eseguire meglio se si dispone di grandi quantità di dati, dal momento che può fare le richieste per ogni riga di dati anziché l'intero importo in una volta. Tieni presente che i valori SimpleDB possono avere una lunghezza massima di 1024 caratteri, pertanto potresti dover comprimere/decomprimere tramite un metodo se i valori dei dati sono più lunghi.

1

A mio avviso, non usereste MrJob a meno che non desideriate sfruttare il cluster Hadoop oi servizi Hadoop da Amazon, anche se l'esempio utilizza i file locali.

MrJob utilizza principalmente "Hadoop streaming" per inviare il lavoro.

Ciò significa che tutti gli input specificati come file o cartelle da Hadoop sono trasmessi in streaming al programma di mappatura e risultati successivi al riduttore. Tutti i mapper ottengono una porzione di input e considerano schematicamente tutti gli input come analoghi in modo da analizzare ed elaborare in modo uniforme la chiave, valore per ogni slice di dati.

Derivando da questa comprensione, gli input sono schematicamente gli stessi del mappatore.L'unico modo per includere due dati schematici diversi è quello di interlacciarli nello stesso file in modo tale che il mappatore possa capire quali sono i dati vettoriali e quali dati matrice.

You are actually doing it already. 

Si può semplicemente migliorare avendo un identificatore se una linea è un dato matrice o un dato vettoriale. Una volta visualizzati i dati vettoriali, vengono applicati i dati della matrice precedenti.

matrix, 1, 2, ... 
matrix, 2, 4, ... 
vector, 3, 4, ... 
matrix, 1, 2, ... 
..... 

Ma il processo che hai citato funziona bene. Devi avere tutti i dati schematici in un singolo file.

Questo ha ancora problemi però. K, la mappa V riduce le prestazioni quando lo schema completo è presente in una singola riga e contiene un'unità di elaborazione singola completa.

Da quanto ho capito, lo state già facendo correttamente, ma suppongo che Map-Reduce non sia un meccanismo adatto per questo tipo di dati. Spero che qualcuno lo chiarisca ulteriormente.

2

La risposta effettiva alla tua domanda è che mrjob non supporta ancora lo schema di join in streaming di hadoop, che è quello di leggere la variabile di ambiente map_input_file (che espone la proprietà map.input.file) per determinare quale tipo di file tu si stanno occupando in base al suo percorso e/o nome.

Si potrebbe ancora essere in grado di tirarlo fuori, se si può facilmente rilevare da solo la lettura dei dati stessi quale tipo appartiene, come viene visualizzato in questo articolo:

http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

Tuttavia questo non è sempre possibile ...

In caso contrario, myjob sembra fantastico e vorrei poter aggiungere supporto per questo in futuro. Fino ad allora questo è praticamente un rompicapo per me.

1

Questo è il modo in cui utilizzo più input e in base al nome del file apportare le modifiche appropriate nella fase di mapper.

Programma Runner:

from mrjob.hadoop import * 


#Define all arguments 

os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/' 
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME'))) 
job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S') 
hadoop_bin = '/usr/bin/hadoop' 
mode = 'hadoop' 
hs = HadoopFilesystem([hadoop_bin]) 

input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"] 

aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin] 
aargs.extend(input_file_names) 
aargs.extend(['-o',output_dir]) 
print aargs 
status_file = True 

mr_job = MRJob(args=aargs) 
with mr_job.make_runner() as runner: 
    runner.run() 
os.environ['HADOOP_HOME'] = '' 
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME'))) 

Il MRJob Classe:

class MR_Job(MRJob): 
    DEFAULT_OUTPUT_PROTOCOL = 'repr_value' 
    def mapper(self, _, line): 
    """ 
    This function reads lines from file. 
    """ 
    try: 
     #Need to clean email. 
     input_file_name = get_jobconf_value('map.input.file').split('/')[-2] 
       """ 
       Mapper code 
       """ 
    except Exception, e: 
     print e 

    def reducer(self, email_id,visitor_id__date_time): 
    try: 
     """ 
       Reducer Code 
       """ 
    except: 
     pass 


if __name__ == '__main__': 
    MRV_Email.run() 
Problemi correlati