Idempotency

What is Idempotency?

The Idempotent Consumer from the EIP patterns is used to filter out duplicate messages: it essentially acts like a Message Filter to filter out duplicates, as reported in the Camel documentation.

From the Enterprise Integration Patterns documentation: Sometimes the same message gets delivered more than once, either because the messaging system is not certain the message has been successfully delivered yet, or because the Message Channel’s quality-of-service has been lowered to improve performance. Message receivers, on the other hand, tend to assume that each message will be delivered exactly once, and tend to cause problems when they repeat processing because of repeat messages. A receiver designed as an Idempotent Receiver handles duplicate messages and prevents them from causing problems in the receiver application.

This is a very useful feature in the integration world and it is an important new feature in the camel-kafka-connector project. Apache Camel provides multiple implementation of the Idempotent Consumer, Camel-Kafka-connector supports the in Memory and Kafka implementations.

Usecases

Suppose you’re using a source connector of any kind. By using the idempotency feature you’ll be able to avoid consuming the same message multiple times.

This means, in the Kafkish language, you won’t ingest the same payload multiple times in the target Kafka topic.

image

In the sink scenario, we’ll stream out of a Kafka topic multiple records, transform/convert/manipulate them and send them to an external system, like a messaging broker, a storage infra, a database etc.

In the Kafka topic used as source we may have multiple repeated records with the same payload or same metadata. Based on this information we can choose to skip the same records while sending data to the external system and for doing this we can leverage the idempotency feature of ckc.

image

Camel-Kafka-connector idempotency configuration

The idempotency feature can be enabled through a number of configuration options. In particular we are talking about:

Name Description Default

camel.idempotency.enabled

If idempotency must be enabled or not

false

camel.idempotency.repository.type

The idempotent repository type to use, possible values are memory and kafka

memory

camel.idempotency.expression.type

How the idempotency will be evaluated: possible values are body and header

body

camel.idempotency.expression.header

The header name that will be evaluated in case of camel.idempotency.expression.type equals to header

null

camel.idempotency.memory.dimension

The Memory dimension of the in memory idempotent Repository

100

camel.idempotency.kafka.topic

The Kafka topic name to use for the idempotent repository

kafka_idempotent_repository

camel.idempotency.kafka.bootstrap.servers

A comma-separated list of host and port pairs that are the addresses of the Kafka brokers where the idempotent repository should live

localhost:9092

camel.idempotency.kafka.max.cache.size

Sets the maximum size of the local key cache

1000

camel.idempotency.kafka.poll.duration.ms

Sets the poll duration (in milliseconds) of the Kafka consumer

100

The in-memory approach has been provided for short running connector workload, while the kafka one is for long running/interruptable connector.

The table is self-explaining.

A typical configuration for the kafka idempotent repository approach could be:

camel.idempotency.enabled=true
camel.idempotency.repository.type=kafka
camel.idempotency.expression.type=body
camel.idempotency.kafka.topic=my.idempotency.topic
camel.idempotency.kafka.bootstrap.servers=localhost:9092
camel.idempotency.kafka.max.cache.size=1500
camel.idempotency.kafka.poll.duration.ms=150

Some of the options can be used with their default value, in this example we’re just listing them for a Kafka idempotent repository configuration.