Skip to main content

Your submission was sent successfully! Close

Thank you for signing up for our newsletter!
In these regular emails you will find the latest updates from Canonical and upcoming events where you can meet our team.Close

Thank you for contacting us. A member of our team will be in touch shortly. Close

This is part of the Charmed Apache Kafka Tutorial. Please refer to this page for more information and an overview of the content.

Using Kafka Connect for ETL

In this part of the tutorial, we are going to use Kafka Connect - an ETL framework on top of Apache Kafka - to seamlessly move data between different charmed database technologies.

We will follow a step-by-step process for moving data between Canonical Data Platform charms using Kafka Connect. Specifically, we will showcase a particular use-case of loading data from a relational database, i.e. PostgreSQL, to a document store and search engine, i.e. OpenSearch, entirely using charmed solutions.

By the end, you should be able to use Kafka Connect integrator and Kafka Connect charms to streamline data ETL tasks on Canonical Data Platform charmed solutions.

Prerequisites

We will be deploying different charmed data solutions including PostgreSQL and OpenSearch. If you require more information or face issues deploying any of the mentioned products, you should consult the respective documentations:

Check current deployment

Up to this point, we should have a 3-unit Apache Kafka application, related to a 5-unit ZooKeeper application. That means the juju status command should show an output similar to the following:

Model     Controller  Cloud/Region         Version  SLA          Timestamp
tutorial  overlord    localhost/localhost  3.6.4    unsupported  17:06:15+08:00

App                       Version  Status  Scale  Charm                     Channel        Rev  Exposed  Message
data-integrator                    active      1  data-integrator           latest/stable   78  no       
kafka                     3.6.1    active      3  kafka                     3/stable       195  no       
zookeeper                 3.8.4    active      5  zookeeper                 3/stable       149  no       

Unit                         Workload  Agent  Machine  Public address  Ports     Message
data-integrator/0*           active    idle   8        10.38.169.159             
kafka/0                      active    idle   5        10.38.169.139             
kafka/1*                     active    idle   6        10.38.169.92              
kafka/2                      active    idle   7        10.38.169.70
self-signed-certificates/0*  active    idle   9        10.38.169.82                            
zookeeper/0*                 active    idle   0        10.38.169.164             
zookeeper/1                  active    idle   1        10.38.169.81              
zookeeper/2                  active    idle   2        10.38.169.72              
zookeeper/3                  active    idle   3        10.38.169.119             
zookeeper/4                  active    idle   4        10.38.169.215             

Set the necessary kernel properties for OpenSearch

Since we will be deploying the OpenSearch charm, we need to make necessary kernel configurations required for OpenSearch charm to function properly, described in detail here. This basically means running the following commands:

sudo tee -a /etc/sysctl.conf > /dev/null <<EOT
vm.max_map_count=262144
vm.swappiness=0
net.ipv4.tcp_retries2=5
fs.file-max=1048576
EOT

sudo sysctl -p

Next, we should set the required model parameters using the juju model-config command:

cat <<EOF > cloudinit-userdata.yaml
cloudinit-userdata: |
  postruncmd:
    - [ 'echo', 'vm.max_map_count=262144', '>>', '/etc/sysctl.conf' ]
    - [ 'echo', 'vm.swappiness=0', '>>', '/etc/sysctl.conf' ]
    - [ 'echo', 'net.ipv4.tcp_retries2=5', '>>', '/etc/sysctl.conf' ]
    - [ 'echo', 'fs.file-max=1048576', '>>', '/etc/sysctl.conf' ]
    - [ 'sysctl', '-p' ]
EOF

juju model-config --file=./cloudinit-userdata.yaml

Deploy the databases and Kafka Connect charms

Deploy the PostgreSQL, OpenSearch, and Kafka Connect charms:

juju deploy kafka-connect --channel edge
juju deploy postgresql --channel 14/stable
juju deploy opensearch --channel 2/stable --config profile=testing

OpenSearch charm requires a TLS relation to become active. We will use the self-signed-certificates charm that was deployed earlier in the Enable Encryption part of this Tutorial.

Enable TLS

Using the juju status command, you should see that the Kafka Connect and OpenSearch applications are in blocked state. In order to activate them, we need to make necessary integrations using the juju integrate command.

First, activate the OpenSearch application by integrating it with the TLS operator:

juju integrate opensearch self-signed-certificates

Then, activate the Kafka Connect application by integrating it with the Apache Kafka application:

juju integrate kafka kafka-connect

Finally, since we will be using TLS on the Kafka Connect interface, integrate the Kafka Connect application with the TLS operator:

juju integrate kafka-connect self-signed-certificates

