2016-12-20 26 views
5

otrzymuję poniżej błędu z Kafki 0.10.1.0 i iskra 2.0.2Spark: 2.0.2 java.util.ConcurrentModificationException: KafkaConsumer nie jest bezpieczne dla wielowątkowych dostępu

private val spark = SparkSession.builder() 
.master("local[*]") 
.appName(job.name) 
.config("spark.cassandra.connection.host","localhost")) 
.config("spark.cassandra.connection.port","9042") 
.config("spark.streaming.receiver.maxRate", 10000) 
.config("spark.streaming.kafka.maxRatePerPartition", 10000) 
.config("spark.streaming.kafka.consumer.cache.maxCapacity", 1) 
.config("spark.streaming.kafka.consumer.cache.initialCapacity", 1) 
.getOrCreate() 

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> config.getString("kafka.hosts"), 
"key.deserializer" -> classOf[StringDeserializer], 
"value.deserializer" -> classOf[StringDeserializer], 
"group.id" -> job.name, 
"auto.offset.reset" -> config.getString("kafka.offset"), 
"enable.auto.commit" -> (false: java.lang.Boolean) 
)` 

Wyjątek

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access 
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1557) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1177) 
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) 
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) 
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) 
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

allreade widać łańcuch mail, ale nie ma jeszcze rozdzielczość https://www.mail-archive.com/[email protected]/msg56566.html

Odpowiedz

0

wpadł sam błąd i nie mógł znaleźć rozwiązanie. Zamiast tego używam "--executor-coreres 1" z iskąpień, aby uniknąć tego problemu. Jeśli ktokolwiek znajdzie rozwiązanie, opublikuj

Powiązane problemy