From c106224052a04ec7bc4e3fa86f687e044cd68039 Mon Sep 17 00:00:00 2001 From: Bernhard Kirchen Date: Tue, 22 Jul 2025 20:28:40 +0200 Subject: [PATCH] logging: centralize rate limiting --- include/MessageOutput.h | 14 ++++++++-- include/SyslogLogger.h | 9 ------- src/MessageOutput.cpp | 59 ++++++++++++++++++++++++++++++++++++++--- src/SyslogLogger.cpp | 44 ------------------------------ 4 files changed, 68 insertions(+), 58 deletions(-) diff --git a/include/MessageOutput.h b/include/MessageOutput.h index 0030711f..2c1cc68f 100644 --- a/include/MessageOutput.h +++ b/include/MessageOutput.h @@ -24,8 +24,6 @@ private: Task _loopTask; - int log_vprintf_locked(const char *fmt, va_list arguments); - static constexpr size_t BUFFER_SIZE = 8192; char _buffer[BUFFER_SIZE]; size_t _buffer_in = 0; @@ -60,6 +58,18 @@ private: std::mutex _msgLock; void serialWrite(const uint8_t* buffer, size_t size); + + static constexpr uint32_t RATE_LIMIT_WINDOW_MS = 1000; + static constexpr size_t RATE_LIMIT_MAX_TOKENS = 128; + size_t _available_tokens = RATE_LIMIT_MAX_TOKENS; + uint32_t _last_token_refill_millis = 0; + size_t _rate_limited_packets = 0; + uint32_t _last_rate_limit_warning_millis = 0; + static constexpr uint32_t RATE_LIMIT_WARNING_INTERVAL_MS = 1000; + bool consumeToken(); + int log_self(const char* fmt, ...); + int log_vprintf_rate_limited(const char* fmt, va_list arguments); + int log_vprintf_recursive(const char* fmt, va_list arguments); }; extern MessageOutputClass MessageOutput; diff --git a/include/SyslogLogger.h b/include/SyslogLogger.h index 6b6472ff..f38f3730 100644 --- a/include/SyslogLogger.h +++ b/include/SyslogLogger.h @@ -20,7 +20,6 @@ private: return _address != INADDR_NONE; } static uint8_t calculatePrival(uint8_t facility, char errorCode); - bool consumeToken(); Task _loopTask; std::mutex _mutex; @@ -31,14 +30,6 @@ private: String _header; uint16_t _port; bool _enabled; - - static constexpr uint32_t RATE_LIMIT_WINDOW_MS = 1000; - static constexpr size_t RATE_LIMIT_MAX_TOKENS = 100; - size_t _available_tokens = RATE_LIMIT_MAX_TOKENS; - uint32_t _last_token_refill_millis = 0; - size_t _rate_limited_packets = 0; - uint32_t _last_rate_limit_warning_millis = 0; - static constexpr uint32_t RATE_LIMIT_WARNING_INTERVAL_MS = 1000; }; extern SyslogLogger Syslog; diff --git a/src/MessageOutput.cpp b/src/MessageOutput.cpp index bac9c53d..b0519d13 100644 --- a/src/MessageOutput.cpp +++ b/src/MessageOutput.cpp @@ -30,10 +30,63 @@ void MessageOutputClass::register_ws_output(AsyncWebSocket* output) int MessageOutputClass::log_vprintf(const char* fmt, va_list arguments) { std::lock_guard lock(MessageOutput._msgLock); - return MessageOutput.log_vprintf_locked(fmt, arguments); + return MessageOutput.log_vprintf_rate_limited(fmt, arguments); } -int MessageOutputClass::log_vprintf_locked(const char* fmt, va_list arguments) +bool MessageOutputClass::consumeToken() +{ + uint32_t now = millis(); + + uint32_t elapsed = now - _last_token_refill_millis; + size_t new_tokens = RATE_LIMIT_MAX_TOKENS * elapsed / RATE_LIMIT_WINDOW_MS; + + if (new_tokens > 0) { + _available_tokens = std::min(_available_tokens + new_tokens, RATE_LIMIT_MAX_TOKENS); + _last_token_refill_millis = now; + } + + if (_available_tokens > 0) { + --_available_tokens; + return true; + } + + return false; +} + +int MessageOutputClass::log_self(const char* fmt, ...) +{ + va_list args; + va_start(args, fmt); + int ret = log_vprintf_recursive(fmt, args); + va_end(args); + return ret; +} + +int MessageOutputClass::log_vprintf_rate_limited(const char* fmt, va_list arguments) +{ + if (!consumeToken()) { + if (_rate_limited_packets == 0) { + _last_rate_limit_warning_millis = millis(); + } + ++_rate_limited_packets; + return 0; + } + + if (_rate_limited_packets > 0) { + uint32_t elapsed = millis() - _last_rate_limit_warning_millis; + if (elapsed > RATE_LIMIT_WARNING_INTERVAL_MS) { + log_self("W (%d) logging: Rate limited %d message%s in the last %d ms\n", + millis(), _rate_limited_packets, + (_rate_limited_packets > 1 ? "s" : ""), elapsed); + _rate_limited_packets = 0; + _last_rate_limit_warning_millis = millis(); + } + } + + return log_vprintf_recursive(fmt, arguments); +} + +int MessageOutputClass::log_vprintf_recursive(const char* fmt, va_list arguments) { // we can only write up to the end of the buffer auto len = sizeof(_buffer) - _buffer_in; @@ -71,7 +124,7 @@ int MessageOutputClass::log_vprintf_locked(const char* fmt, va_list arguments) // we can safely reset the buffer as there is no data ahead that // still needs processing, so we try again from the beginning. _buffer_in = 0; - return log_vprintf_locked(fmt, arguments); + return log_vprintf_recursive(fmt, arguments); } if (Serial) { diff --git a/src/SyslogLogger.cpp b/src/SyslogLogger.cpp index 5866b5c0..9b0a49e2 100644 --- a/src/SyslogLogger.cpp +++ b/src/SyslogLogger.cpp @@ -8,7 +8,6 @@ #include "defaults.h" #include #include -#include #undef TAG static const char* TAG = "syslog"; @@ -66,27 +65,6 @@ void SyslogLogger::write(const uint8_t* buffer, size_t size) return; } - // Check rate limiting using token bucket - if (!consumeToken()) { - if (_rate_limited_packets == 0) { - _last_rate_limit_warning_millis = millis(); - } - ++_rate_limited_packets; - return; - } - - if (_rate_limited_packets > 0) { - uint32_t elapsed = (millis() - _last_rate_limit_warning_millis); - if (elapsed > RATE_LIMIT_WARNING_INTERVAL_MS) { - char buffer[128]; - snprintf(buffer, sizeof(buffer), "Rate limited %d packets in the last %dms", - _rate_limited_packets, elapsed); - Serial.println(buffer); - _rate_limited_packets = 0; - _last_rate_limit_warning_millis = millis(); - } - } - String header = "<"; header += String(calculatePrival(1, buffer[0])); @@ -104,26 +82,6 @@ void SyslogLogger::write(const uint8_t* buffer, size_t size) _udp.endPacket(); } -bool SyslogLogger::consumeToken() -{ - uint32_t now = millis(); - - uint32_t elapsed = now - _last_token_refill_millis; - size_t new_tokens = RATE_LIMIT_MAX_TOKENS * elapsed / RATE_LIMIT_WINDOW_MS; - - if (new_tokens > 0) { - _available_tokens = std::min(_available_tokens + new_tokens, RATE_LIMIT_MAX_TOKENS); - _last_token_refill_millis = now; - } - - if (_available_tokens > 0) { - --_available_tokens; - return true; - } - - return false; -} - void SyslogLogger::disable() { ESP_LOGI(TAG, "Disable"); @@ -145,8 +103,6 @@ void SyslogLogger::enable() std::lock_guard lock(_mutex); _enabled = true; - _available_tokens = RATE_LIMIT_MAX_TOKENS; - _last_token_refill_millis = millis(); } bool SyslogLogger::resolveAndStart()