7

Tutto funziona bene a livello locale quando lo faccio nel modo seguente:rotto Errore tubo provoca lo streaming di lavoro elastico MapReduce su AWS a fallire

cat input | python mapper.py | sort | python reducer.py 

Tuttavia, quando si esegue il lavoro di streaming MapReduce su AWS Elastic MapReduce, il lavoro non viene completare con successo. Il mapper.py viene eseguito parzialmente (lo so a causa della scrittura su stderr lungo la strada). Il mapper è interrotta da un errore di "Broken Pipe", che sono in grado di recuperare dal syslog del tentativo compito dopo non avere superato:

java.io.IOException: Broken pipe 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109) 
    at java.io.DataOutputStream.write(DataOutputStream.java:90) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51) 
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 


2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Broken pipe 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123) 
    at java.io.DataOutputStream.flush(DataOutputStream.java:106) 
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579) 
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:124) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished 
2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Bad file descriptor 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123) 
    at java.io.DataOutputStream.flush(DataOutputStream.java:106) 
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579) 
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished 
2012-03-26 07:19:05,405 INFO org.apache.hadoop.streaming.PipeMapRed (Thread-13): MRErrorThread done 
2012-03-26 07:19:05,408 INFO org.apache.hadoop.mapred.TaskLogsTruncater (main): Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1 
2012-03-26 07:19:05,519 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Initialized cache for UID to User mapping with a cache timeout of 14400 seconds. 
2012-03-26 07:19:05,520 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Got UserName hadoop for UID 106 from the native implementation 
2012-03-26 07:19:05,522 WARN org.apache.hadoop.mapred.Child (main): Error running child 
java.io.IOException: log:null 
R/W/S=7018/3/0 in:NA [rec/s] out:NA [rec/s] 
minRecWrittenToEnableSkip_=9223372036854775807 LOGNAME=null 
HOST=null 
USER=hadoop 
HADOOP_USER=null 
last Hadoop input: |null| 
last tool output: |text/html 1| 
Date: Mon Mar 26 07:19:05 UTC 2012 
java.io.IOException: Broken pipe 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109) 
    at java.io.DataOutputStream.write(DataOutputStream.java:90) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51) 
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 


    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:125) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 
2012-03-26 07:19:05,525 INFO org.apache.hadoop.mapred.Task (main): Runnning cleanup for the task 
2012-03-26 07:19:05,526 INFO org.apache.hadoop.mapred.DirectFileOutputCommitter (main): Nothing to clean up on abort since there are no temporary files written 

Ecco mapper.py. Si noti che ho scritto su stderr per fornire me con le informazioni di debug:

#!/usr/bin/env python 

import sys 
from warc import ARCFile 

def main(): 
    warc_file = ARCFile(fileobj=sys.stdin) 
    for web_page in warc_file: 
     print >> sys.stderr, '%s\t%s' % (web_page.header.content_type, 1) #For debugging 
     print '%s\t%s' % (web_page.header.content_type, 1) 
    print >> sys.stderr, 'done' #For debugging 
if __name__ == "__main__": 
    main() 

Ecco cosa ottengo in stderr per il tentativo compito quando il mapper.py viene eseguito:

text/html 1 
text/html 1 
text/html 1 

In sostanza, il il ciclo scorre 3 volte e poi si interrompe bruscamente senza che Python lanci alcun errore. (Nota: è dovrebbe essere l'output di migliaia di righe). Anche un'eccezione non rilevata dovrebbe apparire in stderr.

Poiché MapReduce funziona perfettamente sul mio computer locale, suppongo che questo sia un problema con il modo in cui Hadoop tratta l'output che sto stampando da mapper.py. Ma non ho idea di quale potrebbe essere il problema.

risposta

9

Il processo di streaming (il tuo script Python) sta terminando prematuramente. Questo può essere fatto per l'input pensante è completo (ad esempio interpretando un EOF) o un'eccezione ingerita. Ad ogni modo, Hadoop sta cercando di passare da STDIN al tuo script, ma poiché l'applicazione è terminata (e quindi STDIN non è più un descrittore di file valido), ricevi un errore BrokenPipe. Suggerirei di aggiungere tracce di stderr nel tuo script per vedere quale linea di input sta causando il problema. codifica Felice,

-Geoff

+4

babonk, può fornisci dettagli su come hai risolto il tuo problema usando questo consiglio? –

+0

Stesso. Apparentemente ho un errore simile qui: http: // StackOverflow.it/questions/18556270/aws-elastic-mapreduce-doesnt-seem-to-be-converting-the-streaming-to-j corretto, e dato che funziona quando si esegue il piping, sono in perdita per come " risolvere "per lo streaming. – Mittenchops

1

non ho alcuna esperienza con Hadoop su AWS ma ho avuto lo stesso errore su un cluster Hadoop regolare - e nel mio caso il problema era come ho iniziato python -mapper ./mapper.py -reducer ./reducer.py ha funzionato, ma -mapper python mapper.py didn' t.

Sembra inoltre utilizzare un pacchetto python non standard warc inviare i file necessari allo streamjob? -cacheFiles o -cacheArchive potrebbe essere utile.

+0

Come si includono pacchetti Python non standard? In particolare, il mapreduce di AWS elastico non sembra rendere disponibili opzioni come i file di cache. – Mittenchops

6

Questo è detto sopra, ma lasciatemi tentare di chiarire: è necessario bloccare su stdin, anche se non ne hai bisogno! Questo è non uguale ai tubi di Linux, quindi non lasciarti ingannare. Quello che succede, intuitivamente, è che lo Streaming resiste al tuo eseguibile, quindi dice "aspetta qui mentre vado a ricevere input per te". Se il tuo eseguibile si ferma per qualsiasi motivo prima che Streaming ti invii il 100% dell'input, Streaming dice "Ehi, dove è andato quell'eseguibile che mi sono alzato in piedi? ... Hmmmm ... il tubo è rotto, permettimi di sollevare quell'eccezione !" Quindi, ecco alcune codice Python, tutto ciò che fa è quello che fa il gatto, ma si noterà, questo codice non uscire fino a quando tutti gli input viene elaborato, e che è il punto chiave:

#!/usr/bin/python 
import sys 

while True: 
    s = sys.stdin.readline() 
    if not s: 
     break 
    sys.stdout.write(s) 
+1

Stavo ricevendo questo errore perché non stavo facendo nulla con l'input. Ho aggiunto questo codice (anche se non fa nulla per me) e l'errore è andato. – schoon

Problemi correlati