Use the juju status --watch 2s command to continuously probe your model’s status. After a couple of minutes, all the applications should be in active|idle state, and you should see an output like the following, with 7 applications and 13 units:

Model     Controller  Cloud/Region         Version  SLA          Timestamp
tutorial  overlord    localhost/localhost  3.6.4    unsupported  17:09:25+08:00

App                       Version  Status  Scale  Charm                     Channel        Rev  Exposed  Message
data-integrator                    active      1  data-integrator           latest/stable   78  no       
kafka                     3.6.1    active      3  kafka                     3/stable       195  no       
kafka-connect                      active      1  kafka-connect             latest/edge     13  no       
opensearch                         active      1  opensearch                2/edge         218  no       
postgresql                14.15    active      1  postgresql                14/stable      553  no       
self-signed-certificates           active      1  self-signed-certificates  1/stable       263  no       
zookeeper                 3.8.4    active      5  zookeeper                 3/stable       149  no       

Unit                         Workload  Agent  Machine  Public address  Ports     Message
data-integrator/0*           active    idle   8        10.38.169.159             
kafka-connect/0*             active    idle   10        10.38.169.23    8083/tcp  
kafka/0                      active    idle   5        10.38.169.139             
kafka/1*                     active    idle   6        10.38.169.92              
kafka/2                      active    idle   7        10.38.169.70              
opensearch/0*                active    idle   11       10.38.169.172   9200/tcp  
postgresql/0*                active    idle   12       10.38.169.121   5432/tcp  Primary
self-signed-certificates/0*  active    idle   9       10.38.169.82              
zookeeper/0*                 active    idle   0        10.38.169.164             
zookeeper/1                  active    idle   1        10.38.169.81              
zookeeper/2                  active    idle   2        10.38.169.72              
zookeeper/3                  active    idle   3        10.38.169.119             
zookeeper/4                  active    idle   4        10.38.169.215             

Load test data

In a real-world scenario, an application would typically write data to a PostgreSQL database. However, for the purposes of this tutorial, we’ll generate test data using a simple SQL script and load it into a PostgreSQL database using the psql command-line tool included with the PostgreSQL charm.

For more information on how to access a PostgreSQL database in the PostgreSQL charm, refer to Access PostgreSQL page of the Charmed PostgreSQL tutorial.

First, create a SQL script by running the following command:

cat <<EOF > /tmp/populate.sql
CREATE TABLE posts (
  id serial not null primary key,
  content text not null,
  likes int default null,
  created_at timestamp with time zone not null default now()
);

INSERT INTO posts (content, likes) 
VALUES 
  (
    'Charmed Apache Kafka is an open-source operator that makes it easier to manage Apache Kafka, with built-in support for enterprise features.', 
    150
  ), 
  (
    'Apache Kafka is a free, open-source software project by the Apache Software Foundation. Users can find out more at the Apache Kafka project page.', 
    200
  ), 
  (
    'Charmed Apache Kafka is built on top of Juju and reliably simplifies the deployment, scaling, design, and management of Apache Kafka in production', 
    100
  ), 
  (
    'Charmed Apache Kafka is a solution designed and developed to help ops teams and administrators automate Apache Kafka operations from Day 0 to Day 2, across multiple cloud environments and platforms.', 
    1000
  ), 
  (
    'Charmed Apache Kafka is developed and supported by Canonical, as part of its commitment to provide open-source, self-driving solutions, seamlessly integrated using the Operator Framework Juju. Please refer to Charmhub, for more charmed operators that can be integrated by Juju.', 
    60
  );
EOF

Next, copy the populate.sql script to the PostgreSQL unit using the juju scp command:

juju scp /tmp/populate.sql postgresql/0:/home/ubuntu/populate.sql

Then, follow the Access PostgreSQL tutorial to retrieve the password for the operator user on the PostgreSQL database using the get-password action:

juju run postgresql/leader get-password

As a result, you should see output similar to the following:

...
password: bQOUgw8ZZgUyPA6n

Make note of the password, and use juju ssh to connect to the PostgreSQL unit:

juju ssh postgresql/leader

Once connected to the unit, use the psql command line tool with the operator user credentials, to create the database named tutorial:

psql --host $(hostname -i) --username operator --password --dbname postgres \
    -c "CREATE DATABASE tutorial"

You will be prompted to type the password, which you have obtained previously.

Now, we can use the populate.sql script copied earlier into the PostgreSQL unit, to create a table named posts with some test data:

cat populate.sql | \
    psql --host $(hostname -i) --username operator --password --dbname tutorial

To ensure that the test data is loaded successfully into the posts table, use the following command:

psql --host $(hostname -i) --username operator --password --dbname tutorial \
    -c 'SELECT COUNT(*) FROM posts'

