Try it out on Kubernetes

You can try CamelKafkaConnector using the Strimzi Operator, which simplifies Kafka cluster deployment and management on top of plain Kubernetes. This procedure assumes that you have cluster-admin rights, Internet access and an external registry for pushing images (i.e. quay.io).

Deploy Kafka and KafkaConnect

First, we create a new namespace and deploy a 3-nodes Kafka cluster:

NAMESPACE="kafka"
STRIMZI="0.20.x"

# create a new namespace
kubectl create namespace $NAMESPACE && kubectl config set-context --current --namespace=$NAMESPACE

# deploy Strimzi operator
curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/$STRIMZI/strimzi-cluster-operator-$STRIMZI.yaml \
    | sed "s/namespace: .*/namespace: $NAMESPACE/g" | kubectl apply -f -

# deploy Kafka cluster
kubectl apply -f https://raw.githubusercontent.com/strimzi/strimzi-kafka-operator/release-$STRIMZI/examples/kafka/kafka-persistent.yaml

Next, we build a custom KafkaConnect image to include all needed connectors (use your own registry here):

CKC_VERSION="0.7.0"
STRIMZI_IMG="strimzi/kafka:latest-kafka-2.6.0"
REGISTRY_URL="quay.io"
REGISTRY_USR="fvaleri"

TMP="/tmp/my-connect"
BASEURL="https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector"
PLUGINS=(
    "$BASEURL/camel-file-kafka-connector/$CKC_VERSION/camel-file-kafka-connector-$CKC_VERSION-package.zip"
    "$BASEURL/camel-sjms2-kafka-connector/$CKC_VERSION/camel-sjms2-kafka-connector-$CKC_VERSION-package.zip"
)

# download connect plugins
rm -rf $TMP && mkdir -p $TMP/plugins
for url in "${PLUGINS[@]}"; do
    curl -sL $url -o $TMP/plugins/file.zip && unzip -qq $TMP/plugins/file.zip -d $TMP/plugins
    rm -f $TMP/plugins/file.zip
done

# build and push the custom image
echo -e "FROM $STRIMZI_IMG\nCOPY ./plugins/ /opt/kafka/plugins/\nUSER 1001" > $TMP/Dockerfile
sudo podman build --layers=false -t $REGISTRY_USR/my-connect:1.0.0 -f $TMP/Dockerfile
sudo podman login -u $REGISTRY_USR $REGISTRY_URL
sudo podman push localhost/$REGISTRY_USR/my-connect:1.0.0 $REGISTRY_URL/$REGISTRY_USR/my-connect:1.0.0
sudo podman push localhost/$REGISTRY_USR/my-connect:1.0.0 $REGISTRY_URL/$REGISTRY_USR/my-connect:latest

Note: your plugin must be under a directory in the plugins (i.e /opt/kafka/plugins/myplugin/<my-plugin-jars-here>).

Finally, we deploy the KafkaConnect single-node cluster using our custom image:

kubectl apply -f - <<'EOF'
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect
  annotations:
    # enable connect operator
    strimzi.io/use-connector-resources: "true"
spec:
  replicas: 1
  version: 2.6.0
  image: $REGISTRY_URL/$REGISTRY_USR/my-connect:1.0.0
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  resources:
    requests:
      cpu: 250m
      memory: 512Mi
    limits:
      cpu: 500m
      memory: 1Gi
  jvmOptions:
    gcLoggingEnabled: false
  config:
    group.id: my-connect
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    offset.storage.topic: my-connect-offsets
    config.storage.topic: my-connect-configs
    status.storage.topic: my-connect-status
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3
EOF

Create KafkaConnector instance

A soon as the infrastructure is running, we can create an instance of a connector plugin:

kubectl apply -f - <<'EOF'
kind: KafkaConnector
apiVersion: kafka.strimzi.io/v1alpha1
metadata:
  name: file-sink
  labels:
    # must match connect cluster name
    strimzi.io/cluster: my-connect
spec:
  tasksMax: 1
  class: org.apache.camel.kafkaconnector.file.CamelFileSinkConnector
  config:
    topics: my-topic
    camel.sink.url: file:/tmp/?fileName=test.txt&fileExist=Append
EOF

You can check the status of the connector instance using:

kubectl describe kafkaconnector file-sink

Check received messages

To test the connector instance, we can send a message to the topic and see if it is written to file:

# send a message to Kafka
echo "Hello CamelKafkaConnector" | kubectl exec -i my-cluster-kafka-0 -c kafka -- \
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

# read the message from file
POD_NAME=$(kubectl get pods | grep my-connect | grep Running | cut -d " " -f1) && \
    kubectl exec -i $POD_NAME -- cat /tmp/test.txt