#include "async.h" #include #include #include #include namespace saw { namespace { thread_local event_loop *local_loop = nullptr; event_loop ¤t_event_loop() { event_loop *loop = local_loop; assert(loop); return *loop; } } // namespace conveyor_node::conveyor_node() {} conveyor_node_with_child_mixin::conveyor_node_with_child_mixin( own &&child_, conveyor_node &owner) : child{std::move(child_)} { assert(child); child->notify_parent_attached(owner); } error_or> conveyor_node_with_child_mixin::swap_child(own &&swapee) { SAW_ASSERT(child) { return make_error("Child should exist if this function is called"); } own old_child = std::move(child); /** * We need the parent of the old_child's next storage */ conveyor_storage *old_storage = old_child->next_storage(); conveyor_storage *old_storage_parent = old_storage ? old_storage->get_parent() : nullptr; /** * Swap in the new child */ if (swapee) { child = std::move(swapee); /** * Then we need to set the new child's storage parent since the next * storage has a nullptr set And if the old_storage_parent is a nullptr, * then it doesn't matter. So we don't check for it */ conveyor_storage *swapee_storage = child->next_storage(); if (swapee_storage) { swapee_storage->set_parent(old_storage_parent); } } return old_child; } conveyor_storage::conveyor_storage() {} conveyor_storage::~conveyor_storage() {} conveyor_storage *conveyor_storage::get_parent() const { return parent_; } void conveyor_event_storage::set_parent(conveyor_storage *p) { /* * parent check isn't needed, but is used * for the assert, because the storage should * be armed if there was an element present * and a valid parent */ if (/*!parent && */ p && !is_armed() && queued() > 0) { assert(!parent_); if (p->space() > 0) { arm_later(); } } parent_ = p; } conveyor_event_storage::conveyor_event_storage() : conveyor_storage{} {} conveyor_base::conveyor_base(own &&node_p) : node_{std::move(node_p)} {} error propagate_error::operator()(const error &error) const { return error.copy_error(); } error propagate_error::operator()(error &&err) { return std::move(err); } event::event() : event(current_event_loop()) {} event::event(event_loop &loop) : loop_{loop} {} event::~event() { disarm(); } void event::arm_next() { assert(&loop_ == local_loop); if (prev_ == nullptr) { // Push the next_insert_point back by one // and inserts itself before that next_ = *loop_.next_insert_point_; prev_ = loop_.next_insert_point_; *prev_ = this; if (next_) { next_->prev_ = &next_; } // Set the new insertion ptr location to next loop_.next_insert_point_ = &next_; // Pushes back the later insert point if it was pointing at the // previous event if (loop_.later_insert_point_ == prev_) { loop_.later_insert_point_ = &next_; } // If tail_ points at the same location then // we are at the end and have to update tail_ then. // Technically should be possible by checking if // next is a `nullptr` if (loop_.tail_ == prev_) { loop_.tail_ = &next_; } loop_.set_runnable(true); } } void event::arm_later() { assert(&loop_ == local_loop); if (prev_ == nullptr) { next_ = *loop_.later_insert_point_; prev_ = loop_.later_insert_point_; *prev_ = this; if (next_) { next_->prev_ = &next_; } loop_.later_insert_point_ = &next_; if (loop_.tail_ == prev_) { loop_.tail_ = &next_; } loop_.set_runnable(true); } } void event::arm_last() { assert(&loop_ == local_loop); if (prev_ == nullptr) { next_ = *loop_.later_insert_point_; prev_ = loop_.later_insert_point_; *prev_ = this; if (next_) { next_->prev_ = &next_; } if (loop_.tail_ == prev_) { loop_.tail_ = &next_; } loop_.set_runnable(true); } } void event::disarm() { if (prev_ != nullptr) { if (loop_.tail_ == &next_) { loop_.tail_ = prev_; } if (loop_.next_insert_point_ == &next_) { loop_.next_insert_point_ = prev_; } *prev_ = next_; if (next_) { next_->prev_ = prev_; } prev_ = nullptr; next_ = nullptr; } } bool event::is_armed() const { return prev_ != nullptr; } conveyor_sink::conveyor_sink() : node_{nullptr} {} conveyor_sink::conveyor_sink(own &&node_p) : node_{std::move(node_p)} {} void event_loop::set_runnable(bool runnable) { is_runnable_ = runnable; } event_loop::event_loop() {} event_loop::event_loop(own &&ep) : event_port_{std::move(ep)} {} event_loop::~event_loop() { assert(local_loop != this); } void event_loop::enter_scope() { assert(!local_loop); local_loop = this; } void event_loop::leave_scope() { assert(local_loop == this); local_loop = nullptr; } bool event_loop::turn_loop() { size_t turn_step = 0; while (head_ && turn_step < 65536) { if (!turn()) { return false; } ++turn_step; } return true; } bool event_loop::turn() { event *event = head_; if (!event) { return false; } head_ = event->next_; if (head_) { head_->prev_ = &head_; } next_insert_point_ = &head_; if (later_insert_point_ == &event->next_) { later_insert_point_ = &head_; } if (tail_ == &event->next_) { tail_ = &head_; } event->next_ = nullptr; event->prev_ = nullptr; next_insert_point_ = &head_; event->fire(); return true; } bool event_loop::wait(const std::chrono::steady_clock::duration &duration) { if (event_port_) { event_port_->wait(duration); } return turn_loop(); } bool event_loop::wait(const std::chrono::steady_clock::time_point &time_point) { if (event_port_) { event_port_->wait(time_point); } return turn_loop(); } bool event_loop::wait() { if (event_port_) { event_port_->wait(); } return turn_loop(); } bool event_loop::poll() { if (event_port_) { event_port_->poll(); } return turn_loop(); } event_port *event_loop::event_port() { return event_port_.get(); } conveyor_sink_set &event_loop::daemon() { if (!daemon_sink_) { daemon_sink_ = heap(); } return *daemon_sink_; } wait_scope::wait_scope(event_loop &loop) : loop_{loop} { loop_.enter_scope(); } wait_scope::~wait_scope() { loop_.leave_scope(); } void wait_scope::wait() { loop_.wait(); } void wait_scope::wait(const std::chrono::steady_clock::duration &duration) { loop_.wait(duration); } void wait_scope::wait(const std::chrono::steady_clock::time_point &time_point) { loop_.wait(time_point); } void wait_scope::poll() { loop_.poll(); } error_or> convert_conveyor_node_base::swap_child(own &&swapee) noexcept { return child_mixin_.swap_child(std::move(swapee)); } conveyor_storage *convert_conveyor_node_base::next_storage() noexcept { if (!child_mixin_.child) { return nullptr; } return child_mixin_.child->next_storage(); } immediate_conveyor_node_base::immediate_conveyor_node_base() : conveyor_event_storage{} {} merge_conveyor_node_base::merge_conveyor_node_base() : conveyor_event_storage{} {} error_or> queue_buffer_conveyor_node_base::swap_child( own &&swapee_) noexcept { return child_mixin_.swap_child(std::move(swapee_)); } void conveyor_sink_set::destroy_sink_conveyor_node(conveyor_node &node) { if (!is_armed()) { arm_last(); } delete_nodes_.push(&node); } void conveyor_sink_set::fail(error &&error) { /// @todo call error_handler } conveyor_sink_set::conveyor_sink_set(event_loop &event_loop) : event{event_loop} {} void conveyor_sink_set::add(conveyor &&sink) { auto nas = conveyor::from_conveyor(std::move(sink)); SAW_ASSERT(nas) { return; } conveyor_storage *storage = nas->next_storage(); own sink_node = nullptr; try { sink_node = heap(std::move(nas), *this); } catch (std::bad_alloc &) { return; } if (storage) { storage->set_parent(sink_node.get()); } sink_nodes_.emplace_back(std::move(sink_node)); } void conveyor_sink_set::fire() { while (!delete_nodes_.empty()) { conveyor_node *node = delete_nodes_.front(); /*auto erased = */ std::remove_if(sink_nodes_.begin(), sink_nodes_.end(), [node](own &element) { return node == element.get(); }); delete_nodes_.pop(); } } convert_conveyor_node_base::convert_conveyor_node_base(own &&dep) : child_mixin_{std::move(dep), *this} {} void convert_conveyor_node_base::get_result(error_or_value &err_or_val) { get_impl(err_or_val); } void attach_conveyor_node_base::get_result( error_or_value &err_or_val) noexcept { if (child_mixin_.child) { child_mixin_.child->get_result(err_or_val); } } error_or> attach_conveyor_node_base::swap_child(own &&swapee_) noexcept { return child_mixin_.swap_child(std::move(swapee_)); } conveyor_storage *attach_conveyor_node_base::next_storage() noexcept { if (!child_mixin_.child) { return nullptr; } return child_mixin_.child->next_storage(); } void detach_conveyor(conveyor &&conveyor) { event_loop &loop = current_event_loop(); conveyor_sink_set &sink = loop.daemon(); sink.add(std::move(conveyor)); } } // namespace saw