mirror of
https://github.com/tbnobody/OpenDTU.git
synced 2025-12-10 16:59:52 +01:00
Use more thread save Message Output
Thanks to OnBattery!
This commit is contained in:
@@ -2,12 +2,14 @@
|
||||
#pragma once
|
||||
|
||||
#include <AsyncWebSocket.h>
|
||||
#include <HardwareSerial.h>
|
||||
#include <Stream.h>
|
||||
#include <TaskSchedulerDeclarations.h>
|
||||
#include <Print.h>
|
||||
#include <freertos/task.h>
|
||||
#include <mutex>
|
||||
|
||||
#define BUFFER_SIZE 500
|
||||
#include <vector>
|
||||
#include <unordered_map>
|
||||
#include <queue>
|
||||
#include <memory>
|
||||
|
||||
class MessageOutputClass : public Print {
|
||||
public:
|
||||
@@ -22,13 +24,34 @@ private:
|
||||
|
||||
Task _loopTask;
|
||||
|
||||
using message_t = std::vector<uint8_t>;
|
||||
|
||||
// we keep a buffer for every task and only write complete lines to the
|
||||
// serial output and then move them to be pushed through the websocket.
|
||||
// this way we prevent mangling of messages from different contexts.
|
||||
std::unordered_map<TaskHandle_t, message_t> _task_messages;
|
||||
std::queue<message_t> _lines;
|
||||
|
||||
// we chunk the websocket output to circumvent issues with TCP delayed ACKs:
|
||||
// if the websocket client (Windows in particular) is using delayed ACKs,
|
||||
// and since we wait for an ACK before sending the next chunk, we will
|
||||
// accumulate way too many messages and we won't be able to send them out
|
||||
// fast enough as the rate of produced messages is higher than the rate of
|
||||
// ACKs received. by chunking and waiting in between chunks, we either
|
||||
// "motivate" the client to send out ACKs immediately as the TCP packets are
|
||||
// "large", or we will wait long enough for the TCP stack to send out the
|
||||
// ACK anyways.
|
||||
void send_ws_chunk(message_t&& line);
|
||||
static constexpr size_t WS_CHUNK_SIZE_BYTES = 512;
|
||||
static constexpr uint32_t WS_CHUNK_INTERVAL_MS = 250;
|
||||
std::shared_ptr<message_t> _ws_chunk = nullptr;
|
||||
uint32_t _last_ws_chunk_sent = 0;
|
||||
|
||||
AsyncWebSocket* _ws = nullptr;
|
||||
char _buffer[BUFFER_SIZE];
|
||||
uint16_t _buff_pos = 0;
|
||||
uint32_t _lastSend = 0;
|
||||
bool _forceSend = false;
|
||||
|
||||
std::mutex _msgLock;
|
||||
|
||||
void serialWrite(message_t const& m);
|
||||
};
|
||||
|
||||
extern MessageOutputClass MessageOutput;
|
||||
|
||||
@@ -3,11 +3,13 @@
|
||||
* Copyright (C) 2022-2024 Thomas Basler and others
|
||||
*/
|
||||
#include "MessageOutput.h"
|
||||
|
||||
#include <Arduino.h>
|
||||
#include <HardwareSerial.h>
|
||||
|
||||
MessageOutputClass MessageOutput;
|
||||
|
||||
#undef TAG
|
||||
#define TAG "MessageOutput"
|
||||
|
||||
MessageOutputClass::MessageOutputClass()
|
||||
: _loopTask(TASK_IMMEDIATE, TASK_FOREVER, std::bind(&MessageOutputClass::loop, this))
|
||||
{
|
||||
@@ -21,46 +23,136 @@ void MessageOutputClass::init(Scheduler& scheduler)
|
||||
|
||||
void MessageOutputClass::register_ws_output(AsyncWebSocket* output)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_msgLock);
|
||||
|
||||
_ws = output;
|
||||
}
|
||||
|
||||
void MessageOutputClass::serialWrite(MessageOutputClass::message_t const& m)
|
||||
{
|
||||
// operator bool() of HWCDC returns false if the device is not attached to
|
||||
// a USB host. in general it makes sense to skip writing entirely if the
|
||||
// default serial port is not ready.
|
||||
if (!Serial) {
|
||||
return;
|
||||
}
|
||||
|
||||
size_t written = 0;
|
||||
while (written < m.size()) {
|
||||
written += Serial.write(m.data() + written, m.size() - written);
|
||||
}
|
||||
|
||||
Serial.flush();
|
||||
}
|
||||
|
||||
size_t MessageOutputClass::write(uint8_t c)
|
||||
{
|
||||
if (_buff_pos < BUFFER_SIZE) {
|
||||
std::lock_guard<std::mutex> lock(_msgLock);
|
||||
_buffer[_buff_pos] = c;
|
||||
_buff_pos++;
|
||||
} else {
|
||||
_forceSend = true;
|
||||
std::lock_guard<std::mutex> lock(_msgLock);
|
||||
|
||||
auto res = _task_messages.emplace(xTaskGetCurrentTaskHandle(), message_t());
|
||||
auto iter = res.first;
|
||||
auto& message = iter->second;
|
||||
|
||||
message.push_back(c);
|
||||
|
||||
if (c == '\n') {
|
||||
serialWrite(message);
|
||||
_lines.emplace(std::move(message));
|
||||
_task_messages.erase(iter);
|
||||
}
|
||||
|
||||
return Serial.write(c);
|
||||
return 1;
|
||||
}
|
||||
|
||||
size_t MessageOutputClass::write(const uint8_t* buffer, size_t size)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_msgLock);
|
||||
if (_buff_pos + size < BUFFER_SIZE) {
|
||||
memcpy(&_buffer[_buff_pos], buffer, size);
|
||||
_buff_pos += size;
|
||||
}
|
||||
_forceSend = true;
|
||||
|
||||
return Serial.write(buffer, size);
|
||||
auto res = _task_messages.emplace(xTaskGetCurrentTaskHandle(), message_t());
|
||||
auto iter = res.first;
|
||||
auto& message = iter->second;
|
||||
|
||||
message.reserve(message.size() + size);
|
||||
|
||||
for (size_t idx = 0; idx < size; ++idx) {
|
||||
uint8_t c = buffer[idx];
|
||||
|
||||
message.push_back(c);
|
||||
|
||||
if (c == '\n') {
|
||||
serialWrite(message);
|
||||
_lines.emplace(std::move(message));
|
||||
message.clear();
|
||||
message.reserve(size - idx - 1);
|
||||
}
|
||||
}
|
||||
|
||||
if (message.empty()) {
|
||||
_task_messages.erase(iter);
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
void MessageOutputClass::send_ws_chunk(message_t&& line)
|
||||
{
|
||||
if (!_ws) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (nullptr == _ws_chunk) {
|
||||
_ws_chunk = std::make_shared<message_t>(std::move(line));
|
||||
_ws_chunk->reserve(WS_CHUNK_SIZE_BYTES + 128); // add room for one more line
|
||||
} else {
|
||||
_ws_chunk->insert(_ws_chunk->end(), line.begin(), line.end());
|
||||
}
|
||||
|
||||
bool small = _ws_chunk->size() < WS_CHUNK_SIZE_BYTES;
|
||||
bool recent = (millis() - _last_ws_chunk_sent) < WS_CHUNK_INTERVAL_MS;
|
||||
if (small && recent) {
|
||||
return;
|
||||
}
|
||||
|
||||
bool added_warning = false;
|
||||
for (auto& client : _ws->getClients()) {
|
||||
if (client.queueIsFull()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
client.text(_ws_chunk);
|
||||
|
||||
// note that all clients will see the warning, even if only one
|
||||
// client is struggeling. however, this should be rare. we
|
||||
// won't be copying chunks around to avoid this. we do however,
|
||||
// avoid adding the warning multiple times.
|
||||
if (client.queueIsFull() && !added_warning) {
|
||||
static char const warningStr[] = "\r\nWARNING: websocket client's queue is full, expect log lines missing\r\n";
|
||||
_ws_chunk->insert(_ws_chunk->end(), warningStr, warningStr + sizeof(warningStr) - 1);
|
||||
added_warning = true;
|
||||
}
|
||||
}
|
||||
|
||||
_ws_chunk = nullptr;
|
||||
_last_ws_chunk_sent = millis();
|
||||
}
|
||||
|
||||
void MessageOutputClass::loop()
|
||||
{
|
||||
// Send data via websocket if either time is over or buffer is full
|
||||
if (_forceSend || (millis() - _lastSend > 1000)) {
|
||||
std::lock_guard<std::mutex> lock(_msgLock);
|
||||
if (_ws && _buff_pos > 0) {
|
||||
_ws->textAll(_buffer, _buff_pos);
|
||||
_buff_pos = 0;
|
||||
std::lock_guard<std::mutex> lock(_msgLock);
|
||||
|
||||
// clean up (possibly filled) buffers of deleted tasks
|
||||
auto map_iter = _task_messages.begin();
|
||||
while (map_iter != _task_messages.end()) {
|
||||
if (eTaskGetState(map_iter->first) == eDeleted) {
|
||||
map_iter = _task_messages.erase(map_iter);
|
||||
continue;
|
||||
}
|
||||
if (_forceSend) {
|
||||
_buff_pos = 0;
|
||||
}
|
||||
_forceSend = false;
|
||||
|
||||
++map_iter;
|
||||
}
|
||||
|
||||
while (!_lines.empty()) {
|
||||
send_ws_chunk(std::move(_lines.front()));
|
||||
_lines.pop();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user