serialization - How to fix "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord" in Spark Streaming Kafka Consumer? -
- spark 2.0.0
- apache kafka 0.10.1.0
- scala 2.11.8
when use spark streaming , kafka integration kafka broker version 0.10.1.0 following scala code fails following exception:
16/11/13 12:55:20 error executor: exception in task 0.0 in stage 0.0 (tid 0) java.io.notserializableexception: org.apache.kafka.clients.consumer.consumerrecord serialization stack: - object not serializable (class: org.apache.kafka.clients.consumer.consumerrecord, value: consumerrecord(topic = local1, partition = 0, offset = 10000, createtime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a)) - element of array (index: 0) - array (class [lorg.apache.kafka.clients.consumer.consumerrecord;, size 11) @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:40)
why? how fix it?
code :
import org.apache.kafka.clients.consumer.consumerrecord import org.apache.kafka.common.serialization.stringdeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.locationstrategies.preferconsistent import org.apache.spark.streaming.kafka010.consumerstrategies.subscribe import org.apache.spark._ import org.apache.commons.codec.stringdecoder import org.apache.spark.streaming._ object kafkaconsumer_spark_test { def main(args: array[string]) { val conf = new sparkconf().setappname("kafkaconsumer_spark_test").setmaster("local[4]") val ssc = new streamingcontext(conf, seconds(1)) ssc.checkpoint("./checkpoint") val kafkaparams =map[string, object]( "bootstrap.servers" -> "localhost:9092", "key.deserializer" -> classof[stringdeserializer], "value.deserializer" -> classof[stringdeserializer], "group.id" -> "example", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.boolean) ) val topics = array("local1") val stream = kafkautils.createdirectstream[string, string]( ssc, preferconsistent, subscribe[string, string](topics, kafkaparams) ) stream.map(record => (record.key, record.value)) stream.print() ssc.start() ssc.awaittermination() } }
exception:
16/11/13 12:55:20 error executor: exception in task 0.0 in stage 0.0 (tid 0) java.io.notserializableexception: org.apache.kafka.clients.consumer.consumerrecord serialization stack: - object not serializable (class: org.apache.kafka.clients.consumer.consumerrecord, value: consumerrecord(topic = local1, partition = 0, offset = 10000, createtime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a)) - element of array (index: 0) - array (class [lorg.apache.kafka.clients.consumer.consumerrecord;, size 11) @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:40) @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:46) @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:100) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:313) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745) 16/11/13 12:55:20 error tasksetmanager: task 0.0 in stage 0.0 (tid 0) had not serializable result: org.apache.kafka.clients.consumer.consumerrecord serialization stack: - object not serializable (class: org.apache.kafka.clients.consumer.consumerrecord, value: consumerrecord(topic = local1, partition = 0, offset = 10000, createtime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a)) - element of array (index: 0) - array (class [lorg.apache.kafka.clients.consumer.consumerrecord;, size 11); not retrying 16/11/13 12:55:20 error jobscheduler: error running job streaming job 1479012920000 ms.0 org.apache.spark.sparkexception: job aborted due stage failure: task 0.0 in stage 0.0 (tid 0) had not serializable result: org.apache.kafka.clients.consumer.consumerrecord serialization stack: - object not serializable (class: org.apache.kafka.clients.consumer.consumerrecord, value: consumerrecord(topic = local1, partition = 0, offset = 10000, createtime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a)) - element of array (index: 0) - array (class [lorg.apache.kafka.clients.consumer.consumerrecord;, size 11) @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1450) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1438) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1437) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48) @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1437) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:811) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:811) @ scala.option.foreach(option.scala:257) @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:811) @ org.apache.spark.scheduler.dagschedulereventprocessloop.doonreceive(dagscheduler.scala:1659) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1618) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1607) @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48) @ org.apache.spark.scheduler.dagscheduler.runjob(dagscheduler.scala:632) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1871) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1884) @ org.apache.spark.streaming.kafka010.kafkardd.take(kafkardd.scala:122) @ org.apache.spark.streaming.kafka010.kafkardd.take(kafkardd.scala:50) @ org.apache.spark.streaming.dstream.dstream$$anonfun$print$2$$anonfun$foreachfunc$3$1.apply(dstream.scala:734) @ org.apache.spark.streaming.dstream.dstream$$anonfun$print$2$$anonfun$foreachfunc$3$1.apply(dstream.scala:733) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(foreachdstream.scala:51) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream.scala:51) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream.scala:51) @ org.apache.spark.streaming.dstream.dstream.createrddwithlocalproperties(dstream.scala:415) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply$mcv$sp(foreachdstream.scala:50) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:50) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:50) @ scala.util.try$.apply(try.scala:192) @ org.apache.spark.streaming.scheduler.job.run(job.scala:39) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply$mcv$sp(jobscheduler.scala:245) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:245) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:245) @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:58) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler.run(jobscheduler.scala:244) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745) exception in thread "main" org.apache.spark.sparkexception: job aborted due stage failure: task 0.0 in stage 0.0 (tid 0) had not serializable result: org.apache.kafka.clients.consumer.consumerrecord serialization stack: - object not serializable (class: org.apache.kafka.clients.consumer.consumerrecord, value: consumerrecord(topic = local1, partition = 0, offset = 10000, createtime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a)) - element of array (index: 0) - array (class [lorg.apache.kafka.clients.consumer.consumerrecord;, size 11) @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1450) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1438) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1437) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48) @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1437) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:811) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:811) @ scala.option.foreach(option.scala:257) @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:811) @ org.apache.spark.scheduler.dagschedulereventprocessloop.doonreceive(dagscheduler.scala:1659) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1618) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1607) @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48) @ org.apache.spark.scheduler.dagscheduler.runjob(dagscheduler.scala:632) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1871) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1884) @ org.apache.spark.streaming.kafka010.kafkardd.take(kafkardd.scala:122) @ org.apache.spark.streaming.kafka010.kafkardd.take(kafkardd.scala:50) @ org.apache.spark.streaming.dstream.dstream$$anonfun$print$2$$anonfun$foreachfunc$3$1.apply(dstream.scala:734) @ org.apache.spark.streaming.dstream.dstream$$anonfun$print$2$$anonfun$foreachfunc$3$1.apply(dstream.scala:733) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(foreachdstream.scala:51) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream.scala:51) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream.scala:51) @ org.apache.spark.streaming.dstream.dstream.createrddwithlocalproperties(dstream.scala:415) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply$mcv$sp(foreachdstream.scala:50) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:50) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:50) @ scala.util.try$.apply(try.scala:192) @ org.apache.spark.streaming.scheduler.job.run(job.scala:39) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply$mcv$sp(jobscheduler.scala:245) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:245) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:245) @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:58) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler.run(jobscheduler.scala:244) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745)
the consumer record object received dstream. when try print it, gives error because object not serailizable. instead should values consumerrecord object , print it.
instead of stream.print(), do:
stream.map(record=>(record.value().tostring)).print
this should solve problem.
Comments
Post a Comment