From 5ba3446ad4836af7ea5676722c967127a74dd828 Mon Sep 17 00:00:00 2001 From: Bernhard Kirchen Date: Sat, 21 Jun 2025 21:44:47 +0200 Subject: [PATCH] Fix: make logging more leightweight using large shared buffer --- include/MessageOutput.h | 13 +++- src/MessageOutput.cpp | 128 ++++++++++++++++++++++++---------------- 2 files changed, 86 insertions(+), 55 deletions(-) diff --git a/include/MessageOutput.h b/include/MessageOutput.h index 6eca0f7b..0030711f 100644 --- a/include/MessageOutput.h +++ b/include/MessageOutput.h @@ -15,7 +15,6 @@ class MessageOutputClass { public: MessageOutputClass(); void init(Scheduler& scheduler); - size_t write(const uint8_t* buffer, size_t size); void register_ws_output(AsyncWebSocket* output); static int log_vprintf(const char *fmt, va_list arguments); @@ -25,6 +24,13 @@ private: Task _loopTask; + int log_vprintf_locked(const char *fmt, va_list arguments); + + static constexpr size_t BUFFER_SIZE = 8192; + char _buffer[BUFFER_SIZE]; + size_t _buffer_in = 0; + size_t _buffer_out = 0; + using message_t = std::vector; // we keep a buffer for every task and only write complete lines to the @@ -42,9 +48,10 @@ private: // "motivate" the client to send out ACKs immediately as the TCP packets are // "large", or we will wait long enough for the TCP stack to send out the // ACK anyways. - void send_ws_chunk(message_t&& line); + void send_ws_chunk(const uint8_t* buffer, size_t size); static constexpr size_t WS_CHUNK_SIZE_BYTES = 512; static constexpr uint32_t WS_CHUNK_INTERVAL_MS = 250; + static constexpr size_t TYPICAL_LINE_LENGTH = 150; std::shared_ptr _ws_chunk = nullptr; uint32_t _last_ws_chunk_sent = 0; @@ -52,7 +59,7 @@ private: std::mutex _msgLock; - void serialWrite(message_t const& m); + void serialWrite(const uint8_t* buffer, size_t size); }; extern MessageOutputClass MessageOutput; diff --git a/src/MessageOutput.cpp b/src/MessageOutput.cpp index dcb34888..bac9c53d 100644 --- a/src/MessageOutput.cpp +++ b/src/MessageOutput.cpp @@ -16,6 +16,7 @@ void MessageOutputClass::init(Scheduler& scheduler) { scheduler.addTask(_loopTask); _loopTask.enable(); + memset(_buffer, 0, sizeof(_buffer)); esp_log_set_vprintf(log_vprintf); } @@ -28,12 +29,65 @@ void MessageOutputClass::register_ws_output(AsyncWebSocket* output) int MessageOutputClass::log_vprintf(const char* fmt, va_list arguments) { - char log_buffer[WS_CHUNK_SIZE_BYTES]; - auto written = vsnprintf(log_buffer, sizeof(log_buffer), fmt, arguments); - return MessageOutput.write(reinterpret_cast(log_buffer), written); + std::lock_guard lock(MessageOutput._msgLock); + return MessageOutput.log_vprintf_locked(fmt, arguments); } -void MessageOutputClass::serialWrite(MessageOutputClass::message_t const& m) +int MessageOutputClass::log_vprintf_locked(const char* fmt, va_list arguments) +{ + // we can only write up to the end of the buffer + auto len = sizeof(_buffer) - _buffer_in; + + // do not overwrite messages that still need processing. be careful not to + // overwrite the length prefix of the next message waiting to be processed, + // i.e., the byte that _buffer_out points to. + if (_buffer_out > _buffer_in) { + len = _buffer_out - _buffer_in; + } + + len -= 1; // leave room for length prefix + + auto prefix = &_buffer[_buffer_in]; + auto start = prefix + 1; + auto written = vsnprintf(start, len, fmt, arguments); + + if (written < 0) { + if (Serial) { + static char const err_msg[] = "ERROR: vsnprintf failed: %d\n"; + char err_buf[sizeof(err_msg) + 10]; + auto err_written = snprintf(err_buf, sizeof(err_buf), err_msg, written); + Serial.write(err_buf, err_written); + } + + *prefix = 0; + return written; + } + + if (written >= len) { + // buffer was too small, at least at the current location + *prefix = 0; + + if (_buffer_in > 0 && _buffer_out <= _buffer_in) { + // we can safely reset the buffer as there is no data ahead that + // still needs processing, so we try again from the beginning. + _buffer_in = 0; + return log_vprintf_locked(fmt, arguments); + } + + if (Serial) { + Serial.write("ERROR: log buffer overrun\n"); + } + + return -1; + } + + written = std::min(written, 255); // effectively truncate to 255 bytes + *prefix = static_cast(written); + _buffer_in += 1 + written; // now points to terminating null byte of message we just wrote + return written; +} + +void MessageOutputClass::serialWrite(const uint8_t* buffer, size_t size) { // operator bool() of HWCDC returns false if the device is not attached to // a USB host. in general it makes sense to skip writing entirely if the @@ -43,55 +97,24 @@ void MessageOutputClass::serialWrite(MessageOutputClass::message_t const& m) } size_t written = 0; - while (written < m.size()) { - written += Serial.write(m.data() + written, m.size() - written); + while (written < size) { + written += Serial.write(buffer + written, size - written); } Serial.flush(); } -size_t MessageOutputClass::write(const uint8_t* buffer, size_t size) -{ - std::lock_guard lock(_msgLock); - - auto res = _task_messages.emplace(xTaskGetCurrentTaskHandle(), message_t()); - auto iter = res.first; - auto& message = iter->second; - - message.reserve(message.size() + size); - - for (size_t idx = 0; idx < size; ++idx) { - uint8_t c = buffer[idx]; - - message.push_back(c); - - if (c == '\n') { - serialWrite(message); - _lines.emplace(std::move(message)); - message.clear(); - message.reserve(size - idx - 1); - } - } - - if (message.empty()) { - _task_messages.erase(iter); - } - - return size; -} - -void MessageOutputClass::send_ws_chunk(message_t&& line) +void MessageOutputClass::send_ws_chunk(const uint8_t* buffer, size_t size) { if (!_ws) { return; } if (nullptr == _ws_chunk) { - _ws_chunk = std::make_shared(std::move(line)); - _ws_chunk->reserve(WS_CHUNK_SIZE_BYTES + 128); // add room for one more line - } else { - _ws_chunk->insert(_ws_chunk->end(), line.begin(), line.end()); + _ws_chunk = std::make_shared(); + _ws_chunk->reserve(WS_CHUNK_SIZE_BYTES + TYPICAL_LINE_LENGTH); // add room for one more line } + _ws_chunk->insert(_ws_chunk->end(), buffer, buffer + size); bool small = _ws_chunk->size() < WS_CHUNK_SIZE_BYTES; bool recent = (millis() - _last_ws_chunk_sent) < WS_CHUNK_INTERVAL_MS; @@ -126,20 +149,21 @@ void MessageOutputClass::loop() { std::lock_guard lock(_msgLock); - // clean up (possibly filled) buffers of deleted tasks - auto map_iter = _task_messages.begin(); - while (map_iter != _task_messages.end()) { - if (eTaskGetState(map_iter->first) == eDeleted) { - map_iter = _task_messages.erase(map_iter); + while (_buffer_out != _buffer_in) { + uint8_t msg_len = static_cast(_buffer[_buffer_out]); + if (msg_len == 0) { + // we only read a null byte while we still have messages to process + // if we are at the end of the buffer and the next message is at the + // front. wrap around. + _buffer_out = 0; continue; } - ++map_iter; - } + auto msg_start = reinterpret_cast(&_buffer[_buffer_out]) + 1; + serialWrite(msg_start, msg_len); + Syslog.write(msg_start, msg_len); + send_ws_chunk(msg_start, msg_len); - while (!_lines.empty()) { - Syslog.write(_lines.front().data(), _lines.front().size()); - send_ws_chunk(std::move(_lines.front())); - _lines.pop(); + _buffer_out += 1 + msg_len; } }