2014-11-21 8 views
10

Próbuję użyć Spark Streaming z Kafki (wersja 1.1.0), ale praca Spark utrzymuje upaść z powodu tego błędu:Spark: Nie można obliczyć podział, nie znaleziono blok

14/11/21 12:39:23 ERROR TaskSetManager: Task 3967.0:0 failed 4 times; aborting job 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) 
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) 

Jedynym istotnym informacje uzyskać z dzienników jest taka:

14/11/21 12:34:18 INFO MemoryStore: Block input-0-1416573258200 stored as bytes to memory (size 85.8 KB, free 2.3 GB) 
14/11/21 12:34:18 INFO BlockManagerMaster: Updated info of block input-0-1416573258200 
14/11/21 12:34:18 INFO BlockGenerator: Pushed block input-0-1416573258200 
org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = GetLocations(input-0-1416573258200)] 
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 
14/11/21 12:37:35 INFO BlockManagerInfo: Added input-0-1416573258200 in memory on ********:43117 (size: 85.8 KB, free: 2.3 GB) 
org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = GetLocations(input-0-1416573258200)] 
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found 

Przykładowy kod:

SparkConf conf = new SparkConf(); 
JavaSparkContext sc = new JavaSparkContext(conf); 
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000)); 
jssc.checkpoint(checkpointDir); 

HashMap<String, Integer> topics = new HashMap<String, Integer>(); 
topics.put(KAFKA_TOPIC, 1); 

HashMap<String, String> kafkaParams = new HashMap<String, String>(); 
kafkaParams.put("group.id", "spark-streaming-test"); 
kafkaParams.put("zookeeper.connect", ZOOKEEPER_QUORUM); 
kafkaParams.put("zookeeper.connection.timeout.ms", "1000"); 
kafkaParams.put("auto.offset.reset", "smallest"); 

JavaPairReceiverInputDStream<String, String> kafkaStream = 
    KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics, StorageLevels.MEMORY_AND_DISK_SER); 

JavaPairDStream<String, String> streamPair = kafkaStream.flatMapToPair(...).reduceByKey(...); 

Nie jestem pewien, jaka jest przyczyna tego problemu.

+0

Jaka jest wydajność pracy? Czy to opóźnia się? – maasg

+0

Nie, nie pozostaje w tyle. – Bobby

+0

Czy znalazłeś już rozwiązanie? Mam taki sam problem z Kafka/Spark Streaming 1.2 – bibac

Odpowiedz

1
+0

Witaj na tej stronie. Podczas gdy łącze pomaga, zasób może zostać przeniesiony lub usunięty. Dobrą praktyką jest więc nie podawać linku bez wyjaśnienia, co link prowadzi do rozwiązania. Zobacz [odpowiedź] – mins

+0

Tak, próbowałem tego i to nie pomaga. – Bobby

+0

Naprawiłem ten problem, aby zmniejszyć dane wejściowe w odbiorniku. Myślę, że jeden z możliwych powodów, że dane wejściowe mogą przekroczyć możliwości przetwarzania. –

0

Sprawdź następujące elementy.

1) Czy utworzyć kontekst strumieniowego właściwie jak w

def functionToCreateContext(): StreamingContext = { 
    val ssc = new StreamingContext(...) // new context 
    val lines = ssc.socketTextStream(...) // create DStreams 
    ... 
    ssc.checkpoint(checkpointDirectory) // set checkpoint directory 
    ssc 
} 

// Get StreamingContext from checkpoint data or create a new one 
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) 

// Do additional setup on context that needs to be done, 
// irrespective of whether it is being started or restarted 
context. ... 

// Start the context 
context.start() 
context.awaitTermination() 

Twój inicjalizacji jest nieprawidłowy.

Zapraszamy do obejrzenia poniższej

np kod na recoverableNetworkCount App

2) Jeżeli masz włączony write nieruchomości naprzód log "spark.streaming.receiver.writeAheadLog.enable"

3) Sprawdź stabilność przesyłania strumieniowego w interfejsie przesyłania strumieniowego. czas przetwarzania < interwał wsadowy.

Powiązane problemy