Skip to content

Consume from public DSH stream

This page describes the steps to consume messages from all Kafka topics that are part of a public DSH stream. This Kafka consumer builds on the Kafka producer described in Publish to public DSH stream, so it may be helpful to read its Key concepts. Bear in mind the following when consuming messages from a public DSH stream:

  • A public DSH stream is a collection of Kafka topics, one of which contains messages that come in from HTTP clients and MQTT clients, via the Messaging API. This is the Kafka topic with the suffix dsh.
  • If a Kafka consumer subscribes to a public DSH stream, then it needs to subscribe to all the Kafka topics inside that DSH stream.
  • Producers must wrap messages on a public DSH stream in specific Protobuf envelopes, so a Kafka consumer needs to unwrap these messages correctly to access the message and its metadata.
  • The Protobuf envelopes are a requirement, but the DSH can’t stop producers from publishing messages to the DSH stream that don’t follow this requirement. As a consequence, the Kafka consumer needs to take malformed messages into account.
  • A Kafka consumer always consumes all messages in the DSH stream, irrespective of their MQTT topic. If desired, you can filter messages on the MQTT topic after consuming them.

Prerequisites

Before you can follow this tutorial, you need the following:

  • On the DSH:
    • Access to a tenant on a DSH platform
    • A public DSH stream. See Adding a DSH stream for more information:
      • Your tenant needs the “Read/Write” permissions for this DSH stream.
      • Use the default values for replication factor, number of partitions, retained messages, and topic level for partitioning.
      • You also need an API client that has “PUB” permission and “SUB” permission on the MQTT topics for this public DSH stream if you want to access it via MQTT.
    • Access to the Harbor container image registry, and the username and CLI secret for your user. See Accessing Harbor for more information.
    • The Grafana service. See Requesting Prometheus and Grafana for more information.
  • On your machine:
    • A Unix-based system, for example Linux, MacOS or Windows Subsystem for Linux
    • Docker CLI
    • Protoc, the Protocol Buffer Compiler

Create your files

The steps below describe how you can create the files for your container image.

Working directory

Open the Terminal, create a directory for this tutorial, and enter it:

Terminal
mkdir public-dsh-stream-consume
cd public-dsh-stream-consume

Bash scripts

In the working directory, create two bash files:

  • set_up_config.sh: Set up the configuration to connect to the DSH’s Kafka cluster.
  • entrypoint.sh: Execute the set_up_config.sh script, and then execute the Python script in the current shell to ensure that termination signals are handled properly.

Bash script for configuration

This script configures the SSL connection with the Kafka cluster. It stores the keys and certificates in the /home/dsh/pki/ directory of your container, and stores information about the Kafka configuration as environment variables.

In the working directory, create a file set_up_config.sh, with the contents of the Script to configure SSL for Kafka.

Bash script for entrypoint

This script is the default executable for your service’s container. It executes the set_up_config.sh script, and then executes in the current shell any subsequent commands.

In the working directory, create a file called entrypoint.sh, with the contents below:

entrypoint.sh
#!/usr/bin/env bash

