apparently the small move succeded.

fb-mini-snake
Claudius Holeksa 2022-12-18 20:04:32 +01:00
parent 32aa36a43d
commit 3be68973ef
12 changed files with 345 additions and 339 deletions

View File

@ -6,15 +6,15 @@ namespace saw {
namespace unix {
IFdOwner::IFdOwner(UnixEventPort &event_port, int file_descriptor, int fd_flags,
uint32_t event_mask)
: event_port{event_port}, file_descriptor{file_descriptor},
fd_flags{fd_flags}, event_mask{event_mask} {
event_port.subscribe(*this, file_descriptor, event_mask);
: event_port_{event_port}, file_descriptor_{file_descriptor},
fd_flags_{fd_flags}, event_mask_{event_mask} {
event_port_.subscribe(*this, file_descriptor, event_mask);
}
IFdOwner::~IFdOwner() {
if (file_descriptor >= 0) {
event_port.unsubscribe(file_descriptor);
::close(file_descriptor);
if (file_descriptor_ >= 0) {
event_port_.unsubscribe(file_descriptor_);
::close(file_descriptor_);
}
}
@ -44,13 +44,13 @@ ErrorOr<size_t> UnixIoStream::read(void *buffer, size_t length) {
Conveyor<void> UnixIoStream::readReady() {
auto caf = newConveyorAndFeeder<void>();
read_ready = std::move(caf.feeder);
read_ready_ = std::move(caf.feeder);
return std::move(caf.conveyor);
}
Conveyor<void> UnixIoStream::onReadDisconnected() {
auto caf = newConveyorAndFeeder<void>();
on_read_disconnect = std::move(caf.feeder);
on_read_disconnect_ = std::move(caf.feeder);
return std::move(caf.conveyor);
}
@ -71,26 +71,26 @@ ErrorOr<size_t> UnixIoStream::write(const void *buffer, size_t length) {
Conveyor<void> UnixIoStream::writeReady() {
auto caf = newConveyorAndFeeder<void>();
write_ready = std::move(caf.feeder);
write_ready_ = std::move(caf.feeder);
return std::move(caf.conveyor);
}
void UnixIoStream::notify(uint32_t mask) {
if (mask & EPOLLOUT) {
if (write_ready) {
write_ready->feed();
if (write_ready_) {
write_ready_->feed();
}
}
if (mask & EPOLLIN) {
if (read_ready) {
read_ready->feed();
if (read_ready_) {
read_ready_->feed();
}
}
if (mask & EPOLLRDHUP) {
if (on_read_disconnect) {
on_read_disconnect->feed();
if (on_read_disconnect_) {
on_read_disconnect_->feed();
}
}
}
@ -101,13 +101,13 @@ UnixServer::UnixServer(UnixEventPort &event_port, int file_descriptor,
Conveyor<Own<IoStream>> UnixServer::accept() {
auto caf = newConveyorAndFeeder<Own<IoStream>>();
accept_feeder = std::move(caf.feeder);
accept_feeder_ = std::move(caf.feeder);
return std::move(caf.conveyor);
}
void UnixServer::notify(uint32_t mask) {
if (mask & EPOLLIN) {
if (accept_feeder) {
if (accept_feeder_) {
struct ::sockaddr_storage address;
socklen_t address_length = sizeof(address);
@ -117,9 +117,9 @@ void UnixServer::notify(uint32_t mask) {
if (accept_fd < 0) {
return;
}
auto fd_stream = heap<UnixIoStream>(event_port, accept_fd, 0,
auto fd_stream = heap<UnixIoStream>(event_port_, accept_fd, 0,
EPOLLIN | EPOLLOUT);
accept_feeder->feed(std::move(fd_stream));
accept_feeder_->feed(std::move(fd_stream));
}
}
}
@ -154,7 +154,7 @@ ErrorOr<size_t> UnixDatagram::read(void *buffer, size_t length) {
Conveyor<void> UnixDatagram::readReady() {
auto caf = newConveyorAndFeeder<void>();
read_ready = std::move(caf.feeder);
read_ready_ = std::move(caf.feeder);
return std::move(caf.conveyor);
}
@ -173,20 +173,20 @@ ErrorOr<size_t> UnixDatagram::write(const void *buffer, size_t length,
Conveyor<void> UnixDatagram::writeReady() {
auto caf = newConveyorAndFeeder<void>();
write_ready = std::move(caf.feeder);
write_ready_ = std::move(caf.feeder);
return std::move(caf.conveyor);
}
void UnixDatagram::notify(uint32_t mask) {
if (mask & EPOLLOUT) {
if (write_ready) {
write_ready->feed();
if (write_ready_) {
write_ready_->feed();
}
}
if (mask & EPOLLIN) {
if (read_ready) {
read_ready->feed();
if (read_ready_) {
read_ready_->feed();
}
}
}
@ -268,7 +268,7 @@ Own<Server> UnixNetwork::listen(NetworkAddress &addr) {
::listen(fd, SOMAXCONN);
return heap<UnixServer>(event_port, fd, 0);
return heap<UnixServer>(event_port_, fd, 0);
}
Conveyor<Own<IoStream>> UnixNetwork::connect(NetworkAddress &addr) {
@ -286,7 +286,7 @@ Conveyor<Own<IoStream>> UnixNetwork::connect(NetworkAddress &addr) {
}
Own<UnixIoStream> io_stream =
heap<UnixIoStream>(event_port, fd, 0, EPOLLIN | EPOLLOUT);
heap<UnixIoStream>(event_port_, fd, 0, EPOLLIN | EPOLLOUT);
bool success = false;
for (size_t i = 0; i < address.unixAddressSize(); ++i) {
@ -353,22 +353,22 @@ Own<Datagram> UnixNetwork::datagram(NetworkAddress &addr) {
return nullptr;
}
/// @todo
return heap<UnixDatagram>(event_port, fd, 0);
return heap<UnixDatagram>(event_port_, fd, 0);
}
const std::string &UnixNetworkAddress::address() const { return path; }
const std::string &UnixNetworkAddress::address() const { return path_; }
uint16_t UnixNetworkAddress::port() const { return port_hint; }
uint16_t UnixNetworkAddress::port() const { return port_hint_; }
SocketAddress &UnixNetworkAddress::unixAddress(size_t i) {
assert(i < addresses.size());
assert(i < addresses_.size());
/// @todo change from list to vector?
return addresses.at(i);
return addresses_.at(i);
}
size_t UnixNetworkAddress::unixAddressSize() const { return addresses.size(); }
size_t UnixNetworkAddress::unixAddressSize() const { return addresses_.size(); }
UnixNetwork::UnixNetwork(UnixEventPort &event) : event_port{event} {}
UnixNetwork::UnixNetwork(UnixEventPort &event) : event_port_{event} {}
Conveyor<Own<NetworkAddress>>
UnixNetwork::resolveAddress(const std::string &path, uint16_t port_hint) {
@ -388,18 +388,18 @@ UnixNetwork::resolveAddress(const std::string &path, uint16_t port_hint) {
}
UnixIoProvider::UnixIoProvider(UnixEventPort &port_ref, Own<EventPort> port)
: event_port{port_ref}, event_loop{std::move(port)}, unix_network{
port_ref} {}
: event_port_{port_ref}, event_loop_{std::move(port)}, unix_network_{
port_ref} {}
Own<InputStream> UnixIoProvider::wrapInputFd(int fd) {
return heap<UnixIoStream>(event_port, fd, 0, EPOLLIN);
return heap<UnixIoStream>(event_port_, fd, 0, EPOLLIN);
}
Network &UnixIoProvider::network() {
return static_cast<Network &>(unix_network);
return static_cast<Network &>(unix_network_);
}
EventLoop &UnixIoProvider::eventLoop() { return event_loop; }
EventLoop &UnixIoProvider::eventLoop() { return event_loop_; }
} // namespace unix

View File

@ -60,7 +60,8 @@ private:
sigset_t signal_fd_set_;
std::unordered_multimap<Signal, Own<ConveyorFeeder<void>>> signal_conveyors_;
std::unordered_multimap<Signal, Own<ConveyorFeeder<void>>>
signal_conveyors_;
int pipefds_[2];
@ -159,7 +160,8 @@ public:
}
::sigemptyset(&signal_fd_set_);
signal_fd_ = ::signalfd(-1, &signal_fd_set_, SFD_NONBLOCK | SFD_CLOEXEC);
signal_fd_ =
::signalfd(-1, &signal_fd_set_, SFD_NONBLOCK | SFD_CLOEXEC);
if (signal_fd_ < 0) {
return;
}

View File

@ -64,7 +64,7 @@ ConveyorStorage::ConveyorStorage() {}
ConveyorStorage::~ConveyorStorage() {}
ConveyorStorage *ConveyorStorage::getParent() const { return parent; }
ConveyorStorage *ConveyorStorage::getParent() const { return parent_; }
void ConveyorEventStorage::setParent(ConveyorStorage *p) {
/*
@ -74,19 +74,19 @@ void ConveyorEventStorage::setParent(ConveyorStorage *p) {
* and a valid parent
*/
if (/*!parent && */ p && !isArmed() && queued() > 0) {
assert(!parent);
assert(!parent_);
if (p->space() > 0) {
armLater();
}
}
parent = p;
parent_ = p;
}
ConveyorEventStorage::ConveyorEventStorage() : ConveyorStorage{} {}
ConveyorBase::ConveyorBase(Own<ConveyorNode> &&node_p)
: node{std::move(node_p)} {}
: node_{std::move(node_p)} {}
Error PropagateError::operator()(const Error &error) const {
return error.copyError();
@ -96,115 +96,115 @@ Error PropagateError::operator()(Error &&error) { return std::move(error); }
Event::Event() : Event(currentEventLoop()) {}
Event::Event(EventLoop &loop) : loop{loop} {}
Event::Event(EventLoop &loop) : loop_{loop} {}
Event::~Event() { disarm(); }
void Event::armNext() {
assert(&loop == local_loop);
if (prev == nullptr) {
assert(&loop_ == local_loop);
if (prev_ == nullptr) {
// Push the next_insert_point back by one
// and inserts itself before that
next = *loop.next_insert_point;
prev = loop.next_insert_point;
*prev = this;
if (next) {
next->prev = &next;
next_ = *loop_.next_insert_point_;
prev_ = loop_.next_insert_point_;
*prev_ = this;
if (next_) {
next_->prev_ = &next_;
}
// Set the new insertion ptr location to next
loop.next_insert_point = &next;
loop_.next_insert_point_ = &next_;
// Pushes back the later insert point if it was pointing at the
// previous event
if (loop.later_insert_point == prev) {
loop.later_insert_point = &next;
if (loop_.later_insert_point_ == prev_) {
loop_.later_insert_point_ = &next_;
}
// If tail points at the same location then
// we are at the end and have to update tail then.
// If tail_ points at the same location then
// we are at the end and have to update tail_ then.
// Technically should be possible by checking if
// next is a `nullptr`
if (loop.tail == prev) {
loop.tail = &next;
if (loop_.tail_ == prev_) {
loop_.tail_ = &next_;
}
loop.setRunnable(true);
loop_.setRunnable(true);
}
}
void Event::armLater() {
assert(&loop == local_loop);
assert(&loop_ == local_loop);
if (prev == nullptr) {
next = *loop.later_insert_point;
prev = loop.later_insert_point;
*prev = this;
if (next) {
next->prev = &next;
if (prev_ == nullptr) {
next_ = *loop_.later_insert_point_;
prev_ = loop_.later_insert_point_;
*prev_ = this;
if (next_) {
next_->prev_ = &next_;
}
loop.later_insert_point = &next;
if (loop.tail == prev) {
loop.tail = &next;
loop_.later_insert_point_ = &next_;
if (loop_.tail_ == prev_) {
loop_.tail_ = &next_;
}
loop.setRunnable(true);
loop_.setRunnable(true);
}
}
void Event::armLast() {
assert(&loop == local_loop);
assert(&loop_ == local_loop);
if (prev == nullptr) {
next = *loop.later_insert_point;
prev = loop.later_insert_point;
*prev = this;
if (next) {
next->prev = &next;
if (prev_ == nullptr) {
next_ = *loop_.later_insert_point_;
prev_ = loop_.later_insert_point_;
*prev_ = this;
if (next_) {
next_->prev_ = &next_;
}
if (loop.tail == prev) {
loop.tail = &next;
if (loop_.tail_ == prev_) {
loop_.tail_ = &next_;
}
loop.setRunnable(true);
loop_.setRunnable(true);
}
}
void Event::disarm() {
if (prev != nullptr) {
if (loop.tail == &next) {
loop.tail = prev;
if (prev_ != nullptr) {
if (loop_.tail_ == &next_) {
loop_.tail_ = prev_;
}
if (loop.next_insert_point == &next) {
loop.next_insert_point = prev;
if (loop_.next_insert_point_ == &next_) {
loop_.next_insert_point_ = prev_;
}
*prev = next;
if (next) {
next->prev = prev;
*prev_ = next_;
if (next_) {
next_->prev_ = prev_;
}
prev = nullptr;
next = nullptr;
prev_ = nullptr;
next_ = nullptr;
}
}
bool Event::isArmed() const { return prev != nullptr; }
bool Event::isArmed() const { return prev_ != nullptr; }
ConveyorSink::ConveyorSink() : node{nullptr} {}
ConveyorSink::ConveyorSink() : node_{nullptr} {}
ConveyorSink::ConveyorSink(Own<ConveyorNode> &&node_p)
: node{std::move(node_p)} {}
: node_{std::move(node_p)} {}
void EventLoop::setRunnable(bool runnable) { is_runnable = runnable; }
void EventLoop::setRunnable(bool runnable) { is_runnable_ = runnable; }
EventLoop::EventLoop() {}
EventLoop::EventLoop(Own<EventPort> &&event_port)
: event_port{std::move(event_port)} {}
: event_port_{std::move(event_port)} {}
EventLoop::~EventLoop() { assert(local_loop != this); }
@ -220,7 +220,7 @@ void EventLoop::leaveScope() {
bool EventLoop::turnLoop() {
size_t turn_step = 0;
while (head && turn_step < 65536) {
while (head_ && turn_step < 65536) {
if (!turn()) {
return false;
}
@ -230,29 +230,29 @@ bool EventLoop::turnLoop() {
}
bool EventLoop::turn() {
Event *event = head;
Event *event = head_;
if (!event) {
return false;
}
head = event->next;
if (head) {
head->prev = &head;
head_ = event->next_;
if (head_) {
head_->prev_ = &head_;
}
next_insert_point = &head;
if (later_insert_point == &event->next) {
later_insert_point = &head;
next_insert_point_ = &head_;
if (later_insert_point_ == &event->next_) {
later_insert_point_ = &head_;
}
if (tail == &event->next) {
tail = &head;
if (tail_ == &event->next_) {
tail_ = &head_;
}
event->next = nullptr;
event->prev = nullptr;
event->next_ = nullptr;
event->prev_ = nullptr;
next_insert_point = &head;
next_insert_point_ = &head_;
event->fire();
@ -260,72 +260,72 @@ bool EventLoop::turn() {
}
bool EventLoop::wait(const std::chrono::steady_clock::duration &duration) {
if (event_port) {
event_port->wait(duration);
if (event_port_) {
event_port_->wait(duration);
}
return turnLoop();
}
bool EventLoop::wait(const std::chrono::steady_clock::time_point &time_point) {
if (event_port) {
event_port->wait(time_point);
if (event_port_) {
event_port_->wait(time_point);
}
return turnLoop();
}
bool EventLoop::wait() {
if (event_port) {
event_port->wait();
if (event_port_) {
event_port_->wait();
}
return turnLoop();
}
bool EventLoop::poll() {
if (event_port) {
event_port->poll();
if (event_port_) {
event_port_->poll();
}
return turnLoop();
}
EventPort *EventLoop::eventPort() { return event_port.get(); }
EventPort *EventLoop::eventPort() { return event_port_.get(); }
ConveyorSinkSet &EventLoop::daemon() {
if (!daemon_sink) {
daemon_sink = heap<ConveyorSinkSet>();
if (!daemon_sink_) {
daemon_sink_ = heap<ConveyorSinkSet>();
}
return *daemon_sink;
return *daemon_sink_;
}
WaitScope::WaitScope(EventLoop &loop) : loop{loop} { loop.enterScope(); }
WaitScope::WaitScope(EventLoop &loop) : loop_{loop} { loop_.enterScope(); }
WaitScope::~WaitScope() { loop.leaveScope(); }
WaitScope::~WaitScope() { loop_.leaveScope(); }
void WaitScope::wait() { loop.wait(); }
void WaitScope::wait() { loop_.wait(); }
void WaitScope::wait(const std::chrono::steady_clock::duration &duration) {
loop.wait(duration);
loop_.wait(duration);
}
void WaitScope::wait(const std::chrono::steady_clock::time_point &time_point) {
loop.wait(time_point);
loop_.wait(time_point);
}
void WaitScope::poll() { loop.poll(); }
void WaitScope::poll() { loop_.poll(); }
ErrorOr<Own<ConveyorNode>>
ConvertConveyorNodeBase::swapChild(Own<ConveyorNode> &&swapee) noexcept {
return child_mixin.swapChild(std::move(swapee));
return child_mixin_.swapChild(std::move(swapee));
}
ConveyorStorage *ConvertConveyorNodeBase::nextStorage() noexcept {
if (!child_mixin.child) {
if (!child_mixin_.child) {
return nullptr;
}
return child_mixin.child->nextStorage();
return child_mixin_.child->nextStorage();
}
ImmediateConveyorNodeBase::ImmediateConveyorNodeBase()
@ -335,7 +335,7 @@ MergeConveyorNodeBase::MergeConveyorNodeBase() : ConveyorEventStorage{} {}
ErrorOr<Own<ConveyorNode>>
QueueBufferConveyorNodeBase::swapChild(Own<ConveyorNode> &&swapee_) noexcept {
return child_mixin.swapChild(std::move(swapee_));
return child_mixin_.swapChild(std::move(swapee_));
}
void ConveyorSinkSet::destroySinkConveyorNode(ConveyorNode &node) {
@ -343,7 +343,7 @@ void ConveyorSinkSet::destroySinkConveyorNode(ConveyorNode &node) {
armLast();
}
delete_nodes.push(&node);
delete_nodes_.push(&node);
}
void ConveyorSinkSet::fail(Error &&error) {
@ -367,44 +367,45 @@ void ConveyorSinkSet::add(Conveyor<void> &&sink) {
storage->setParent(sink_node.get());
}
sink_nodes.emplace_back(std::move(sink_node));
sink_nodes_.emplace_back(std::move(sink_node));
}
void ConveyorSinkSet::fire() {
while (!delete_nodes.empty()) {
ConveyorNode *node = delete_nodes.front();
/*auto erased = */ std::remove_if(sink_nodes.begin(), sink_nodes.end(),
while (!delete_nodes_.empty()) {
ConveyorNode *node = delete_nodes_.front();
/*auto erased = */ std::remove_if(sink_nodes_.begin(),
sink_nodes_.end(),
[node](Own<ConveyorNode> &element) {
return node == element.get();
});
delete_nodes.pop();
delete_nodes_.pop();
}
}
ConvertConveyorNodeBase::ConvertConveyorNodeBase(Own<ConveyorNode> &&dep)
: child_mixin{std::move(dep), *this} {}
: child_mixin_{std::move(dep), *this} {}
void ConvertConveyorNodeBase::getResult(ErrorOrValue &err_or_val) {
getImpl(err_or_val);
}
void AttachConveyorNodeBase::getResult(ErrorOrValue &err_or_val) noexcept {
if (child_mixin.child) {
child_mixin.child->getResult(err_or_val);
if (child_mixin_.child) {
child_mixin_.child->getResult(err_or_val);
}
}
ErrorOr<Own<ConveyorNode>>
AttachConveyorNodeBase::swapChild(Own<ConveyorNode> &&swapee_) noexcept {
return child_mixin.swapChild(std::move(swapee_));
return child_mixin_.swapChild(std::move(swapee_));
}
ConveyorStorage *AttachConveyorNodeBase::nextStorage() noexcept {
if (!child_mixin.child) {
if (!child_mixin_.child) {
return nullptr;
}
return child_mixin.child->nextStorage();
return child_mixin_.child->nextStorage();
}
void detachConveyor(Conveyor<void> &&conveyor) {

View File

@ -81,125 +81,125 @@ std::string Buffer::toHex() const {
}
BufferView::BufferView(Buffer &buffer)
: buffer{buffer}, read_offset{0}, write_offset{0} {}
: buffer_{buffer}, read_offset_{0}, write_offset_{0} {}
size_t BufferView::readPosition() const {
return read_offset + buffer.readPosition();
return read_offset_ + buffer_.readPosition();
}
size_t BufferView::readCompositeLength() const {
assert(read_offset <= buffer.readCompositeLength());
if (read_offset > buffer.readCompositeLength()) {
assert(read_offset_ <= buffer_.readCompositeLength());
if (read_offset_ > buffer_.readCompositeLength()) {
return 0;
}
return buffer.readCompositeLength() - read_offset;
return buffer_.readCompositeLength() - read_offset_;
}
size_t BufferView::readSegmentLength(size_t offset) const {
size_t off = offset + read_offset;
assert(off <= buffer.readCompositeLength());
if (off > buffer.readCompositeLength()) {
size_t off = offset + read_offset_;
assert(off <= buffer_.readCompositeLength());
if (off > buffer_.readCompositeLength()) {
return 0;
}
return buffer.readSegmentLength(off);
return buffer_.readSegmentLength(off);
}
void BufferView::readAdvance(size_t bytes) {
size_t offset = bytes + read_offset;
assert(offset <= buffer.readCompositeLength());
if (offset > buffer.readCompositeLength()) {
read_offset += buffer.readCompositeLength();
size_t offset = bytes + read_offset_;
assert(offset <= buffer_.readCompositeLength());
if (offset > buffer_.readCompositeLength()) {
read_offset_ += buffer_.readCompositeLength();
return;
}
read_offset += bytes;
read_offset_ += bytes;
}
uint8_t &BufferView::read(size_t i) {
size_t pos = i + read_offset;
size_t pos = i + read_offset_;
assert(pos < buffer.readCompositeLength());
assert(pos < buffer_.readCompositeLength());
return buffer.read(pos);
return buffer_.read(pos);
}
const uint8_t &BufferView::read(size_t i) const {
size_t pos = i + read_offset;
size_t pos = i + read_offset_;
assert(pos < buffer.readCompositeLength());
assert(pos < buffer_.readCompositeLength());
return buffer.read(pos);
return buffer_.read(pos);
}
size_t BufferView::writePosition() const {
return write_offset + buffer.writePosition();
return write_offset_ + buffer_.writePosition();
}
size_t BufferView::writeCompositeLength() const {
assert(write_offset <= buffer.writeCompositeLength());
if (write_offset > buffer.writeCompositeLength()) {
assert(write_offset_ <= buffer_.writeCompositeLength());
if (write_offset_ > buffer_.writeCompositeLength()) {
return 0;
}
return buffer.writeCompositeLength() - write_offset;
return buffer_.writeCompositeLength() - write_offset_;
}
size_t BufferView::writeSegmentLength(size_t offset) const {
size_t off = offset + write_offset;
assert(off <= buffer.writeCompositeLength());
if (off > buffer.writeCompositeLength()) {
size_t off = offset + write_offset_;
assert(off <= buffer_.writeCompositeLength());
if (off > buffer_.writeCompositeLength()) {
return 0;
}
return buffer.writeSegmentLength(off);
return buffer_.writeSegmentLength(off);
}
void BufferView::writeAdvance(size_t bytes) {
size_t offset = bytes + write_offset;
assert(offset <= buffer.writeCompositeLength());
if (offset > buffer.writeCompositeLength()) {
write_offset += buffer.writeCompositeLength();
size_t offset = bytes + write_offset_;
assert(offset <= buffer_.writeCompositeLength());
if (offset > buffer_.writeCompositeLength()) {
write_offset_ += buffer_.writeCompositeLength();
return;
}
write_offset += bytes;
write_offset_ += bytes;
}
uint8_t &BufferView::write(size_t i) {
size_t pos = i + write_offset;
size_t pos = i + write_offset_;
assert(pos < buffer.writeCompositeLength());
assert(pos < buffer_.writeCompositeLength());
return buffer.write(pos);
return buffer_.write(pos);
}
const uint8_t &BufferView::write(size_t i) const {
size_t pos = i + write_offset;
size_t pos = i + write_offset_;
assert(pos < buffer.writeCompositeLength());
assert(pos < buffer_.writeCompositeLength());
return buffer.write(pos);
return buffer_.write(pos);
}
Error BufferView::writeRequireLength(size_t bytes) {
return buffer.writeRequireLength(bytes + write_offset);
return buffer_.writeRequireLength(bytes + write_offset_);
}
size_t BufferView::readOffset() const { return read_offset; }
size_t BufferView::readOffset() const { return read_offset_; }
size_t BufferView::writeOffset() const { return write_offset; }
size_t BufferView::writeOffset() const { return write_offset_; }
RingBuffer::RingBuffer() : read_position{0}, write_position{0} {
buffer.resize(RING_BUFFER_MAX_SIZE);
RingBuffer::RingBuffer() : read_position_{0}, write_position_{0} {
buffer_.resize(RING_BUFFER_MAX_SIZE);
}
RingBuffer::RingBuffer(size_t size) : read_position{0}, write_position{0} {
buffer.resize(size);
RingBuffer::RingBuffer(size_t size) : read_position_{0}, write_position_{0} {
buffer_.resize(size);
}
size_t RingBuffer::readPosition() const { return read_position; }
size_t RingBuffer::readPosition() const { return read_position_; }
/*
* If write is ahead of read it is a simple distance, but if read ist ahead of
@ -208,9 +208,9 @@ size_t RingBuffer::readPosition() const { return read_position; }
*/
size_t RingBuffer::readCompositeLength() const {
return writePosition() < readPosition()
? buffer.size() - (readPosition() - writePosition())
: (write_reached_read ? buffer.size()
: writePosition() - readPosition());
? buffer_.size() - (readPosition() - writePosition())
: (write_reached_read_ ? buffer_.size()
: writePosition() - readPosition());
}
/*
@ -224,8 +224,8 @@ size_t RingBuffer::readSegmentLength(size_t offset) const {
size_t remaining = read_composite - offset;
size_t read_offset = readPosition() + offset;
read_offset = read_offset >= buffer.size() ? read_offset - buffer.size()
: read_offset;
read_offset = read_offset >= buffer_.size() ? read_offset - buffer_.size()
: read_offset;
// case 1 write is located before read and reached read
// then offset can be used normally
@ -234,12 +234,12 @@ size_t RingBuffer::readSegmentLength(size_t offset) const {
// case 3 write is located after read
// since std::min you can use simple subtraction
if (writePosition() < read_offset) {
return buffer.size() - read_offset;
return buffer_.size() - read_offset;
}
if (writePosition() == read_offset) {
if (remaining > 0) {
return buffer.size() - read_offset;
return buffer_.size() - read_offset;
} else {
return 0;
}
@ -253,34 +253,34 @@ void RingBuffer::readAdvance(size_t bytes) {
assert(bytes <= read_composite);
bytes = std::min(bytes, read_composite);
size_t advanced = read_position + bytes;
read_position = advanced >= buffer.size() ? advanced - buffer.size()
: advanced;
write_reached_read = bytes > 0 ? false : write_reached_read;
size_t advanced = read_position_ + bytes;
read_position_ = advanced >= buffer_.size() ? advanced - buffer_.size()
: advanced;
write_reached_read_ = bytes > 0 ? false : write_reached_read_;
}
uint8_t &RingBuffer::read(size_t i) {
assert(i < readCompositeLength());
size_t pos = read_position + i;
pos = pos >= buffer.size() ? pos - buffer.size() : pos;
return buffer[pos];
size_t pos = read_position_ + i;
pos = pos >= buffer_.size() ? pos - buffer_.size() : pos;
return buffer_[pos];
}
const uint8_t &RingBuffer::read(size_t i) const {
assert(i < readCompositeLength());
size_t pos = read_position + i;
pos = pos >= buffer.size() ? pos - buffer.size() : pos;
return buffer[pos];
size_t pos = read_position_ + i;
pos = pos >= buffer_.size() ? pos - buffer_.size() : pos;
return buffer_[pos];
}
size_t RingBuffer::writePosition() const { return write_position; }
size_t RingBuffer::writePosition() const { return write_position_; }
size_t RingBuffer::writeCompositeLength() const {
return readPosition() > writePosition()
? (readPosition() - writePosition())
: (write_reached_read
: (write_reached_read_
? 0
: buffer.size() - (writePosition() - readPosition()));
: buffer_.size() - (writePosition() - readPosition()));
}
size_t RingBuffer::writeSegmentLength(size_t offset) const {
@ -289,42 +289,43 @@ size_t RingBuffer::writeSegmentLength(size_t offset) const {
offset = std::min(offset, write_composite);
size_t write_offset = writePosition() + offset;
write_offset = write_offset >= buffer.size() ? write_offset - buffer.size()
: write_offset;
write_offset = write_offset >= buffer_.size()
? write_offset - buffer_.size()
: write_offset;
if (read_position > write_offset) {
return read_position - write_offset;
if (read_position_ > write_offset) {
return read_position_ - write_offset;
}
if (write_reached_read) {
if (write_reached_read_) {
return 0;
}
return buffer.size() - write_offset;
return buffer_.size() - write_offset;
}
void RingBuffer::writeAdvance(size_t bytes) {
assert(bytes <= writeCompositeLength());
size_t advanced = write_position + bytes;
write_position = advanced >= buffer.size() ? advanced - buffer.size()
: advanced;
size_t advanced = write_position_ + bytes;
write_position_ = advanced >= buffer_.size() ? advanced - buffer_.size()
: advanced;
write_reached_read =
(write_position == read_position && bytes > 0 ? true : false);
write_reached_read_ =
(write_position_ == read_position_ && bytes > 0 ? true : false);
}
uint8_t &RingBuffer::write(size_t i) {
assert(i < writeCompositeLength());
size_t pos = write_position + i;
pos = pos >= buffer.size() ? pos - buffer.size() : pos;
return buffer[pos];
size_t pos = write_position_ + i;
pos = pos >= buffer_.size() ? pos - buffer_.size() : pos;
return buffer_[pos];
}
const uint8_t &RingBuffer::write(size_t i) const {
assert(i < writeCompositeLength());
size_t pos = write_position + i;
pos = pos >= buffer.size() ? pos - buffer.size() : pos;
return buffer[pos];
size_t pos = write_position_ + i;
pos = pos >= buffer_.size() ? pos - buffer_.size() : pos;
return buffer_[pos];
}
/*
Error RingBuffer::increaseSize(size_t size){
@ -352,14 +353,14 @@ Error RingBuffer::writeRequireLength(size_t bytes) {
return noError();
}
ArrayBuffer::ArrayBuffer(size_t size) : read_position{0}, write_position{0} {
buffer.resize(size);
ArrayBuffer::ArrayBuffer(size_t size) : read_position_{0}, write_position_{0} {
buffer_.resize(size);
}
size_t ArrayBuffer::readPosition() const { return read_position; }
size_t ArrayBuffer::readPosition() const { return read_position_; }
size_t ArrayBuffer::readCompositeLength() const {
return write_position - read_position;
return write_position_ - read_position_;
}
size_t ArrayBuffer::readSegmentLength(size_t offset) const {
@ -367,59 +368,59 @@ size_t ArrayBuffer::readSegmentLength(size_t offset) const {
assert(offset <= read_composite);
offset = std::min(read_composite, offset);
size_t read_offset = read_position + offset;
size_t read_offset = read_position_ + offset;
return write_position - read_offset;
return write_position_ - read_offset;
}
void ArrayBuffer::readAdvance(size_t bytes) {
assert(bytes <= readCompositeLength());
read_position += bytes;
read_position_ += bytes;
}
uint8_t &ArrayBuffer::read(size_t i) {
assert(i < readCompositeLength());
return buffer[i + read_position];
return buffer_[i + read_position_];
}
const uint8_t &ArrayBuffer::read(size_t i) const {
assert(i + read_position < buffer.size());
assert(i + read_position_ < buffer_.size());
return buffer[i + read_position];
return buffer_[i + read_position_];
}
size_t ArrayBuffer::writePosition() const { return write_position; }
size_t ArrayBuffer::writePosition() const { return write_position_; }
size_t ArrayBuffer::writeCompositeLength() const {
assert(write_position <= buffer.size());
return buffer.size() - write_position;
assert(write_position_ <= buffer_.size());
return buffer_.size() - write_position_;
}
size_t ArrayBuffer::writeSegmentLength(size_t offset) const {
assert(write_position <= buffer.size());
assert(write_position_ <= buffer_.size());
size_t write_composite = writeCompositeLength();
assert(offset <= write_composite);
offset = std::min(write_composite, offset);
size_t write_offset = write_position + offset;
size_t write_offset = write_position_ + offset;
return buffer.size() - write_offset;
return buffer_.size() - write_offset;
}
void ArrayBuffer::writeAdvance(size_t bytes) {
assert(bytes <= writeCompositeLength());
write_position += bytes;
write_position_ += bytes;
}
uint8_t &ArrayBuffer::write(size_t i) {
assert(i < writeCompositeLength());
return buffer[i + write_position];
return buffer_[i + write_position_];
}
const uint8_t &ArrayBuffer::write(size_t i) const {
assert(i < writeCompositeLength());
return buffer[i + write_position];
return buffer_[i + write_position_];
}
Error ArrayBuffer::writeRequireLength(size_t bytes) {
size_t write_remain = writeCompositeLength();

View File

@ -4,14 +4,14 @@ namespace saw {
Error::Error() : error_{static_cast<Error::Code>(0)} {}
Error::Error(const std::string_view &msg, Error::Code code)
: error_message{msg}, error_{static_cast<Error::Code>(code)} {}
: error_message_{msg}, error_{static_cast<Error::Code>(code)} {}
Error::Error(std::string &&msg, Error::Code code)
: error_message{std::move(msg)}, error_{static_cast<Error::Code>(code)} {}
: error_message_{std::move(msg)}, error_{static_cast<Error::Code>(code)} {}
Error::Error(Error &&error)
: error_message{std::move(error.error_message)}, error_{std::move(
error.error_)} {}
: error_message_{std::move(error.error_message_)}, error_{std::move(
error.error_)} {}
const std::string_view Error::message() const {
@ -27,7 +27,7 @@ const std::string_view Error::message() const {
return "Error in class Error. Good luck :)";
}
},
error_message);
error_message_);
}
bool Error::failed() const {
@ -46,9 +46,9 @@ Error Error::copyError() const {
Error error;
error.error_ = error_;
try {
error.error_message = error_message;
error.error_message_ = error_message_;
} catch (const std::bad_alloc &) {
error.error_message =
error.error_message_ =
std::string_view{"Error while copying Error string. Out of memory"};
}
return error;

View File

@ -5,67 +5,66 @@
namespace saw {
AsyncIoStream::AsyncIoStream(Own<IoStream> str)
: stream{std::move(str)}, read_ready{stream->readReady()
.then([this]() {
read_stepper.readStep(*stream);
})
.sink()},
write_ready{stream->writeReady()
.then([this]() { write_stepper.writeStep(*stream); })
: stream_{std::move(str)},
read_ready_{stream_->readReady()
.then([this]() { read_stepper_.readStep(*stream_); })
.sink()},
read_disconnected{stream->onReadDisconnected()
.then([this]() {
if (read_stepper.on_read_disconnect) {
read_stepper.on_read_disconnect->feed();
}
})
.sink()} {}
write_ready_{stream_->writeReady()
.then([this]() { write_stepper_.writeStep(*stream_); })
.sink()},
read_disconnected_{stream_->onReadDisconnected()
.then([this]() {
if (read_stepper_.on_read_disconnect) {
read_stepper_.on_read_disconnect->feed();
}
})
.sink()} {}
void AsyncIoStream::read(void *buffer, size_t min_length, size_t max_length) {
SAW_ASSERT(buffer && max_length >= min_length && min_length > 0) { return; }
SAW_ASSERT(!read_stepper.read_task.has_value()) { return; }
SAW_ASSERT(!read_stepper_.read_task.has_value()) { return; }
read_stepper.read_task =
read_stepper_.read_task =
ReadTaskAndStepHelper::ReadIoTask{buffer, min_length, max_length, 0};
read_stepper.readStep(*stream);
read_stepper_.readStep(*stream_);
}
Conveyor<size_t> AsyncIoStream::readDone() {
auto caf = newConveyorAndFeeder<size_t>();
read_stepper.read_done = std::move(caf.feeder);
read_stepper_.read_done = std::move(caf.feeder);
return std::move(caf.conveyor);
}
Conveyor<void> AsyncIoStream::onReadDisconnected() {
auto caf = newConveyorAndFeeder<void>();
read_stepper.on_read_disconnect = std::move(caf.feeder);
read_stepper_.on_read_disconnect = std::move(caf.feeder);
return std::move(caf.conveyor);
}
void AsyncIoStream::write(const void *buffer, size_t length) {
SAW_ASSERT(buffer && length > 0) { return; }
SAW_ASSERT(!write_stepper.write_task.has_value()) { return; }
SAW_ASSERT(!write_stepper_.write_task.has_value()) { return; }
write_stepper.write_task =
write_stepper_.write_task =
WriteTaskAndStepHelper::WriteIoTask{buffer, length, 0};
write_stepper.writeStep(*stream);
write_stepper_.writeStep(*stream_);
}
Conveyor<size_t> AsyncIoStream::writeDone() {
auto caf = newConveyorAndFeeder<size_t>();
write_stepper.write_done = std::move(caf.feeder);
write_stepper_.write_done = std::move(caf.feeder);
return std::move(caf.conveyor);
}
StringNetworkAddress::StringNetworkAddress(const std::string &address,
uint16_t port)
: address_value{address}, port_value{port} {}
: address_value_{address}, port_value_{port} {}
const std::string &StringNetworkAddress::address() const {
return address_value;
return address_value_;
}
uint16_t StringNetworkAddress::port() const { return port_value; }
uint16_t StringNetworkAddress::port() const { return port_value_; }
} // namespace saw

View File

@ -1,8 +1,8 @@
#include "io_auth.h"
namespace saw {
Peer::Peer(const std::string &identity_) : identity_value{identity_} {}
Peer::Peer(std::string &&identity_) : identity_value{std::move(identity_)} {}
Peer::Peer(const std::string &identity_) : identity_value_{identity_} {}
Peer::Peer(std::string &&identity_) : identity_value_{std::move(identity_)} {}
const std::string &Peer::identity() const { return identity_value; }
const std::string &Peer::identity() const { return identity_value_; }
} // namespace saw

View File

@ -1,9 +1,9 @@
#pragma once
#include "async.h"
#include "buffer.h"
#include "io.h"
#include "message.h"
#include "buffer.h"
namespace saw {
@ -54,7 +54,9 @@ private:
OutContainer, BufferT> &peer_)
: peer_{peer_} {}
void feed(HeapMessageRoot<Outgoing, OutContainer> &&data) override { (void)data; }
void feed(HeapMessageRoot<Outgoing, OutContainer> &&data) override {
(void)data;
}
void fail(Error &&error) override { (void)error; }

View File

@ -12,67 +12,68 @@ template <typename Codec, typename Incoming, typename Outgoing,
typename InContainer, typename OutContainer, typename BufferT>
StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer, BufferT>::
StreamingIoPeer(
Own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed_,
Own<AsyncIoStream> stream_, Codec codec_, BufferT in_, BufferT out_)
: incoming_feeder_{std::move(feed_)}, io_stream_{std::move(stream_)},
codec_{std::move(codec_)}, in_buffer_{std::move(in_)}, out_buffer_{std::move(
out_)},
Own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed,
Own<AsyncIoStream> stream, Codec codec, BufferT in, BufferT out)
: incoming_feeder_{std::move(feed)},
io_stream_{std::move(stream)}, codec_{std::move(codec)},
in_buffer_{std::move(in)}, out_buffer_{std::move(out)},
sink_read_{io_stream_->readDone()
.then([this](size_t bytes) -> ErrorOr<void> {
in_buffer_.writeAdvance(bytes);
if (in_buffer_.writeSegmentLength() == 0) {
return criticalError("Message too long");
}
io_stream_->read(&in_buffer_.write(), 1,
in_buffer_.writeSegmentLength());
while (true) {
auto root =
heapMessageRoot<Incoming, InContainer>();
auto builder = root.build();
Error error =
codec_.template decode<Incoming, InContainer>(
builder, in_buffer_);
if (error.isCritical()) {
return error;
}
if (!error.failed()) {
incoming_feeder_->feed(std::move(root));
} else {
break;
}
}
return Void{};
})
.sink([this](Error error) {
incoming_feeder_->fail(error.copyError());
return error;
})},
sink_write_{io_stream_->writeDone()
.then([this](size_t bytes) -> ErrorOr<void> {
out_buffer_.readAdvance(bytes);
if (out_buffer_.readCompositeLength() > 0) {
io_stream_->write(&out_buffer_.read(),
out_buffer_.readSegmentLength());
in_buffer_.writeAdvance(bytes);
if (in_buffer_.writeSegmentLength() == 0) {
return criticalError("Message too long");
}
io_stream_->read(&in_buffer_.write(), 1,
in_buffer_.writeSegmentLength());
while (true) {
auto root =
heapMessageRoot<Incoming, InContainer>();
auto builder = root.build();
Error error =
codec_.template decode<Incoming, InContainer>(
builder, in_buffer_);
if (error.isCritical()) {
return error;
}
if (!error.failed()) {
incoming_feeder_->feed(std::move(root));
} else {
break;
}
}
return Void{};
})
.sink()} {
.sink([this](Error error) {
incoming_feeder_->fail(error.copyError());
return error;
})},
sink_write_{io_stream_->writeDone()
.then([this](size_t bytes) -> ErrorOr<void> {
out_buffer_.readAdvance(bytes);
if (out_buffer_.readCompositeLength() > 0) {
io_stream_->write(
&out_buffer_.read(),
out_buffer_.readSegmentLength());
}
return Void{};
})
.sink()} {
io_stream_->read(&in_buffer_.write(), 1, in_buffer_.writeSegmentLength());
}
template <typename Codec, typename Incoming, typename Outgoing,
typename InContainer, typename OutContainer, typename BufferT>
Error StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer,
BufferT>::send(HeapMessageRoot<Outgoing, OutContainer>
msg) {
BufferT>::send(HeapMessageRoot<Outgoing, OutContainer>
msg) {
bool restart_write = out_buffer_.readSegmentLength() == 0;
Error error =

View File

@ -1,7 +1,7 @@
#include "log.h"
namespace saw {
LogIo::LogIo(EventLoop &loop) : loop{loop} {}
LogIo::LogIo(EventLoop &loop) : loop_{loop} {}
Log::Log(LogIo &central, EventLoop &loop) : central{central}, loop{loop} {}
} // namespace saw
Log::Log(LogIo &central, EventLoop &loop) : central_{central}, loop_{loop} {}
} // namespace saw

View File

@ -23,4 +23,4 @@ private:
public:
LogIo(EventLoop &loop);
};
} // namespace saw
} // namespace saw

View File

@ -1,7 +1,7 @@
#pragma once
#include "string_literal.h"
#include "common.h"
#include "string_literal.h"
namespace saw {
namespace schema {