The output should indicate that the posts table has five rows now:

 count 
-------
     5
(1 row)

Log out from the PostgreSQL unit using exit command or the Ctrl+D keyboard shortcut.

Deploy and integrate the postgresql-connect-integrator charm

Now that you have sample data loaded into PostgreSQL, it is time to deploy the postgresql-connect-integrator charm to enable integration of PostgreSQL and Kafka Connect applications. First, deploy the charm in source mode using the juju deploy command and provide the minimum necessary configurations:

juju deploy postgresql-connect-integrator \
    --channel edge \
    --config mode=source \
    --config db_name=tutorial \
    --config topic_prefix=etl_

Each Kafka Connect integrator application needs at least two relations:

  • with the Kafka Connect
  • with a Database charm (e.g. MySQL, PostgreSQL, OpenSearch, etc.)

Integrate both Kafka Connect and PostgreSQL with the postgresql-connect-integrator charm:

juju integrate postgresql-connect-integrator postgresql
juju integrate postgresql-connect-integrator kafka-connect

After a couple of minutes, juju status command should show the postgresql-connect-integrator in active|idle state, with a message indicating that the ETL task is running:

...
postgresql-connect-integrator/0*  active    idle   13       10.38.169.83    8080/tcp  Task Status: RUNNING
...

This means that the integrator application is actively copying data from the source database (named tutorial) into Apache Kafka topics prefixed with etl_. For example, rows in the posts table will be published into the Apache Kafka topic named etl_posts.

Deploy and integrate the opensearch-connect-integrator charm

You are almost done with the ETL task, the only remaining part is to move data from Apache Kafka to OpenSearch. To do that, deploy another Kafka Connect integrator named opensearch-connect-integrator in the sink mode:

juju deploy opensearch-connect-integrator \
    --channel edge \
    --config mode=sink \
    --config topics="etl_posts"

The above command deploys an integrator application to move messages from the etl_posts topic to the index in OpenSearch named etl_posts. And the etl_posts topic is filled by the postgresql-connect-integrator charm we deployed earlier.

To activate the opensearch-connect-integrator, make the necessary integrations:

juju integrate opensearch-connect-integrator opensearch
juju integrate opensearch-connect-integrator kafka-connect

Wait a couple of minutes and run juju status, now both opensearch-connect-integrator and postgresql-connect-integrator applications should be in active|idle state, showing a message indicating that the ETL task is running:

...
opensearch-connect-integrator/0*  active    idle   14       10.38.169.108   8080/tcp  Task Status: RUNNING
postgresql-connect-integrator/0*  active    idle   13       10.38.169.83    8080/tcp  Task Status: RUNNING
...

Verify data transfer

Now it’s time to verify that the data is being copied from the PostgreSQL database to the OpenSearch index. We can use the OpenSearch REST API for that purpose.

First, retrieve the admin user credentials for OpenSearch using get-password action:

juju run opensearch/leader get-password

As a result, you should see output similar to the following:

...
password: GoCNE5KdFywT4nF1GSrwpAGyqRLecSXC
username: admin

Then, retrieve the OpenSearch unit IP and save it into an environment variable:

OPENSEARCH_IP=$(juju ssh opensearch/0 'hostname -i')

Now, using the password obtained above, send a request to the topic’s _search endpoint, either using your browser or curl:

curl -u admin:<admin-password> -k -X GET https://$OPENSEARCH_IP:9200/etl_posts/_search

As a result you get a JSON response containing the search results, which should have five documents. The hits.total value should be 5, as shown in the output example below:

{
  "took": 15,
  "timed_out": false, 
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 5,
      "relation": "eq"
    },
    "max_score": 1.0,
    "hits": [
      ...
    ]
  }
}

Now let’s insert a new post into the PostgreSQL database. First SSH in to the PostgreSQL leader unit:

juju ssh postgresql/leader

Then, insert a new post using following command and the password for the operator user on the PostgreSQL:

psql --host $(hostname -i) --username operator --password --dbname tutorial -c \ 
    "INSERT INTO posts (content, likes) VALUES ('my new post', 1)"

Log out from the PostgreSQL unit using exit command or the Ctrl+D keyboard shortcut.

Then, check that the data is automatically copied to the OpenSearch index:

curl -u admin:<admin-password> -k -X GET https://$OPENSEARCH_IP:9200/etl_posts/_search

Which now should have six hits (output is truncated):

{
...
  "hits": {
    "total": {
      "value": 6,
      "relation": "eq"
    },
...
}

Congratulations! You have successfully completed an ETL job that continuously moves data from PostgreSQL to OpenSearch, using entirely charmed solutions.

Last updated 7 hours ago. Help improve this document in the forum.