java - Kafka consumer.poll call not returning kafka ConsumerRecords -
i'm new kafka technology..i'm working on poc need send producerrecord<string, paymnt>
kafka topic paymnt pojo..i able publish record & see messages being delivered kafka topic..
d:\kafka\kafka_2.11-0.11.0.0\bin\windows>kafka-run-class.bat kafka.tools.getoffsetshell --broker-list localhost:9092 --topic test --time -1 test:2:0 test:1:0 test:0:4
however on consumer side i'm not able retrieve same record..when debug consumer code,i see thread call blocking on consumer.poll()
consumer class
public class consumer { public static void main(string args[]) throws ioexception { properties props = new properties(); kafkaconsumer<string, paymnt> consumer = null; props.put("bootstrap.servers", "localhost:9092"); props.put("batch.size", 16384); props.put("buffer.memory", 33554432); props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); props.put("value.deserializer", "com.org.kafkapro.paymentdeserializer"); props.put("enable.auto.commit", "false"); props.put("group.id", "test-consumer-group"); try { consumer =new kafkaconsumer<string, paymnt>(props); consumer.subscribe(arrays.aslist("test")); while(true){ consumerrecords<string, paymnt> records = consumer.poll(200); (consumerrecord<string,paymnt> record : records) { system.out.println(record.value().tostring()); } consumer.commitasync(); } } catch(exception ex){ ex.printstacktrace(); }finally{ consumer.commitsync(); consumer.close(); } } }
paymentdeserliazer class
package com.org.kafkapro; import java.io.bytearrayinputstream; import java.io.ioexception; import java.io.objectinput; import java.io.objectinputstream; import java.util.map; import org.apache.kafka.common.serialization.deserializer; public class paymentdeserializer implements deserializer<paymnt> { public paymentdeserializer(){ } public void close() { // todo auto-generated method stub } public void configure(map<string, ?> arg0, boolean arg1) { // todo auto-generated method stub } public paymnt deserialize(string arg0, byte[] arg1) { // todo auto-generated method stub bytearrayinputstream bis = new bytearrayinputstream(arg1); objectinputstream in = null; paymnt h2 = null; try { in = new objectinputstream(bis); } catch (ioexception e1) { // todo auto-generated catch block e1.printstacktrace(); } try { h2 = (paymnt) in.readobject(); } catch (classnotfoundexception e) { // todo auto-generated catch block e.printstacktrace(); } catch (ioexception e) { // todo auto-generated catch block e.printstacktrace(); } return h2; } }
paymnt class
public class paymnt { //fields,getters & setters }
serializer
public class paymentserializer implements serializer<paymnt> { public paymentserializer(){ } public void close() { // todo auto-generated method stub } public void configure(map<string, ?> arg0, boolean arg1) { // todo auto-generated method stub } public byte[] serialize(string arg0, paymnt payment) { // todo auto-generated method stub try { bytearrayoutputstream baos = new bytearrayoutputstream(); objectoutputstream oos = new objectoutputstream(baos); oos.writeobject(payment); oos.close(); byte[] b= baos.tobytearray(); return b; } catch (ioexception e) { return new byte[0]; } } }
appreciate help.thank you
wiki
Comments
Post a Comment