Important Notice: CloudKarafka is shutting down. Read all about it in our End of Life Announcement

Ruby

There are several Kafka clients for ruby but we recommend using rdkafka-ruby

rdkafka-ruby

It's a wrapper around the excellent librdkafka and supports all the features in the latest Kafka release. The main feature that we need from a library is support for authentication with SASL/SCRAM since this is what CloudKarafka uses.

For a complete code example that lets you get going quickly go here: https://github.com/CloudKarafka/ruby-kafka-example

Karafka

https://github.com/karafka/karafka

Karafka is a framework for Apache Kafka based Ruby and Rails applications development. Karafka allows you to capture everything that happens in your systems in large scale, providing you with a seamless and stable core for consuming and processing this data, without having to focus on things that are not your business domain. Karafka not only handles incoming messages but also provides tools for building complex data-flow applications that receive and send messages.

Note: The library Karafka and the Service CloudKarafka share names but there is no relation between them.

Example App

Karafka has an example app here: https://github.com/karafka/example-app
This example only require some small changes in order to work with CloudKarafka. The changes that needs to be made depends on if you are running a dedicated or shared plan on CloudKarafka.

Dedicated cluster configuration

If you are running a dedicated cluster, you will only need to edit the kafka config in the setup phase of Karafka like this

config.kafka.seed_brokers = ENV['CLOUDKARAFKA_BROKERS']&.split(",")&.map { |b| "kafka://#{b}" }
config.kafka.sasl_scram_username = ENV['CLOUDKARAFKA_USERNAME']
config.kafka.sasl_scram_password = ENV['CLOUDKARAFKA_PASSWORD']
config.kafka.sasl_scram_mechanism = "sha256"
config.kafka.ssl_ca_certs_from_system = true

Shared cluster configuration

If you are running a shared cluster, make the changes as shown below

Edit karafka.rb to look like the following:

# frozen_string_literal: true

# Non Ruby on Rails setup
ENV['RACK_ENV'] ||= 'development'
ENV['KARAFKA_ENV'] ||= ENV['RACK_ENV']
Bundler.require(:default, ENV['KARAFKA_ENV'])
Karafka::Loader.load(Karafka::App.root)
require 'active_support/core_ext/hash'

class CloudKarafkaTopicMapper
  def initialize(prefix)
    @prefix = "#{prefix}-"
  end

  def incoming(topic)
    topic.to_s.gsub(@prefix, '')
  end

  def outgoing(topic)
    "#{@prefix}#{topic}"
  end
end

class App < Karafka::App
  setup do |config|

    config.topic_mapper = CloudKarafkaTopicMapper.new(ENV['CLOUDKARAFKA_USERNAME'])
    config.consumer_mapper = proc { |name| "#{ENV['CLOUDKARAFKA_USERNAME']}-#{name}" }
    config.kafka.seed_brokers = ENV['CLOUDKARAFKA_BROKERS']&.split(",")&.map { |b| "kafka://#{b}" }
    config.kafka.sasl_scram_username = ENV['CLOUDKARAFKA_USERNAME']
    config.kafka.sasl_scram_password = ENV['CLOUDKARAFKA_PASSWORD']
    config.kafka.sasl_scram_mechanism = "sha256"
    config.kafka.ssl_ca_certs_from_system = true

    config.client_id = "example_app"
    # Enable those 2 lines if you use Rails and want to use hash with indifferent access for
    # Karafka params
    # require 'active_support/hash_with_indifferent_access'
    # config.params_base_class = HashWithIndifferentAccess
  end

  after_init do
    WaterDrop.setup { |config| config.deliver = !Karafka.env.test? }
  end
end

Karafka.monitor.subscribe(Karafka::Instrumentation::Listener)

# Consumer group defined with the 0.6+ routing style (recommended)
App.consumer_groups.draw do
  consumer_group :batched_group do
    batch_fetching true

    topic :xml_data do
      consumer XmlMessagesConsumer
      batch_consuming false
      parser XmlParser
    end

    topic :inline_batch_data do
      consumer InlineBatchConsumer
      batch_consuming true
    end

    topic :callbacked_data do
      consumer CallbackedConsumer
      batch_consuming true
    end
  end

  # A ping-pong implementation using karafka-sidekiq backend
  # @note The backend is totally optional, if you disable it, the game will
  # work as well
  consumer_group :async_pong do
    topic :ping do
      consumer Pong::PingConsumer
      backend :sidekiq
    end

    topic :pong do
      consumer Pong::PongConsumer
      backend :sidekiq
    end
  end
end

App.boot!

What we have done is added a new TopicMapper called CloudKarafkaTopicMapper and added the kafka configuration to connect to CloudKarafka using SASL/SCRAM.

You will also need to edit the file lib/tasks/sender.rake and add the topic prefix on all topics. The topic prefix is your username with a dash after. This is because all topics on shared clusters are prefixed with your username.

Using docker?

If you are using Docker the setting config.kafka.ssl_ca_certs_from_system will not work.

You will need to explicitly set the CA certificate to use with the setting config.kafka.ssl_ca_cert_file_path. Download the CA certificate here: https://www.cloudkarafka.com/certs/cloudkarafka.ca