serialization - How to fix "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord" in Spark Streaming Kafka Consumer? -


  • spark 2.0.0
  • apache kafka 0.10.1.0
  • scala 2.11.8

when use spark streaming , kafka integration kafka broker version 0.10.1.0 following scala code fails following exception:

16/11/13 12:55:20 error executor: exception in task 0.0 in stage 0.0 (tid 0) java.io.notserializableexception: org.apache.kafka.clients.consumer.consumerrecord serialization stack:     - object not serializable (class: org.apache.kafka.clients.consumer.consumerrecord, value: consumerrecord(topic = local1, partition = 0, offset = 10000, createtime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))     - element of array (index: 0)     - array (class [lorg.apache.kafka.clients.consumer.consumerrecord;, size 11)     @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:40) 

why? how fix it?


code :

import org.apache.kafka.clients.consumer.consumerrecord import org.apache.kafka.common.serialization.stringdeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.locationstrategies.preferconsistent import org.apache.spark.streaming.kafka010.consumerstrategies.subscribe import org.apache.spark._ import org.apache.commons.codec.stringdecoder import org.apache.spark.streaming._  object kafkaconsumer_spark_test {   def main(args: array[string]) {     val conf = new sparkconf().setappname("kafkaconsumer_spark_test").setmaster("local[4]")     val ssc = new streamingcontext(conf, seconds(1))     ssc.checkpoint("./checkpoint")     val kafkaparams =map[string, object](       "bootstrap.servers" -> "localhost:9092",       "key.deserializer" -> classof[stringdeserializer],       "value.deserializer" -> classof[stringdeserializer],       "group.id" -> "example",       "auto.offset.reset" -> "latest",       "enable.auto.commit" -> (false: java.lang.boolean)     )      val topics = array("local1")     val stream = kafkautils.createdirectstream[string, string](       ssc,       preferconsistent,       subscribe[string, string](topics, kafkaparams)     )     stream.map(record => (record.key, record.value))     stream.print()      ssc.start()     ssc.awaittermination()   } } 

exception:

16/11/13 12:55:20 error executor: exception in task 0.0 in stage 0.0 (tid 0) java.io.notserializableexception: org.apache.kafka.clients.consumer.consumerrecord serialization stack:     - object not serializable (class: org.apache.kafka.clients.consumer.consumerrecord, value: consumerrecord(topic = local1, partition = 0, offset = 10000, createtime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))     - element of array (index: 0)     - array (class [lorg.apache.kafka.clients.consumer.consumerrecord;, size 11)     @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:40)     @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:46)     @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:100)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:313)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617)     @ java.lang.thread.run(thread.java:745) 16/11/13 12:55:20 error tasksetmanager: task 0.0 in stage 0.0 (tid 0) had not serializable result: org.apache.kafka.clients.consumer.consumerrecord serialization stack:     - object not serializable (class: org.apache.kafka.clients.consumer.consumerrecord, value: consumerrecord(topic = local1, partition = 0, offset = 10000, createtime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))     - element of array (index: 0)     - array (class [lorg.apache.kafka.clients.consumer.consumerrecord;, size 11); not retrying 16/11/13 12:55:20 error jobscheduler: error running job streaming job 1479012920000 ms.0 org.apache.spark.sparkexception: job aborted due stage failure: task 0.0 in stage 0.0 (tid 0) had not serializable result: org.apache.kafka.clients.consumer.consumerrecord serialization stack:     - object not serializable (class: org.apache.kafka.clients.consumer.consumerrecord, value: consumerrecord(topic = local1, partition = 0, offset = 10000, createtime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))     - element of array (index: 0)     - array (class [lorg.apache.kafka.clients.consumer.consumerrecord;, size 11)     @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1450)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1438)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1437)     @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)     @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48)     @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1437)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:811)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:811)     @ scala.option.foreach(option.scala:257)     @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:811)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.doonreceive(dagscheduler.scala:1659)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1618)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1607)     @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48)     @ org.apache.spark.scheduler.dagscheduler.runjob(dagscheduler.scala:632)     @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1871)     @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1884)     @ org.apache.spark.streaming.kafka010.kafkardd.take(kafkardd.scala:122)     @ org.apache.spark.streaming.kafka010.kafkardd.take(kafkardd.scala:50)     @ org.apache.spark.streaming.dstream.dstream$$anonfun$print$2$$anonfun$foreachfunc$3$1.apply(dstream.scala:734)     @ org.apache.spark.streaming.dstream.dstream$$anonfun$print$2$$anonfun$foreachfunc$3$1.apply(dstream.scala:733)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(foreachdstream.scala:51)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream.scala:51)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream.scala:51)     @ org.apache.spark.streaming.dstream.dstream.createrddwithlocalproperties(dstream.scala:415)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply$mcv$sp(foreachdstream.scala:50)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:50)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:50)     @ scala.util.try$.apply(try.scala:192)     @ org.apache.spark.streaming.scheduler.job.run(job.scala:39)     @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply$mcv$sp(jobscheduler.scala:245)     @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:245)     @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:245)     @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:58)     @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler.run(jobscheduler.scala:244)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617)     @ java.lang.thread.run(thread.java:745) exception in thread "main" org.apache.spark.sparkexception: job aborted due stage failure: task 0.0 in stage 0.0 (tid 0) had not serializable result: org.apache.kafka.clients.consumer.consumerrecord serialization stack:     - object not serializable (class: org.apache.kafka.clients.consumer.consumerrecord, value: consumerrecord(topic = local1, partition = 0, offset = 10000, createtime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))     - element of array (index: 0)     - array (class [lorg.apache.kafka.clients.consumer.consumerrecord;, size 11)     @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1450)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1438)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1437)     @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)     @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48)     @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1437)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:811)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:811)     @ scala.option.foreach(option.scala:257)     @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:811)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.doonreceive(dagscheduler.scala:1659)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1618)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1607)     @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48)     @ org.apache.spark.scheduler.dagscheduler.runjob(dagscheduler.scala:632)     @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1871)     @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1884)     @ org.apache.spark.streaming.kafka010.kafkardd.take(kafkardd.scala:122)     @ org.apache.spark.streaming.kafka010.kafkardd.take(kafkardd.scala:50)     @ org.apache.spark.streaming.dstream.dstream$$anonfun$print$2$$anonfun$foreachfunc$3$1.apply(dstream.scala:734)     @ org.apache.spark.streaming.dstream.dstream$$anonfun$print$2$$anonfun$foreachfunc$3$1.apply(dstream.scala:733)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(foreachdstream.scala:51)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream.scala:51)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream.scala:51)     @ org.apache.spark.streaming.dstream.dstream.createrddwithlocalproperties(dstream.scala:415)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply$mcv$sp(foreachdstream.scala:50)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:50)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:50)     @ scala.util.try$.apply(try.scala:192)     @ org.apache.spark.streaming.scheduler.job.run(job.scala:39)     @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply$mcv$sp(jobscheduler.scala:245)     @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:245)     @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:245)     @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:58)     @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler.run(jobscheduler.scala:244)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617)     @ java.lang.thread.run(thread.java:745) 

the consumer record object received dstream. when try print it, gives error because object not serailizable. instead should values consumerrecord object , print it.

instead of stream.print(), do:

stream.map(record=>(record.value().tostring)).print 

this should solve problem.


Comments

Popular posts from this blog

php - How to display all orders for a single product showing the most recent first? Woocommerce -

asp.net - How to correctly use QUERY_STRING in ISAPI rewrite? -

angularjs - How restrict admin panel using in backend laravel and admin panel on angular? -