Basic Configuration

In this page we list the Camel Kafka Configuration which are not part of the camel-catalog material and are not part of the kafka connect framework. For the specific connector configuration you can have a look at the single documentation pages.

For a Sink connector the basic options are:

Name Description Default Priority

camel.sink.marshal

The camel dataformat name to use to marshal data to the destination

null

HIGH

camel.sink.unmarshal

The camel dataformat name to use to unmarshal data from the topic

null

HIGH

camel.sink.contentLogLevel

Log level for the record’s content. Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF.

OFF

HIGH

camel.beans.aggregate

A reference to an aggregate bean, in the form of #class:

null

MEDIUM

camel.aggregation.size

The size of the aggregation, to be used in combination with camel.beans.aggregate

10

MEDIUM

camel.aggregation.timeout

The timeout of the aggregation, to be used in combination with camel.beans.aggregate

500L

MEDIUM

camel.error.handler

The error handler to use: possible value are 'no' or 'default'

default

MEDIUM

camel.error.handler.max.redeliveries

The maximum redeliveries to be use in case of Default Error Handler

0

MEDIUM

camel.error.handler.redelivery.delay

The initial redelivery delay in milliseconds in case of Default Error Handler

1000L

MEDIUM

camel.remove.headers.pattern

The pattern of the headers we want to exclude from the exchange

null

MEDIUM

camel.idempotency.enabled

If idempotency must be enabled or not

false

LOW

camel.idempotency.repository.type

The idempotent repository type to use, possible values are memory and kafka

memory

LOW

camel.idempotency.expression.type

How the idempotency will be evaluated: possible values are body and header

body

LOW

camel.idempotency.expression.header

The header name that will be evaluated in case of camel.idempotency.expression.type equals to header

null

LOW

camel.idempotency.memory.dimension

The Memory dimension of the in memory idempotent Repository

100

LOW

camel.idempotency.kafka.topic

The Kafka topic name to use for the idempotent repository

kafka_idempotent_repository

LOW

camel.idempotency.kafka.bootstrap.servers

A comma-separated list of host and port pairs that are the addresses of the Kafka brokers where the idempotent repository should live

localhost:9092

LOW

camel.idempotency.kafka.max.cache.size

Sets the maximum size of the local key cache

1000

LOW

camel.idempotency.kafka.poll.duration.ms

Sets the poll duration (in milliseconds) of the Kafka consumer

100

LOW

For a Source connector the basic options are:

Name Description Default Priority

camel.source.marshal

The camel dataformat name to use to marshal data to the destination

null

HIGH

camel.source.unmarshal

The camel dataformat name to use to unmarshal data from the topic

null

HIGH

camel.source.contentLogLevel

Log level for the record’s content. Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF.

OFF

HIGH

camel.source.maxBatchPollSize

The max number of messages retrieved in a single poll()

1000L

MEDIUM

camel.source.maxPollDuration

The maximum time in milliseconds spent in a single call to poll()

1000L

MEDIUM

camel.source.pollingConsumerQueueSize

The queue size for the internal hand-off queue between the polling consumer, and producers sending data into the queue.

1000L

MEDIUM

camel.source.pollingConsumerBlockTimeout

To use a timeout (in milliseconds) when the producer is blocked if the internal queue is full. If the value is 0 or negative then no timeout is in use.

0L

MEDIUM

camel.source.pollingConsumerBlockWhenFull

Whether to block any producer if the internal queue is full.

true

MEDIUM

camel.source.camelMessageHeaderKey

The name of a camel message header containing an unique key that can be used as a Kafka message key. If this is not specified, then the Kafka message will not have a key.

null

MEDIUM

camel.beans.aggregate

A reference to an aggregate bean, in the form of #class:

null

MEDIUM

camel.aggregation.size

The size of the aggregation, to be used in combination with camel.beans.aggregate

10

MEDIUM

camel.aggregation.timeout

The timeout of the aggregation, to be used in combination with camel.beans.aggregate

500L

MEDIUM

camel.error.handler

The error handler to use: possible value are 'no' or 'default'

default

MEDIUM

camel.error.handler.max.redeliveries

The maximum redeliveries to be use in case of Default Error Handler

0

MEDIUM

camel.error.handler.redelivery.delay

The initial redelivery delay in milliseconds in case of Default Error Handler

1000L

MEDIUM

camel.remove.headers.pattern

The pattern of the headers we want to exclude from the exchange

null

MEDIUM

camel.idempotency.enabled

If idempotency must be enabled or not

false

LOW

camel.idempotency.repository.type

The idempotent repository type to use, possible values are memory and kafka

memory

LOW

camel.idempotency.expression.type

How the idempotency will be evaluated: possible values are body and header

body

LOW

camel.idempotency.expression.header

The header name that will be evaluated in case of camel.idempotency.expression.type equals to header

null

LOW

camel.idempotency.memory.dimension

The Memory dimension of the in memory idempotent Repository

100

LOW

camel.idempotency.kafka.topic

The Kafka topic name to use for the idempotent repository

kafka_idempotent_repository

LOW

camel.idempotency.kafka.bootstrap.servers

A comma-separated list of host and port pairs that are the addresses of the Kafka brokers where the idempotent repository should live

localhost:9092

LOW

camel.idempotency.kafka.max.cache.size

Sets the maximum size of the local key cache

1000

LOW

camel.idempotency.kafka.poll.duration.ms

Sets the poll duration (in milliseconds) of the Kafka consumer

100

LOW

For more options related to single connector you can have a look at Connectors list.

Message Level Headers

Certain sink components offer additional configuration and adjustment of behavior by setting specific headers on the message sent via Kafka. For example the file sink connector allows the user to set the file name either via the configuration property or via the header CamelHeader.CamelFileName. This is usually noted on the connector documentation, but as a rule of thumb, users are advised to also check the Camel components documentation to verify what message headers are available. If they are available, to configure them directly from your Kafka producer, you can append them with the string CamelHeader. (i.e.: CamelHeader.CamelFileName, CamelHeader.CamelFilePath, etc) and include them on the message headers from Kafka.