Pages

22/02/2020

[Kafka] Delay poll return for testing scenarios

One of the challenges you will face when testing applications that rely on Kafka to send and receive messages, is that the consumer behaviour is quite erratic.

This is all due to how the call to the poll method works.

In short, three key factors are considered before the method returns some messages, the set poll duration and two consumer configurations:
  • fetch.min.bytes: as soon as this many bytes can be read from the desired topic, the consumer's fetch request is ready to be fulfilled and all messages it has retrieved by then will be returned. Default is 1 byte.
  • fetch.max.wait.ms: how long should the fetch request be kept waiting if there is not enough data to return (see previous parameter). default is 500ms.
  • poll timeout: how long will the consumer wait for the fetch request to provide some data before returning. 0 means no wait.
As you can see, in a test environment you might want to control these timings very precisely, to ensure all your send and retrieve message requests are correctly and consistently fulfilled.

I suggest increasing immediately your fetch.min.bytes configuration to something reasonable according to your test message size.
Then you might want to slow the application down a little with the fetch.max.wait.ms setting to account for slight delays in message reception by the test broker.
Lastly, set the poll timeout to a value higher than the fetch.max.wait.ms to ensure your consumer will never return too early while some test messages might still be in the queue for the specific test case. Also remember that behind the scenes, poll does MUCH MORE than simply retrieving messages, therefore you want to allow for enough time to perform the various administrative tasks.

[Java] Kafka send and receive JSON messages

If you wish to send and receive JSON messages via Apache Kafka, you need to provide a way to convert between your POJOs to JSON and viceversa.

Fortunately, if you use generics, you can provide a single configuration that works in all but the most exceptional cases. 

[Kafka] Exactly once delivery

Among the various configuration settings that you set for your producers and consumers, there is a specific set that is very important if you care about EOD (Exactly Once Delivery).

Producers:
  • enable.idempotence = true
  • max.in.flight.requests.per.connection <= 5
  • retries > 0
  • acks = "all"
Consumers:
  • isolation.level = "read_committed"

The consumer configuration is NOT strictly necessary, since messages that are not part of a transaction are not affected by it, but in case your producer would send messages as part of a transaction, you might want to avoid reading aborted ones.

[Java] Testcontainers copy file to container

Once you have your container up and running, you might want to upload some files to it. The outcome of this action is largely dependent on the container type and configuration, but in general you want to use the copyFileToContainer API paying attention to a couple things:

1. If the target location in the container does not exist, it will not be automatically created for you, you can work around this by issuing a command to do so:

container.execInContainer("mkdir", "-p", "TARGET_LOCATION");

2. The uploaded files will likely be owned by root, but you won't be running apps in it as root, so you want to set good enough permissions (at least 775) to allow the actual application user to access them:

container.copyFileToContainer(MountableFile.forHostPath(FILE_ON_HOST, 0777, "TARGET_LOCATION");

where FILE_ON_HOST is a File object representing the resource you are trying to copy to the container.

[Java] Testcontainers set container name

When you create a new container with testcontainers, you might want to assign it a name so that it's easier to spot when you list all of them from docker. This is as simple as:

 GenericContainer container = new GenericContainer<>("IMAGE_NAME")  
                   .withCreateContainerCmdModifier(cmd -> cmd.withName("YOUR_NAME"));  


[Java] Testcontainers start container with fixed port binding

If you're working in Java, you will likely be using docker in your testing process and quite likely you'll be using docker-compose or the amazing testcontainers project so configure and start a bunch of necessary dependencies.

While in a CI/CD environment for obvious reasons it is NOT recommended to fix a port binding for your containers, there might be instances (Kafka anyone?) where it might be preferable to grab a random port BEFORE the container is started AND force the binding to that port. This allows for easier configuration setup while your test suite boots up.

Once you have your random port, you need to configure your FixedHostPortGenericContainer to bind to it:

 int port = findFreePort();  
   
 GenericContainer container = new FixedhostPortGenericContainer<>("IMAGE_NAME")  
                   .withNetworkMode("host")  
                   .withFixedExposedPort(port, port);  
   
 container.setPortBindings(Collections.singletonList(port + ":" + port));  


which will configure the container to run in host network mode and will fix the host and container port to expose and bind to. In the Kafka example you could then set the KAFKA_ADVERTISED_LISTENERS environment variable directly in the container specification, since your port is now fixed.

[Java] Locate resource on classpath

Sometimes you might need to access a resource from your classpath and you might be tempted to use Spring's ClassPathResource which obviously requires you to have Spring available.

An alternative is the getResource method from the Java classloader:

 File myResource;  
   
 try{  
  final URL resource = getClass().getClassLoader().getResource("MY_RESOURCE");  
   
  if(resource != null){  
   myResource = new File(resource.toURI());  
  }  
 } catch(URISyntaxException e){  
  //something went wrong  
 }