forstio/source/forstio/async.cpp

420 lines
9.1 KiB
C++

#include "async.h"
#include "common.h"
#include "error.h"
#include <algorithm>
#include <cassert>
namespace saw {
namespace {
thread_local event_loop *local_loop = nullptr;
event_loop &current_event_loop() {
event_loop *loop = local_loop;
assert(loop);
return *loop;
}
} // namespace
conveyor_node::conveyor_node() {}
conveyor_node_with_child_mixin::conveyor_node_with_child_mixin(
own<conveyor_node> &&child_, conveyor_node &owner)
: child{std::move(child_)} {
assert(child);
child->notify_parent_attached(owner);
}
error_or<own<conveyor_node>>
conveyor_node_with_child_mixin::swap_child(own<conveyor_node> &&swapee) {
SAW_ASSERT(child) {
return critical_error("Child should exist if this function is called");
}
own<conveyor_node> old_child = std::move(child);
/**
* We need the parent of the old_child's next storage
*/
conveyor_storage *old_storage = old_child->next_storage();
conveyor_storage *old_storage_parent =
old_storage ? old_storage->get_parent() : nullptr;
/**
* Swap in the new child
*/
if (swapee) {
child = std::move(swapee);
/**
* Then we need to set the new child's storage parent since the next
* storage has a nullptr set And if the old_storage_parent is a nullptr,
* then it doesn't matter. So we don't check for it
*/
conveyor_storage *swapee_storage = child->next_storage();
if (swapee_storage) {
swapee_storage->set_parent(old_storage_parent);
}
}
return old_child;
}
conveyor_storage::conveyor_storage() {}
conveyor_storage::~conveyor_storage() {}
conveyor_storage *conveyor_storage::get_parent() const { return parent_; }
void conveyor_event_storage::set_parent(conveyor_storage *p) {
/*
* parent check isn't needed, but is used
* for the assert, because the storage should
* be armed if there was an element present
* and a valid parent
*/
if (/*!parent && */ p && !is_armed() && queued() > 0) {
assert(!parent_);
if (p->space() > 0) {
arm_later();
}
}
parent_ = p;
}
conveyor_event_storage::conveyor_event_storage() : conveyor_storage{} {}
conveyor_base::conveyor_base(own<conveyor_node> &&node_p)
: node_{std::move(node_p)} {}
error propagate_error::operator()(const error &error) const {
return error.copy_error();
}
error propagate_error::operator()(error &&err) { return std::move(err); }
event::event() : event(current_event_loop()) {}
event::event(event_loop &loop) : loop_{loop} {}
event::~event() { disarm(); }
void event::arm_next() {
assert(&loop_ == local_loop);
if (prev_ == nullptr) {
// Push the next_insert_point back by one
// and inserts itself before that
next_ = *loop_.next_insert_point_;
prev_ = loop_.next_insert_point_;
*prev_ = this;
if (next_) {
next_->prev_ = &next_;
}
// Set the new insertion ptr location to next
loop_.next_insert_point_ = &next_;
// Pushes back the later insert point if it was pointing at the
// previous event
if (loop_.later_insert_point_ == prev_) {
loop_.later_insert_point_ = &next_;
}
// If tail_ points at the same location then
// we are at the end and have to update tail_ then.
// Technically should be possible by checking if
// next is a `nullptr`
if (loop_.tail_ == prev_) {
loop_.tail_ = &next_;
}
loop_.set_runnable(true);
}
}
void event::arm_later() {
assert(&loop_ == local_loop);
if (prev_ == nullptr) {
next_ = *loop_.later_insert_point_;
prev_ = loop_.later_insert_point_;
*prev_ = this;
if (next_) {
next_->prev_ = &next_;
}
loop_.later_insert_point_ = &next_;
if (loop_.tail_ == prev_) {
loop_.tail_ = &next_;
}
loop_.set_runnable(true);
}
}
void event::arm_last() {
assert(&loop_ == local_loop);
if (prev_ == nullptr) {
next_ = *loop_.later_insert_point_;
prev_ = loop_.later_insert_point_;
*prev_ = this;
if (next_) {
next_->prev_ = &next_;
}
if (loop_.tail_ == prev_) {
loop_.tail_ = &next_;
}
loop_.set_runnable(true);
}
}
void event::disarm() {
if (prev_ != nullptr) {
if (loop_.tail_ == &next_) {
loop_.tail_ = prev_;
}
if (loop_.next_insert_point_ == &next_) {
loop_.next_insert_point_ = prev_;
}
*prev_ = next_;
if (next_) {
next_->prev_ = prev_;
}
prev_ = nullptr;
next_ = nullptr;
}
}
bool event::is_armed() const { return prev_ != nullptr; }
conveyor_sink::conveyor_sink() : node_{nullptr} {}
conveyor_sink::conveyor_sink(own<conveyor_node> &&node_p)
: node_{std::move(node_p)} {}
void event_loop::set_runnable(bool runnable) { is_runnable_ = runnable; }
event_loop::event_loop() {}
event_loop::event_loop(own<class event_port> &&ep)
: event_port_{std::move(ep)} {}
event_loop::~event_loop() { assert(local_loop != this); }
void event_loop::enter_scope() {
assert(!local_loop);
local_loop = this;
}
void event_loop::leave_scope() {
assert(local_loop == this);
local_loop = nullptr;
}
bool event_loop::turn_loop() {
size_t turn_step = 0;
while (head_ && turn_step < 65536) {
if (!turn()) {
return false;
}
++turn_step;
}
return true;
}
bool event_loop::turn() {
event *event = head_;
if (!event) {
return false;
}
head_ = event->next_;
if (head_) {
head_->prev_ = &head_;
}
next_insert_point_ = &head_;
if (later_insert_point_ == &event->next_) {
later_insert_point_ = &head_;
}
if (tail_ == &event->next_) {
tail_ = &head_;
}
event->next_ = nullptr;
event->prev_ = nullptr;
next_insert_point_ = &head_;
event->fire();
return true;
}
bool event_loop::wait(const std::chrono::steady_clock::duration &duration) {
if (event_port_) {
event_port_->wait(duration);
}
return turn_loop();
}
bool event_loop::wait(const std::chrono::steady_clock::time_point &time_point) {
if (event_port_) {
event_port_->wait(time_point);
}
return turn_loop();
}
bool event_loop::wait() {
if (event_port_) {
event_port_->wait();
}
return turn_loop();
}
bool event_loop::poll() {
if (event_port_) {
event_port_->poll();
}
return turn_loop();
}
event_port *event_loop::event_port() { return event_port_.get(); }
conveyor_sink_set &event_loop::daemon() {
if (!daemon_sink_) {
daemon_sink_ = heap<conveyor_sink_set>();
}
return *daemon_sink_;
}
wait_scope::wait_scope(event_loop &loop) : loop_{loop} { loop_.enter_scope(); }
wait_scope::~wait_scope() { loop_.leave_scope(); }
void wait_scope::wait() { loop_.wait(); }
void wait_scope::wait(const std::chrono::steady_clock::duration &duration) {
loop_.wait(duration);
}
void wait_scope::wait(const std::chrono::steady_clock::time_point &time_point) {
loop_.wait(time_point);
}
void wait_scope::poll() { loop_.poll(); }
error_or<own<conveyor_node>>
convert_conveyor_node_base::swap_child(own<conveyor_node> &&swapee) noexcept {
return child_mixin_.swap_child(std::move(swapee));
}
conveyor_storage *convert_conveyor_node_base::next_storage() noexcept {
if (!child_mixin_.child) {
return nullptr;
}
return child_mixin_.child->next_storage();
}
immediate_conveyor_node_base::immediate_conveyor_node_base()
: conveyor_event_storage{} {}
merge_conveyor_node_base::merge_conveyor_node_base()
: conveyor_event_storage{} {}
error_or<own<conveyor_node>> queue_buffer_conveyor_node_base::swap_child(
own<conveyor_node> &&swapee_) noexcept {
return child_mixin_.swap_child(std::move(swapee_));
}
void conveyor_sink_set::destroy_sink_conveyor_node(conveyor_node &node) {
if (!is_armed()) {
arm_last();
}
delete_nodes_.push(&node);
}
void conveyor_sink_set::fail(error &&error) {
/// @todo call error_handler
}
conveyor_sink_set::conveyor_sink_set(event_loop &event_loop)
: event{event_loop} {}
void conveyor_sink_set::add(conveyor<void> &&sink) {
auto nas = conveyor<void>::from_conveyor(std::move(sink));
SAW_ASSERT(nas) { return; }
conveyor_storage *storage = nas->next_storage();
own<sink_conveyor_node> sink_node = nullptr;
try {
sink_node = heap<sink_conveyor_node>(std::move(nas), *this);
} catch (std::bad_alloc &) {
return;
}
if (storage) {
storage->set_parent(sink_node.get());
}
sink_nodes_.emplace_back(std::move(sink_node));
}
void conveyor_sink_set::fire() {
while (!delete_nodes_.empty()) {
conveyor_node *node = delete_nodes_.front();
/*auto erased = */ std::remove_if(sink_nodes_.begin(),
sink_nodes_.end(),
[node](own<conveyor_node> &element) {
return node == element.get();
});
delete_nodes_.pop();
}
}
convert_conveyor_node_base::convert_conveyor_node_base(own<conveyor_node> &&dep)
: child_mixin_{std::move(dep), *this} {}
void convert_conveyor_node_base::get_result(error_or_value &err_or_val) {
get_impl(err_or_val);
}
void attach_conveyor_node_base::get_result(
error_or_value &err_or_val) noexcept {
if (child_mixin_.child) {
child_mixin_.child->get_result(err_or_val);
}
}
error_or<own<conveyor_node>>
attach_conveyor_node_base::swap_child(own<conveyor_node> &&swapee_) noexcept {
return child_mixin_.swap_child(std::move(swapee_));
}
conveyor_storage *attach_conveyor_node_base::next_storage() noexcept {
if (!child_mixin_.child) {
return nullptr;
}
return child_mixin_.child->next_storage();
}
void detach_conveyor(conveyor<void> &&conveyor) {
event_loop &loop = current_event_loop();
conveyor_sink_set &sink = loop.daemon();
sink.add(std::move(conveyor));
}
} // namespace saw