Skip to content

Deploy services in Python

This chapter contains several tutorials that you can use to get started with services in Python on the DSH:

Kafka via SSL

If you add a custom service to your tenant in the DSH, it’s isolated inside your tenant:

  • By default, it can connect to other services inside your tenant only.
  • You can add vhosts to allow connections outside your tenant, namely with other tenants or the public Internet.

However, every platform contains one Kafka cluster that is shared among all tenants on that platform:

  • Your service needs to set up an SSL connection with the Kafka cluster if it wants to interact with it. The DSH contains a Public Key Infrastructure (PKI) that handles the certificates and keys for these SSL connections.
  • Additionally, your service needs the address of the Kafka servers, and it’s recommended to assign a Kafka client ID to your service.

It’s no easy feat to set up the SSL connection correctly, and to retrieve the correct information to connect to the Kafka cluster. For that reason, the KPN DSH team created a bash script that does this for you:

  • It retrieves the environment variables, such as tenant name, DNS name, task ID of your service.
  • It retrieves the server certificate and public key for the Kafka cluster from the DSH’s PKI, and stores them.
  • It generates a client certificate, public key and private key for your service, and requests that the DSH’s PKI verifies and signs the client certificate.
  • It stores the following information as environment variables, so that your custom service can retrieve them:
    • The location of the certificates and your service’s keys
    • The tenant name and the DNS name
    • The Kafka configuration for your service, in JSON

In the tutorials, the script set_up_config.sh takes care of configuring the SSL connection with the Kafka cluster. It stores the keys and certificates in the /home/dsh/pki/ directory of your container.

Script to configure SSL for Kafka

This script configures the SSL connection with the Kafka cluster. It stores the keys and certificates in the /home/dsh/pki/ directory of your container, and stores information about the Kafka configuration as environment variables. You can use this script in your containers to set up the SSL connection.

