[Java] Schedule tasks with Quartz and Spring

We saw how to schedule tasks with ScheduledExecutorService, and we now we will see how to achieve the same result using the Quartz framework.

Much like before, we have two key concepts:
  • job: a job is a runnable class that implements the Job interface
  • trigger: a Trigger is a specific schedule for a job. A trigger can only be linked to a job, and a job can be linked to multiple triggers.
You can find Quartz on Maven.


[Java] Schedule tasks with ScheduledExecutorService

Java offers a ScheduledExecutorService that allows developers to create and schedule tasks.

A task is a class that implements ONE method to be executed that performs the desired duties. It is NOT necessary for the task itself to implement the Runnable interface, although the tasks will be run as a Thread.

An important thing to notice is that by default threads are NOT marked as daemons, therefore it might be worth setting such flag according to your needs.

Another important thing to note, is that the scheduler will hold on to references to cancelled tasks, which might lead to memory leaks, so it might be preferable to setRemoveOnCancelPolicy to true.


[Kafka] Delay poll return for testing scenarios

One of the challenges you will face when testing applications that rely on Kafka to send and receive messages, is that the consumer behaviour is quite erratic.

This is all due to how the call to the poll method works.

In short, three key factors are considered before the method returns some messages, the set poll duration and two consumer configurations:
  • fetch.min.bytes: as soon as this many bytes can be read from the desired topic, the consumer's fetch request is ready to be fulfilled and all messages it has retrieved by then will be returned. Default is 1 byte.
  • fetch.max.wait.ms: how long should the fetch request be kept waiting if there is not enough data to return (see previous parameter). default is 500ms.
  • poll timeout: how long will the consumer wait for the fetch request to provide some data before returning. 0 means no wait.
As you can see, in a test environment you might want to control these timings very precisely, to ensure all your send and retrieve message requests are correctly and consistently fulfilled.

I suggest increasing immediately your fetch.min.bytes configuration to something reasonable according to your test message size.
Then you might want to slow the application down a little with the fetch.max.wait.ms setting to account for slight delays in message reception by the test broker.
Lastly, set the poll timeout to a value higher than the fetch.max.wait.ms to ensure your consumer will never return too early while some test messages might still be in the queue for the specific test case. Also remember that behind the scenes, poll does MUCH MORE than simply retrieving messages, therefore you want to allow for enough time to perform the various administrative tasks.

[Java] Kafka send and receive JSON messages

If you wish to send and receive JSON messages via Apache Kafka, you need to provide a way to convert between your POJOs to JSON and viceversa.

Fortunately, if you use generics, you can provide a single configuration that works in all but the most exceptional cases. 

[Kafka] Exactly once delivery

Among the various configuration settings that you set for your producers and consumers, there is a specific set that is very important if you care about EOD (Exactly Once Delivery).

  • enable.idempotence = true
  • max.in.flight.requests.per.connection <= 5
  • retries > 0
  • acks = "all"
  • isolation.level = "read_committed"

The consumer configuration is NOT strictly necessary, since messages that are not part of a transaction are not affected by it, but in case your producer would send messages as part of a transaction, you might want to avoid reading aborted ones.

[Java] Testcontainers copy file to container

Once you have your container up and running, you might want to upload some files to it. The outcome of this action is largely dependent on the container type and configuration, but in general you want to use the copyFileToContainer API paying attention to a couple things:

1. If the target location in the container does not exist, it will not be automatically created for you, you can work around this by issuing a command to do so:

container.execInContainer("mkdir", "-p", "TARGET_LOCATION");

2. The uploaded files will likely be owned by root, but you won't be running apps in it as root, so you want to set good enough permissions (at least 775) to allow the actual application user to access them:

container.copyFileToContainer(MountableFile.forHostPath(FILE_ON_HOST, 0777, "TARGET_LOCATION");

where FILE_ON_HOST is a File object representing the resource you are trying to copy to the container.

[Java] Testcontainers set container name

When you create a new container with testcontainers, you might want to assign it a name so that it's easier to spot when you list all of them from docker. This is as simple as:

 GenericContainer container = new GenericContainer<>("IMAGE_NAME")  
                   .withCreateContainerCmdModifier(cmd -> cmd.withName("YOUR_NAME"));