Fortunately, if you use generics, you can provide a single configuration that works in all but the most exceptional cases.
First, you will need a serializer:
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
public class KafkaJsonSerializer implements Serializer {
@Override
public void configure(Map map, boolean bool){
//nothing to do
}
@Override
public byte[] serialize(String s, Object o) {
final ObjectMapper objectMapper = new ObjectMapper();
byte[] output;
try {
output = objectMapper.writeValueAsBytes(o);
} catch(Exception e) {
//something went wrong
}
return output;
}
@Override
public void close(){
//nothing to do
}
}
then a deserializer (which MUST be typed):
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
public class KafkaJsonDeserializer<T> implements Deserializer {
private Class<T> clazz;
public KafkaJsonDeserializer(Class clazz){
this.clazz = clazz;
}
@Override
public void configure(Map map, boolean bool){
//nothing to do
}
@Override
public T deserialize(String s, byte[] b) {
final ObjectMapper objectMapper = new ObjectMapper();
T output;
try {
output = objectMapper.readValue(b, clazz);
} catch(Exception e) {
//something went wrong
}
return output;
}
@Override
public void close(){
//nothing to do
}
}
then you need to configure the producer by setting the following option:
value.serializer = KafkaJsonSerializer.class
and in the consumer:
value.deserializer = KafkaJsonDeserializer.class
and finally expose the typed KafkaConsumer via your own typed class:
public class KafkaReceiver<T> {
public Consumer<String, T> kafkaConsumer;
public KafkaReceiver(final Map<String, Object> config, final KafkaJsonDeserializer kafkaJsonDeserializer, final String topic){
kafkaConsumer = new KafkaConsumer<>(config, new StringDeserializer(), kafkaJsonDeserializer);
//subscribe to the desired topic
kafkaConsumer.subscribe(Collections.singleton(topic));
//perform an initial poll to initialize this consumer, remember not to actually commit messages yet!
kafkaConsumer.poll(Duration.ZERO);
}
}
Now you can create a typed consumer with:
KafkaReceiver<YOUR_CLASS> receiver = new KafkaReceiver<>(YOUR_CONFIG, new KafkaJsonDeserializer(YOUR_CLASS.class), YOUR_TOPIC);
All of this is unfortunately necessary due to the fact that the generics in Java do not allow you to specify at COMPILE time what the ACTUAL type would be, which means that for those cases such as a deserializer where you MUST know the EXACT type at compile time, you have to work around all this with some generics wrapping.
No comments:
Post a Comment
With great power comes great responsibility