mirror of
https://github.com/tbnobody/OpenDTU.git
synced 2025-12-14 10:51:26 +01:00
Fix: handle MQTT message fragmentation
MQTT messages might arrive in parts if their payload is too big. for that reason, we need to be prepared to re-assemble fragmented messages on a topic before handing them over to the subscriber.
This commit is contained in:
@@ -6,6 +6,8 @@
|
|||||||
#include <Ticker.h>
|
#include <Ticker.h>
|
||||||
#include <espMqttClient.h>
|
#include <espMqttClient.h>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
#include <map>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
class MqttSettingsClass {
|
class MqttSettingsClass {
|
||||||
public:
|
public:
|
||||||
@@ -36,6 +38,7 @@ private:
|
|||||||
|
|
||||||
MqttClient* _mqttClient = nullptr;
|
MqttClient* _mqttClient = nullptr;
|
||||||
Ticker _mqttReconnectTimer;
|
Ticker _mqttReconnectTimer;
|
||||||
|
std::map<String, std::vector<uint8_t>> _fragments;
|
||||||
MqttSubscribeParser _mqttSubscribeParser;
|
MqttSubscribeParser _mqttSubscribeParser;
|
||||||
std::mutex _clientLock;
|
std::mutex _clientLock;
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -86,9 +86,33 @@ void MqttSettingsClass::onMqttDisconnect(espMqttClientTypes::DisconnectReason re
|
|||||||
|
|
||||||
void MqttSettingsClass::onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, const size_t len, const size_t index, const size_t total)
|
void MqttSettingsClass::onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, const size_t len, const size_t index, const size_t total)
|
||||||
{
|
{
|
||||||
ESP_LOGD(TAG, "Received MQTT message on topic: %s", topic);
|
ESP_LOGD(TAG, "Received MQTT message on topic '%s' (Bytes %zu-%zu/%zu)",
|
||||||
|
topic, index + 1, (index + len), total);
|
||||||
|
|
||||||
_mqttSubscribeParser.handle_message(properties, topic, payload, len);
|
// shortcut for most MQTT messages, which are not fragmented
|
||||||
|
if (index == 0 && len == total) {
|
||||||
|
return _mqttSubscribeParser.handle_message(properties, topic, payload, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto& fragment = _fragments[String(topic)];
|
||||||
|
|
||||||
|
// first fragment of a new message
|
||||||
|
if (index == 0) {
|
||||||
|
fragment.clear();
|
||||||
|
fragment.reserve(total);
|
||||||
|
}
|
||||||
|
|
||||||
|
fragment.insert(fragment.end(), payload, payload + len);
|
||||||
|
|
||||||
|
if (fragment.size() < total) {
|
||||||
|
return;
|
||||||
|
} // wait for last fragment
|
||||||
|
|
||||||
|
ESP_LOGD(TAG, "Fragmented MQTT message reassembled for topic '%s'", topic);
|
||||||
|
|
||||||
|
_mqttSubscribeParser.handle_message(properties, topic, fragment.data(), total);
|
||||||
|
|
||||||
|
_fragments.erase(String(topic));
|
||||||
}
|
}
|
||||||
|
|
||||||
void MqttSettingsClass::performConnect()
|
void MqttSettingsClass::performConnect()
|
||||||
|
|||||||
Reference in New Issue
Block a user