java - Kafka consumer.poll call not returning kafka ConsumerRecords -




i'm new kafka technology..i'm working on poc need send producerrecord<string, paymnt> kafka topic paymnt pojo..i able publish record & see messages being delivered kafka topic..

d:\kafka\kafka_2.11-0.11.0.0\bin\windows>kafka-run-class.bat kafka.tools.getoffsetshell --broker-list localhost:9092 --topic test --time -1 test:2:0 test:1:0 test:0:4 

however on consumer side i'm not able retrieve same record..when debug consumer code,i see thread call blocking on consumer.poll()

consumer class

public class consumer {    public static void main(string args[]) throws ioexception {     properties props = new properties();     kafkaconsumer<string, paymnt> consumer = null;     props.put("bootstrap.servers", "localhost:9092");     props.put("batch.size", 16384);     props.put("buffer.memory", 33554432);     props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");     props.put("value.deserializer", "com.org.kafkapro.paymentdeserializer");     props.put("enable.auto.commit", "false");     props.put("group.id", "test-consumer-group");     try {         consumer =new kafkaconsumer<string, paymnt>(props);         consumer.subscribe(arrays.aslist("test"));             while(true){              consumerrecords<string, paymnt> records = consumer.poll(200);             (consumerrecord<string,paymnt> record : records)             {                 system.out.println(record.value().tostring());             }              consumer.commitasync();             }      }     catch(exception ex){         ex.printstacktrace();     }finally{         consumer.commitsync();         consumer.close();     } } } 

paymentdeserliazer class

 package com.org.kafkapro;     import java.io.bytearrayinputstream;     import java.io.ioexception;     import java.io.objectinput;     import java.io.objectinputstream;     import java.util.map;      import org.apache.kafka.common.serialization.deserializer;      public class paymentdeserializer implements deserializer<paymnt> {          public paymentdeserializer(){          }          public void close() {             // todo auto-generated method stub          }          public void configure(map<string, ?> arg0, boolean arg1) {             // todo auto-generated method stub          }          public paymnt deserialize(string arg0, byte[] arg1) {             // todo auto-generated method stub             bytearrayinputstream bis = new bytearrayinputstream(arg1);             objectinputstream in = null;             paymnt h2 = null;             try {                 in = new objectinputstream(bis);             } catch (ioexception e1) {                 // todo auto-generated catch block                 e1.printstacktrace();             }             try {                  h2 = (paymnt) in.readobject();             } catch (classnotfoundexception e) {                 // todo auto-generated catch block                 e.printstacktrace();             } catch (ioexception e) {                 // todo auto-generated catch block                 e.printstacktrace();             }             return h2;         }  } 

paymnt class

public class paymnt  {  //fields,getters & setters         } 

serializer

    public class paymentserializer implements serializer<paymnt> {           public paymentserializer(){          }          public void close() {             // todo auto-generated method stub          }          public void configure(map<string, ?> arg0, boolean arg1) {             // todo auto-generated method stub          }          public byte[] serialize(string arg0, paymnt payment) {             // todo auto-generated method stub             try {                 bytearrayoutputstream baos = new bytearrayoutputstream();                 objectoutputstream oos = new objectoutputstream(baos);                 oos.writeobject(payment);                 oos.close();                 byte[] b= baos.tobytearray();                 return b;             } catch (ioexception e) {                 return new byte[0];             }          }  } 

appreciate help.thank you





wiki

Comments

Popular posts from this blog

Asterisk AGI Python Script to Dialplan does not work -

python - Read npy file directly from S3 StreamingBody -

kotlin - Out-projected type in generic interface prohibits the use of metod with generic parameter -