line buffer: cleanup

This commit is contained in:
Raphael Robatsch 2021-10-29 20:03:26 +02:00
parent 9dffc074ff
commit ebf06f932f
1 changed files with 54 additions and 43 deletions

View File

@ -3,58 +3,69 @@
#pragma once #pragma once
#include <array> #include <array>
#include <string.h> #include <algorithm>
#include <sys/types.h>
// reads data from Reader, and passes complete lines to Handler. // reads data from Reader, and passes complete lines to Consumer.
template<size_t N> template<size_t BufSize>
class LineBuffer { class LineBuffer {
std::array<char, N> _buffer; using Iterator = typename std::array<char, BufSize>::iterator;
size_t _bytesBuffered {0}; std::array<char, BufSize> _buffer;
size_t _bytesConsumed {0}; Iterator _bufferedTo;
bool _discardLine {0}; Iterator _consumedTo;
bool _discardLine {false};
public: public:
template<typename Reader, typename Handler> LineBuffer()
size_t readLines(const Reader& reader, const Handler& handler) : _bufferedTo {_buffer.begin()}
, _consumedTo {_buffer.begin()}
{
}
template<typename Reader, typename Consumer>
ssize_t readLines(const Reader& reader, const Consumer& consumer)
{ {
auto d = _buffer.data();
while (true) { while (true) {
auto bytesRead = reader(d + _bytesBuffered, _buffer.size() - _bytesBuffered); auto bytesRead = reader(_bufferedTo, _buffer.end() - _bufferedTo);
if (bytesRead <= 0) { if (bytesRead <= 0) {
return bytesRead; return bytesRead;
} }
_bufferedTo += bytesRead;
_bytesBuffered += bytesRead; dispatchLines(consumer);
char* linePosition = nullptr; resetBuffer();
do {
char* lineStart = d + _bytesConsumed;
linePosition = static_cast<char*>(memchr(
lineStart,
'\n',
_bytesBuffered - _bytesConsumed));
if (linePosition) {
int lineLength = linePosition - lineStart;
if (!_discardLine) {
handler(lineStart, lineLength);
} }
_bytesConsumed += lineLength + 1; }
private:
template<typename Consumer>
void dispatchLines(const Consumer& consumer)
{
while (true) {
auto separator = std::find(_consumedTo, _bufferedTo, '\n');
if (separator == _bufferedTo) {
break;
}
size_t lineLength = separator - _consumedTo;
if (!_discardLine) {
consumer(_consumedTo, lineLength);
}
_consumedTo = separator + 1;
_discardLine = false; _discardLine = false;
} }
} while (linePosition); }
size_t bytesRemaining = _bytesBuffered - _bytesConsumed; void resetBuffer()
{
size_t bytesRemaining = _bufferedTo - _consumedTo;
if (bytesRemaining == _buffer.size()) { if (bytesRemaining == _buffer.size()) {
// line too long // line too long
_discardLine = true; _discardLine = true;
_bytesBuffered = 0; _consumedTo = _buffer.begin();
_bytesConsumed = 0; _bufferedTo = _buffer.begin();
} else if (bytesRemaining > 0 && _bytesConsumed > 0) { } else if (bytesRemaining > 0 && _consumedTo > _buffer.begin()) {
// move the last partial message to the front of the buffer, so a full-sized // move the last partial message to the front of the buffer, so a full-sized
// message will fit // message will fit
memmove(d, d+_bytesConsumed, bytesRemaining); std::copy(_consumedTo, _bufferedTo, _buffer.begin());
_bytesBuffered = bytesRemaining; _consumedTo = _buffer.begin();
_bytesConsumed = 0; _bufferedTo = _consumedTo + bytesRemaining;
}
} }
} }
}; };