This guide will cover how to run Kafka Connect in standalone mode on
your server in AWS using the hosted Kafka Cluster at Cloudkarafka.
You need a server running Ubuntu in your AWS account that you can access with
ssh. To run Kafka Connect without memory issues the server needs to have at
least 2Gb of memory.
Create the Kafka cluster at cloudkarafka.com, make sure to select a subnet that doesn’t conflict with the subnet that your machines (in you account) is using.
See this Guide on how to set up VPC Peering connections Guide: VPC Peering
Kafka Connect is available both from the Apache Kafka package or from
Confluent Platform download, for this guide we will use the Apache Kafka
package, but it should be quite similar setup if you choose to use
Confluent Platform.
Below is a link to where you can download Apache Kafka version 2.5.1,
this is the latest version when writing this but there might be newer
versions available. Choose the version that you want
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.5.1/kafka_2.13-2.5.1.tgz
Download the tarball and extract into /opt
tar -xzvf kafka_2.13-2.5.1.tgz -C /opt
# /opt/kafka_2.13-2.5.1/config/connect-standalone.properties
bootstrap.servers=10.56.72.161:9092,10.56.72.51:9092,10.56.72.225:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.file.filename=/opt/connect.offsets
You have now configured Kafka Connect but we have yet to configured any workers to actually move any data. Kafka Connect has several workers to help you move data around, for this guide we will only configure two but it should be enough for you to get going and configure the others.
This just prints everything coming into a topic to standard out so you can see what’s happening.
# /opt/kafka_2.13-2.5.1/config/connect-console-sink.properties
name=guide-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=connect-test
Forward all records from a kafka topic to a queue in RabbitMQ
The RabbitMQ connector doesn’t come bundled with Kafka Connect so you need
to download the library first
cd /opt
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-rabbitmq-sink/versions/1.3.0-preview/confluentinc-kafka-connect-rabbitmq-sink-1.3.0-preview.zip
unzip confluentinc-kafka-connect-rabbitmq-sink-1.3.0-preview.zip
Then the jar files for the connector needs to be added to the plugin path so Kafka Connect can load them, this is done in /opt/kafka_2.13-2.5.1/config/connect-standalone.properties
# /opt/kafka_2.13-2.5.1/config/connect-standalone.properties
plugin.path=/opt/confluentinc-kafka-connect-rabbitmq-sink-1.3.0-preview/lib
Time to configure the connector
# /opt/kafka_2.13-2.5.1/config/connect-rabbitmq-sink.properties
name=rabbitmq-sink-connector
topics=connect-guide
tasks.max=1
connector.class=io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
confluent.topic.bootstrap.servers=10.56.72.161:9092,10.56.72.51:9092,10.56.72.225:9092
confluent.topic.replication.factor=1
rabbitmq.host=test-serious-butterfly.rmq.cloudamqp.com
rabbitmq.virtual.host=gtwvkvpq
rabbitmq.username=gtwvkvpq
rabbitmq.password=*************************************
rabbitmq.exchange=amq.topic
rabbitmq.routing.key=kafka.connect
rabbitmq.delivery.mode=PERSISTENT
This will make the connector publish all records from topic connect-guide to RabbitMQ on host test-serious-butterfly.rmq.cloudamqp.com to vhost gtwvkvpq, all messages will be published to exchange amq.topic using the routing key kafka.connect. In order to see the messages you need to create a queue on that vhost and bind that queue to the exchange.
One thing to note here is that the value.converter must be the ByteArrayConverter otherwise the connector will not be able to publish the messages to RabbitMQ.Once we have configured Kafka Connect, Console sink and the RabbitMQ sink it’s time to run it
/opt/kafka_2.13-2.5.1/bin/connect-standalone.sh \
/opt/kafka_2.13-2.5.1/config/connect-standalone.properties \
/opt/kafka_2.13-2.5.1/config/connect-console-sink.properties \
/opt/kafka_2.13-2.5.1/config/connect-rabbitmq-sink.properties
Unless you have change the log4j.properties file, the log level is INFO, which means that it will log very much into your terminal, but below you can see some key rows to keep a look for
[2020-08-26 07:05:32,795] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:69)
.....
[2020-08-26 07:05:33,940] INFO Added plugin 'io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:191)
....
[2020-08-26 07:05:47,511] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
[2020-08-26 07:05:48,339] INFO Creating connector rabbitmq-sink-connector of type io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector (org.apache.kafka.connect.runtime.Worker:253)
[2020-08-26 07:05:48,342] INFO Instantiated connector rabbitmq-sink-connector with version 1.3.0 of type class io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnector (org.apache.kafka.connect.runtime.Worker:256)
[2020-08-26 07:05:48,343] INFO RabbitMQSinkConnectorConfig values:
confluent.license =
confluent.topic = _confluent-command
confluent.topic.bootstrap.servers = [10.56.72.161:9092, 10.56.72.51:9092, 10.56.72.225:9092]
confluent.topic.replication.factor = 1
rabbitmq.delivery.mode = PERSISTENT
rabbitmq.exchange = amq.topic
rabbitmq.host = test-serious-butterfly.rmq.cloudamqp.com
rabbitmq.port = 5672
rabbitmq.username = gtwvkvpq
rabbitmq.virtual.host = gtwvkvpq
(io.confluent.connect.rabbitmq.sink.RabbitMQSinkConnectorConfig:347)
...
[2020-08-26 07:05:49,269] INFO Creating task rabbitmq-sink-connector-0 (org.apache.kafka.connect.runtime.Worker:421)
...
[2020-08-26 07:05:49,314] INFO Opening connection to test-serious-butterfly.rmq.cloudamqp.com:5672/gtwvkvpq (io.confluent.connect.rabbitmq.sink.RabbitMQSinkTask:49)
[2020-08-26 07:05:49,573] INFO Creating Channel (io.confluent.connect.rabbitmq.sink.RabbitMQSinkTask:62)
[2020-08-26 07:05:49,702] INFO WorkerSinkTask{id=rabbitmq-sink-connector-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:305)
...
[2020-08-26 07:06:49,310] INFO Starting publishing batches to RabbitMQ (io.confluent.connect.rabbitmq.sink.RabbitMQSinkTask:93)
Now, all messages are just sent to an exchange with a routing key,
so you cannot see any messages yet. RabbitMQ drops all messages going into
an exchange if there are no queue bindings for it. We need to fix that,
Log in to the RabbitMQ Management Interface, select the correct vhost
and create a queue. Open the queue and scroll down until you find bindings
Using # as routing key will route all messages going into the exchange
to your queue, read more about RabbitMQ routing here:
https://www.cloudamqp.com/blog/part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html
And we are done, all records going into your Kafka cluster on topic
connect-guide will end up in the terminal on your server and in the
RabbitMQ on the queue you just created.
Running the connector directly from your terminal isn’t the best,
even running the process as a background task isn’t good enough,
it won't be restarted if it fails.
To fix that, lets create a systemd service and let systemd run the process
# /etc/systemd/system/kafkaconnect_rabbitmq.service
[Unit]
Description=Kafka Connect RabbitMQ
[Service]
Type=simple
PIDFile=/var/run/kafkaconnect.pid
User=ubuntu
Group=ubuntu
ExecStart=/opt/kafka_2.13-2.5.1/bin/connect-standalone.sh /opt/kafka_2.13-2.5.1/config/connect-standalone.properties /opt/kafka_2.13-2.5.1/config/connect-rabbitmq-sink.properties
Restart=on-failure
SyslogIdentifier=kafkaconnect
[Install]
WantedBy=multi-user.target
Now enable the service and start it
sudo systemctl enable kafkaconnect_rabbitmq
sudo systemctl start kafkaconnect_rabbitmq
And now the service will start automatically every time the server is rebooted.
To check the status of the service
sudo systemctl status kafkaconnect_rabbitmq