java - Kafka topic details not displaying in spark -
i have written topic in kafka my-topic
, trying fetch information of topic in spark. facing difficulty in displaying kafka topic details getting long list of errors. using java fetching data.
below code:
public static void main(string s[]) throws interruptedexception{ sparkconf conf = new sparkconf().setmaster("local[*]").setappname("sampleapp"); javastreamingcontext jssc = new javastreamingcontext(conf, durations.seconds(10)); map<string, object> kafkaparams = new hashmap<>(); kafkaparams.put("bootstrap.servers", "localhost:9092"); kafkaparams.put("key.deserializer", stringdeserializer.class); kafkaparams.put("value.deserializer", stringdeserializer.class); kafkaparams.put("group.id", "different id allotted different stream"); kafkaparams.put("auto.offset.reset", "latest"); kafkaparams.put("enable.auto.commit", false); collection<string> topics = arrays.aslist("my-topic"); final javainputdstream<consumerrecord<string, string>> stream = kafkautils.createdirectstream( jssc, locationstrategies.preferconsistent(), consumerstrategies.<string, string>subscribe(topics, kafkaparams) ); javapairdstream<string, string> jpairdstream = stream.maptopair( new pairfunction<consumerrecord<string, string>, string, string>() { /** * */ private static final long serialversionuid = 1l; @override public tuple2<string, string> call(consumerrecord<string, string> record) throws exception { return new tuple2<>(record.key(), record.value()); } }); jpairdstream.foreachrdd(jpairrdd -> { jpairrdd.foreach(rdd -> { system.out.println("key="+rdd._1()+" value="+rdd._2()); }); }); jssc.start(); jssc.awaittermination(); stream.maptopair( new pairfunction<consumerrecord<string, string>, string, string>() { /** * */ private static final long serialversionuid = 1l; @override public tuple2<string, string> call(consumerrecord<string, string> record) throws exception { return new tuple2<>(record.key(), record.value()); } }); }
below error getting:
using spark's default log4j profile: org/apache/spark/log4j-defaults.properties 17/09/04 11:41:15 info sparkcontext: running spark version 2.1.0 17/09/04 11:41:15 warn nativecodeloader: unable load native-hadoop library platform... using builtin-java classes applicable 17/09/04 11:41:15 info securitymanager: changing view acls to: 11014525 17/09/04 11:41:15 info securitymanager: changing modify acls to: 11014525 17/09/04 11:41:15 info securitymanager: changing view acls groups to: 17/09/04 11:41:15 info securitymanager: changing modify acls groups to: 17/09/04 11:41:15 info securitymanager: securitymanager: authentication disabled; ui acls disabled; users view permissions: set(11014525); groups view permissions: set(); users modify permissions: set(11014525); groups modify permissions: set() 17/09/04 11:41:15 info utils: started service 'sparkdriver' on port 56668. 17/09/04 11:41:15 info sparkenv: registering mapoutputtracker 17/09/04 11:41:15 info sparkenv: registering blockmanagermaster 17/09/04 11:41:15 info blockmanagermasterendpoint: using org.apache.spark.storage.defaulttopologymapper getting topology information 17/09/04 11:41:15 info blockmanagermasterendpoint: blockmanagermasterendpoint 17/09/04 11:41:15 info diskblockmanager: created local directory @ c:\users\11014525\appdata\local\temp\blockmgr-cba489b9-2458-455a-8c03-4c4395a01d44 17/09/04 11:41:15 info memorystore: memorystore started capacity 896.4 mb 17/09/04 11:41:16 info sparkenv: registering outputcommitcoordinator 17/09/04 11:41:16 info utils: started service 'sparkui' on port 4040. 17/09/04 11:41:16 info sparkui: bound sparkui 0.0.0.0, , started @ http://172.16.202.21:4040 17/09/04 11:41:16 info executor: starting executor id driver on host localhost 17/09/04 11:41:16 info utils: started service 'org.apache.spark.network.netty.nettyblocktransferservice' on port 56689. 17/09/04 11:41:16 info nettyblocktransferservice: server created on 172.16.202.21:56689 17/09/04 11:41:16 info blockmanager: using org.apache.spark.storage.randomblockreplicationpolicy block replication policy 17/09/04 11:41:16 info blockmanagermaster: registering blockmanager blockmanagerid(driver, 172.16.202.21, 56689, none) 17/09/04 11:41:16 info blockmanagermasterendpoint: registering block manager 172.16.202.21:56689 896.4 mb ram, blockmanagerid(driver, 172.16.202.21, 56689, none) 17/09/04 11:41:16 info blockmanagermaster: registered blockmanager blockmanagerid(driver, 172.16.202.21, 56689, none) 17/09/04 11:41:16 info blockmanager: initialized blockmanager: blockmanagerid(driver, 172.16.202.21, 56689, none) 17/09/04 11:41:16 warn kafkautils: overriding enable.auto.commit false executor 17/09/04 11:41:16 warn kafkautils: overriding auto.offset.reset none executor 17/09/04 11:41:16 warn kafkautils: overriding executor group.id spark-executor-different id allotted different stream 17/09/04 11:41:16 warn kafkautils: overriding receive.buffer.bytes 65536 see kafka-3135 17/09/04 11:41:16 info directkafkainputdstream: slide time = 10000 ms 17/09/04 11:41:16 info directkafkainputdstream: storage level = serialized 1x replicated 17/09/04 11:41:16 info directkafkainputdstream: checkpoint interval = null 17/09/04 11:41:16 info directkafkainputdstream: remember interval = 10000 ms 17/09/04 11:41:16 info directkafkainputdstream: initialized , validated org.apache.spark.streaming.kafka010.directkafkainputdstream@23a3407b 17/09/04 11:41:16 info mappeddstream: slide time = 10000 ms 17/09/04 11:41:16 info mappeddstream: storage level = serialized 1x replicated 17/09/04 11:41:16 info mappeddstream: checkpoint interval = null 17/09/04 11:41:16 info mappeddstream: remember interval = 10000 ms 17/09/04 11:41:16 info mappeddstream: initialized , validated org.apache.spark.streaming.dstream.mappeddstream@140030a9 17/09/04 11:41:16 info foreachdstream: slide time = 10000 ms 17/09/04 11:41:16 info foreachdstream: storage level = serialized 1x replicated 17/09/04 11:41:16 info foreachdstream: checkpoint interval = null 17/09/04 11:41:16 info foreachdstream: remember interval = 10000 ms 17/09/04 11:41:16 info foreachdstream: initialized , validated org.apache.spark.streaming.dstream.foreachdstream@65041548 17/09/04 11:41:16 error streamingcontext: error starting context, marking stopped org.apache.kafka.common.config.configexception: missing required configuration "partition.assignment.strategy" has no default value. @ org.apache.kafka.common.config.configdef.parse(configdef.java:124) @ org.apache.kafka.common.config.abstractconfig.(abstractconfig.java:48) @ org.apache.kafka.clients.consumer.consumerconfig.(consumerconfig.java:194) @ org.apache.kafka.clients.consumer.kafkaconsumer.(kafkaconsumer.java:380) @ org.apache.kafka.clients.consumer.kafkaconsumer.(kafkaconsumer.java:363) @ org.apache.kafka.clients.consumer.kafkaconsumer.(kafkaconsumer.java:350) @ org.apache.spark.streaming.kafka010.subscribe.onstart(consumerstrategy.scala:83) @ org.apache.spark.streaming.kafka010.directkafkainputdstream.consumer(directkafkainputdstream.scala:75) @ org.apache.spark.streaming.kafka010.directkafkainputdstream.start(directkafkainputdstream.scala:243) @ org.apache.spark.streaming.dstreamgraph$$anonfun$start$5.apply(dstreamgraph.scala:49) @ org.apache.spark.streaming.dstreamgraph$$anonfun$start$5.apply(dstreamgraph.scala:49) @ scala.collection.parallel.mutable.pararray$pararrayiterator.foreach_quick(pararray.scala:143) @ scala.collection.parallel.mutable.pararray$pararrayiterator.foreach(pararray.scala:136) @ scala.collection.parallel.pariterablelike$foreach.leaf(pariterablelike.scala:972) @ scala.collection.parallel.task$$anonfun$tryleaf$1.apply$mcv$sp(tasks.scala:49) @ scala.collection.parallel.task$$anonfun$tryleaf$1.apply(tasks.scala:48) @ scala.collection.parallel.task$$anonfun$tryleaf$1.apply(tasks.scala:48) @ scala.collection.parallel.task$class.tryleaf(tasks.scala:51) @ scala.collection.parallel.pariterablelike$foreach.tryleaf(pariterablelike.scala:969) @ scala.collection.parallel.adaptiveworkstealingtasks$wrappedtask$class.compute(tasks.scala:152) @ scala.collection.parallel.adaptiveworkstealingforkjointasks$wrappedtask.compute(tasks.scala:443) @ scala.concurrent.forkjoin.recursiveaction.exec(recursiveaction.java:160) @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260) @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339) @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979) @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) @ ... run in separate thread using org.apache.spark.util.threadutils ... () @ org.apache.spark.streaming.streamingcontext.liftedtree1$1(streamingcontext.scala:578) @ org.apache.spark.streaming.streamingcontext.start(streamingcontext.scala:572) @ org.apache.spark.streaming.api.java.javastreamingcontext.start(javastreamingcontext.scala:556) @ json.exceltojson.sparkconsumingkafka.main(sparkconsumingkafka.java:56) 17/09/04 11:41:16 info receivertracker: receivertracker stopped 17/09/04 11:41:16 info jobgenerator: stopping jobgenerator 17/09/04 11:41:16 info recurringtimer: stopped timer jobgenerator after time -1 17/09/04 11:41:16 info jobgenerator: stopped jobgenerator 17/09/04 11:41:16 info jobscheduler: stopped jobscheduler exception in thread "main" org.apache.kafka.common.config.configexception: missing required configuration "partition.assignment.strategy" has no default value. @ org.apache.kafka.common.config.configdef.parse(configdef.java:124) @ org.apache.kafka.common.config.abstractconfig.(abstractconfig.java:48) @ org.apache.kafka.clients.consumer.consumerconfig.(consumerconfig.java:194) @ org.apache.kafka.clients.consumer.kafkaconsumer.(kafkaconsumer.java:380) @ org.apache.kafka.clients.consumer.kafkaconsumer.(kafkaconsumer.java:363) @ org.apache.kafka.clients.consumer.kafkaconsumer.(kafkaconsumer.java:350) @ org.apache.spark.streaming.kafka010.subscribe.onstart(consumerstrategy.scala:83) @ org.apache.spark.streaming.kafka010.directkafkainputdstream.consumer(directkafkainputdstream.scala:75) @ org.apache.spark.streaming.kafka010.directkafkainputdstream.start(directkafkainputdstream.scala:243) @ org.apache.spark.streaming.dstreamgraph$$anonfun$start$5.apply(dstreamgraph.scala:49) @ org.apache.spark.streaming.dstreamgraph$$anonfun$start$5.apply(dstreamgraph.scala:49) @ scala.collection.parallel.mutable.pararray$pararrayiterator.foreach_quick(pararray.scala:143) @ scala.collection.parallel.mutable.pararray$pararrayiterator.foreach(pararray.scala:136) @ scala.collection.parallel.pariterablelike$foreach.leaf(pariterablelike.scala:972) @ scala.collection.parallel.task$$anonfun$tryleaf$1.apply$mcv$sp(tasks.scala:49) @ scala.collection.parallel.task$$anonfun$tryleaf$1.apply(tasks.scala:48) @ scala.collection.parallel.task$$anonfun$tryleaf$1.apply(tasks.scala:48) @ scala.collection.parallel.task$class.tryleaf(tasks.scala:51) @ scala.collection.parallel.pariterablelike$foreach.tryleaf(pariterablelike.scala:969) @ scala.collection.parallel.adaptiveworkstealingtasks$wrappedtask$class.compute(tasks.scala:152) @ scala.collection.parallel.adaptiveworkstealingforkjointasks$wrappedtask.compute(tasks.scala:443) @ scala.concurrent.forkjoin.recursiveaction.exec(recursiveaction.java:160) @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260) @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339) @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979) @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) @ ... run in separate thread using org.apache.spark.util.threadutils ... () @ org.apache.spark.streaming.streamingcontext.liftedtree1$1(streamingcontext.scala:578) @ org.apache.spark.streaming.streamingcontext.start(streamingcontext.scala:572) @ org.apache.spark.streaming.api.java.javastreamingcontext.start(javastreamingcontext.scala:556) @ json.exceltojson.sparkconsumingkafka.main(sparkconsumingkafka.java:56) 17/09/04 11:41:16 info sparkcontext: invoking stop() shutdown hook 17/09/04 11:41:16 info sparkui: stopped spark web ui @ http://172.16.202.21:4040 17/09/04 11:41:16 info mapoutputtrackermasterendpoint: mapoutputtrackermasterendpoint stopped! 17/09/04 11:41:16 info memorystore: memorystore cleared 17/09/04 11:41:16 info blockmanager: blockmanager stopped 17/09/04 11:41:16 info blockmanagermaster: blockmanagermaster stopped 17/09/04 11:41:16 info outputcommitcoordinator$outputcommitcoordinatorendpoint: outputcommitcoordinator stopped! 17/09/04 11:41:16 info sparkcontext: stopped sparkcontext 17/09/04 11:41:16 info shutdownhookmanager: shutdown hook called 17/09/04 11:41:16 info shutdownhookmanager: deleting directory c:\users\11014525\appdata\local\temp\spark-37334cdc-9680-4801-8e50-ef3024ed1d8a
pom.xml
org.apache.spark spark-streaming_2.11 2.1.0 commons-lang commons-lang 2.6 org.apache.kafka kafka_2.10 0.8.2.0 org.apache.spark spark-streaming-kafka-0-10_2.10 2.1.1
from log, spark version 2.1.0. have not shared build file having other dependencies. looks have both spark-streaming-kafka-0-8_2.11-2.1.0.jar
, spark-streaming-kafka-0-10_2.11-2.1.0.jar
in classpath , loading wrong class. if using maven need dependencies below. please check , update project.
<dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-core_2.11</artifactid> <version>2.1.0</version> </dependency> <dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-sql_2.11</artifactid> <version>2.1.0</version> </dependency> <dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-streaming_2.11</artifactid> <version>2.1.0</version> </dependency> <dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-streaming-kafka-0-10_2.11</artifactid> <version>2.1.0</version> </dependency>
edit
as have edited question , posted dependencies editing answer. using kafka version 0.8.*
while spark-streaming-kafka version 0.10.*
. please use same version kafka dependencies. please use below dependency org.apache.kafka
<dependency> <groupid>org.apache.kafka</groupid> <artifactid>kafka_2.11</artifactid> <version>0.10.2.0</version> </dependency>
wiki
Comments
Post a Comment