2016-01-25 7 views
5

Jego rodzaj dziwnego błędu, ponieważ wciąż przesyłam dane do kafka i zużywam wiadomość od kafka i Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative jest też trochę dziwny. Szukam i nie otrzymuję żadnych zasobów związanych z.Spark Streaming z Kafki ma błąd numRecords nie może być ujemny

Pozwól mi wyjaśnić moją grupę. Mam 1 serwer to mesy typu master i slave, na tym ustawiłem 3 brokerów kafka w ten sposób. Następnie uruchamiam iskrownik na tej gromadzie. brokers Mesos Używam spark 1.5.2

brokers: 
    id: 0 
    active: true 
    state: running 
    resources: cpus:1.00, mem:1024, heap:512, port:31000 
    failover: delay:1m, max-delay:10m 
    stickiness: period:10m, hostname:test-master 
    task: 
    id: broker-0-c32082d0-a544-4260-b7c4-0239d99f0972 
    state: running 
    endpoint: test-master:31000 
    metrics: 
    collected: 2016-01-25 17:46:47+08 
    under-replicated-partitions: 0 
    offline-partitions-count: 0 
    is-active-controller: 1 

    id: 1 
    active: true 
    state: running 
    resources: cpus:1.00, mem:1024, heap:512, port:31001 
    failover: delay:1m, max-delay:10m 
    stickiness: period:10m, hostname:test-master 
    task: 
    id: broker-1-7b30d6ad-6b19-4420-b743-c6f7f1adfb07 
    state: running 
    endpoint: test-master:31001 
    metrics: 
    collected: 2016-01-25 17:46:31+08 
    under-replicated-partitions: 0 
    offline-partitions-count: 0 
    is-active-controller: 0 

    id: 2 
    active: true 
    state: running 
    resources: cpus:1.00, mem:1024, heap:512, port:31002 
    failover: delay:1m, max-delay:10m 
    stickiness: period:10m, hostname:test-master 
    task: 
    id: broker-2-8ef6437b-79b2-4183-8653-17cf2fe4591f 
    state: running 
    endpoint: test-master:31002 
    metrics: 
    collected: 2016-01-25 17:46:38+08 
    under-replicated-partitions: 0 
    offline-partitions-count: 0 
    is-active-controller: 0 

Potem biegnę iskra strumieniowanie zadanie pobrać dane z Kafka następnie analizowania.

Sprawdziłem broker działa za pomocą

kafkacat -b test-master:31001,test-master:31000,test-master:31002 -t bid_event 

got to dane, ale gdy uruchamiam zapłonie pracę otrzymuję błąd

