Camunda Zeebe
??? warning “Experimental module”
**Experimental** module is fully working and tested, but requires additional approbation and usage analytics,
for this reason, API may potentially undergo minor changes before fully stable.
Module for connecting a client and creating workers for an external process orchestrator Camunda 8 (Zeebe)
Dependency¶
Dependency build.gradle
:
Module:
Dependency build.gradle.kts
:
Module:
Configuration¶
Example of a complete client configuration described in the ZeebeClientConfig
class (example values or default values are specified):
zeebe {
client {
executionThreads = 2 //(1)!
keepAlive = "45s" //(2)!
tls = true //(3)!
certificatePath = "/file/path/to/cert.crt" //(4)!
initializationFailTimeout = "15s" //(5)!
grpc {
url = "grpc://localhost:8090" //(6)!
ttl = "1h" //(7)!
maxMessageSize = "4Mib" //(8)!
retryPolicy {
enabled = true //(9)!
attempts = 5 //(10)!
delay = "100ms" //(11)!
delayMax = "5s" //(12)!
stepFactor = 3.0 //(13)!
}
}
http {
url = "http://localhost:8080" //(14)!
}
deployment {
resources = "classpath:bpm" //(15)!
timeout = "45s" //(16)!
}
telemetry {
logging {
enabled = false //(17)!
}
metrics {
enabled = true //(18)!
slo = [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ] //(19)!
}
tracing {
enabled = true //(20)!
}
}
}
}
- Maximum number of threads for task workers, by default equal to the number of CPU cores or minimum
2
. - Connection time without reading activity before sending
KeepAlive
check - Whether to use TLS when connecting on a connection
- File path to the certificate file to use when connecting, or use the default system certificate
- Maximum time to wait for initialization of workers to start when the service starts (default is none)
- URL for connection via gRPC
- Time for how long the message should be buffered at the broker over gRPC connection
- Maximum message size over gRPC connection
- Whether the policy of execution repeat in case of connection error is enabled
- Number of attempts
- Delay between attempts
- maximum duration of retries
- Step coefficient for increasing the delay time between attempts
- URL for HTTP connection
- Paths to find resources that will be loaded into the orchestrator after startup
- Maximum time to wait for resources to be loaded
- Enables module logging (default is
false
) - Enables module metrics (default
true
) - Configures SLO for DistributionSummary metrics
- Enables module tracing (default
true
)
zeebe:
client:
executionThreads: 2 #(1)!
keepAlive: "45s" #(2)!
tls: true #(3)!
certificatePath: "/file/path/to/cert.crt" #(4)!
initializationFailTimeout: "15s" #(5)!
grpc:
url: "grpc:#localhost:8090" //(6)!
ttl: "1h" #(7)!
maxMessageSize: "4Mib" #(8)!
retryPolicy:
enabled: true #(9)!
attempts: 5 #(10)!
delay: "100ms" #(11)!
delayMax: "5s" #(12)!
stepFactor: 3.0 #(13)!
http:
url: "http:#localhost:8080" //(14)!
deployment:
resources: "classpath:bpm" #(15)!
timeout: "45s" #(16)!
telemetry:
logging:
enabled: false #(17)!
metrics:
enabled: true #(18)!
slo: [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ] #(19)!
tracing:
enabled: true #(20)!
- Maximum number of threads for task workers, by default equal to the number of CPU cores or minimum
2
. - Connection time without reading activity before sending
KeepAlive
check - Whether to use TLS when connecting on a connection
- File path to the certificate file to use when connecting, or use the default system certificate
- Maximum time to wait for initialization of workers to start when the service starts (default is none)
- URL for connection via gRPC
- Time for how long the message should be buffered at the broker over gRPC connection
- Maximum message size over gRPC connection
- Whether the policy of execution repeat in case of connection error is enabled
- Number of attempts
- Delay between attempts
- maximum duration of retries
- Step coefficient for increasing the delay time between attempts
- URL for HTTP connection
- Paths to find resources that will be loaded into the orchestrator after startup
- Maximum time to wait for resources to be loaded
- Enables module logging (default is
false
) - Enables module metrics (default
true
) - Configures SLO for DistributionSummary metrics
- Enables module tracing (default
true
)
Worker¶
Worker is a handler that can perform a specific job in a process. Each time such a job needs to be performed, it is polled by worker.
Configuration¶
There is a default configuration that is applied to all workers at creation
and then the named worker-specific settings (by Type
) are then applied overriding the default settings.
You can change the default settings for all interrupters at the same time by changing the default configuration (default
).
Example of a complete worker configuration is described in the ZeebebeWorkerConfig
class (example values or default values are specified):
zeebe {
worker {
job {
default { //(1)!
enabled = true //(2)!
timeout = "15m" //(3)!
maxJobsActive = 32 //(4)!
requestTimeout = "15s" //(5)!
pollInterval = "100ms" //(6)!
tenantIds = [] //(7)!
streamEnabled = false //(8)!
streamTimeout = "15s" //(9)!
backoff {
minDelay = "100ms" //(11)!
maxDelay = "500ms" //(12)!
factor = 1.0 //(10)!
jitter = 1.3 //(13)!
}
}
}
}
}
- Worker (
Type
) or the name of the default settings (default
) - Whether to include an worker
- Maximum time for an worker to complete a single task
- The maximum number of tasks that will be activated simultaneously for this worker only. This is used to control the speed of the data producer to match the speed of the worker (
backpressure
) - Limitation on the query time used to poll a new task by the worker
- Maximum interval between polling of new tasks. The worker automatically tries to always activate new tasks after the job is finished. If no task can be activated after completion, the performer will poll new tasks periodically
- Specifies the tenant identifiers that can own any entities (e.g., process definition, process instances, etc.) resulting from the execution of this command
- If set to enabled, the worker will use a combination of streaming and polling to activate jobs
- If streaming is enabled, sets the maximum lifetime for this thread
- Sets the minimum repetition delay. Note that due to
jitter
, the repeat delay may be lower than this minimum - Sets the maximum repeat delay. Note that
jitter
may exceed this maximum delay - Sets the delay multiplication factor. The previous delay is multiplied by this factor
- Sets the jitter coefficient. The next delay is varied randomly within the range +/- of this coefficient.
For example, if the next delay is calculated as 1s and
jitter
is 0.1, the actual next delay may be somewhere between 0.9 and 1.1s
zeebe:
worker:
job:
default: #(1)!
enabled: true #(2)!
timeout: "15m" #(3)!
maxJobsActive: 32 #(4)!
requestTimeout: "15s" #(5)!
pollInterval: "100ms" #(6)!
tenantIds: [] #(7)!
streamEnabled: false #(8)!
streamTimeout: "15s" #(9)!
backoff:
minDelay: "100ms" #(11)!
maxDelay: "500ms" #(12)!
factor: 1.0 #(10)!
jitter: 1.3 #(13)!
- Worker (
Type
) or the name of the default settings (default
) - Whether to include an worker
- Maximum time for an worker to complete a single task
- The maximum number of tasks that will be activated simultaneously for this worker only. This is used to control the speed of the data producer to match the speed of the worker (
backpressure
) - Limitation on the query time used to poll a new task by the worker
- Maximum interval between polling of new tasks. The worker automatically tries to always activate new tasks after the job is finished. If no task can be activated after completion, the performer will poll new tasks periodically
- Specifies the tenant identifiers that can own any entities (e.g., process definition, process instances, etc.) resulting from the execution of this command
- If set to enabled, the worker will use a combination of streaming and polling to activate jobs
- If streaming is enabled, sets the maximum lifetime for this thread
- Sets the minimum repetition delay. Note that due to
jitter
, the repeat delay may be lower than this minimum - Sets the maximum repeat delay. Note that
jitter
may exceed this maximum delay - Sets the delay multiplication factor. The previous delay is multiplied by this factor
- Sets the jitter coefficient. The next delay is varied randomly within the range +/- of this coefficient.
For example, if the next delay is calculated as 1s and
jitter
is 0.1, the actual next delay may be somewhere between 0.9 and 1.1s
Declarative¶
You can create declaratively JobWorkers that will perform work within the Zeebe orchestrator.
JobWorker
annotation specifies the value of the worker type (Type
) within the process.
Parameter context¶
You can embed the job context as a method argument. Job Context has task, worker and process metadata available for the current task being executed.
Parameter variable¶
You can embed process variables as method arguments, a process variable is part of the process state and can be set on start or as part of the worker result.
Importantly, if any named variables are specified, only those variables will be passed to receive from the orchestrator.
You can specify a variable name from context, or the default method argument name will be used.
Since all process variables are required to be JSON objects, the method argument can also be any mapping of a JSON object.
Parameter variables¶
You can embed multiple process variables at once as a method argument, as a single object that represents JSON objects in the process state.
Result¶
You can also execute a job with some result of the job execution and pass it as a variable to process context.
The result can be returned as a Map<String, Object>
describing the JSON structure of the response.
Or return the named result as a single variable at once,
which will be analogous to a single key and value in a Map<String, Object>
object.
In this case, it is obligatory to specify the name of the variable in the @JobVariable
annotation:
Errors¶
In case you need to terminate execution with an error, you can throw a JobWorkerException
exception where you can specify,
both the error code and the message and process variables if required.
Imperative.¶
You can also create more low-level workers and work directly with ZeebeClient
contracts and its interface.
Signatures¶
Available signatures for repository methods out of the box:
The T
refers to the type of the return value or Void
.
T myMethod()
Optional<T> myMethod()
CompletionStage<T> myMethod()
CompletionStageMono<T> myMethod()
Project Reactor (add dependency)
By T
we mean the type of the return value, either T?
or Unit
.
myMethod(): T
suspend myMethod(): Deferred<T>
Kotlin Coroutine (add dependency asimplementation
)