Important Notice: CloudKarafka is shutting down. Read all about it in our End of Life Announcement

Guide: Kafka Connect

This guide will cover how to run Kafka Connect in standalone mode on your server in AWS using the hosted Kafka Cluster at Cloudkarafka.
You need a server running Ubuntu in your AWS account that you can access with ssh. To run Kafka Connect without memory issues the server needs to have at least 2Gb of memory.

Create a Kafka cluster

Create the Kafka cluster at cloudkarafka.com, make sure to select a subnet that doesn’t conflict with the subnet that your machines (in you account) is using.

Setup VPC peering

See this Guide on how to set up VPC Peering connections Guide: VPC Peering

Download

Kafka Connect is available both from the Apache Kafka package or from Confluent Platform download, for this guide we will use the Apache Kafka package, but it should be quite similar setup if you choose to use Confluent Platform.
Below is a link to where you can download Apache Kafka version 2.5.1, this is the latest version when writing this but there might be newer versions available. Choose the version that you want https://www.apache.org/dyn/closer.cgi?path=/kafka/2.5.1/kafka_2.13-2.5.1.tgz

Download the tarball and extract into /opt

tar -xzvf kafka_2.13-2.5.1.tgz -C /opt

Configure

# /opt/kafka_2.13-2.5.1/config/connect-standalone.properties

bootstrap.servers=10.56.72.161:9092,10.56.72.51:9092,10.56.72.225:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.file.filename=/opt/connect.offsets
Note: You must replace the IPs on the first line to the IPs of the brokers in your cluster, they can be found on the details page.

You have now configured Kafka Connect but we have yet to configured any workers to actually move any data. Kafka Connect has several workers to help you move data around, for this guide we will only configure two but it should be enough for you to get going and configure the others.

Console Sink

This just prints everything coming into a topic to standard out so you can see what’s happening.

# /opt/kafka_2.13-2.5.1/config/connect-console-sink.properties

name=guide-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=connect-test

RabbitMQ Sink

Forward all records from a kafka topic to a queue in RabbitMQ
The RabbitMQ connector doesn’t come bundled with Kafka Connect so you need to download the library first

cd /opt
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-rabbitmq-sink/versions/1.3.0-preview/confluentinc-kafka-connect-rabbitmq-sink-1.3.0-preview.zip
unzip confluentinc-kafka-connect-rabbitmq-sink-1.3.0-preview.zip

Then the jar files for the connector needs to be added to the plugin path so Kafka Connect can load them, this is done in /opt/kafka_2.13-2.5.1/config/connect-standalone.properties

# /opt/kafka_2.13-2.5.1/config/connect-standalone.properties

plugin.path=/opt/confluentinc-kafka-connect-rabbitmq-sink-1.3.0-preview/lib

Time to configure the connector

# /opt/kafka_2.13-2.5.1/config/connect-rabbitmq-sink.properties

name=rabbitmq-sink-connector
topics=connect-guide
tasks.max=1
connector.class=io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
confluent.topic.bootstrap.servers=10.56.72.161:9092,10.56.72.51:9092,10.56.72.225:9092
confluent.topic.replication.factor=1
rabbitmq.host=test-serious-butterfly.rmq.cloudamqp.com
rabbitmq.virtual.host=gtwvkvpq
rabbitmq.username=gtwvkvpq
rabbitmq.password=*************************************
rabbitmq.exchange=amq.topic
rabbitmq.routing.key=kafka.connect
rabbitmq.delivery.mode=PERSISTENT

This will make the connector publish all records from topic connect-guide to RabbitMQ on host test-serious-butterfly.rmq.cloudamqp.com to vhost gtwvkvpq, all messages will be published to exchange amq.topic using the routing key kafka.connect. In order to see the messages you need to create a queue on that vhost and bind that queue to the exchange.

One thing to note here is that the value.converter must be the ByteArrayConverter otherwise the connector will not be able to publish the messages to RabbitMQ.

Run

Once we have configured Kafka Connect, Console sink and the RabbitMQ sink it’s time to run it

