Message envelopes¶
If a Kafka client publishes a message to a public DSH stream, then it needs to wrap the data in message envelopes to ensure that all messages in the public DSH stream have the same format, irrespective of their source.
Note
Message envelopes are a hard requirement for messages on public DSH streams. However, the DSH doesn’t enforce message envelopes on Kafka topics (of type “scratch”) or internal DSH streams.
Rationale¶
A public DSH stream consists of Kafka topics that contain Kafka messages. Two types of clients can write to these streams:
- External MQTT clients and external HTTP clients, which write to the DSH stream via the Messaging API
- Kafka clients, which write to the DSH stream directly
However, messages from external MQTT/HTTP clients contain metadata that isn’t present in Kafka messages by default:
- MQTT topic: The device or the point of information that the message is about. In public DSH streams, the message key of a Kafka message contains the MQTT topic of that message.
- Retained: Whether the DSH should store messages in the Latest Value Store for the MQTT topic in question:
- If a new MQTT client subscribes to the MQTT topic, then it receives the retained message of that topic immediately after subscribing. As a consequence, the MQTT client doesn’t have to wait for an update in the stream before it receives a message.
- HTTP clients can only receive the retained message for any given MQTT topic.
- Quality of Service (QoS): This defines the guarantee of delivery for a specific message. The concept is taken from MQTT. The DSH supports two levels:
- At most once (0): Guarantees a best-effort delivery. The recipient doesn’t acknowledge that it received the message.
- At least once (1): The sender stores the message until the receiver acknowledges that it received the message. It’s possible for a message to be sent or delivered multiple times.
It’s important that the data and the messages are consistent across the public DSH stream, and that clients know which metadata to use when they interact with a DSH stream. For that reason, a Kafka client must use a specific message envelope when it writes to a public DSH stream.
Format¶
The serialization scheme for enveloped streams is Google Protocol Buffers:
- The message key must be wrapped in a key envelope.
- The message payload must be wrapped in a data envelope.
Key envelope¶
The key envelope contains the metadata that allows the Messaging API to figure out how to handle the message properly. In the Protobuf Interface Definition Language (IDL), the key envelope is defined as follows:
syntax = "proto3";
message KeyEnvelope {
KeyHeader header = 1; // The header is mandatory on Kafka topics in public DSH streams
string key = 2; // MQTT topic (minus prefixes) on stream topics
reserved 3; // Deprecated field
}
message KeyHeader {
// Identifies the message origin
Identity identifier = 1;
// Marks the message as 'retained'
// This makes the Latest Value Store keep a copy of the message in memory for later retrieval
bool retained = 2;
// The QOS with which the message is handled within the system
QoS qos = 3;
}
// identifies the data origin
message Identity {
string tenant = 1;
string publisher = 2;
}
// System QOS identifiers
enum QoS {
BEST_EFFORT = 0; // Might be dropped in case of resources running low (~ highest throughput)
RELIABLE = 1; // Will *never* be dropped and retried until success (~ lowest throughput)
}
The header consists of the following:
- The
identifierfield, with the origin of the message:tenant: Contains the tenant ID of the message. If a client doesn’t set this field correctly, then the DSH drops the message. It isn’t retained, nor forwarded to external MQTT/HTTP clients.publisher: A free-form string that identifies the data source. This field is purely informative, and the DSH platform doesn’t attach any semantic meaning to it:- If an external MQTT/HTTP client publishes the message via the Messaging API, then the field contains the client ID of the external client.
- If a Kafka client publishes the message directly on the DSH stream, then it can choose its own naming scheme.
- The
retainedfield, which indicates whether the DSH should store this message in the Latest Value Store. - The
qosfield, which indicates the QoS level at which the Messaging API treats the message:BEST_EFFORT (0): The DSH may drop messages in rare cases if the platform is under high load. Use this for messages that are frequently refreshed, for example a vehicle that publishes its position every second.RELIABLE (1): The DSH retries the delivery of messages until successful. Using this option has a cost in terms of throughput, so only do this for messages that the DSH must deliver absolutely, in all cases.
Tip
The qos field only applies to QoS for the Messaging API. Once a message is available on a DSH stream, then it’s reliably available to all Kafka consumers, regardless of the QoS value.
The key field contains the actual message key:
- It corresponds to the MQTT topic on which the Messaging API exposes the message, minus the prefix and name of the DSH stream.
- For example, an external MQTT/HTTP client subscribes to the MQTT topic
/tt/weather/a/b/cvia the Messaging API:/tt/is the prefix.weatheris the DSH stream.a/b/cis the MQTT topic path.
- When a Kafka client publishes a message directly on this public DSH stream, then it should use
a/b/cas a key for that same MQTT topic. Note that there is no leading forward slash (/) in the key.
Data envelope¶
The data envelope contains the actual data of the message. In the Protobuf IDL, the key envelope is defined as follows:
syntax = "proto3";
message DataEnvelope {
oneof kind { // Main payload for data messages;
// Leave empty for DELETE semantics in Latest Value Store
bytes payload = 1;
}
map<string, string> tracing = 2; // Tracing data: used for passing span contexts between applications
// Tenant-specific fields are ONLY allowed in the 500-1000 range
}
The following fields are relevant:
- The
payloadfield contains the actual message payload:- Thanks to the
oneofconstruct, you can make a distinction between a data envelope with an empty payload, and a data envelope with no payload. See Empty payload for more information. - This difference is important because it defines how the Latest Value Store and the Messaging API interpret the message.
- Thanks to the
- The
tracingfield is used to transport span contexts between different platform components. See Tracing for more information.
Implementing message envelopes¶
You can find several examples of the implementation of message envelopes in repositories of the KPN-DSH organization. If you use message envelopes, then you should also take extra care when using empty payloads.
Resources¶
You can use several resources to get started with Protobuf:
Empty payload¶
In order to delete a message from the Latest Value Store, you need to encode it as a Kafka message with a correctly enveloped key, and a data envelope value that has an empty kind. When the Latest Value Store receives this message, it fully removes the corresponding key from its state store.
However, take the following into account:
- Don’t encode it as a Kafka message with a correctly enveloped key and a
nullvalue, because messages on public DSH streams must always have a correctly enveloped value. - Don’t encode it as a Kafka message with a correctly enveloped key and a data envelope value that has a zero-length binary payload. If such a message is marked as retained, the DSH will store it in the Latest Value Store. As a consequence, the Latest Value Store contains a message with a zero-length binary payload, which is problematic.