logging: centralize rate limiting

This commit is contained in:
Bernhard Kirchen
2025-07-22 20:28:40 +02:00
committed by Thomas Basler
parent 20003a0f92
commit c106224052
4 changed files with 68 additions and 58 deletions

View File

@@ -24,8 +24,6 @@ private:
Task _loopTask;
int log_vprintf_locked(const char *fmt, va_list arguments);
static constexpr size_t BUFFER_SIZE = 8192;
char _buffer[BUFFER_SIZE];
size_t _buffer_in = 0;
@@ -60,6 +58,18 @@ private:
std::mutex _msgLock;
void serialWrite(const uint8_t* buffer, size_t size);
static constexpr uint32_t RATE_LIMIT_WINDOW_MS = 1000;
static constexpr size_t RATE_LIMIT_MAX_TOKENS = 128;
size_t _available_tokens = RATE_LIMIT_MAX_TOKENS;
uint32_t _last_token_refill_millis = 0;
size_t _rate_limited_packets = 0;
uint32_t _last_rate_limit_warning_millis = 0;
static constexpr uint32_t RATE_LIMIT_WARNING_INTERVAL_MS = 1000;
bool consumeToken();
int log_self(const char* fmt, ...);
int log_vprintf_rate_limited(const char* fmt, va_list arguments);
int log_vprintf_recursive(const char* fmt, va_list arguments);
};
extern MessageOutputClass MessageOutput;

View File

@@ -20,7 +20,6 @@ private:
return _address != INADDR_NONE;
}
static uint8_t calculatePrival(uint8_t facility, char errorCode);
bool consumeToken();
Task _loopTask;
std::mutex _mutex;
@@ -31,14 +30,6 @@ private:
String _header;
uint16_t _port;
bool _enabled;
static constexpr uint32_t RATE_LIMIT_WINDOW_MS = 1000;
static constexpr size_t RATE_LIMIT_MAX_TOKENS = 100;
size_t _available_tokens = RATE_LIMIT_MAX_TOKENS;
uint32_t _last_token_refill_millis = 0;
size_t _rate_limited_packets = 0;
uint32_t _last_rate_limit_warning_millis = 0;
static constexpr uint32_t RATE_LIMIT_WARNING_INTERVAL_MS = 1000;
};
extern SyslogLogger Syslog;

View File

@@ -30,10 +30,63 @@ void MessageOutputClass::register_ws_output(AsyncWebSocket* output)
int MessageOutputClass::log_vprintf(const char* fmt, va_list arguments)
{
std::lock_guard<std::mutex> lock(MessageOutput._msgLock);
return MessageOutput.log_vprintf_locked(fmt, arguments);
return MessageOutput.log_vprintf_rate_limited(fmt, arguments);
}
int MessageOutputClass::log_vprintf_locked(const char* fmt, va_list arguments)
bool MessageOutputClass::consumeToken()
{
uint32_t now = millis();
uint32_t elapsed = now - _last_token_refill_millis;
size_t new_tokens = RATE_LIMIT_MAX_TOKENS * elapsed / RATE_LIMIT_WINDOW_MS;
if (new_tokens > 0) {
_available_tokens = std::min(_available_tokens + new_tokens, RATE_LIMIT_MAX_TOKENS);
_last_token_refill_millis = now;
}
if (_available_tokens > 0) {
--_available_tokens;
return true;
}
return false;
}
int MessageOutputClass::log_self(const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
int ret = log_vprintf_recursive(fmt, args);
va_end(args);
return ret;
}
int MessageOutputClass::log_vprintf_rate_limited(const char* fmt, va_list arguments)
{
if (!consumeToken()) {
if (_rate_limited_packets == 0) {
_last_rate_limit_warning_millis = millis();
}
++_rate_limited_packets;
return 0;
}
if (_rate_limited_packets > 0) {
uint32_t elapsed = millis() - _last_rate_limit_warning_millis;
if (elapsed > RATE_LIMIT_WARNING_INTERVAL_MS) {
log_self("W (%d) logging: Rate limited %d message%s in the last %d ms\n",
millis(), _rate_limited_packets,
(_rate_limited_packets > 1 ? "s" : ""), elapsed);
_rate_limited_packets = 0;
_last_rate_limit_warning_millis = millis();
}
}
return log_vprintf_recursive(fmt, arguments);
}
int MessageOutputClass::log_vprintf_recursive(const char* fmt, va_list arguments)
{
// we can only write up to the end of the buffer
auto len = sizeof(_buffer) - _buffer_in;
@@ -71,7 +124,7 @@ int MessageOutputClass::log_vprintf_locked(const char* fmt, va_list arguments)
// we can safely reset the buffer as there is no data ahead that
// still needs processing, so we try again from the beginning.
_buffer_in = 0;
return log_vprintf_locked(fmt, arguments);
return log_vprintf_recursive(fmt, arguments);
}
if (Serial) {

View File

@@ -8,7 +8,6 @@
#include "defaults.h"
#include <ESPmDNS.h>
#include <HardwareSerial.h>
#include <algorithm>
#undef TAG
static const char* TAG = "syslog";
@@ -66,27 +65,6 @@ void SyslogLogger::write(const uint8_t* buffer, size_t size)
return;
}
// Check rate limiting using token bucket
if (!consumeToken()) {
if (_rate_limited_packets == 0) {
_last_rate_limit_warning_millis = millis();
}
++_rate_limited_packets;
return;
}
if (_rate_limited_packets > 0) {
uint32_t elapsed = (millis() - _last_rate_limit_warning_millis);
if (elapsed > RATE_LIMIT_WARNING_INTERVAL_MS) {
char buffer[128];
snprintf(buffer, sizeof(buffer), "Rate limited %d packets in the last %dms",
_rate_limited_packets, elapsed);
Serial.println(buffer);
_rate_limited_packets = 0;
_last_rate_limit_warning_millis = millis();
}
}
String header = "<";
header += String(calculatePrival(1, buffer[0]));
@@ -104,26 +82,6 @@ void SyslogLogger::write(const uint8_t* buffer, size_t size)
_udp.endPacket();
}
bool SyslogLogger::consumeToken()
{
uint32_t now = millis();
uint32_t elapsed = now - _last_token_refill_millis;
size_t new_tokens = RATE_LIMIT_MAX_TOKENS * elapsed / RATE_LIMIT_WINDOW_MS;
if (new_tokens > 0) {
_available_tokens = std::min(_available_tokens + new_tokens, RATE_LIMIT_MAX_TOKENS);
_last_token_refill_millis = now;
}
if (_available_tokens > 0) {
--_available_tokens;
return true;
}
return false;
}
void SyslogLogger::disable()
{
ESP_LOGI(TAG, "Disable");
@@ -145,8 +103,6 @@ void SyslogLogger::enable()
std::lock_guard<std::mutex> lock(_mutex);
_enabled = true;
_available_tokens = RATE_LIMIT_MAX_TOKENS;
_last_token_refill_millis = millis();
}
bool SyslogLogger::resolveAndStart()