14/02/2024

[Java] Workflow management in microservices

Also published on Medium.

Simple workflow management with checkpointing, error handling, and error correction to allow the interoperability of components.

Current applications are implemented as connected microservices that model workflows which support the use cases they were built for. These workflows are the concrete implementation of the abstract series of operations handled locally by a component or distributed across a deployment. In either case, when a failure happens it’s up to the application design to provide resilience.

Assuming the application follows RESTful web application principles and the twelve factor app concepts, the approach to resilience and error correction is still left open to be solved specifically, as the solution clearly depends on the use case.

Here, we propose a simple framework that adds asynchronous workflow capabilities with checkpointing to a distributed application. It is independent of the storage solution used, but it is based on Java and to some extent the Spring framework.

The crucial step is to sketch all the workflows that describe the application and to identify the failure points that provide a recovery option, as well as the idempotency requirements for such operations . For example, all calls to external components must definitely be considered, as well as some local operations such as writing to a file or database.

Once these steps have been identified, the next step is to determine which ones are recoverable (for example, should the failed call be attempted again or is it too late or unnecessary) and what the idempotency requirements are, as sometimes repeating an operation can lead to data or process duplication, making a simple retry of a call potentially more harmful than the initial failure.

For the steps that can be managed and safely retried, it is then necessary to track information in persistent storage with regard to the operation itself, as well as the desired input. This can be achieved by dumping a serialized version of the input or a key-value collection for each input value consisting of the call as well as a schematic representation of the call itself, such as the operation name, an ID, and contextual data that is not used as input but that is helpful for identifying and reconstructing the failed operation so that it can be retriggered later.

It is then necessary to provide some trigger mechanism to restart an operation from the last successful checkpoint; this could be anything that allows a component to reconstruct the saved operation input data and retrigger it.

Finally, the pattern for each checkpointed workflow would become:

1. Start the workflow

2. When a checkpointed operation is reached, store its data in persistent storage (in a separate transaction)

3. Trigger the operation (either synchronously or asynchronously)

4. In case of error, depending on the failure type, attempt any of the following as relevant for the specific use case:

a. Retry the operation

b. Modify the operation input and retry it

c. Provide a crafted operation response without retrying it

d. Mark the operation as failed and raise an appropriate error. The caller would need to handle such a case, and the operation can be attempted (if necessary) again later on, possibly after manual investigation into the issue

There are a couple of important pitfalls when implementing such a framework using Java/Spring:

1- Methods with the @Transactional annotation with a different isolation or different propagation requirements will not respect those settings if the caller of such a method resides in the same class. The pattern only works for calls received via a proxy that is automatically generated by Spring. This can be easily solved by providing a Bean that implements an executor class; this can invoke the desired method via lambda:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
import org.springframework.stereotype.Service;

import org.springframework.transaction.annotation.Propagation;

import org.springframework.transaction.annotation.Transactional;

import java.util.function.Supplier;

/**

* Since spring ignores transaction settings for methods within the same class, we need a separate service

* to run isolated transactions which can be called from anywhere simply by supplying the method to execute

*/

@Service

public class TransactionHandlerService {

/**

* Runs the given method in the same transaction as the caller

*

* @param supplier the method to execute

* @param <T>

* @return the result of the invoked method

*/

@Transactional(propagation = Propagation.REQUIRED)

public <T> T runInSameTransaction(Supplier<T> supplier) {

return supplier.get();

}

/**

* Runs the given void method in the same transaction as the caller

*

* @param supplier the method to execute

*/

@Transactional(propagation = Propagation.REQUIRED)

public void runVoidInSameTransaction(Runnable supplier) {

supplier.run();

}

/**

* Runs the given method in a separate transaction

*

* @param supplier the method to execute

* @param <T>

* @return the result of the invoked method

*/

@Transactional(propagation = Propagation.REQUIRES_NEW)

public <T> T runInNewTransaction(Supplier<T> supplier) {

return supplier.get();

}

/**

* Runs the given method in a separate transaction with strict isolation

*

* @param supplier the method to execute

* @param <T>

* @return the result of the invoked method

*/

@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.SERIALIZABLE)

public <T> T runInNewTransactionIsolated(Supplier<T> supplier) {

return supplier.get();

}

}

We can then execute the appropriate call by supplying the desired method as input to the executor, for example:

transactionHandlerService.runInNewTransaction(() -> methodToExecute(input));

2- Synchronization and locks on classes and methods only block execution until the protected section has been completed. This can have undesired effects when mixed with different transactionality settings.

For example, the following parallel execution:

methodA() calls syncMethod() then calls somethingElse()

methodB() calls syncMethod() then calls somethingElse()

What happens:

methodA opens Transaction A enters syncMethod

methodB opens Transaction B waits on syncMethod

methodA exits syncMethod, starts executing somethingElse

methodB enters syncMethod while somethingElse completes and commits Transaction A

methodB exits syncMethod, starts executing somethingElse

methodB completes somethingElse and commits Transaction B

Now the important part is point 4. If the syncMethod requires fresh data from other executions, methodB will NOT see that data when it enters syncMethod since Transaction A has NOT yet been committed.

Synchronization only blocks a thread from entering the protected code block, but as soon as the resource is free, the next waiting thread will enter immediately.

This means that, to allow other methods to read the latest data while in the protected section, any operation done in synchronized method must be executed in a separate transaction, for example using the TransactionHandler helper from the previous point .

No comments:

Post a Comment

With great power comes great responsibility