# Find the parent directory of the file.
medir=${0%/*}

# Execute the bash script for the SSL and Kafka configuration.
source ${medir}/set_up_config.sh

# Override the current shell session without creating a new process, and use the parameters passed to the script as commands. The Dockerfile passes the Python command as a parameter.
exec "$@"

Protobuf message envelopes

This file defines the message envelopes that Kafka clients must use to interact with public DSH streams. It defines the KeyEnvelope for the message metadata and the DataEnvelope for the payload. In order to use these envelopes, you need to create a Python module from the Protobuf file:

  • In the working directory, create a file envelopes.proto, with the contents of the Protobuf file for message envelopes.
  • In the Terminal, execute the command below. It creates the Python module file envelopes_pb2.py that you will import in the nexts step.
Terminal
protoc -I=. --python_out=. ./envelopes.proto

Tip

If you already created the envelopes_pb2.py file for the Produce to public DSH stream tutorial, then you can reuse that file.

Python script

This script does the actual work of consuming messages from the public DSH stream:

  • It retrieves the configuration for the SSL connection and the Kafka cluster.
  • It creates a Consumer object that consumes messages from the public DSH stream.
  • It unwraps the envelopes of the messages.
  • It creates a log entry to register the success or failure to consume messages, and the contents of the messages.
main.py
import os
import sys
import json
import signal
import time
import envelopes_pb2 as proto
from confluent_kafka import Consumer

# Handle termination signals gracefully.
def handle_sigterm(signum, frame):
    consumer.close()
    print("Received SIGTERM, shutting down.")
    sys.exit(0)

# Retrieve the configuration that the set_up_config.sh set.
def load_config():
    # Retrieve the certificates, keys, and the ID of the service from the environment variables.
    pki_cacert = os.environ["DSH_PKI_CACERT"]
    pki_key = os.environ["DSH_PKI_KEY"]
    pki_cert = os.environ["DSH_PKI_CERT"]
    client_id = os.environ["MESOS_TASK_ID"]

    # Retrieve the addresses of the Kafka brokers from the environment variable with the JSON configuration, or from the environment variable with the Kafka servers.
    # Also retrieve the list of Kafka consumer groups provided by the DSH, and use the last one in the list.
    tenant_cfg = os.getenv("JSON_TENANT_CONFIG")
    if tenant_cfg:
        cfg = json.loads(tenant_cfg)
        servers = ",".join(cfg["brokers"])
        group_id = cfg.get("shared_consumer_groups", [None])[-1]
    else:
        servers = os.environ.get("KAFKA_SERVERS")
        group_id = os.environ.get("KAFKA_GROUP_ID")

    # Make sure that the servers and consumer groups are set, because the Consumer object needs them.
    if not (servers and group_id):
        sys.stderr.write("Error: Kafka servers or group ID not set.")
        sys.exit(1)

    return {
        "bootstrap.servers": servers,
        "client.id": client_id,
        "group.id": group_id,
        "security.protocol": "ssl",
        "ssl.key.location": pki_key,
        "ssl.certificate.location": pki_cert,
        "ssl.ca.location": pki_cacert,
        "auto.offset.reset": "latest",
    }

# Unwrap the message envelopes, and assign the values to fields. This function returns the list of fields.
def unwrap(msg):
    try:
        envelope_message = proto.DataEnvelope()
        envelope_message.ParseFromString(msg.value())
        envelope_key = proto.KeyEnvelope()
        envelope_key.ParseFromString(msg.key())
        publisher_type = envelope_key.header.identifier.WhichOneof('publisher')
        publisher_value = getattr(envelope_key.header.identifier, publisher_type)
        return (
            envelope_key.key,
            envelope_message.payload.decode('utf8'),
            envelope_key.header.identifier.tenant,
            publisher_type,
            publisher_value,
            str(envelope_key.header.retained),
            str(envelope_key.header.qos)
        )
    # If the message doesn't use message envelopes, or contains an error in the envelope format, then log an error and return an empty list.
    except Exception as err:
        sys.stderr.write(f"Error in the message's Protobuf envelopes at offset {msg.offset()} in {msg.topic()}. {err}\n")
        return()

def consume_messages():
    # Listen for termination signals and load the SSL and Kafka configuration.
    signal.signal(signal.SIGTERM, handle_sigterm)
    config = load_config()

    # Create the Consumer object, and retrieve the Kafka topic (of type "scratch") from the environment variables.
    global consumer
    consumer = Consumer(config)
    topic = os.environ.get("CONSUME_STREAM_TOPIC")
    if not topic:
        sys.stderr.write("Error: CONSUME_STREAM_TOPIC not set.")
        sys.exit(1)

    # Subscribe to the topic. In this case, this is a pattern to subscribe to all Kafka topics in the DSH stream.
    consumer.subscribe([topic])
    sys.stderr.write(f"Subscribed to topic: {topic}\n" )

    # Consume from the Kafka topic, and write the success of message consumption to standard error output, so we can monitor it in Grafana.
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is not None:
                # Unwrap the message envelopes, and retrieve a list of fields inside the envelopes.
                msg_fields = unwrap(msg)
                # Log the list of fields, seperated by the pipe symbol.
                sys.stderr.write(f"Consumed message: " + " | ".join(msg_fields) + "\n")             
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()

if __name__ == "__main__":
    consume_messages()

Some aspects of this script are worth noting:

  • It’s recommended to make your container responsive to termination signals (line 9–13 and line 75).
  • The script logs results to standard error output for the sake of demonstration (line 98). However, this isn’t a recommended way of working if you deploy custom services to a production environment because this is too chatty.
  • In the configuration, the auto.offset.reset property is set to latest (line 47):
    • This defines what to do if there is no initial offset in Kafka, or if the current offset doesn’t exist anymore, for example because the message was deleted. In the case of latest, the offset of the consumer is reset to the latest offset in Kafka.
    • In this script, the consumer will reset to the latest offset in Kafka, which may cause it to miss any previous messages that were published between the deleted message’s offset and the latest offset.
    • See auto.offset.reset for more information.
  • The script doesn’t define the value for the enable.auto.commit property, so it defaults to true:
    • The property defines whether the consumer’s offset will be periodically committed to Kafka in the background.
    • However, in a production environment, you want to control this action:
      • You only want to commit a message’s offset to Kafka if you’re absolutely certain that your service processed the message correctly.
      • This will make sure that your service picks up at the right offset in case of a crash, for example during the processing of a message. That way, you don’t lose any data.
      • See enable.auto.commit and KafkaConsumer.commit() for more information.
  • Prepare for the possibility that messages aren’t wrapped in envelopes (line 69–71):
    • Every Kafka producer must use specific message envelopes if they publish a message to a public DSH stream.
    • However, the DSH can’t refuse messages if they don’t adhere to this rule.
    • As a consequence, it’s possible that the DSH stream contains messages that aren’t wrapped in envelopes, or aren’t wrapped correctly. Trying to unwrap such messages will result in an error.
  • The script uses the CONSUME_STREAM_TOPIC environment variable that you define in the service definition. See Deploy the custom service below for more information.

Dockerfile

This file contains the instructions and commands to assemble an image using Docker.

In the working directory, create a file Dockerfile with the contents below:

Dockerfile
# Use the Python official Docker image .
FROM python:3.13

# Create an environment variable for the tenant's user ID.
ENV id=<tenant-user-ID>

# Install the necessary packages for SSL and Kafka.
RUN apt-get update && apt-get install -y --no-install-recommends \
    openssl \
    curl \
    librdkafka-dev

# Create "appuser" user.
RUN useradd --uid $id appuser

# Copy the bash scripts and the Python script.
COPY --chown=$id:$id --chmod=0755 entrypoint.sh /home/dsh/app/
COPY --chown=$id:$id --chmod=0755 set_up_config.sh /home/dsh/app/
COPY --chown=$id:$id --chmod=0755 main.py /home/dsh/app/

# Install the Confluent-Kafka Python package.
WORKDIR /home/dsh/app/
RUN pip install confluent-kafka google protobuf

# Switch from root to appuser, execute the entrypoint bash script, and execute the Python script.
USER $id=$id
ENTRYPOINT ["/bin/bash", "/home/dsh/app/entrypoint.sh"] 
CMD ["python", "/home/dsh/app/main.py"]

Some aspects of this script are worth noting:

  • It’s important that you don’t run commands in your image as the root user:
    • Running commands as root raises security issues.
    • As a solution, you can add a new user and set it as the default user to run commands (line 14 and 26).
  • It’s recommended that you assign your tenant’s user ID to the new user:
    • In the menu bar of the DSH Console, navigate to “Resources” > “Overview” to see the user ID of your tenant.
    • You can add your tenant’s user ID as an environment variable (line 5).
    • Specify the user ID via the environment variable when you create the user (line 14), and when you switch to the user (line 26).
  • The container executes the entrypoint.sh script (line 27) by default:
    • As described in Bash script for configuration, this script executes the configuration script, and then uses the parameters passed to the script as commands.
    • The CMD instruction passes the python command to entrypoint.sh, with the location of your Python script as a parameter (line 28).
    • This is a standard way to set environment variables, and to make sure that all processes in the container are responsive to termination signals.
  • The Python script uses the following Python packages:

Build the image

Log in to the DSH container image registry

In the next step, log in to the Harbor container image registry of the DSH. Execute the command below, and enter the CLI secret for your user when prompted:

Terminal
docker login registry.cp.kpn-dsh.com -u <your-Harbor-username>
  • Replace <your-Harbor-username> with your actual Harbor username.
  • See Accessing Harbor for more information about Harbor and the credentials.

Build and push the container image

Now that you have access, you can actually build the container image using Docker, and push it to the DSH’s container image registry:

Terminal
docker build -t registry.cp.kpn-dsh.com/<your-tenant-name>/python-consume-stream:1.0.0 .
docker push registry.cp.kpn-dsh.com/<your-tenant-name>/python-consume-stream:1.0.0
  • Replace <your-tenant-name> with the name of your tenant on the DSH.
  • It’s recommended to tag your container images. For that reason, the code snippet uses the -t (or --tag) option, and the image has the name:tag format.
  • It’s recommended to use semantic versioning for the tag, which applies the pattern <major>.<minor>.<patch> for version numbers.
  • You’re free to choose a different name for your image.

Deploy the custom service

Finally, you need to add a custom service, and set up the service definition:

  • Click “Services” > “Overview” in the menu bar of the DSH Console.
  • Click the “+ Service” button at the top of the “Services” overview page.
  • Enter the name for the service, for example ‘python-consume-from-stream’.
  • Edit the JSON file for the service definition so that it has the form in the code snippet below. Don’t forget to replace the variables with the correct values:
    • <your-tenant-name>: Your tenant’s name
    • <tenant-user-ID>: Your tenant’s user ID. You can find it in the DSH Console, on the “Resources” overview page.
    • <public-DSH-stream-name>: The name of the public DSH stream that you created for this tutorial.
    • Use the name and tag for the container that you pushed in the previous step.
  • Click “Start service” if the service definition looks good to you.
Service definition
{
  "name": "python-consume-from-stream",
  "image": "registry.cp.kpn-dsh.com/<your-tenant-name>/python-consume-stream:1.0.0",
  "cpus": 0.1,
  "mem": 256,
  "env": {
    "CONSUME_STREAM_TOPIC": "^stream.<public-DSH-stream-name>.*"
  },
  "instances": 1,
  "singleInstance": false,
  "needsToken": true,
  "user": "<tenant-user-ID>"
}

The Python script uses the environment variable in the service definition (line 6–8):

  • The variable CONSUME_STREAM_TOPIC defines which topic the Kafka consumer needs to subscribe to.
  • The caret symbol ^ indicates that the string following it is a pattern. See the subscribe() definition in Confluent’s documentation for more information.
  • In this case, the pattern lets the consumer subscribe to all Kafka topics in the public DSH stream.

Inspect the service

When you start the service, the DSH automatically redirects you to the details page of your service. You can also reach this page by clicking “Services” > “Overview” in the menu of the DSH Console, and then clicking the relevant line for your service in the overview page.

Grafana

You can inspect the output of the service:

  • Navigate to the details page of the service if you aren’t already there.
  • Under “Running tasks”, click the button with the blue “Page” icon at the right of the running task.
  • In a new browser tab, the DSH leads you to the correct query in Grafana for your service’s logs:
    • Scroll down to inspect the log entries.
    • It may take a minute before log entries start coming in.
    • Click the “Live” button at the top right of your Grafana page to see the log entries in real time, or you can refresh the page manually.
  • If all goes well, you’ll see the following messages appear:
    • The output of the set_up_config.sh script
    • The message <timestamp> Consumed message: house/kitchen/sensor | <message> | <tenant-name> | <publisher-type> | <publisher-id> | <retained> | <qos>, as defined in the Python script.

Now stop your service:

  • Head back to the details page of your service.
  • Click the “Stop” button at the top right of the page.
  • Go back to the log entries in Grafana. The logs should show <timestamp> Received SIGTERM, shutting down., as defined in the Python script.

Further reading

Check out the following resources to find out more about Kafka producers in Python: