diff options
Diffstat (limited to 'src/async/async.cpp')
-rw-r--r-- | src/async/async.cpp | 419 |
1 files changed, 419 insertions, 0 deletions
diff --git a/src/async/async.cpp b/src/async/async.cpp new file mode 100644 index 0000000..c53ffa6 --- /dev/null +++ b/src/async/async.cpp @@ -0,0 +1,419 @@ +#include "async.h" +#include <forstio/core/common.h> +#include <forstio/core/error.h> + +#include <algorithm> +#include <cassert> + +namespace saw { +namespace { +thread_local event_loop *local_loop = nullptr; + +event_loop ¤t_event_loop() { + event_loop *loop = local_loop; + assert(loop); + return *loop; +} +} // namespace + +conveyor_node::conveyor_node() {} + +conveyor_node_with_child_mixin::conveyor_node_with_child_mixin( + own<conveyor_node> &&child_, conveyor_node &owner) + : child{std::move(child_)} { + assert(child); + + child->notify_parent_attached(owner); +} + +error_or<own<conveyor_node>> +conveyor_node_with_child_mixin::swap_child(own<conveyor_node> &&swapee) { + SAW_ASSERT(child) { + return make_error<err::invalid_state>("Child should exist if this function is called"); + } + own<conveyor_node> old_child = std::move(child); + + /** + * We need the parent of the old_child's next storage + */ + conveyor_storage *old_storage = old_child->next_storage(); + conveyor_storage *old_storage_parent = + old_storage ? old_storage->get_parent() : nullptr; + + /** + * Swap in the new child + */ + if (swapee) { + child = std::move(swapee); + + /** + * Then we need to set the new child's storage parent since the next + * storage has a nullptr set And if the old_storage_parent is a nullptr, + * then it doesn't matter. So we don't check for it + */ + conveyor_storage *swapee_storage = child->next_storage(); + if (swapee_storage) { + swapee_storage->set_parent(old_storage_parent); + } + } + + return old_child; +} + +conveyor_storage::conveyor_storage() {} + +conveyor_storage::~conveyor_storage() {} + +conveyor_storage *conveyor_storage::get_parent() const { return parent_; } + +void conveyor_event_storage::set_parent(conveyor_storage *p) { + /* + * parent check isn't needed, but is used + * for the assert, because the storage should + * be armed if there was an element present + * and a valid parent + */ + if (/*!parent && */ p && !is_armed() && queued() > 0) { + assert(!parent_); + if (p->space() > 0) { + arm_later(); + } + } + + parent_ = p; +} + +conveyor_event_storage::conveyor_event_storage() : conveyor_storage{} {} + +conveyor_base::conveyor_base(own<conveyor_node> &&node_p) + : node_{std::move(node_p)} {} + +error propagate_error::operator()(const error &error) const { + return error.copy_error(); +} + +error propagate_error::operator()(error &&err) { return std::move(err); } + +event::event() : event(current_event_loop()) {} + +event::event(event_loop &loop) : loop_{loop} {} + +event::~event() { disarm(); } + +void event::arm_next() { + assert(&loop_ == local_loop); + if (prev_ == nullptr) { + // Push the next_insert_point back by one + // and inserts itself before that + next_ = *loop_.next_insert_point_; + prev_ = loop_.next_insert_point_; + *prev_ = this; + if (next_) { + next_->prev_ = &next_; + } + + // Set the new insertion ptr location to next + loop_.next_insert_point_ = &next_; + + // Pushes back the later insert point if it was pointing at the + // previous event + if (loop_.later_insert_point_ == prev_) { + loop_.later_insert_point_ = &next_; + } + + // If tail_ points at the same location then + // we are at the end and have to update tail_ then. + // Technically should be possible by checking if + // next is a `nullptr` + if (loop_.tail_ == prev_) { + loop_.tail_ = &next_; + } + + loop_.set_runnable(true); + } +} + +void event::arm_later() { + assert(&loop_ == local_loop); + + if (prev_ == nullptr) { + next_ = *loop_.later_insert_point_; + prev_ = loop_.later_insert_point_; + *prev_ = this; + if (next_) { + next_->prev_ = &next_; + } + + loop_.later_insert_point_ = &next_; + if (loop_.tail_ == prev_) { + loop_.tail_ = &next_; + } + + loop_.set_runnable(true); + } +} + +void event::arm_last() { + assert(&loop_ == local_loop); + + if (prev_ == nullptr) { + next_ = *loop_.later_insert_point_; + prev_ = loop_.later_insert_point_; + *prev_ = this; + if (next_) { + next_->prev_ = &next_; + } + + if (loop_.tail_ == prev_) { + loop_.tail_ = &next_; + } + + loop_.set_runnable(true); + } +} + +void event::disarm() { + if (prev_ != nullptr) { + if (loop_.tail_ == &next_) { + loop_.tail_ = prev_; + } + + if (loop_.next_insert_point_ == &next_) { + loop_.next_insert_point_ = prev_; + } + + *prev_ = next_; + if (next_) { + next_->prev_ = prev_; + } + + prev_ = nullptr; + next_ = nullptr; + } +} + +bool event::is_armed() const { return prev_ != nullptr; } + +conveyor_sink::conveyor_sink() : node_{nullptr} {} + +conveyor_sink::conveyor_sink(own<conveyor_node> &&node_p) + : node_{std::move(node_p)} {} + +void event_loop::set_runnable(bool runnable) { is_runnable_ = runnable; } + +event_loop::event_loop() {} + +event_loop::event_loop(own<class event_port> &&ep) + : event_port_{std::move(ep)} {} + +event_loop::~event_loop() { assert(local_loop != this); } + +void event_loop::enter_scope() { + assert(!local_loop); + local_loop = this; +} + +void event_loop::leave_scope() { + assert(local_loop == this); + local_loop = nullptr; +} + +bool event_loop::turn_loop() { + size_t turn_step = 0; + while (head_ && turn_step < 65536) { + if (!turn()) { + return false; + } + ++turn_step; + } + return true; +} + +bool event_loop::turn() { + event *event = head_; + + if (!event) { + return false; + } + + head_ = event->next_; + if (head_) { + head_->prev_ = &head_; + } + + next_insert_point_ = &head_; + if (later_insert_point_ == &event->next_) { + later_insert_point_ = &head_; + } + if (tail_ == &event->next_) { + tail_ = &head_; + } + + event->next_ = nullptr; + event->prev_ = nullptr; + + next_insert_point_ = &head_; + + event->fire(); + + return true; +} + +bool event_loop::wait(const std::chrono::steady_clock::duration &duration) { + if (event_port_) { + event_port_->wait(duration); + } + + return turn_loop(); +} + +bool event_loop::wait(const std::chrono::steady_clock::time_point &time_point) { + if (event_port_) { + event_port_->wait(time_point); + } + + return turn_loop(); +} + +bool event_loop::wait() { + if (event_port_) { + event_port_->wait(); + } + + return turn_loop(); +} + +bool event_loop::poll() { + if (event_port_) { + event_port_->poll(); + } + + return turn_loop(); +} + +event_port *event_loop::event_port() { return event_port_.get(); } + +conveyor_sink_set &event_loop::daemon() { + if (!daemon_sink_) { + daemon_sink_ = heap<conveyor_sink_set>(); + } + return *daemon_sink_; +} + +wait_scope::wait_scope(event_loop &loop) : loop_{loop} { loop_.enter_scope(); } + +wait_scope::~wait_scope() { loop_.leave_scope(); } + +void wait_scope::wait() { loop_.wait(); } + +void wait_scope::wait(const std::chrono::steady_clock::duration &duration) { + loop_.wait(duration); +} + +void wait_scope::wait(const std::chrono::steady_clock::time_point &time_point) { + loop_.wait(time_point); +} + +void wait_scope::poll() { loop_.poll(); } + +error_or<own<conveyor_node>> +convert_conveyor_node_base::swap_child(own<conveyor_node> &&swapee) noexcept { + return child_mixin_.swap_child(std::move(swapee)); +} + +conveyor_storage *convert_conveyor_node_base::next_storage() noexcept { + if (!child_mixin_.child) { + return nullptr; + } + return child_mixin_.child->next_storage(); +} + +immediate_conveyor_node_base::immediate_conveyor_node_base() + : conveyor_event_storage{} {} + +merge_conveyor_node_base::merge_conveyor_node_base() + : conveyor_event_storage{} {} + +error_or<own<conveyor_node>> queue_buffer_conveyor_node_base::swap_child( + own<conveyor_node> &&swapee_) noexcept { + return child_mixin_.swap_child(std::move(swapee_)); +} + +void conveyor_sink_set::destroy_sink_conveyor_node(conveyor_node &node) { + if (!is_armed()) { + arm_last(); + } + + delete_nodes_.push(&node); +} + +void conveyor_sink_set::fail(error &&error) { + /// @todo call error_handler +} + +conveyor_sink_set::conveyor_sink_set(event_loop &event_loop) + : event{event_loop} {} + +void conveyor_sink_set::add(conveyor<void> &&sink) { + auto nas = conveyor<void>::from_conveyor(std::move(sink)); + SAW_ASSERT(nas) { return; } + conveyor_storage *storage = nas->next_storage(); + + own<sink_conveyor_node> sink_node = nullptr; + try { + sink_node = heap<sink_conveyor_node>(std::move(nas), *this); + } catch (std::bad_alloc &) { + return; + } + if (storage) { + storage->set_parent(sink_node.get()); + } + + sink_nodes_.emplace_back(std::move(sink_node)); +} + +void conveyor_sink_set::fire() { + while (!delete_nodes_.empty()) { + conveyor_node *node = delete_nodes_.front(); + /*auto erased = */ std::remove_if(sink_nodes_.begin(), + sink_nodes_.end(), + [node](own<conveyor_node> &element) { + return node == element.get(); + }); + delete_nodes_.pop(); + } +} + +convert_conveyor_node_base::convert_conveyor_node_base(own<conveyor_node> &&dep) + : child_mixin_{std::move(dep), *this} {} + +void convert_conveyor_node_base::get_result(error_or_value &err_or_val) { + get_impl(err_or_val); +} + +void attach_conveyor_node_base::get_result( + error_or_value &err_or_val) noexcept { + if (child_mixin_.child) { + child_mixin_.child->get_result(err_or_val); + } +} + +error_or<own<conveyor_node>> +attach_conveyor_node_base::swap_child(own<conveyor_node> &&swapee_) noexcept { + return child_mixin_.swap_child(std::move(swapee_)); +} + +conveyor_storage *attach_conveyor_node_base::next_storage() noexcept { + if (!child_mixin_.child) { + return nullptr; + } + + return child_mixin_.child->next_storage(); +} + +void detach_conveyor(conveyor<void> &&conveyor) { + event_loop &loop = current_event_loop(); + conveyor_sink_set &sink = loop.daemon(); + sink.add(std::move(conveyor)); +} +} // namespace saw |