mirror of
https://github.com/tbnobody/OpenDTU.git
synced 2025-12-11 17:30:37 +01:00
Fix: make logging more leightweight using large shared buffer
This commit is contained in:
committed by
Thomas Basler
parent
3e3879f9af
commit
5ba3446ad4
@@ -15,7 +15,6 @@ class MessageOutputClass {
|
|||||||
public:
|
public:
|
||||||
MessageOutputClass();
|
MessageOutputClass();
|
||||||
void init(Scheduler& scheduler);
|
void init(Scheduler& scheduler);
|
||||||
size_t write(const uint8_t* buffer, size_t size);
|
|
||||||
void register_ws_output(AsyncWebSocket* output);
|
void register_ws_output(AsyncWebSocket* output);
|
||||||
|
|
||||||
static int log_vprintf(const char *fmt, va_list arguments);
|
static int log_vprintf(const char *fmt, va_list arguments);
|
||||||
@@ -25,6 +24,13 @@ private:
|
|||||||
|
|
||||||
Task _loopTask;
|
Task _loopTask;
|
||||||
|
|
||||||
|
int log_vprintf_locked(const char *fmt, va_list arguments);
|
||||||
|
|
||||||
|
static constexpr size_t BUFFER_SIZE = 8192;
|
||||||
|
char _buffer[BUFFER_SIZE];
|
||||||
|
size_t _buffer_in = 0;
|
||||||
|
size_t _buffer_out = 0;
|
||||||
|
|
||||||
using message_t = std::vector<uint8_t>;
|
using message_t = std::vector<uint8_t>;
|
||||||
|
|
||||||
// we keep a buffer for every task and only write complete lines to the
|
// we keep a buffer for every task and only write complete lines to the
|
||||||
@@ -42,9 +48,10 @@ private:
|
|||||||
// "motivate" the client to send out ACKs immediately as the TCP packets are
|
// "motivate" the client to send out ACKs immediately as the TCP packets are
|
||||||
// "large", or we will wait long enough for the TCP stack to send out the
|
// "large", or we will wait long enough for the TCP stack to send out the
|
||||||
// ACK anyways.
|
// ACK anyways.
|
||||||
void send_ws_chunk(message_t&& line);
|
void send_ws_chunk(const uint8_t* buffer, size_t size);
|
||||||
static constexpr size_t WS_CHUNK_SIZE_BYTES = 512;
|
static constexpr size_t WS_CHUNK_SIZE_BYTES = 512;
|
||||||
static constexpr uint32_t WS_CHUNK_INTERVAL_MS = 250;
|
static constexpr uint32_t WS_CHUNK_INTERVAL_MS = 250;
|
||||||
|
static constexpr size_t TYPICAL_LINE_LENGTH = 150;
|
||||||
std::shared_ptr<message_t> _ws_chunk = nullptr;
|
std::shared_ptr<message_t> _ws_chunk = nullptr;
|
||||||
uint32_t _last_ws_chunk_sent = 0;
|
uint32_t _last_ws_chunk_sent = 0;
|
||||||
|
|
||||||
@@ -52,7 +59,7 @@ private:
|
|||||||
|
|
||||||
std::mutex _msgLock;
|
std::mutex _msgLock;
|
||||||
|
|
||||||
void serialWrite(message_t const& m);
|
void serialWrite(const uint8_t* buffer, size_t size);
|
||||||
};
|
};
|
||||||
|
|
||||||
extern MessageOutputClass MessageOutput;
|
extern MessageOutputClass MessageOutput;
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ void MessageOutputClass::init(Scheduler& scheduler)
|
|||||||
{
|
{
|
||||||
scheduler.addTask(_loopTask);
|
scheduler.addTask(_loopTask);
|
||||||
_loopTask.enable();
|
_loopTask.enable();
|
||||||
|
memset(_buffer, 0, sizeof(_buffer));
|
||||||
esp_log_set_vprintf(log_vprintf);
|
esp_log_set_vprintf(log_vprintf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -28,12 +29,65 @@ void MessageOutputClass::register_ws_output(AsyncWebSocket* output)
|
|||||||
|
|
||||||
int MessageOutputClass::log_vprintf(const char* fmt, va_list arguments)
|
int MessageOutputClass::log_vprintf(const char* fmt, va_list arguments)
|
||||||
{
|
{
|
||||||
char log_buffer[WS_CHUNK_SIZE_BYTES];
|
std::lock_guard<std::mutex> lock(MessageOutput._msgLock);
|
||||||
auto written = vsnprintf(log_buffer, sizeof(log_buffer), fmt, arguments);
|
return MessageOutput.log_vprintf_locked(fmt, arguments);
|
||||||
return MessageOutput.write(reinterpret_cast<uint8_t*>(log_buffer), written);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void MessageOutputClass::serialWrite(MessageOutputClass::message_t const& m)
|
int MessageOutputClass::log_vprintf_locked(const char* fmt, va_list arguments)
|
||||||
|
{
|
||||||
|
// we can only write up to the end of the buffer
|
||||||
|
auto len = sizeof(_buffer) - _buffer_in;
|
||||||
|
|
||||||
|
// do not overwrite messages that still need processing. be careful not to
|
||||||
|
// overwrite the length prefix of the next message waiting to be processed,
|
||||||
|
// i.e., the byte that _buffer_out points to.
|
||||||
|
if (_buffer_out > _buffer_in) {
|
||||||
|
len = _buffer_out - _buffer_in;
|
||||||
|
}
|
||||||
|
|
||||||
|
len -= 1; // leave room for length prefix
|
||||||
|
|
||||||
|
auto prefix = &_buffer[_buffer_in];
|
||||||
|
auto start = prefix + 1;
|
||||||
|
auto written = vsnprintf(start, len, fmt, arguments);
|
||||||
|
|
||||||
|
if (written < 0) {
|
||||||
|
if (Serial) {
|
||||||
|
static char const err_msg[] = "ERROR: vsnprintf failed: %d\n";
|
||||||
|
char err_buf[sizeof(err_msg) + 10];
|
||||||
|
auto err_written = snprintf(err_buf, sizeof(err_buf), err_msg, written);
|
||||||
|
Serial.write(err_buf, err_written);
|
||||||
|
}
|
||||||
|
|
||||||
|
*prefix = 0;
|
||||||
|
return written;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (written >= len) {
|
||||||
|
// buffer was too small, at least at the current location
|
||||||
|
*prefix = 0;
|
||||||
|
|
||||||
|
if (_buffer_in > 0 && _buffer_out <= _buffer_in) {
|
||||||
|
// we can safely reset the buffer as there is no data ahead that
|
||||||
|
// still needs processing, so we try again from the beginning.
|
||||||
|
_buffer_in = 0;
|
||||||
|
return log_vprintf_locked(fmt, arguments);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Serial) {
|
||||||
|
Serial.write("ERROR: log buffer overrun\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
written = std::min(written, 255); // effectively truncate to 255 bytes
|
||||||
|
*prefix = static_cast<uint8_t>(written);
|
||||||
|
_buffer_in += 1 + written; // now points to terminating null byte of message we just wrote
|
||||||
|
return written;
|
||||||
|
}
|
||||||
|
|
||||||
|
void MessageOutputClass::serialWrite(const uint8_t* buffer, size_t size)
|
||||||
{
|
{
|
||||||
// operator bool() of HWCDC returns false if the device is not attached to
|
// operator bool() of HWCDC returns false if the device is not attached to
|
||||||
// a USB host. in general it makes sense to skip writing entirely if the
|
// a USB host. in general it makes sense to skip writing entirely if the
|
||||||
@@ -43,55 +97,24 @@ void MessageOutputClass::serialWrite(MessageOutputClass::message_t const& m)
|
|||||||
}
|
}
|
||||||
|
|
||||||
size_t written = 0;
|
size_t written = 0;
|
||||||
while (written < m.size()) {
|
while (written < size) {
|
||||||
written += Serial.write(m.data() + written, m.size() - written);
|
written += Serial.write(buffer + written, size - written);
|
||||||
}
|
}
|
||||||
|
|
||||||
Serial.flush();
|
Serial.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t MessageOutputClass::write(const uint8_t* buffer, size_t size)
|
void MessageOutputClass::send_ws_chunk(const uint8_t* buffer, size_t size)
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(_msgLock);
|
|
||||||
|
|
||||||
auto res = _task_messages.emplace(xTaskGetCurrentTaskHandle(), message_t());
|
|
||||||
auto iter = res.first;
|
|
||||||
auto& message = iter->second;
|
|
||||||
|
|
||||||
message.reserve(message.size() + size);
|
|
||||||
|
|
||||||
for (size_t idx = 0; idx < size; ++idx) {
|
|
||||||
uint8_t c = buffer[idx];
|
|
||||||
|
|
||||||
message.push_back(c);
|
|
||||||
|
|
||||||
if (c == '\n') {
|
|
||||||
serialWrite(message);
|
|
||||||
_lines.emplace(std::move(message));
|
|
||||||
message.clear();
|
|
||||||
message.reserve(size - idx - 1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (message.empty()) {
|
|
||||||
_task_messages.erase(iter);
|
|
||||||
}
|
|
||||||
|
|
||||||
return size;
|
|
||||||
}
|
|
||||||
|
|
||||||
void MessageOutputClass::send_ws_chunk(message_t&& line)
|
|
||||||
{
|
{
|
||||||
if (!_ws) {
|
if (!_ws) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nullptr == _ws_chunk) {
|
if (nullptr == _ws_chunk) {
|
||||||
_ws_chunk = std::make_shared<message_t>(std::move(line));
|
_ws_chunk = std::make_shared<message_t>();
|
||||||
_ws_chunk->reserve(WS_CHUNK_SIZE_BYTES + 128); // add room for one more line
|
_ws_chunk->reserve(WS_CHUNK_SIZE_BYTES + TYPICAL_LINE_LENGTH); // add room for one more line
|
||||||
} else {
|
|
||||||
_ws_chunk->insert(_ws_chunk->end(), line.begin(), line.end());
|
|
||||||
}
|
}
|
||||||
|
_ws_chunk->insert(_ws_chunk->end(), buffer, buffer + size);
|
||||||
|
|
||||||
bool small = _ws_chunk->size() < WS_CHUNK_SIZE_BYTES;
|
bool small = _ws_chunk->size() < WS_CHUNK_SIZE_BYTES;
|
||||||
bool recent = (millis() - _last_ws_chunk_sent) < WS_CHUNK_INTERVAL_MS;
|
bool recent = (millis() - _last_ws_chunk_sent) < WS_CHUNK_INTERVAL_MS;
|
||||||
@@ -126,20 +149,21 @@ void MessageOutputClass::loop()
|
|||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(_msgLock);
|
std::lock_guard<std::mutex> lock(_msgLock);
|
||||||
|
|
||||||
// clean up (possibly filled) buffers of deleted tasks
|
while (_buffer_out != _buffer_in) {
|
||||||
auto map_iter = _task_messages.begin();
|
uint8_t msg_len = static_cast<uint8_t>(_buffer[_buffer_out]);
|
||||||
while (map_iter != _task_messages.end()) {
|
if (msg_len == 0) {
|
||||||
if (eTaskGetState(map_iter->first) == eDeleted) {
|
// we only read a null byte while we still have messages to process
|
||||||
map_iter = _task_messages.erase(map_iter);
|
// if we are at the end of the buffer and the next message is at the
|
||||||
|
// front. wrap around.
|
||||||
|
_buffer_out = 0;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
++map_iter;
|
auto msg_start = reinterpret_cast<const uint8_t*>(&_buffer[_buffer_out]) + 1;
|
||||||
}
|
serialWrite(msg_start, msg_len);
|
||||||
|
Syslog.write(msg_start, msg_len);
|
||||||
|
send_ws_chunk(msg_start, msg_len);
|
||||||
|
|
||||||
while (!_lines.empty()) {
|
_buffer_out += 1 + msg_len;
|
||||||
Syslog.write(_lines.front().data(), _lines.front().size());
|
|
||||||
send_ws_chunk(std::move(_lines.front()));
|
|
||||||
_lines.pop();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user