Add Sparkplug B Industrial IoT Example#461
Conversation
New example demonstrating the Sparkplug B industrial IoT protocol:
- Edge Node client publishes sensor data and responds to commands
- Host Application client subscribes to data and sends commands
- Implements Sparkplug topic namespace (spBv1.0/{group}/{type}/{node}[/{device}])
- Demonstrates NBIRTH, NDEATH, DDATA, and DCMD message types
- Includes simplified payload encoding (not full protobuf)
- Supports both single-threaded and multi-threaded builds
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
README covers: - Protocol overview and architecture - Message types and simulated metrics - Build instructions (Autotools and CMake) - Command-line options and example output - Configuration and payload format notes Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Link to examples/sparkplug/README.md for detailed documentation. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Use WOLF_CRYPT_TYPES_H include guard to skip the word64 typedef when wolfSSL has already defined it, matching the pattern used in wolfmqtt/mqtt_types.h. Fixes build error on macOS where wolfSSL defines word64 as unsigned long vs uint64_t (unsigned long long). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR adds a new Sparkplug B Industrial IoT example to wolfMQTT, demonstrating Sparkplug-style topic namespaces and message flows using two MQTT clients (Edge Node + Host Application), plus build-system and documentation integration.
Changes:
- Added new Sparkplug example implementation (
examples/sparkplug/sparkplug.c/.h) with simplified (non-protobuf) payload encoding/decoding. - Integrated the new example into Autotools and CMake builds, and updated
.gitignore. - Added user documentation for the example and linked it from the project README.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| examples/sparkplug/sparkplug.h | Defines Sparkplug message/topic helpers and simplified payload encode/decode utilities. |
| examples/sparkplug/sparkplug.c | Implements the Edge Node + Host example clients, message callback handling, and publish/subscribe loops. |
| examples/sparkplug/README.md | Documents Sparkplug concepts, build/run instructions, and example output. |
| examples/include.am | Adds the new example to Autotools build targets and distribution lists. |
| README.md | Adds a top-level section pointing users to the Sparkplug example. |
| CMakeLists.txt | Registers the Sparkplug example target under WOLFMQTT_EXAMPLES. |
| .gitignore | Ignores the built Sparkplug example binary. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| static INLINE int SparkplugTopic_Build(char* buf, int buf_len, | ||
| const char* group_id, SparkplugMsgType msg_type, | ||
| const char* edge_node_id, const char* device_id) | ||
| { | ||
| int len; | ||
| const char* type_str = SparkplugMsgType_ToString(msg_type); | ||
|
|
||
| if (device_id != NULL) { | ||
| len = XSNPRINTF(buf, buf_len, "%s/%s/%s/%s/%s", | ||
| SPARKPLUG_NAMESPACE, group_id, type_str, edge_node_id, device_id); | ||
| } | ||
| else { | ||
| len = XSNPRINTF(buf, buf_len, "%s/%s/%s/%s", | ||
| SPARKPLUG_NAMESPACE, group_id, type_str, edge_node_id); | ||
| } | ||
|
|
||
| return len; | ||
| } |
There was a problem hiding this comment.
SparkplugTopic_Build returns the XSNPRINTF result, but callers don’t check for truncation (return >= buf_len) or errors (negative). If the topic is truncated, publishes/subscribes can silently target the wrong topic. Consider returning an error code on truncation and checking the return value at call sites.
| /* Encode each metric */ | ||
| for (i = 0; i < payload->metric_count; i++) { | ||
| const SparkplugMetric* m = &payload->metrics[i]; | ||
|
|
||
| /* Name length and name */ | ||
| name_len = (word16)XSTRLEN(m->name); | ||
| if (pos + 2 + name_len + 17 > buf_len) { | ||
| return MQTT_CODE_ERROR_OUT_OF_BUFFER; | ||
| } |
There was a problem hiding this comment.
SparkplugPayload_Encode trusts payload->metric_count and iterates that many metrics without bounding it to SPARKPLUG_MAX_METRICS. If a caller sets a larger count, this will read past the metrics array. Clamp/validate metric_count up-front and return BAD_ARG when out of range.
| /* Decode each metric */ | ||
| for (i = 0; i < payload->metric_count && pos < buf_len; i++) { | ||
| SparkplugMetric* m = &payload->metrics[i]; | ||
|
|
||
| /* Name length and name */ | ||
| if (pos + 2 > buf_len) break; | ||
| name_len = ((word16)buf[pos] << 8) | buf[pos+1]; | ||
| pos += 2; | ||
| if (pos + name_len > buf_len || name_len >= sizeof(name_bufs[0])) break; | ||
| XMEMCPY(name_bufs[i], &buf[pos], name_len); |
There was a problem hiding this comment.
SparkplugPayload_Decode can break out of the metric loop on malformed/truncated input, but it does not update payload->metric_count to the number of successfully decoded metrics. Callers then iterate payload.metric_count and may dereference NULL m->name / uninitialized metrics, leading to crashes (e.g., XSTRCMP(m->name, ...)). Update metric_count to i on early exit and consider returning MQTT_CODE_ERROR_MALFORMED_DATA when decoding fails.
| MQTTCtx* mqttCtx = &spCtx->mqttCtx; | ||
| static MqttTopic topics[1]; | ||
|
|
||
| XMEMSET(&mqttCtx->subscribe, 0, sizeof(mqttCtx->subscribe)); | ||
| mqttCtx->subscribe.packet_id = mqtt_get_packetid(); | ||
| mqttCtx->subscribe.topic_count = 1; | ||
| topics[0].topic_filter = topic_filter; | ||
| topics[0].qos = mqttCtx->qos; | ||
| mqttCtx->subscribe.topics = topics; | ||
|
|
There was a problem hiding this comment.
sparkplug_subscribe uses a static MqttTopic topics[1], which is shared across threads. In multi-thread mode the edge and host threads can call subscribe concurrently, causing races/corruption of the subscribe request. Use a stack MqttTopic (or mqttCtx->topics like other examples) instead of a static shared array.
| if (msg_new) { | ||
| /* Parse topic */ | ||
| char topic_str[SPARKPLUG_TOPIC_MAX_LEN]; | ||
| int topic_len = msg->topic_name_len; | ||
| if (topic_len >= (int)sizeof(topic_str)) { | ||
| topic_len = sizeof(topic_str) - 1; | ||
| } | ||
| XMEMCPY(topic_str, msg->topic_name, topic_len); | ||
| topic_str[topic_len] = '\0'; | ||
|
|
||
| rc = SparkplugTopic_Parse(topic_str, group_id, sizeof(group_id), | ||
| &msg_type, node_id, sizeof(node_id), | ||
| device_id, sizeof(device_id)); | ||
|
|
||
| if (rc == MQTT_CODE_SUCCESS) { | ||
| PRINTF("Sparkplug [%s]: Received %s from %s/%s%s%s", | ||
| spCtx->mqttCtx.client_id, | ||
| SparkplugMsgType_ToString(msg_type), | ||
| group_id, node_id, | ||
| device_id[0] ? "/" : "", device_id); | ||
|
|
||
| /* Decode payload if complete */ | ||
| if (msg_done && msg->buffer_len > 0) { | ||
| rc = SparkplugPayload_Decode(msg->buffer, msg->buffer_len, &payload); | ||
| if (rc > 0) { | ||
| #ifdef SPARKPLUG_DEBUG |
There was a problem hiding this comment.
mqtt_message_cb only processes messages inside if (msg_new) { ... }, but wolfMQTT can invoke the callback multiple times per PUBLISH (msg_new=1 first chunk, then msg_new=0 for subsequent chunks, with msg_done=1 on the final chunk). With the current structure, multi-chunk payloads will never be decoded/handled because the msg_done path is skipped when msg_new==0. Consider caching the parsed topic/msg_type on msg_new and performing decode/handling when msg_done is true (even if msg_new is false), or accumulate payload until complete.
| /* Cleanup */ | ||
| MqttClientNet_DeInit(&mqttCtx->net); | ||
|
|
||
| if (mqttCtx->tx_buf) { | ||
| WOLFMQTT_FREE(mqttCtx->tx_buf); | ||
| mqttCtx->tx_buf = NULL; | ||
| } | ||
| if (mqttCtx->rx_buf) { | ||
| WOLFMQTT_FREE(mqttCtx->rx_buf); | ||
| mqttCtx->rx_buf = NULL; | ||
| } | ||
|
|
||
| PRINTF("Sparkplug: Disconnected %s", mqttCtx->client_id); | ||
| return rc; |
There was a problem hiding this comment.
sparkplug_disconnect frees buffers and deinitializes the network, but it never calls MqttClient_DeInit(&mqttCtx->client). In multi-thread builds MqttClient_Init allocates/initializes semaphores inside the client, so skipping MqttClient_DeInit can leak OS resources. Call MqttClient_DeInit during disconnect (and on connect/init error paths).
| | Type | Description | | ||
| |------|-------------| | ||
| | NBIRTH | Node Birth Certificate - Edge node announces itself | | ||
| | NDEATH | Node Death Certificate - Edge node goes offline (LWT) | | ||
| | DBIRTH | Device Birth Certificate - Device announces itself | | ||
| | DDEATH | Device Death Certificate - Device goes offline | | ||
| | NDATA | Node Data - Metrics from the edge node | | ||
| | DDATA | Device Data - Metrics from a device | | ||
| | NCMD | Node Command - Command to the edge node | | ||
| | DCMD | Device Command - Command to a specific device | | ||
| | STATE | Host Application state | | ||
|
|
There was a problem hiding this comment.
The markdown tables in this README use || instead of standard table | delimiters (e.g., the “Message Types” table). As written, these won’t render as tables on GitHub. Convert them to proper GitHub-flavored markdown tables (single | with a header separator row).
| dist_example_DATA+= examples/mqttnet.c \ | ||
| examples/mqttexample.c \ | ||
| examples/mqttport.c \ | ||
| examples/mqttclient/mqttclient.c \ | ||
| examples/mqttsimple/mqttsimple.c \ | ||
| examples/firmware/fwpush.c \ | ||
| examples/firmware/fwclient.c \ | ||
| examples/azure/azureiothub.c \ | ||
| examples/aws/awsiot.c \ | ||
| examples/wiot/wiot.c | ||
| dist_example_DATA+= examples/nbclient/nbclient.c | ||
| dist_example_DATA+= examples/multithread/multithread.c | ||
| if BUILD_SN | ||
| dist_example_DATA+= examples/sn-client/sn-client.c | ||
| dist_example_DATA+= examples/sn-client/sn-client_qos-1.c | ||
| dist_example_DATA+= examples/sn-client/sn-multithread.c | ||
| endif | ||
| dist_example_DATA+= examples/pub-sub/mqtt-pub.c | ||
| dist_example_DATA+= examples/pub-sub/mqtt-sub.c | ||
| dist_example_DATA+= examples/sparkplug/sparkplug.c | ||
| if BUILD_WEBSOCKET |
There was a problem hiding this comment.
examples/include.am adds the Sparkplug example source to dist_example_DATA, but does not include the new examples/sparkplug/README.md (and possibly sparkplug.h) anywhere for make dist packaging. If the project distributes example docs/headers, add these files to the appropriate dist_*/EXTRA_DIST lists so they’re included in release tarballs.
| /* Initialize outputs */ | ||
| if (group_id) group_id[0] = '\0'; | ||
| if (edge_node_id) edge_node_id[0] = '\0'; | ||
| if (device_id) device_id[0] = '\0'; | ||
|
|
||
| /* Parse topic: spBv1.0/group/type/node[/device] */ | ||
| matched = XSSCANF(topic, "%15[^/]/%63[^/]/%15[^/]/%63[^/]/%63s", | ||
| namespace_buf, group_id, type_buf, edge_node_id, device_id); | ||
|
|
There was a problem hiding this comment.
SparkplugTopic_Parse initializes outputs only when the pointers are non-NULL, but then unconditionally passes group_id, edge_node_id, and device_id to XSSCANF. This will crash if any of those pointers are NULL, despite the signature implying they’re optional. Make the parameters required (and assert/return BAD_ARG when NULL) or scan into local temp buffers and copy out conditionally.
| /* Parse topic: spBv1.0/group/type/node[/device] */ | ||
| matched = XSSCANF(topic, "%15[^/]/%63[^/]/%15[^/]/%63[^/]/%63s", | ||
| namespace_buf, group_id, type_buf, edge_node_id, device_id); | ||
|
|
||
| if (matched < 4) { | ||
| return MQTT_CODE_ERROR_BAD_ARG; | ||
| } | ||
|
|
||
| /* Verify namespace */ | ||
| if (XSTRCMP(namespace_buf, SPARKPLUG_NAMESPACE) != 0) { | ||
| return MQTT_CODE_ERROR_BAD_ARG; | ||
| } | ||
|
|
There was a problem hiding this comment.
SparkplugTopic_Parse ignores the *_len parameters and uses hard-coded %63 field widths when scanning into group_id/edge_node_id/device_id. If a caller passes smaller buffers, this can overflow. Use widths derived from the provided lengths (len-1) and/or parse manually using delimiters.
Add Sparkplug B Industrial IoT Example
Summary
This PR adds a new example demonstrating the https://sparkplug.eclipse.org/
industrial IoT protocol specification using wolfMQTT. The example creates two
MQTT clients that communicate using the Sparkplug topic namespace and message
types.
Changes
(CMakeLists.txt)
Features
Two Communicating Clients:
--enable-mt)
Sparkplug Protocol Support:
spBv1.0/{group_id}/{message_type}/{edge_node_id}[/{device_id}]
protobuf dependency)
Simulated Metrics:
Build & Test
Autotools - single-threaded (Edge Node only)
./configure --disable-tls
make
Autotools - multi-threaded (both clients)
./configure --enable-mt --disable-tls
make
CMake - multi-threaded
mkdir build && cd build
cmake -DWOLFMQTT_TLS=no -DWOLFMQTT_MT=yes ..
make sparkplug
Run
./examples/sparkplug/sparkplug -h test.mosquitto.org -p 1883
Example Output
Sparkplug B Example
Starting Edge Node and Host Application threads...
Sparkplug: Connected! (client_id=WolfMQTT_Sparkplug_Edge)
Sparkplug: Published NBIRTH to spBv1.0/WolfMQTT/NBIRTH/EdgeNode1
Sparkplug [WolfMQTT_Sparkplug_Host]: Received NBIRTH from WolfMQTT/EdgeNode1
-> Edge Node came online (bdSeq=0)
Sparkplug: Published DDATA to spBv1.0/WolfMQTT/DDATA/EdgeNode1/Device1
Sparkplug [WolfMQTT_Sparkplug_Host]: Received DDATA from
WolfMQTT/EdgeNode1/Device1
-> Device data received:
Temperature = 22.83
Humidity = 45.36
LED = OFF
Sparkplug [Host]: Sending DCMD to spBv1.0/WolfMQTT/DCMD/EdgeNode1/Device1
(LED=ON)
Sparkplug [WolfMQTT_Sparkplug_Edge]: Received DCMD from
WolfMQTT/EdgeNode1/Device1
-> Command received:
LED set to ON
Sparkplug example completed!
Test Plan