22/02/2020

[Java] Kafka send and receive JSON messages

If you wish to send and receive JSON messages via Apache Kafka, you need to provide a way to convert between your POJOs to JSON and viceversa.

Fortunately, if you use generics, you can provide a single configuration that works in all but the most exceptional cases. 


First, you will need a serializer:

 import com.fasterxml.jackson.databind.ObjectMapper;  
 import java.util.Map;  
 import org.apache.kafka.common.serialization.Serializer;  
   
 public class KafkaJsonSerializer implements Serializer {  
   
  @Override  
  public void configure(Map map, boolean bool){  
   //nothing to do  
  }  
   
  @Override  
  public byte[] serialize(String s, Object o) {  
     
   final ObjectMapper objectMapper = new ObjectMapper();  
   
   byte[] output;  
   
   try {  
    output = objectMapper.writeValueAsBytes(o);  
   } catch(Exception e) {  
    //something went wrong  
   }  
   
   return output;  
  }  
   
  @Override  
  public void close(){  
   //nothing to do  
  }  
 }  


then a deserializer (which MUST be typed):

 import com.fasterxml.jackson.databind.ObjectMapper;   
  import java.util.Map;   
  import org.apache.kafka.common.serialization.Deserializer;   
     
  public class KafkaJsonDeserializer<T> implements Deserializer {   
     
  private Class<T> clazz;   
     
  public KafkaJsonDeserializer(Class clazz){   
   this.clazz = clazz;   
  }   
     
  @Override   
  public void configure(Map map, boolean bool){   
   //nothing to do   
  }   
     
  @Override   
  public T deserialize(String s, byte[] b) {   
      
   final ObjectMapper objectMapper = new ObjectMapper();   
     
   T output;   
     
   try {   
   output = objectMapper.readValue(b, clazz);   
   } catch(Exception e) {   
   //something went wrong   
   }   
     
   return output;   
  }   
     
  @Override   
  public void close(){   
   //nothing to do   
  }   
}   


then you need to configure the producer by setting the following option:

value.serializer = KafkaJsonSerializer.class

and in the consumer:

value.deserializer = KafkaJsonDeserializer.class

and finally expose the typed KafkaConsumer via your own typed class:

 public class KafkaReceiver<T> {  
   
  public Consumer<String, T> kafkaConsumer;  
   
  public KafkaReceiver(final Map<String, Object> config, final KafkaJsonDeserializer kafkaJsonDeserializer, final String topic){  
   kafkaConsumer = new KafkaConsumer<>(config, new StringDeserializer(), kafkaJsonDeserializer);  
   
   //subscribe to the desired topic
   kafkaConsumer.subscribe(Collections.singleton(topic));  
   
   //perform an initial poll to initialize this consumer, remember not to actually commit messages yet!
   kafkaConsumer.poll(Duration.ZERO);  
  }  
 }  

Now you can create a typed consumer with:

 KafkaReceiver<YOUR_CLASS> receiver = new KafkaReceiver<>(YOUR_CONFIG, new KafkaJsonDeserializer(YOUR_CLASS.class), YOUR_TOPIC);  

All of this is unfortunately necessary due to the fact that the generics in Java do not allow you to specify at COMPILE time what the ACTUAL type would be, which means that for those cases such as a deserializer where you MUST know the EXACT type at compile time, you have to work around all this with some generics wrapping.

No comments:

Post a Comment

With great power comes great responsibility