Consume from Kafka topic of type “scratch”¶
This page describes the steps to consume messages from a Kafka topic of type “scratch” on the DSH. You will create a very simple Kafka consumer in Python that consumes messages from a topic on the DSH’s Kafka cluster.
Prerequisites¶
Before you can follow this tutorial, you need the following:
- On the DSH:
- Access to a tenant on a DSH platform
- A Kafka topic of type “scratch” in that tenant. See Adding a topic for more information. If you already created a Kafka topic for the Publish to Kafka topic of type “scratch” tutorial, then you don’t have to create a new one, and you can simply reuse the existing topic.
- A Kafka producer. See Publish to Kafka topic of type “scratch” to create one in Python.
- Access to the Harbor container image registry, and the username and CLI secret for your user. See Accessing Harbor for more information.
- The Grafana service. See Requesting Prometheus and Grafana for more information.
- On your machine:
- A Unix-based system, for example Linux, MacOS or Windows Subsystem for Linux
- Docker CLI
Create your files¶
The steps below describe how you can create the files for your container image.
Working directory¶
Open the Terminal, create a directory for this tutorial, and enter it:
Bash scripts¶
In the working directory, create two bash files:
set_up_config.sh: Set up the configuration to connect to the DSH’s Kafka cluster.entrypoint.sh: Execute theset_up_config.shscript, and then execute the Python script in the current shell to ensure that termination signals are handled properly.
Bash script for configuration¶
This script configures the SSL connection with the Kafka cluster. It stores the keys and certificates in the /home/dsh/pki/ directory of your container, and stores information about the Kafka configuration as environment variables.
In the working directory, create a file set_up_config.sh, with the contents of the Script to configure SSL for Kafka.
Bash script for entrypoint¶
This script is the default executable for your service’s container. It executes the set_up_config.sh script, and then executes in the current shell any subsequent commands.
In the working directory, create a file called entrypoint.sh, with the contents below:
Python script¶
This script does the actual work of producing messages to the Kafka topic:
- It retrieves the configuration for the SSL connection and the Kafka cluster.
- It creates a
Consumerobject that consumes messages from the Kafka topic. - It also creates a log entry to register the success or failure to consume messages.
Some aspects of this script are worth noting:
- It’s recommended to make your container responsive to termination signals (line 8–11 and line 51).
- The script logs results to standard error output for the sake of demonstration (line 71). However, this isn’t a recommended way of working if you deploy custom services to a production environment because this is too chatty.
- In the configuration, the
auto.offset.resetproperty is set tolatest(line 45):- This defines what to do if there is no initial offset in Kafka, or if the current offset doesn’t exist anymore, for example because the message was deleted. In the case of
latest, the offset of the consumer is reset to the latest offset in Kafka. - In this script, the consumer will reset to the latest offset in Kafka, which may cause it to miss any previous messages that were published between the deleted message’s offset and the latest offset.
- See
auto.offset.resetfor more information.
- This defines what to do if there is no initial offset in Kafka, or if the current offset doesn’t exist anymore, for example because the message was deleted. In the case of
- The script doesn’t define the value for the
enable.auto.commitproperty, so it defaults totrue:- The property defines whether the consumer’s offset will be periodically committed to Kafka in the background.
- However, in a production environment, you want to control this action:
- You only want to commit a message’s offset to Kafka if you’re absolutely certain that your service processed the message correctly.
- This will make sure that your service picks up at the right offset in case of a crash, for example during the processing of a message. That way, you don’t lose any data.
- See
enable.auto.commitandKafkaConsumer.commit()for more information.
Dockerfile¶
This file contains the instructions and commands to assemble an image using Docker.
In the working directory, create a file Dockerfile with the contents below:
Some aspects of this script are worth noting:
- It’s important that you don’t run commands in your image as the root user:
- Running commands as root raises security issues.
- As a solution, you can add a new user and set it as the default user to run commands (line 14 and 26).
- It’s recommended that you assign your tenant’s user ID to the new user:
- In the menu bar of the DSH Console, navigate to “Resources” > “Overview” to see the user ID of your tenant.
- You can add your tenant’s user ID as an environment variable (line 5).
- Specify the user ID via the environment variable when you create the user (line 14), and when you switch to the user (line 26).
- The container executes the
entrypoint.shscript (line 27) by default:- As described in Bash script for configuration, this script executes the configuration script, and then uses the parameters passed to the script as commands.
- The
CMDinstruction passes thepythoncommand toentrypoint.sh, with the location of your Python script as a parameter (line 28). - This is a standard way to set environment variables, and to make sure that all processes in the container are responsive to termination signals.
Build the image¶
Log in to the DSH container image registry¶
In the next step, log in to the Harbor container image registry of the DSH. Execute the command below, and enter the CLI secret for your user when prompted:
- Replace
<your-Harbor-username>with your actual Harbor username. - See Accessing Harbor for more information about Harbor and the credentials.
Build and push the container image¶
Now that you have access, you can actually build the container image using Docker, and push it to the DSH’s container image registry:
docker build -t registry.cp.kpn-dsh.com/<your-tenant-name>/python-consume-scratch:1.0.0 .
docker push registry.cp.kpn-dsh.com/<your-tenant-name>/python-consume-scratch:1.0.0
- Replace
<your-tenant-name>with the name of your tenant on the DSH. - It’s recommended to tag your container images. For that reason, the code snippet uses the
-t(or--tag) option, and the image has thename:tagformat. - It’s recommended to use semantic versioning for the tag, which applies the pattern
<major>.<minor>.<patch>for version numbers. - You’re free to choose a different name for your image.
Deploy the custom service¶
Finally, you need to add a custom service, and set up the service definition:
- Click “Services” > “Overview” in the menu bar of the DSH Console.
- Click the “+ Service” button at the top of the “Services” overview page.
- Enter the name for the service, for example ‘python-consume-from-scratch’.
- Edit the JSON file for the service definition so that it has the form in the code snippet below. Don’t forget to replace the variables with the correct values:
<your-tenant-name>: Your tenant’s name<tenant-user-ID>: Your tenant’s user ID. You can find it in the DSH Console, on the “Resources” overview page.<topic-name>: The name of the Kafka topic of type “scratch” that you created for this tutorial- Use the name and tag for the container that you pushed in the previous step.
- Click “Start service” if the service definition looks good to you.
Some aspects of this script are worth noting:
- The Python script uses the environment variable
CONSUME_TOPIC(line 7) as the destination to write messages to. Make sure that you fill out the full name of your Kafka topic here, in the formatscratch.<topic-name>.<your-tenant-name>. - The
topicskey (line 13-15) is optional, but it ensures that the Kafka topic is available on the DSH when you deploy your custom service. See Kafka topics for more information.
Tip
Reuse the Kafka topic that you created for the Publish to Kafka topic of type “scratch” tutorial. That way, this service can consume the messages that your Python producer publishes to the Kafka topic.
Inspect the service¶
When you start the service, the DSH automatically redirects you to the details page of your service. You can also reach this page by clicking “Services” > “Overview” in the menu of the DSH Console, and then clicking the relevant line for your service in the overview page.
Producer¶
Before you can inspect your Kafka consumer service, you need to start a Kafka producer. See Publish to Kafka topic of type “scratch” to find out how you can create and start a Kafka producer in Python.
DSH Console¶
In the DSH Console, navigate to the details page of your Kafka consumer:
- Navigate to the details page of the service if you aren’t already there.
- At the bottom of the page, you can see more information about the consumer group lag. This indicates how well the services are catching up with new messages on the Kafka topic.
Grafana¶
If both the Kafka producer and Kafka consumer are running, you can inspect the output of your Kafka consumer:
- Navigate to the details page of the service if you aren’t already there.
- Under “Running tasks”, click the button with the blue “Page” icon at the right of the running task.
- In a new browser tab, the DSH leads you to the correct query in Grafana for your service’s logs:
- Scroll down to inspect the log entries.
- It may take a minute before log entries start coming in.
- Click the “Live” button at the top right of your Grafana page to see the log entries in real time, or you can refresh the page manually.
- If all goes well, you’ll see the following messages appear:
- The output of the
set_up_config.shscript - The message
<timestamp> Message received from <scratch-topic-address> [<kafka-partition>] @ <offset>, as defined in the Python script.
- The output of the
Now stop your service:
- Head back to the details page of your service.
- Click the “Stop” button at the top right of the page.
- Go back to the log entries in Grafana. The logs should show
<timestamp> Received SIGTERM, shutting down., as defined in the Python script.
Kafdrop¶
You can also use Kafdrop to inspect your Kafka topic:
- First, deploy the Kafdrop app if you haven’t done so already. If you already deployed Kafdrop, then you can skip the steps below:
- In the menu bar of the DSH Console, click “Services” > “App Catalog”.
- Click the title of the “Kafdrop” app, and then the “Configure & Deploy” button.
- Fill out the configuration form correctly, and click the “Deploy” button.
- Once the DSH deployed your Kafdrop app, you can open it:
- Click “Services” > “Overview”, and then click the name that you chose for your Kafdrop app.
- On the details page of your Kafdrop app, click the icon next to the link under “Services & resources” to navigate to your Kafdrop app.
- Log in if necessary.
- Now, you can inspect your Kafka topic:
- In Kafdrop, click the name of your Kafka topic. You’ll find it in the table under “Topics”, and the name has the format
scratch.<topic-name>.<your-tenant-name>. - Click the “View Messages” button on the details page of your Kafka topic.
- Click the “View Messages” button again on the “Topic Messages” page.
- Kafdrop displays the messages in your topic, with their timestamp and message content.
- You can use the buttons above the list to navigate through the list.
- In Kafdrop, click the name of your Kafka topic. You’ll find it in the table under “Topics”, and the name has the format
Congratulations: you have deployed your first Kafka consumer in Python. Next, you can deploy producers and consumers in Python that interact with public streams.
Further reading¶
Check out the following resources to find out more about Kafka producers in Python:
- The DSH’s Python SDK repository on GitHub. This tutorial uses several files from this repository, so it’s a good place to start.
- The Python Consumer Class tutorial by Confluent
- The documentation for the Python Consumer class by Confluent