/opt/kafka_2.13-2.5.1/bin/connect-standalone.sh \
/opt/kafka_2.13-2.5.1/config/connect-standalone.properties \
/opt/kafka_2.13-2.5.1/config/connect-console-sink.properties \
/opt/kafka_2.13-2.5.1/config/connect-rabbitmq-sink.properties

Unless you have change the log4j.properties file, the log level is INFO, which means that it will log very much into your terminal, but below you can see some key rows to keep a look for

[2020-08-26 07:05:32,795] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:69)
.....
[2020-08-26 07:05:33,940] INFO Added plugin 'io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:191)
....
[2020-08-26 07:05:47,511] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
[2020-08-26 07:05:48,339] INFO Creating connector rabbitmq-sink-connector of type io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector (org.apache.kafka.connect.runtime.Worker:253)
[2020-08-26 07:05:48,342] INFO Instantiated connector rabbitmq-sink-connector with version 1.3.0 of type class io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector (org.apache.kafka.connect.runtime.Worker:256)
[2020-08-26 07:05:48,343] INFO RabbitMQSinkConnectorConfig values:
    confluent.license =
    confluent.topic = _confluent-command
    confluent.topic.bootstrap.servers = [10.56.72.161:9092, 10.56.72.51:9092, 10.56.72.225:9092]
    confluent.topic.replication.factor = 1
    rabbitmq.delivery.mode = PERSISTENT
    rabbitmq.exchange = amq.topic
    rabbitmq.host = test-serious-butterfly.rmq.cloudamqp.com
    rabbitmq.port = 5672
    rabbitmq.username = gtwvkvpq
    rabbitmq.virtual.host = gtwvkvpq
 (io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnectorConfig:347)
...
[2020-08-26 07:05:49,269] INFO Creating task rabbitmq-sink-connector-0 (org.apache.kafka.connect.runtime.Worker:421)
...
[2020-08-26 07:05:49,314] INFO Opening connection to test-serious-butterfly.rmq.cloudamqp.com:5672/gtwvkvpq (io.confluent.connect.rabbitmq.sink.RabbitMQSinkTask:49)
[2020-08-26 07:05:49,573] INFO Creating Channel (io.confluent.connect.rabbitmq.sink.RabbitMQSinkTask:62)
[2020-08-26 07:05:49,702] INFO WorkerSinkTask{id=rabbitmq-sink-connector-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:305)
...
[2020-08-26 07:06:49,310] INFO Starting publishing batches to RabbitMQ (io.confluent.connect.rabbitmq.sink.RabbitMQSinkTask:93)

Configure RabbitMQ

Now, all messages are just sent to an exchange with a routing key, so you cannot see any messages yet. RabbitMQ drops all messages going into an exchange if there are no queue bindings for it. We need to fix that, Log in to the RabbitMQ Management Interface, select the correct vhost and create a queue. Open the queue and scroll down until you find bindings
Using # as routing key will route all messages going into the exchange to your queue, read more about RabbitMQ routing here: https://www.cloudamqp.com/blog/part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html And we are done, all records going into your Kafka cluster on topic connect-guide will end up in the terminal on your server and in the RabbitMQ on the queue you just created.

Improvements

Running the connector directly from your terminal isn’t the best, even running the process as a background task isn’t good enough, it won't be restarted if it fails.
To fix that, lets create a systemd service and let systemd run the process

# /etc/systemd/system/kafkaconnect_rabbitmq.service

[Unit]
Description=Kafka Connect RabbitMQ

[Service]
Type=simple
PIDFile=/var/run/kafkaconnect.pid
User=ubuntu
Group=ubuntu
ExecStart=/opt/kafka_2.13-2.5.1/bin/connect-standalone.sh /opt/kafka_2.13-2.5.1/config/connect-standalone.properties /opt/kafka_2.13-2.5.1/config/connect-rabbitmq-sink.properties
Restart=on-failure
SyslogIdentifier=kafkaconnect

[Install]
WantedBy=multi-user.target

Now enable the service and start it

sudo systemctl enable kafkaconnect_rabbitmq
sudo systemctl start kafkaconnect_rabbitmq

And now the service will start automatically every time the server is rebooted.

To check the status of the service

sudo systemctl status kafkaconnect_rabbitmq