set_up_config.sh
#!/usr/bin/env bash
medir=${0%/*}

############################## Functions ##############################
function section() {
    if [ -z "$QUIET_ENTRY" ]; then
        echo -e "\n******"
        if [ $# -gt 0 ]; then
            for ar in "$@"; do
                echo "****** $ar"
            done
        else
            echo "******"
        fi
        echo -e "******\n"
    fi
}

function slash2dn() {
    reslt=""
    # change IFS to use '/' as word separator
    SAVEIFS="$IFS"
    IFS='/'
    # read components of the marathon id separated by '/' as words
    for part in $(echo "$1"); do
        if [ -n "$part" ]; then
            # append non-empty words in reverse order separated by dots"
            reslt="$part.$reslt"
        fi
    done
    # restore original IFS
    IFS="$SAVEIFS"
    # print the result removing the dot at the end
    echo ${reslt%.}
}

function checkenv() {
    return_code=0
    for arg in "$@"; do
        var_name=$(echo '\$'${arg})
        eval [ -z "${var_name}" ]
        code=$?
        if [ $code -eq 0 ]; then
            echo "ERROR: Missing environment variable: ${arg}" && return_code=1
        fi
    done
    return $return_code
}

function checkerror() {
    code=$?
    if [ $code -ne 0 ]; then
        echo "ERROR: Exiting because the last shell command returned a non-zero exit code: $code"
        exit 1
    fi
}

############################## Main ##############################

section "Setting up DSH tenant certificates and tenant configuration for secure Kafka"

### Prerequisites ###
section "Check required environment variables provided by DSH."
checkenv KAFKA_CONFIG_HOST DSH_SECRET_TOKEN MESOS_TASK_ID MARATHON_APP_ID DSH_CA_CERTIFICATE
checkerror
echo "KAFKA_CONFIG_HOST  : $KAFKA_CONFIG_HOST"
echo "DSH_SECRET_TOKEN   : XXXXXXXXXX${DSH_SECRET_TOKEN:10}"
echo "MESOS_TASK_ID      : $MESOS_TASK_ID"
echo "MARATHON_APP_ID    : $MARATHON_APP_ID"
echo "DSH_CA_CERTIFICATE :"
echo "$DSH_CA_CERTIFICATE"

### PKI dir ###
section "Create directory to store PKI files."
PKI_CONFIG_DIR=/home/dsh/pki
mkdir -p $PKI_CONFIG_DIR
echo "PKI_CONFIG_DIR=$PKI_CONFIG_DIR"

### Tenant name ###
section "Determine the tenant name. It is the 1st component of MARATHON_APP_ID"
TENANT_NAME=$(echo ${MARATHON_APP_ID} | cut -d / -f 2)
echo "TENANT_NAME=$TENANT_NAME"

### DNS name ###
section "Determine the host name. It is the components of MARATHON_APP_ID" "in reverse, with 'marathon.mesos' suffix"
DNS_NAME=$(slash2dn mesos/marathon/$MARATHON_APP_ID)
echo "DNS_NAME=$DNS_NAME"

### Client certificate ###
section "Store the DSH CA CERTIFICATE in ${PKI_CONFIG_DIR}/ca.crt."
echo "${DSH_CA_CERTIFICATE}" >$PKI_CONFIG_DIR/ca.crt
openssl x509 -in $PKI_CONFIG_DIR/ca.crt -text -noout

section "For our certificate, we request the subject DN from the KAFKA_CONFIG_HOST" \
    "Typically using this:" \
    'curl --cacert "$PKI_CONFIG_DIR/ca.crt" -s "https://$KAFKA_CONFIG_HOST/dn/$TENANT_NAME/$MESOS_TASK_ID"'

echo curl --cacert "$PKI_CONFIG_DIR/ca.crt" "https://$KAFKA_CONFIG_HOST/dn/$TENANT_NAME/$MESOS_TASK_ID"
DN=$(curl --cacert "$PKI_CONFIG_DIR/ca.crt" "https://$KAFKA_CONFIG_HOST/dn/$TENANT_NAME/$MESOS_TASK_ID")
checkerror
echo -e "\nDN=$DN"

section "Use OpenSSL to generate our CSR = Certificate Signing Request, which will become our x509 certificate" \
    "For this we first generate public/private key pair" \
    'openssl genrsa -out ${PKI_CONFIG_DIR}/client.key 4096' \
    'Then generate CSR itself with certificate subject name = $DN we received from KAFKA_CONFIG_HOST' \
    'but separated with "/" instead of ","' \
    'openssl req -key ${PKI_CONFIG_DIR}/client.key -new -out ${PKI_CONFIG_DIR}/client.csr -subj "/${DN//,//}"'
CERTIFICATE_SUBJECT_NAME="/${DN//,//}"
echo CERTIFICATE_SUBJECT_NAME="${CERTIFICATE_SUBJECT_NAME}"

section "Generate public/private key pair in $PKI_CONFIG_DIR/client.key"
openssl genrsa -out $PKI_CONFIG_DIR/client.key 4096
checkerror
echo "..Contents of key is not logged to standard output for security reasons."

section "Generate csr in $PKI_CONFIG_DIR/client.csr"
openssl req -key $PKI_CONFIG_DIR/client.key -new -out $PKI_CONFIG_DIR/client.csr -subj "${CERTIFICATE_SUBJECT_NAME}"
checkerror
openssl req -in $PKI_CONFIG_DIR/client.csr -text

section "Now we HTTP POST the request to KAFKA_CONFIG_HOST for verification and signature" \
    "it responds with our newly minted x509 cert" \
    'curl --cacert ${PKI_CONFIG_DIR}/ca.crt -o ${PKI_CONFIG_DIR}/client.crt -s --data-binary @${PKI_CONFIG_DIR}/client.csr -H "X-Kafka-Config-Token: ${DSH_SECRET_TOKEN}" "https://${KAFKA_CONFIG_HOST}/sign/${TENANT_NAME}/${MESOS_TASK_ID}"'
echo curl --cacert ${PKI_CONFIG_DIR}/ca.crt -o ${PKI_CONFIG_DIR}/client.crt -s --data-binary @${PKI_CONFIG_DIR}/client.csr -H "X-Kafka-Config-Token: ${DSH_SECRET_TOKEN}" "https://${KAFKA_CONFIG_HOST}/sign/${TENANT_NAME}/${MESOS_TASK_ID}"
curl --cacert ${PKI_CONFIG_DIR}/ca.crt -o ${PKI_CONFIG_DIR}/client.crt -s --data-binary @${PKI_CONFIG_DIR}/client.csr -H "X-Kafka-Config-Token: ${DSH_SECRET_TOKEN}" "https://${KAFKA_CONFIG_HOST}/sign/${TENANT_NAME}/${MESOS_TASK_ID}"
checkerror

section "Store certificate in $PKI_CONFIG_DIR/client.crt"
openssl x509 -in $PKI_CONFIG_DIR/client.crt -text -noout

### Java keystores ###
section "Creating java style keystores. They will only be created when the 'keytool' is on the path."
if [ -x "$(command -v keytool)" ]; then
    echo "keytool is installed, continue with generating keystores."
    DSH_KEYSTORE="$PKI_CONFIG_DIR/keystore.jks"
    DSH_KEYSTORE_PASSWORD=$(openssl rand -base64 32)
    DSH_TRUSTSTORE="$PKI_CONFIG_DIR/truststore.jks"
    DSH_TRUSTSTORE_PASSWORD=$(openssl rand -base64 32)

    echo "Create pkcs12 from pem key file"
    openssl pkcs12 -export -out "${PKI_CONFIG_DIR}/client.key.p12" -inkey "${PKI_CONFIG_DIR}/client.key" \
        -in "${PKI_CONFIG_DIR}/client.crt" -certfile "${PKI_CONFIG_DIR}/ca.crt" -password pass:${DSH_KEYSTORE_PASSWORD}

    echo "Create keystore"
    keytool -importkeystore -deststorepass ${DSH_KEYSTORE_PASSWORD} -destkeypass ${DSH_KEYSTORE_PASSWORD} \
        -destkeystore "${DSH_KEYSTORE}" -srckeystore "${PKI_CONFIG_DIR}/client.key.p12" -srcstoretype PKCS12 \
        -srcstorepass "${DSH_KEYSTORE_PASSWORD}" -alias 1

    echo "Create truststore"
    keytool -import -alias "dsh ca" -keystore "${DSH_TRUSTSTORE}" -file "${PKI_CONFIG_DIR}/ca.crt" \
        -storepass "${DSH_TRUSTSTORE_PASSWORD}" -noprompt
else
    echo "keytool is not installed, continue without generating keystores."
fi

### Tenant configuration ###
section "Tenant configuration can be requested as follows (with proper TLS params)" \
    "3 formats are available: json, shell, and java" \
    'curl https://${KAFKA_CONFIG_HOST}/kafka/config/${TENANT_NAME}/${MESOS_TASK_ID}?format=json'

echo curl --cacert ${PKI_CONFIG_DIR}/ca.crt --key ${PKI_CONFIG_DIR}/client.key --cert ${PKI_CONFIG_DIR}/client.crt "https://${KAFKA_CONFIG_HOST}/kafka/config/${TENANT_NAME}/${MESOS_TASK_ID}?format=json"
JSON_TENANT_CONFIG=$(curl --cacert ${PKI_CONFIG_DIR}/ca.crt --key ${PKI_CONFIG_DIR}/client.key --cert ${PKI_CONFIG_DIR}/client.crt "https://${KAFKA_CONFIG_HOST}/kafka/config/${TENANT_NAME}/${MESOS_TASK_ID}?format=json")

### Export variables ###
section "Export environment variables to make them available for reuse."
export DSH_PKI_CACERT="$PKI_CONFIG_DIR/ca.crt"
export DSH_PKI_KEY="$PKI_CONFIG_DIR/client.key"
export DSH_PKI_CERT="$PKI_CONFIG_DIR/client.crt"
export TENANT_NAME
export JSON_TENANT_CONFIG
export DNS_NAME

echo DSH_PKI_CACERT="$DSH_PKI_CACERT"
echo DSH_PKI_KEY="$DSH_PKI_KEY"
echo DSH_PKI_CERT="$DSH_PKI_CERT"

[ ! -z "${DSH_KEYSTORE}" ] && export DSH_KEYSTORE && echo "DSH_KEYSTORE=${DSH_KEYSTORE}"
[ ! -z "${DSH_KEYSTORE_PASSWORD}" ] && export DSH_KEYSTORE_PASSWORD && echo "DSH_KEYSTORE_PASSWORD=XXXX"
[ ! -z "${DSH_TRUSTSTORE}" ] && export DSH_TRUSTSTORE && echo "DSH_TRUSTSTORE=${DSH_TRUSTSTORE}"
[ ! -z "${DSH_TRUSTSTORE_PASSWORD}" ] && export DSH_TRUSTSTORE_PASSWORD && echo "DSH_TRUSTSTORE_PASSWORD=XXXX"

echo TENANT_NAME="$TENANT_NAME"
echo DNS_NAME="$DNS_NAME"
echo JSON_TENANT_CONFIG="$JSON_TENANT_CONFIG"

### End ###
section "Completed setup tenant certificates and tenant configuration"

Protobuf file for message envelopes

If a Kafka producer wants to publish messages to a public DSH stream, then it needs to wrap these messages in a KeyEnvelope and a DataEnvelope. The envelopes.proto file below contains the definitions for these envelopes, which are used in the Publish to DSH stream and Consume from DSH stream examples.

Tip

See Message envelopes for more information

envelopes.proto
syntax = "proto3";
package com.kpn.dsh.messages.common;

option java_package = "com.kpn.dsh.messages.common";

import "google/protobuf/any.proto";

message KeyEnvelope {
    KeyHeader header = 1;           // header is mandatory on stream topics
    string key = 2;                 // MQTT topic (minus prefixes) on stream topics
    reserved 3;                     // deprecated field
}

message KeyHeader {
    // identifies the message origin
    Identity identifier = 1;

    // marks the message as 'retained'
    // this makes the Latest Value Store keep a copy of the message in memory for later retrieval
    bool retained = 2;

    // the QOS with wich the message is handled within the system
    QoS qos = 3;
}

// identifies the data origin
message Identity {
    string  tenant    = 1;
    oneof publisher {
        string free_form   = 2;     // for 'backwards' compatibility or produces not matching the publishers below
        string user        = 3;     // reserved for MQTT side publishes ('humans')
        string client      = 4;     // reserved for MQTT side publishes ('devices')
        string application = 5;     // to be used when producing data from containers
    }
}

// System QOS identifiers
enum QoS {
    BEST_EFFORT  = 0;   // might be dropped in case of resources running low (~ highest throughput)
    RELIABLE     = 1;   // will *never* be dropped and retried until success (~ lowest throughput)
}

message DataEnvelope {
    oneof kind {                      // main payload for data messages;
                                      // leave empty for DELETE semantics in Latest Value Store
        bytes payload = 1;
    }

    map<string, string> tracing = 2; // tracing data: used for passing span contexts between applications
    // tenant-specific fields are ONLY allowed in the 500-1000 range
}