Important Notice: CloudKarafka is shutting down. Read all about it in our End of Life Announcement

Guide: Kafka Connect

This guide will cover how to run Kafka Connect in standalone mode on your server in AWS using the hosted Kafka Cluster at Cloudkarafka.
You need a server running Ubuntu in your AWS account that you can access with ssh. To run Kafka Connect without memory issues the server needs to have at least 2Gb of memory.

Create a Kafka cluster

Create the Kafka cluster at, make sure to select a subnet that doesn’t conflict with the subnet that your machines (in you account) is using.

Setup VPC peering

See this Guide on how to set up VPC Peering connections Guide: VPC Peering


Kafka Connect is available both from the Apache Kafka package or from Confluent Platform download, for this guide we will use the Apache Kafka package, but it should be quite similar setup if you choose to use Confluent Platform.
Below is a link to where you can download Apache Kafka version 2.5.1, this is the latest version when writing this but there might be newer versions available. Choose the version that you want

Download the tarball and extract into /opt

tar -xzvf kafka_2.13-2.5.1.tgz -C /opt


# /opt/kafka_2.13-2.5.1/config/

Note: You must replace the IPs on the first line to the IPs of the brokers in your cluster, they can be found on the details page.

You have now configured Kafka Connect but we have yet to configured any workers to actually move any data. Kafka Connect has several workers to help you move data around, for this guide we will only configure two but it should be enough for you to get going and configure the others.

Console Sink

This just prints everything coming into a topic to standard out so you can see what’s happening.

# /opt/kafka_2.13-2.5.1/config/


RabbitMQ Sink

Forward all records from a kafka topic to a queue in RabbitMQ
The RabbitMQ connector doesn’t come bundled with Kafka Connect so you need to download the library first

cd /opt

Then the jar files for the connector needs to be added to the plugin path so Kafka Connect can load them, this is done in /opt/kafka_2.13-2.5.1/config/

# /opt/kafka_2.13-2.5.1/config/


Time to configure the connector

# /opt/kafka_2.13-2.5.1/config/


This will make the connector publish all records from topic connect-guide to RabbitMQ on host to vhost gtwvkvpq, all messages will be published to exchange amq.topic using the routing key kafka.connect. In order to see the messages you need to create a queue on that vhost and bind that queue to the exchange.

One thing to note here is that the value.converter must be the ByteArrayConverter otherwise the connector will not be able to publish the messages to RabbitMQ.


Once we have configured Kafka Connect, Console sink and the RabbitMQ sink it’s time to run it

/opt/kafka_2.13-2.5.1/bin/ \
/opt/kafka_2.13-2.5.1/config/ \
/opt/kafka_2.13-2.5.1/config/ \

Unless you have change the file, the log level is INFO, which means that it will log very much into your terminal, but below you can see some key rows to keep a look for

[2020-08-26 07:05:32,795] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:69)
[2020-08-26 07:05:33,940] INFO Added plugin 'io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:191)
[2020-08-26 07:05:47,511] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
[2020-08-26 07:05:48,339] INFO Creating connector rabbitmq-sink-connector of type io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector (org.apache.kafka.connect.runtime.Worker:253)
[2020-08-26 07:05:48,342] INFO Instantiated connector rabbitmq-sink-connector with version 1.3.0 of type class io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector (org.apache.kafka.connect.runtime.Worker:256)
[2020-08-26 07:05:48,343] INFO RabbitMQSinkConnectorConfig values:
    confluent.license =
    confluent.topic = _confluent-command
    confluent.topic.bootstrap.servers = [,,]
    confluent.topic.replication.factor = 1 = PERSISTENT = amq.topic =
    rabbitmq.port = 5672
    rabbitmq.username = gtwvkvpq = gtwvkvpq
[2020-08-26 07:05:49,269] INFO Creating task rabbitmq-sink-connector-0 (org.apache.kafka.connect.runtime.Worker:421)
[2020-08-26 07:05:49,314] INFO Opening connection to (io.confluent.connect.rabbitmq.sink.RabbitMQSinkTask:49)
[2020-08-26 07:05:49,573] INFO Creating Channel (io.confluent.connect.rabbitmq.sink.RabbitMQSinkTask:62)
[2020-08-26 07:05:49,702] INFO WorkerSinkTask{id=rabbitmq-sink-connector-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:305)
[2020-08-26 07:06:49,310] INFO Starting publishing batches to RabbitMQ (io.confluent.connect.rabbitmq.sink.RabbitMQSinkTask:93)

Configure RabbitMQ

Now, all messages are just sent to an exchange with a routing key, so you cannot see any messages yet. RabbitMQ drops all messages going into an exchange if there are no queue bindings for it. We need to fix that, Log in to the RabbitMQ Management Interface, select the correct vhost and create a queue. Open the queue and scroll down until you find bindings
Using # as routing key will route all messages going into the exchange to your queue, read more about RabbitMQ routing here: And we are done, all records going into your Kafka cluster on topic connect-guide will end up in the terminal on your server and in the RabbitMQ on the queue you just created.


Running the connector directly from your terminal isn’t the best, even running the process as a background task isn’t good enough, it won't be restarted if it fails.
To fix that, lets create a systemd service and let systemd run the process

# /etc/systemd/system/kafkaconnect_rabbitmq.service

Description=Kafka Connect RabbitMQ

ExecStart=/opt/kafka_2.13-2.5.1/bin/ /opt/kafka_2.13-2.5.1/config/ /opt/kafka_2.13-2.5.1/config/


Now enable the service and start it

sudo systemctl enable kafkaconnect_rabbitmq
sudo systemctl start kafkaconnect_rabbitmq

And now the service will start automatically every time the server is rebooted.

To check the status of the service

sudo systemctl status kafkaconnect_rabbitmq