The Distributed SQL Blog

Thoughts on distributed databases, open source, and cloud native

Streaming Changes From YugabyteDB to Downstream Databases

In this blog post, we will stream data to downstream databases leveraging YugabyteDB’s Change Data Capture (CDC) feature introduced in YugabyteDB 2.13. This will publish the changes to Kafka and then stream those changes to databases like MySQL, PostgreSQL, and Elasticsearch.

YugabyteDB CDC Downstream Databases Topology

The diagram below represents how data will flow from YugabyteDB to Kafka and then further to the sink databases. 

YugabyteDB Downstream Databases Topology

Specifically, the Debezium Connector for YugabyteDB continuously captures the changes from YugabyteDB and publishes them to a separate Kafka topic for each table. Finally, the JDBC Sink connector reads the data from the topics and puts them into the sink depending on how it is configured.

YugabyteDB CDC Downstream Databases Example

In this example, we will deploy all the connectors to a single Kafka Connect instance that will write and read from Kafka in order to further push data to downstream sinks.

  • For starters, clone the repository from GitHub:

  • Navigate to the cloned repository and start the Docker containers using the docker-compose-init.yaml file:

Now that all the components have started, we will begin to deploy the required connectors.

  • Next, login to ysqlsh:

Create a table:

  • Create a stream ID:

Copy the stream ID the above command is producing.

  • Now you can deploy the plugin. But make sure to change the stream ID in the following command:

Deploying the PostgreSQL sink connector

  • Next, deploy the sink connector for PostgreSQL:

The above deployed JSON file should look like this:

  • The above JSON file is trying to set up a connector with the given configuration properties:
    • `name`: The logical name given to the sink connector.
    • `connector.class`: The connector to use. We are using a JDBC sink connector in this example.
    • `topics`: The topics to read the data from.
    • `dialect.name`: The dialect to follow. This varies depending on the sink database being used.
    • `table.name.format`: The name of the table in the sink database.
    • `transforms.unwrap.type`: The transformer we need to use in order to make the Kafka message interpretable by the JDBC sink connector.
    • `auto.create`: The ability to create a table in the sink database if it does not exist already.
    • `insert.mode`: One of the various insert modes that the JDBC sink connector supports.
    • `pk.fields`: Comma-separated list of columns that together make up the primary key.

You can read more about the JDBC Sink connector and its configuration options here.

Deploying the MySQL sink connector

  • Next, deploy the sink connector for MySQL:
    • Now this is a tricky step since you will need to create the target tables before deploying the sink connector.
    • First, connect to mysql (Password: debezium):

    • Execute the following commands:

    • Deploy the connector configuration:

Deploying the Elasticsearch sink connector

  • Now deploy the sink connector for Elasticsearch:

Replicating data across all sinks with YugabyteDB

  • You can now insert data into YugabyteDB and it will start replicating across all the sinks you have just configured. In your ysqlsh, execute the following commands:

  • Now go to your sinks one by one to confirm if the data is replicated:

PostgreSQL

You will need to connect to the PostgreSQL database first:

MySQL

Elasticsearch

Conclusion

In this post, we set up a complex pipeline to keep YugabyteDB in sync with other databases as well as an Elasticsearch instance. The identifiers and the names are kept the same across all the instances in order to maintain consistency and proper understanding. 

Head over to Change Data Capture (CDC) to learn more about YugabyteDB CDC downstream databases. You can also join the YugabyteDB Community Slack channel for a deeper discussion with over 5,500 developers, engineers, and architects.

Related Posts