7

Wszystko działa prawidłowo lokalnie kiedy zrobić w następujący sposób:Broken pipe Błąd powoduje strumieniowego Elastic MapReduce pracę na AWS niepowodzenie

cat input | python mapper.py | sort | python reducer.py 

Jednak gdy uruchamiam zadanie strumieniowe MapReduce na AWS Elastic MapReduce, zadanie nie zakończyć pomyślnie. The mapper.py przebiega w połowie drogi (wiem o tym z powodu pisania na adres stderr po drodze). Element odwzorowujący zostaje przerwana przez błąd „Broken pipe”, które jestem w stanie odzyskać od syslog zamachu zadań po niepowodzeniu:

java.io.IOException: Broken pipe 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109) 
    at java.io.DataOutputStream.write(DataOutputStream.java:90) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51) 
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 


2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Broken pipe 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123) 
    at java.io.DataOutputStream.flush(DataOutputStream.java:106) 
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579) 
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:124) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished 
2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Bad file descriptor 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123) 
    at java.io.DataOutputStream.flush(DataOutputStream.java:106) 
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579) 
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished 
2012-03-26 07:19:05,405 INFO org.apache.hadoop.streaming.PipeMapRed (Thread-13): MRErrorThread done 
2012-03-26 07:19:05,408 INFO org.apache.hadoop.mapred.TaskLogsTruncater (main): Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1 
2012-03-26 07:19:05,519 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Initialized cache for UID to User mapping with a cache timeout of 14400 seconds. 
2012-03-26 07:19:05,520 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Got UserName hadoop for UID 106 from the native implementation 
2012-03-26 07:19:05,522 WARN org.apache.hadoop.mapred.Child (main): Error running child 
java.io.IOException: log:null 
R/W/S=7018/3/0 in:NA [rec/s] out:NA [rec/s] 
minRecWrittenToEnableSkip_=9223372036854775807 LOGNAME=null 
HOST=null 
USER=hadoop 
HADOOP_USER=null 
last Hadoop input: |null| 
last tool output: |text/html 1| 
Date: Mon Mar 26 07:19:05 UTC 2012 
java.io.IOException: Broken pipe 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109) 
    at java.io.DataOutputStream.write(DataOutputStream.java:90) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51) 
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 


    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:125) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 
2012-03-26 07:19:05,525 INFO org.apache.hadoop.mapred.Task (main): Runnning cleanup for the task 
2012-03-26 07:19:05,526 INFO org.apache.hadoop.mapred.DirectFileOutputCommitter (main): Nothing to clean up on abort since there are no temporary files written 

Oto mapper.py. Zauważ, że piszę do stderr, aby zapewnić sobie debugowania info:

#!/usr/bin/env python 

import sys 
from warc import ARCFile 

def main(): 
    warc_file = ARCFile(fileobj=sys.stdin) 
    for web_page in warc_file: 
     print >> sys.stderr, '%s\t%s' % (web_page.header.content_type, 1) #For debugging 
     print '%s\t%s' % (web_page.header.content_type, 1) 
    print >> sys.stderr, 'done' #For debugging 
if __name__ == "__main__": 
    main() 

Oto co mam na stderr na próbie zadań gdy mapper.py jest prowadzony:

text/html 1 
text/html 1 
text/html 1 

Zasadniczo, Pętla przechodzi 3 razy, a następnie zatrzymuje się nagle bez pytona rzucającego żadnego błędu. (Uwaga: to powinno być wyprowadzać tysiące linii). Nawet nieprzechwycony wyjątek powinien pojawić się w stderr.

Ponieważ MapReduce działa zupełnie dobrze na moim lokalnym komputerze, domyślam się, że jest to problem z tym, jak Hadoop radzi sobie z wydrukami, które drukuję z mapper.py. Ale nie mam pojęcia, jaki może być problem.

Odpowiedz

9

Proces przesyłania strumieniowego (skrypt w języku Python) kończy się przedwcześnie. Może to być spowodowane tym, że wprowadzanie myślowe jest kompletne (na przykład interpretacja EOF) lub wyjątek połknięty. Tak czy inaczej, Hadoop próbuje wprowadzić do STDIN skrypt, ale ponieważ aplikacja została zakończona (a tym samym STDIN nie jest już prawidłowym deskryptorem pliku), pojawia się błąd BrokenPipe. Sugerowałbym dodanie do skryptu śladów stderr, aby zobaczyć, która linia wprowadzania powoduje problem. Szczęśliwy kodowanie,

-Geoff

+4

babonk, puszka podasz szczegółowe informacje o tym, jak rozwiązałeś swój problem, korzystając z tej porady? –

+0

To samo. Najwyraźniej mam podobny błąd tutaj: http: // stackoverflow.com/questions/18556270/aws-elastic-mapreduce-nie-wydaje się poprawnie przetwarzać-streaming-na-j, i biorąc pod uwagę, że działa, gdy jest podłączony, nie mam pojęcia, jak to zrobić " napraw "to dla streamingu. – Mittenchops

1

mam żadnego doświadczenia z Hadoop na AWS, ale miałem ten sam błąd w regularnych klastrze Hadoop - iw moim przypadku problem był jak zacząłem pyton -mapper ./mapper.py -reducer ./reducer.py pracował ale -mapper python mapper.py didn” t.

Wydaje się również, że używasz nietypowego pakietu pythonowego warc czy przesyłasz niezbędne pliki do streamjob? -cacheFiles lub -cacheArchive może być pomocne.

+0

Jak dołączyć niestandardowe pakiety Pythona? Zwłaszcza elastyczna aktualizacja map AWS wydaje się nie udostępniać opcji takich jak pliki cache. – Mittenchops

6

To powiedziane powyżej, ale pozwól mi spróbować wyjaśnić - musisz zablokować na stdin, nawet jeśli go nie potrzebujesz! To jest , a nie tak samo jak Linuksa, więc niech cię to nie zmyli. To, co się dzieje intuicyjnie, polega na tym, że Streaming wstaje z twojego pliku wykonywalnego, a następnie mówi: "czekaj tutaj, a ja pójdę po ciebie". Jeśli twój plik wykonywalny zatrzyma się z jakiegokolwiek powodu, zanim Streaming wyśle ​​ci 100% danych wejściowych, Streaming mówi: "Hej, gdzie ten plik wykonywalny podszedł, że wstałem? ... Hmmmm ... fajka jest zepsuta, pozwól mi podnieść ten wyjątek ! " Tak, tu jest jakiś kod python, wszystko robi to, co robi kot, ale musisz pamiętać, że ten kod nie będzie zjazd aż wszystkie wejścia są przetwarzane, i to jest kluczowy punkt:

#!/usr/bin/python 
import sys 

while True: 
    s = sys.stdin.readline() 
    if not s: 
     break 
    sys.stdout.write(s) 
+1

Otrzymałem ten błąd, ponieważ nic nie robiłem z wprowadzeniem. Dodałem ten kod (mimo że nic dla mnie nie robi) i błąd zniknął. – schoon

Powiązane problemy