Announcing the Kafka Connect YugabyteDB Sink Connector

Andrew Nelson

For customers that run Kafka for their streaming data platform, the Kafka Connect Sink plugin handles delivery of specific topic data to a YugabyteDB instance. As soon as new messages are published, the Sink manages forwarding and automatic addition to a destination table.

YugabyteDB is a high-performance, distributed SQL database built on a scalable and fault-tolerant design inspired by Google Spanner. Yugabyte’s SQL API (YSQL) supports most of PostgreSQL’s functionality and is wire-protocol compatible with PostgreSQL drivers.

Apache Kafka is a community distributed streaming platform capable of handling trillions of events a day. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open sourced by LinkedIn in 2011, Kafka has quickly evolved from messaging queue to a full-fledged event streaming platform.

Kafka Connect allows movement of data in and out of Kafka easily. In this case, the Kafka Sink Connector specifically takes topic data as a source and delivers it to a YugabyteDB as its destination. Benefits of this service are:

  • Simple data abstraction for forwarding data from Kafka to Yugabyte.
  • Flexible and scalable for many-to-many interactions between multiple topics and multiple YugabyteDB instances.
  • Re-use and customize the YugabyteDB Sink Connector for individual customer use cases for their data pipelines.

Getting Started

There are several easy ways to get started using the integration components of Kafka and YugabyteDB. It is also very easy to test the Sink Connector in a distributed environment. To start your local YugabyteDB cluster, please refer to the desired quick start guide for your chosen environment.

Prepare your environment to compile the YugabyteDB Kafka Sink Connector. For example, with a basic Debian/GNU Linux 9 GCP image:

sudo apt-get update
sudo apt-get install openjdk-8-jdk
sudo apt-get install jq
sudo apt-get install maven
sudo apt-get install git

Download a copy of our connector via Github and setup the connector and environment libs:

git clone https://github.com/yugabyte/yb-kafka-connector.git
mvn clean install -DskipTests
cp ~/yb-kafka-connector/target/yb-kafka-connnector-1.0.0.jar ~/kafka-version/libs/
cd ~/kafka-version/libs/
wget https://central.maven.org/maven2/io/netty/netty-all/4.1.25.Final/netty-all-4.1.25.Final.jar
wget https://central.maven.org/maven2/com/yugabyte/cassandra-driver-core/3.2.0-yb-18/cassandra-driver-core-3.2.0-yb-18.jar
wget https://central.maven.org/maven2/com/codahale/metrics/metrics-core/3.0.1/metrics-core-3.0.1.jar

For a non-production environment, start your Kafka instance from the CLI and fork the process with “&”:

./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties

Create a test topic in Kafka:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Ensure your Kafka Sink Connector is configured to point to your YugabyteDB instance:

vi ~/yb-kafka-connector/resources/examples/yugabyte.sink.properties
# Sample yugabyte sink properties.
name=yugabyte-sink
connector.class=com.yb.connect.sink.YBSinkConnector
topics=test_topic
yugabyte.cql.keyspace=demo
yugabyte.cql.tablename=test_table
yugabyte.cql.contact.points=YB_IP_ENDPOINT:9042

Prep your YugabyteDB for entries made by the Kafka Sink Connector. Ensure your $CQLSH_HOST is set to your target IP of your YugabyteDB instance:

yugabyte-install-dir/bin/cqlsh
 Connected to local cluster at 35.193.82.27:9042.
 [cqlsh 5.0.1 | Cassandra 3.9-SNAPSHOT | CQL spec 3.4.2 | Native protocol v4]
 Use HELP for help. 
cqlsh> CREATE KEYSPACE IF NOT EXISTS demo;
cqlsh> CREATE TABLE demo.test_table (key text, value bigint, ts timestamp, PRIMARY KEY (key));

Load the YugabyteDB Kafka Sink Connector:

./bin/connect-standalone.sh ~/yb-kafka-connector/resources/examples/kafka.connect.properties ~/yb-kafka-connector/resources/examples/yugabyte.sink.properties

Create some events in the sample topic to be processed:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
{"key" : "A", "value" : 1, "ts" : 1541559411000}
{"key" : "B", "value" : 2, "ts" : 1541559412000}
{"key" : "C", "value" : 3, "ts" : 1541559413000}

Verify that the events were consumed by the YugabyteDB:

cqlsh> SELECT * FROM demo.test_table;
key | value | ts
----+-------+---------------------------------
 A |     1 | 2018-11-07 02:56:51.000000+0000
 B |     2 | 2018-11-07 02:56:52.000000+0000
 C |     3 | 2018-11-07 02:56:53.000000+0000

For more information, please see YugabyteDB’s Kafka documentation or on our Github. For any questions, please join the integration channel on our Slack instance. For support for our Kafka sink connector, please use our Github issues.

What’s Next?

  • Compare YugabyteDB in depth to databases like CockroachDB, Google Cloud Spanner and MongoDB.
  • Get started with YugabyteDB on macOS, Linux, Docker, and Kubernetes.
  • Contact us to learn more about licensing, pricing or to schedule a technical overview.


Andrew Nelson

Related Posts

Explore Distributed SQL and YugabyteDB in Depth

Discover the future of data management.
Learn at Yugabyte University
Get Started
Browse Yugabyte Docs
Explore docs
PostgreSQL For Cloud Native World
Read for Free