6/01/25 17:44:52 INFO SparkContext: Running Spark version 1.5.2 
16/01/25 17:44:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
16/01/25 17:44:52 INFO SecurityManager: Changing view acls to: ubuntu 
16/01/25 17:44:52 INFO SecurityManager: Changing modify acls to: ubuntu 
16/01/25 17:44:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ubuntu); users with modify permissions: Set(ubuntu) 
16/01/25 17:44:53 INFO Slf4jLogger: Slf4jLogger started 
16/01/25 17:44:53 INFO Remoting: Starting remoting 
16/01/25 17:44:53 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:51816] 
16/01/25 17:44:53 INFO Utils: Successfully started service 'sparkDriver' on port 51816. 
16/01/25 17:44:53 INFO SparkEnv: Registering MapOutputTracker 
16/01/25 17:44:53 INFO SparkEnv: Registering BlockManagerMaster 
16/01/25 17:44:53 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-97e9787b-3a67-4d00-aff6-a5e02b271a74 
16/01/25 17:44:53 INFO MemoryStore: MemoryStore started with capacity 441.9 MB 
16/01/25 17:44:53 INFO HttpFileServer: HTTP File server directory is /tmp/spark-442ac339-2ac7-427f-9e6b-5b5cb18a54dd/httpd-4724a937-4d03-4bd0-99f1-7b9f1129291e 
16/01/25 17:44:53 INFO HttpServer: Starting HTTP Server 
16/01/25 17:44:53 INFO Utils: Successfully started service 'HTTP file server' on port 51817. 
16/01/25 17:44:53 INFO SparkEnv: Registering OutputCommitCoordinator 
16/01/25 17:44:53 INFO Utils: Successfully started service 'SparkUI' on port 4040. 
16/01/25 17:44:53 INFO SparkUI: Started SparkUI at http://10.xxx.xxx.25:4040 
16/01/25 17:44:54 INFO SparkContext: Added JAR file:/home/ubuntu/spark-jobs/./rtb_spark-assembly-1.0-deps.jar at http://10.xxx.xxx.25:51817/jars/rtb_spark-assembly-1.0-deps.jar with timestamp 1453715094219 
16/01/25 17:44:54 INFO SparkContext: Added JAR file:/home/ubuntu/spark-jobs/./rtb-spark.jar at http://10.xxx.xxx.25:51817/jars/rtb-spark.jar with timestamp 1453715094222 
16/01/25 17:44:54 INFO Utils: Copying /home/ubuntu/spark-jobs/./test.conf to /tmp/spark-442ac339-2ac7-427f-9e6b-5b5cb18a54dd/userFiles-cdef27e0-c357-4ebb-adcf-ccf963ff9d60/test.conf 
16/01/25 17:44:54 INFO SparkContext: Added file file:/home/ubuntu/spark-jobs/./test.conf at http://10.xxx.xxx.25:51817/files/test.conf with timestamp 1453715094309 
16/01/25 17:44:54 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):[email protected][email protected]: Client environment:zookeeper.version=zookeeper C client 3.4.5 
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):[email protected][email protected]: Client environment:host.name=knx-rtb-server-google-test 
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):[email protected][email protected]: Client environment:os.name=Linux 
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):[email protected][email protected]: Client environment:os.arch=3.13.0-76-generic 
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):[email protected][email protected]: Client environment:os.version=#120~precise1-Ubuntu SMP Tue Jan 19 11:09:43 UTC 2016 
I0125 17:44:54.444169 5444 sched.cpp:166] Version: 0.26.0 
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):[email protected][email protected]: Client environment:user.name=ubuntu 
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):[email protected][email protected]: Client environment:user.home=/home/ubuntu 
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):[email protected][email protected]: Client environment:user.dir=/home/ubuntu/spark-jobs 
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):[email protected][email protected]: Initiating client connection, host=test-master:2181 sessionTimeout=10000 watcher=0x7f2ded821210 sessionId=0 sessionPasswd=<null> context=0x7f2d54001470 flags=0 
2016-01-25 17:44:54,444:5202(0x7f2d6e6fb700):[email protected][email protected]: initiated connection to server [10.xxx.xxx.25:2181] 
2016-01-25 17:44:54,446:5202(0x7f2d6e6fb700):[email protected][email protected]: session establishment complete on server [10.xxx.xxx.25:2181], sessionId=0x15278112832012c, negotiated timeout=10000 
I0125 17:44:54.447082 5439 group.cpp:331] Group process (group(1)@10.xxx.xxx.25:28249) connected to ZooKeeper 
I0125 17:44:54.447120 5439 group.cpp:805] Syncing group operations: queue size (joins, cancels, datas) = (0, 0, 0) 
I0125 17:44:54.447140 5439 group.cpp:403] Trying to create path '/mesos' in ZooKeeper 
I0125 17:44:54.448109 5439 detector.cpp:156] Detected a new leader: (id='28') 
I0125 17:44:54.448246 5439 group.cpp:674] Trying to get '/mesos/json.info_0000000028' in ZooKeeper 
I0125 17:44:54.448755 5440 detector.cpp:482] A new leading master ([email protected]:5050) is detected 
I0125 17:44:54.448832 5440 sched.cpp:264] New master detected at [email protected]:5050 
I0125 17:44:54.448977 5440 sched.cpp:274] No credentials provided. Attempting to register without authentication 
I0125 17:44:54.449766 5440 sched.cpp:643] Framework registered with a636c17f-2b0d-46f7-9b15-5a3d6e9918a4-0003 
16/01/25 17:44:54 INFO MesosSchedulerBackend: Registered as framework ID a636c17f-2b0d-46f7-9b15-5a3d6e9918a4-0003 
16/01/25 17:44:54 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51820. 
16/01/25 17:44:54 INFO NettyBlockTransferService: Server created on 51820 
16/01/25 17:44:54 INFO BlockManagerMaster: Trying to register BlockManager 
16/01/25 17:44:54 INFO BlockManagerMasterEndpoint: Registering block manager 10.xxx.xxx.25:51820 with 441.9 MB RAM, BlockManagerId(driver, 10.xxx.xxx.25, 51820) 
16/01/25 17:44:54 INFO BlockManagerMaster: Registered BlockManager 
16/01/25 17:44:55 INFO ForEachDStream: metadataCleanupDelay = -1 
16/01/25 17:44:55 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1 
16/01/25 17:44:55 INFO DirectKafkaInputDStream: Slide time = 30000 ms 
16/01/25 17:44:55 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 
16/01/25 17:44:55 INFO DirectKafkaInputDStream: Checkpoint interval = null 
16/01/25 17:44:55 INFO DirectKafkaInputDStream: Remember duration = 30000 ms 
16/01/25 17:44:55 INFO DirectKafkaInputDStream: Initialized and validated [email protected]b 
16/01/25 17:44:55 INFO ForEachDStream: Slide time = 30000 ms 
16/01/25 17:44:55 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 
16/01/25 17:44:55 INFO ForEachDStream: Checkpoint interval = null 
16/01/25 17:44:55 INFO ForEachDStream: Remember duration = 30000 ms 
16/01/25 17:44:55 INFO ForEachDStream: Initialized and validated [email protected] 
16/01/25 17:44:55 INFO RecurringTimer: Started timer for JobGenerator at time 1453715100000 
16/01/25 17:44:55 INFO JobGenerator: Started JobGenerator at 1453715100000 ms 
16/01/25 17:44:55 INFO JobScheduler: Started JobScheduler 
16/01/25 17:44:55 INFO StreamingContext: StreamingContext started 
16/01/25 17:45:00 INFO VerifiableProperties: Verifying properties 
16/01/25 17:45:00 INFO VerifiableProperties: Property auto.commit.interval.ms is overridden to 1000 
16/01/25 17:45:00 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest 
16/01/25 17:45:00 INFO VerifiableProperties: Property group.id is overridden to bid_event_consumer_group_zk 
16/01/25 17:45:00 INFO VerifiableProperties: Property zookeeper.connect is overridden to test-master:2181 
16/01/25 17:45:00 INFO VerifiableProperties: Property zookeeper.session.timeout.ms is overridden to 400 
16/01/25 17:45:00 INFO VerifiableProperties: Property zookeeper.sync.time.ms is overridden to 200 
16/01/25 17:45:00 ERROR JobScheduler: Error generating jobs for time 1453715100000 ms 
java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative 
    at scala.Predef$.require(Predef.scala:233) 
    at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38) 
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) 
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) 
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative 
    at scala.Predef$.require(Predef.scala:233) 
    at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38) 
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) 
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) 
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
16/01/25 17:45:00 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook 
16/01/25 17:45:00 INFO JobGenerator: Stopping JobGenerator immediately 
16/01/25 17:45:00 INFO RecurringTimer: Stopped timer for JobGenerator after time 1453715100000 
16/01/25 17:45:00 INFO JobGenerator: Stopped JobGenerator 
16/01/25 17:45:00 INFO JobScheduler: Stopped JobScheduler 
16/01/25 17:45:00 INFO StreamingContext: StreamingContext stopped successfully 
16/01/25 17:45:00 INFO SparkContext: Invoking stop() from shutdown hook 
16/01/25 17:45:00 INFO SparkUI: Stopped Spark web UI at http://10.xxx.xxx.25:4040 
16/01/25 17:45:00 INFO DAGScheduler: Stopping DAGScheduler 
I0125 17:45:00.281819 5579 sched.cpp:1805] Asked to stop the driver 
I0125 17:45:00.281951 5437 sched.cpp:1043] Stopping framework 'a636c17f-2b0d-46f7-9b15-5a3d6e9918a4-0003' 
16/01/25 17:45:00 INFO MesosSchedulerBackend: driver.run() returned with code DRIVER_STOPPED 
16/01/25 17:45:00 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
16/01/25 17:45:00 INFO MemoryStore: MemoryStore cleared 
16/01/25 17:45:00 INFO BlockManager: BlockManager stopped 
16/01/25 17:45:00 INFO BlockManagerMaster: BlockManagerMaster stopped 
16/01/25 17:45:00 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
16/01/25 17:45:00 INFO SparkContext: Successfully stopped SparkContext 
16/01/25 17:45:00 INFO ShutdownHookManager: Shutdown hook called 
16/01/25 17:45:00 INFO ShutdownHookManager: Deleting directory /tmp/spark-442ac339-2ac7-427f-9e6b-5b5cb18a54dd 

Odpowiedz

4

jakie miałem problem tak bardzo ostatnio w projekt z Kafka i Spark Streaming. W tej sytuacji pomogło mi ręcznie usunąć pliki punktów kontrolnych Spark Streaming, a następnie zacząć od nowa.

9

Sprawdź przesunięcie tematu Kafki. Ten podany w kodzie może być poza zakresem.

, tj. Może być mniejszy niż przesunięcie listy uszeregowania lub większy od ostatniego przesunięcia.

+0

Mógłbyś podać przykład, w jaki sposób zapewnić w kodzie ..we stoją ten sam problem. . Nie dostarczamy żadnego przesunięcia w kodzie – user2359997

Powiązane problemy