Publish to public DSH stream¶
This page describes the steps to publish messages to a Kafka topic that is part of a public DSH stream. You will create a very simple Kafka producer in Python that sends messages to a public DSH stream every five seconds.
Prerequisites¶
Before you can follow this tutorial, you need the following:
- On the DSH:
- Access to a tenant on a DSH platform
- A public DSH stream. See Adding a DSH stream for more information:
- Your tenant needs the “Read/Write” permissions for this DSH stream.
- Use the default values for replication factor, number of partitions, retained messages, and topic level for partitioning.
- You also need an API client that has “PUB” permission and “SUB” permission on the MQTT topics for this public DSH stream if you want to access it via MQTT.
- Access to the Harbor container image registry, and the username and CLI secret for your user. See Accessing Harbor for more information.
- The Grafana service. See Requesting Prometheus and Grafana for more information.
- On your machine:
- A Unix-based system, for example Linux, MacOS or Windows Subsystem for Linux
- Docker CLI
- Protoc, the Protocol Buffer Compiler
Key concepts¶
If you want to interact with a public DSH stream, then you need to understand some key concepts. Take your time to read through the sections below, and to discover the details in the reference pages.
Public DSH stream¶
A public DSH stream is a collection of Kafka topics, and it will contain at least 2 of them. If a tenant ‘automator’ requested a public DSH stream ‘temperature’, then this DSH stream contains at least the following Kafka topics:
stream.temperature.automator: The Kafka topic that the tenant ‘automator’ publishes messages to with a Kafka producer.stream.temperature.dsh: The Kafka topic that the DSH publishes messages to that come in via the Messaging API.
You can share a public DSH stream with multiple tenants, who then receive their own Kafka topic inside the DSH stream. However, only the DSH can write to a *.dsh Kafka topic.
Note
See DSH stream for more information.
Messaging API¶
You can expose a public DSH stream via the DSH’s Messaging API. If you do so, external clients can publish messages to the public DSH stream via MQTT or HTTP, and they can subscribe to the DSH stream:
- The messages in a public DSH stream support the MQTT protocol. As a consequence, these messages have metadata specifically for MQTT.
- The DSH publishes messages that come in via the Messaging API to a specific topic in a public DSH stream. In a DSH stream called ‘temperature’, the DSH writes these messages to the Kafka topic
stream.temperature.dsh. - When an external HTTP client or MQTT client subscribes to an MQTT topic in a public DSH stream, it consumes the messages across all Kafka topics in that DSH stream, provided that these messages have a matching MQTT topic.
Note
See Messaging API for more information.
Message envelopes¶
In order to support the MQTT protocol, messages in a public DSH stream contain metadata for “MQTT topic”, “Quality of Service”, “retained”, and the identity of the publisher. For that reason, a Kafka producer must wrap the messages in Protobuf message envelopes:
- The KeyEnvelope contains the metadata, and the DataEnvelope contains the message itself.
- These message envelopes allow the Messaging API to handle these messages correctly.
- The Kafka producer must also apply a specific partitioning scheme to ensure that messages with the same MQTT topic end up in the same Kafka partition. The Messaging API follows the same partitioning scheme. That way, you maintain the order and history of messages.
Note
See DSH stream and Message envelopes for more information.
Create your files¶
The steps below describe how you can create the files for your container image.
Working directory¶
Open the Terminal, create a directory for this tutorial, and enter it:
Bash scripts¶
In the working directory, create two bash files:
set_up_config.sh: Set up the configuration to connect to the DSH’s Kafka cluster.entrypoint.sh: Execute theset_up_config.shscript, and then execute the Python script in the current shell to ensure that termination signals are handled properly.
Bash script for configuration¶
This script configures the SSL connection with the Kafka cluster. It stores the keys and certificates in the /home/dsh/pki/ directory of your container, and stores information about the Kafka configuration as environment variables.
In the working directory, create a file set_up_config.sh, with the contents of the Script to configure SSL for Kafka.
Bash script for entrypoint¶
This script is the default executable for your service’s container. It executes the set_up_config.sh script, and then executes in the current shell any subsequent commands.
In the working directory, create a file called entrypoint.sh, with the contents below:
Protobuf message envelopes¶
This file defines the message envelopes that Kafka clients must use to interact with public DSH streams. It defines the KeyEnvelope for the message metadata, and the DataEnvelope for the payload. In order to use these envelopes, you need to create a Python module from the Protobuf file:
- In the working directory, create a file
envelopes.proto, with the contents of the Protobuf file for message envelopes. - In the Terminal, execute the command below. It creates the Python module file
envelopes_pb2.pythat you will import in the nexts step.
Python script¶
This script does the actual work of producing messages to the tenant’s Kafka topic inside the public DSH stream:
- It retrieves the configuration for the SSL connection and the Kafka cluster.
- It creates a
Producerobject. - It calculates the partition, using the topic level depth and number of partions of the DSH stream, and the MQTT topic of the message.
- It wraps the metadata and the value in the appropriate envelopes.
- It publishes the message and creates a log entry to register the success or failure to publish messages.
| main.py | |
|---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | |
Some aspects of this script are worth noting:
- It’s recommended to make your container responsive to termination signals (line 14–17 and line 93).
- The script logs results to standard error output for the sake of demonstration (line 49–53). However, this isn’t a recommended way of working if you deploy custom services to a production environment because this is too chatty.
- The partitioner uses the MQTT topic, topic level depth and number of partitions to calculate the partition that the producer should write the message to (line 58–60):
- It first reduces the MQTT topic to the number of levels defined by the topic level depth. Note that the MQTT topics in question don’t have a leading forward slash (
/). - It then calculates the hash of the reduced MQTT topic, using the
murmur2function from the standard Kafka library. You have to add a bitmask& 0x7fffffffto account for the difference between Java and Python. The DSH uses the Java implementation of themurmur2function to calculate the partition, and Java has no unsigned integer types. This script uses the Python implementation of themurmur2function, but Python does support unsigned integer types. The bitmask makes sure that there is no difference in the hashes between Java and Python. - It then applies the the modulo operation to the hash, using the number of partitions as a divisor.
- Make sure that you use this logic when you calculate a partition, and don’t forget to add the bitmask in Python. If the message ends up in the wrong partition, MQTT clients can’t access it.
- It first reduces the MQTT topic to the number of levels defined by the topic level depth. Note that the MQTT topics in question don’t have a leading forward slash (
- If a Kafka producer publishes messages to a public DSH stream, then it must wrap them in message envelopes. The script imports the Python module that we created in the previous step (line 7) and populates the fields in the
wrap()function (line 63–89, called on line 125). - The script uses the
PRODUCE_STREAM_TOPICandMQTT_TOPICenvironment variables that you define in the service definition. See Deploy the custom service below for more information.
Dockerfile¶
This file contains the instructions and commands to assemble an image using Docker.
In the working directory, create a file Dockerfile with the contents below:
Some aspects of this script are worth noting:
- It’s important that you don’t run commands in your image as the root user:
- Running commands as root raises security issues.
- As a solution, you can add a new user and set it as the default user to run commands (line 14 and 26).
- It’s recommended that you assign your tenant’s user ID to the new user:
- In the menu bar of the DSH Console, navigate to “Resources” > “Overview” to see the user ID of your tenant.
- You can add your tenant’s user ID as an environment variable (line 5).
- Specify the user ID via the environment variable when you create the user (line 14), and when you switch to the user (line 26).
- The container executes the
entrypoint.shscript (line 27) by default:- As described in Bash script for configuration, this script executes the configuration script, and then uses the parameters passed to the script as commands.
- The
CMDinstruction passes thepythoncommand toentrypoint.sh, with the location of your Python script as a parameter (line 28). - This is a standard way to set environment variables, and to make sure that all processes in the container are responsive to termination signals.
- The Python script uses the following Python packages:
- confluent-kafka for the Kafka producer
- google and protobuf for the message envelopes
- kafka_python for the
murmur2hashing function
Build the image¶
Log in to the DSH container image registry¶
In the next step, log in to the Harbor container image registry of the DSH. Execute the command below, and enter the CLI secret for your user when prompted:
- Replace
<your-Harbor-username>with your actual Harbor username. - See Accessing Harbor for more information about Harbor and the credentials.
Build and push the container image¶
Now that you have access, you can actually build the container image using Docker, and push it to the DSH’s container image registry:
docker build -t registry.cp.kpn-dsh.com/<your-tenant-name>/python-publish-stream:1.0.0 .
docker push registry.cp.kpn-dsh.com/<your-tenant-name>/python-publish-stream:1.0.0
- Replace
<your-tenant-name>with the name of your tenant on the DSH. - It’s recommended to tag your container images. For that reason, the code snippet uses the
-t(or--tag) option, and the image has thename:tagformat. - It’s recommended to use semantic versioning for the tag, which applies the pattern
<major>.<minor>.<patch>for version numbers. - You’re free to choose a different name for your image.
Deploy the custom service¶
Finally, you need to add a custom service, and set up the service definition:
- Click “Services” > “Overview” in the menu bar of the DSH Console.
- Click the “+ Service” button at the top of the “Services” overview page.
- Enter the name for the service, for example ‘python-publish-to-stream’.
- Edit the JSON file for the service definition so that it has the form in the code snippet below. Don’t forget to replace the variables with the correct values:
<your-tenant-name>: Your tenant’s name<tenant-user-ID>: Your tenant’s user ID. You can find it in the DSH Console, on the “Resources” overview page.<tenant-topic-in-public-stream>: The name of your tenant’s Kafka topic in the public DSH stream that you created for this tutorial. It has the formatstream.<public-DSH-stream-name>.<your-tenant-name>.- Use the name and tag for the container that you pushed in the previous step.
- Click “Start service” if the service definition looks good to you.
The Python script uses the environment variables in the service definition (line 6–9):
PRODUCE_STREAM_TOPIC: The destination to write messages to. Make sure that you fill out the full name of your Kafka topic here.MQTT_TOPIC: The MQTT topic to write messages to. For the demonstration, we use the MQTT topic ‘house/kitchen/sensor’, but you can change it. Make sure that your MQTT topic doesn’t have a leading forward slash (/).
Inspect the service¶
When you start the service, the DSH automatically redirects you to the details page of your service. You can also reach this page by clicking “Services” > “Overview” in the menu of the DSH Console, and then clicking the relevant line for your service in the overview page.
Grafana¶
You can inspect the output of the service:
- Navigate to the details page of the service if you aren’t already there.
- Under “Running tasks”, click the button with the blue “Page” icon at the right of the running task.
- In a new browser tab, the DSH leads you to the correct query in Grafana for your service’s logs:
- Scroll down to inspect the log entries.
- It may take a minute before log entries start coming in.
- Click the “Live” button at the top right of your Grafana page to see the log entries in real time, or you can refresh the page manually.
- If all goes well, you’ll see the following messages appear:
- The output of the
set_up_config.shscript - The message
<timestamp> Message delivered to <scratch-topic-address> [<kafka-partition>] @ <offset>, as defined in the Python script.
- The output of the
Now stop your service:
- Head back to the details page of your service.
- Click the “Stop” button at the top right of the page.
- Go back to the log entries in Grafana. The logs should show
<timestamp> Received SIGTERM, shutting down., as defined in the Python script.
Kafdrop¶
You can use Kafdrop to inspect your Kafka topic:
- First, deploy the Kafdrop app if you haven’t done so already. If you already deployed Kafdrop, then you can skip the following steps:
- In the menu bar of the DSH Console, click “Services” > “App Catalog”.
- Click the title of the “Kafdrop” app, and then the “Configure & Deploy” button.
- Fill out the configuration form correctly, and click the “Deploy” button.
- Once the DSH deployed your Kafdrop app, you can open it:
- Click “Services” > “Overview”, and then click the name that you chose for your Kafdrop app.
- On the details page of your Kafdrop app, click the icon next to the link under “Services & resources” to navigate to your Kafdrop app.
- Log in if necessary.
- Now, you can inspect your Kafka topic:
- In Kafdrop, click the name of your Kafka topic. You can find it in the table under “Topics”, and the name has the format
stream.<topic-name>.<your-tenant-name>. - Click the “View Messages” button on the details page of your Kafka topic.
- Click the “View Messages” button again on the “Topic Messages” page. Make sure that you select the correct partition.
- Kafdrop displays the messages in your topic, with their timestamp and message content.
- You can use the buttons above the list to navigate through the list.
- In Kafdrop, click the name of your Kafka topic. You can find it in the table under “Topics”, and the name has the format
Cmd Line¶
You can use the Cmd Line app to inspect the complete DSH stream:
- First, deploy the Cmd Line app if you haven’t done so already. If you already deployed Cmd Line, then you can skip the following steps:
- In the menu bar of the DSH Console, click “Services” > “App Catalog”.
- Click the title of the “Cmd Line” app, and then the “Configure & Deploy” button.
- Fill out the configuration form correctly, and click the “Deploy” button.
- Once the DSH deployed your Cmd Line app, you can open it:
- Click “Services” > “Overview”, and then click the name that you chose for your Cmd Line app.
- On the details page of your Cmd Line app, click the icon next to the link under “Services & resources” to navigate to your Cmd Line app.
- Log in if necessary.
- Now, you can inspect your public DSH stream:
- Enter the command
dshkcl consume stream.<public-DSH-stream-name>.* | jq . - The Cmd Line app will display all the messages in the public DSH stream, in the JSON format.
- Check the documentation at the top of the Cmd Line app to discover other commands.
- Enter the command
Congratulations: you have deployed your first Kafka producer for a public DSH stream, in Python. Next, you can deploy a Kafka consumer of a public DSH stream in Python.
Further reading¶
Check out the following resources to find out more about Kafka producers in Python:
- The DSH’s Python SDK repository on GitHub. This tutorial uses several files from this repository, so it’s a good place to start.
- The Python Producer Class tutorial by Confluent
- The documentation for the Python Producer class by Confluent