From d039455b82ef3ba4f941b8c91feb4a5c87f761b0 Mon Sep 17 00:00:00 2001 From: benzman Date: Tue, 15 Apr 2025 20:03:11 +0200 Subject: [PATCH] Fix: handle MQTT message fragmentation MQTT messages might arrive in parts if their payload is too big. for that reason, we need to be prepared to re-assemble fragmented messages on a topic before handing them over to the subscriber. --- include/MqttSettings.h | 3 +++ src/MqttSettings.cpp | 28 ++++++++++++++++++++++++++-- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/include/MqttSettings.h b/include/MqttSettings.h index 67983333..e6a30ac4 100644 --- a/include/MqttSettings.h +++ b/include/MqttSettings.h @@ -6,6 +6,8 @@ #include #include #include +#include +#include class MqttSettingsClass { public: @@ -36,6 +38,7 @@ private: MqttClient* _mqttClient = nullptr; Ticker _mqttReconnectTimer; + std::map> _fragments; MqttSubscribeParser _mqttSubscribeParser; std::mutex _clientLock; }; diff --git a/src/MqttSettings.cpp b/src/MqttSettings.cpp index 79316b08..723064ce 100644 --- a/src/MqttSettings.cpp +++ b/src/MqttSettings.cpp @@ -86,9 +86,33 @@ void MqttSettingsClass::onMqttDisconnect(espMqttClientTypes::DisconnectReason re void MqttSettingsClass::onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, const size_t len, const size_t index, const size_t total) { - ESP_LOGD(TAG, "Received MQTT message on topic: %s", topic); + ESP_LOGD(TAG, "Received MQTT message on topic '%s' (Bytes %zu-%zu/%zu)", + topic, index + 1, (index + len), total); - _mqttSubscribeParser.handle_message(properties, topic, payload, len); + // shortcut for most MQTT messages, which are not fragmented + if (index == 0 && len == total) { + return _mqttSubscribeParser.handle_message(properties, topic, payload, len); + } + + auto& fragment = _fragments[String(topic)]; + + // first fragment of a new message + if (index == 0) { + fragment.clear(); + fragment.reserve(total); + } + + fragment.insert(fragment.end(), payload, payload + len); + + if (fragment.size() < total) { + return; + } // wait for last fragment + + ESP_LOGD(TAG, "Fragmented MQTT message reassembled for topic '%s'", topic); + + _mqttSubscribeParser.handle_message(properties, topic, fragment.data(), total); + + _fragments.erase(String(topic)); } void MqttSettingsClass::performConnect()