Send and receive JSON objects with Kafka java client

Send and receive JSON objects with Kafka java client

Kafka gives us the ability to subscribe and publish records of any type. Here I’m going to demonstrate how to send Java objects as JSON and map any incoming JSON string into Java object.

Dependencies needed

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

Let’s start

First what we need to do is create a Serializer and Deserializer to handle the mapping between JSON and Java objects. To create a serializer class, we need to implement org.apache.kafka.common.serialization.Serializer interface and for the deserializer, we need to implement org.apache.kafka.common.serialization.Deserializer interface. Both of these interfaces consists with three methods.

  1. Configure –Method called at startup with configurations.
  2. Serialize/Deserialize –Handle the Serializtion/Deserializtion.
  3. Close –Method called when kafka session is closing.

Serializer Implementation

public class KafkaJsonSerializer implements Serializer {

    private Logger logger = LogManager.getLogger(this.getClass());

    @Override
    public void configure(Map map, boolean b) {

    }

    @Override
    public byte[] serialize(String s, Object o) {
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            retVal = objectMapper.writeValueAsBytes(o);
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
        return retVal;
    }

    @Override
    public void close() {

    }
}

Deserializer Implementation

Notice that in order to generalise the class to be used with different object types, I’m using a constructor to pass the type.

public class KafkaJsonDeserializer<T> implements Deserializer {

    private Logger logger = LogManager.getLogger(this.getClass());

    private Class <T> type;

    public KafkaJsonDeserializer(Class type) {
        this.type = type;
    }

    @Override
    public void configure(Map map, boolean b) {

    }

    @Override
    public Object deserialize(String s, byte[] bytes) {
        ObjectMapper mapper = new ObjectMapper();
        T obj = null;
        try {
            obj = mapper.readValue(bytes, type);
        } catch (Exception e) {

            logger.error(e.getMessage());
        }
        return obj;
    }

    @Override
    public void close() {

    }
}

Producer example

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
Producer<String,CustomObject> kafkaProducer = 
  new KafkaProducer<>(
    props,
    new StringSerializer(),
    new KafkaJsonSerializer()
  );
// Send a message
kafkaProducer.send(new ProducerRecord<>("TOPIC", "0", customObject));

Consumer example

Notice that when initializing our KafkaJsonDeserializer , you have to pass the Object type as a constructor argument.

Consumer<String, CustomObject> consumer =
        new KafkaConsumer<>(props,new StringSerializer(),new KafkaJsonDeserializer<CustomObject>(CustomObject.class));
// Subscribe to the topic
consumer.subscribe(Collections.singletonList("TOPIC"));

That’s it.

An example can be found here

Thank you for taking the time to read this.

Leave a Comment

%d bloggers like this: