summaryrefslogtreecommitdiff
path: root/src/async/async.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/async/async.cpp')
-rw-r--r--src/async/async.cpp419
1 files changed, 419 insertions, 0 deletions
diff --git a/src/async/async.cpp b/src/async/async.cpp
new file mode 100644
index 0000000..c53ffa6
--- /dev/null
+++ b/src/async/async.cpp
@@ -0,0 +1,419 @@
+#include "async.h"
+#include <forstio/core/common.h>
+#include <forstio/core/error.h>
+
+#include <algorithm>
+#include <cassert>
+
+namespace saw {
+namespace {
+thread_local event_loop *local_loop = nullptr;
+
+event_loop &current_event_loop() {
+ event_loop *loop = local_loop;
+ assert(loop);
+ return *loop;
+}
+} // namespace
+
+conveyor_node::conveyor_node() {}
+
+conveyor_node_with_child_mixin::conveyor_node_with_child_mixin(
+ own<conveyor_node> &&child_, conveyor_node &owner)
+ : child{std::move(child_)} {
+ assert(child);
+
+ child->notify_parent_attached(owner);
+}
+
+error_or<own<conveyor_node>>
+conveyor_node_with_child_mixin::swap_child(own<conveyor_node> &&swapee) {
+ SAW_ASSERT(child) {
+ return make_error<err::invalid_state>("Child should exist if this function is called");
+ }
+ own<conveyor_node> old_child = std::move(child);
+
+ /**
+ * We need the parent of the old_child's next storage
+ */
+ conveyor_storage *old_storage = old_child->next_storage();
+ conveyor_storage *old_storage_parent =
+ old_storage ? old_storage->get_parent() : nullptr;
+
+ /**
+ * Swap in the new child
+ */
+ if (swapee) {
+ child = std::move(swapee);
+
+ /**
+ * Then we need to set the new child's storage parent since the next
+ * storage has a nullptr set And if the old_storage_parent is a nullptr,
+ * then it doesn't matter. So we don't check for it
+ */
+ conveyor_storage *swapee_storage = child->next_storage();
+ if (swapee_storage) {
+ swapee_storage->set_parent(old_storage_parent);
+ }
+ }
+
+ return old_child;
+}
+
+conveyor_storage::conveyor_storage() {}
+
+conveyor_storage::~conveyor_storage() {}
+
+conveyor_storage *conveyor_storage::get_parent() const { return parent_; }
+
+void conveyor_event_storage::set_parent(conveyor_storage *p) {
+ /*
+ * parent check isn't needed, but is used
+ * for the assert, because the storage should
+ * be armed if there was an element present
+ * and a valid parent
+ */
+ if (/*!parent && */ p && !is_armed() && queued() > 0) {
+ assert(!parent_);
+ if (p->space() > 0) {
+ arm_later();
+ }
+ }
+
+ parent_ = p;
+}
+
+conveyor_event_storage::conveyor_event_storage() : conveyor_storage{} {}
+
+conveyor_base::conveyor_base(own<conveyor_node> &&node_p)
+ : node_{std::move(node_p)} {}
+
+error propagate_error::operator()(const error &error) const {
+ return error.copy_error();
+}
+
+error propagate_error::operator()(error &&err) { return std::move(err); }
+
+event::event() : event(current_event_loop()) {}
+
+event::event(event_loop &loop) : loop_{loop} {}
+
+event::~event() { disarm(); }
+
+void event::arm_next() {
+ assert(&loop_ == local_loop);
+ if (prev_ == nullptr) {
+ // Push the next_insert_point back by one
+ // and inserts itself before that
+ next_ = *loop_.next_insert_point_;
+ prev_ = loop_.next_insert_point_;
+ *prev_ = this;
+ if (next_) {
+ next_->prev_ = &next_;
+ }
+
+ // Set the new insertion ptr location to next
+ loop_.next_insert_point_ = &next_;
+
+ // Pushes back the later insert point if it was pointing at the
+ // previous event
+ if (loop_.later_insert_point_ == prev_) {
+ loop_.later_insert_point_ = &next_;
+ }
+
+ // If tail_ points at the same location then
+ // we are at the end and have to update tail_ then.
+ // Technically should be possible by checking if
+ // next is a `nullptr`
+ if (loop_.tail_ == prev_) {
+ loop_.tail_ = &next_;
+ }
+
+ loop_.set_runnable(true);
+ }
+}
+
+void event::arm_later() {
+ assert(&loop_ == local_loop);
+
+ if (prev_ == nullptr) {
+ next_ = *loop_.later_insert_point_;
+ prev_ = loop_.later_insert_point_;
+ *prev_ = this;
+ if (next_) {
+ next_->prev_ = &next_;
+ }
+
+ loop_.later_insert_point_ = &next_;
+ if (loop_.tail_ == prev_) {
+ loop_.tail_ = &next_;
+ }
+
+ loop_.set_runnable(true);
+ }
+}
+
+void event::arm_last() {
+ assert(&loop_ == local_loop);
+
+ if (prev_ == nullptr) {
+ next_ = *loop_.later_insert_point_;
+ prev_ = loop_.later_insert_point_;
+ *prev_ = this;
+ if (next_) {
+ next_->prev_ = &next_;
+ }
+
+ if (loop_.tail_ == prev_) {
+ loop_.tail_ = &next_;
+ }
+
+ loop_.set_runnable(true);
+ }
+}
+
+void event::disarm() {
+ if (prev_ != nullptr) {
+ if (loop_.tail_ == &next_) {
+ loop_.tail_ = prev_;
+ }
+
+ if (loop_.next_insert_point_ == &next_) {
+ loop_.next_insert_point_ = prev_;
+ }
+
+ *prev_ = next_;
+ if (next_) {
+ next_->prev_ = prev_;
+ }
+
+ prev_ = nullptr;
+ next_ = nullptr;
+ }
+}
+
+bool event::is_armed() const { return prev_ != nullptr; }
+
+conveyor_sink::conveyor_sink() : node_{nullptr} {}
+
+conveyor_sink::conveyor_sink(own<conveyor_node> &&node_p)
+ : node_{std::move(node_p)} {}
+
+void event_loop::set_runnable(bool runnable) { is_runnable_ = runnable; }
+
+event_loop::event_loop() {}
+
+event_loop::event_loop(own<class event_port> &&ep)
+ : event_port_{std::move(ep)} {}
+
+event_loop::~event_loop() { assert(local_loop != this); }
+
+void event_loop::enter_scope() {
+ assert(!local_loop);
+ local_loop = this;
+}
+
+void event_loop::leave_scope() {
+ assert(local_loop == this);
+ local_loop = nullptr;
+}
+
+bool event_loop::turn_loop() {
+ size_t turn_step = 0;
+ while (head_ && turn_step < 65536) {
+ if (!turn()) {
+ return false;
+ }
+ ++turn_step;
+ }
+ return true;
+}
+
+bool event_loop::turn() {
+ event *event = head_;
+
+ if (!event) {
+ return false;
+ }
+
+ head_ = event->next_;
+ if (head_) {
+ head_->prev_ = &head_;
+ }
+
+ next_insert_point_ = &head_;
+ if (later_insert_point_ == &event->next_) {
+ later_insert_point_ = &head_;
+ }
+ if (tail_ == &event->next_) {
+ tail_ = &head_;
+ }
+
+ event->next_ = nullptr;
+ event->prev_ = nullptr;
+
+ next_insert_point_ = &head_;
+
+ event->fire();
+
+ return true;
+}
+
+bool event_loop::wait(const std::chrono::steady_clock::duration &duration) {
+ if (event_port_) {
+ event_port_->wait(duration);
+ }
+
+ return turn_loop();
+}
+
+bool event_loop::wait(const std::chrono::steady_clock::time_point &time_point) {
+ if (event_port_) {
+ event_port_->wait(time_point);
+ }
+
+ return turn_loop();
+}
+
+bool event_loop::wait() {
+ if (event_port_) {
+ event_port_->wait();
+ }
+
+ return turn_loop();
+}
+
+bool event_loop::poll() {
+ if (event_port_) {
+ event_port_->poll();
+ }
+
+ return turn_loop();
+}
+
+event_port *event_loop::event_port() { return event_port_.get(); }
+
+conveyor_sink_set &event_loop::daemon() {
+ if (!daemon_sink_) {
+ daemon_sink_ = heap<conveyor_sink_set>();
+ }
+ return *daemon_sink_;
+}
+
+wait_scope::wait_scope(event_loop &loop) : loop_{loop} { loop_.enter_scope(); }
+
+wait_scope::~wait_scope() { loop_.leave_scope(); }
+
+void wait_scope::wait() { loop_.wait(); }
+
+void wait_scope::wait(const std::chrono::steady_clock::duration &duration) {
+ loop_.wait(duration);
+}
+
+void wait_scope::wait(const std::chrono::steady_clock::time_point &time_point) {
+ loop_.wait(time_point);
+}
+
+void wait_scope::poll() { loop_.poll(); }
+
+error_or<own<conveyor_node>>
+convert_conveyor_node_base::swap_child(own<conveyor_node> &&swapee) noexcept {
+ return child_mixin_.swap_child(std::move(swapee));
+}
+
+conveyor_storage *convert_conveyor_node_base::next_storage() noexcept {
+ if (!child_mixin_.child) {
+ return nullptr;
+ }
+ return child_mixin_.child->next_storage();
+}
+
+immediate_conveyor_node_base::immediate_conveyor_node_base()
+ : conveyor_event_storage{} {}
+
+merge_conveyor_node_base::merge_conveyor_node_base()
+ : conveyor_event_storage{} {}
+
+error_or<own<conveyor_node>> queue_buffer_conveyor_node_base::swap_child(
+ own<conveyor_node> &&swapee_) noexcept {
+ return child_mixin_.swap_child(std::move(swapee_));
+}
+
+void conveyor_sink_set::destroy_sink_conveyor_node(conveyor_node &node) {
+ if (!is_armed()) {
+ arm_last();
+ }
+
+ delete_nodes_.push(&node);
+}
+
+void conveyor_sink_set::fail(error &&error) {
+ /// @todo call error_handler
+}
+
+conveyor_sink_set::conveyor_sink_set(event_loop &event_loop)
+ : event{event_loop} {}
+
+void conveyor_sink_set::add(conveyor<void> &&sink) {
+ auto nas = conveyor<void>::from_conveyor(std::move(sink));
+ SAW_ASSERT(nas) { return; }
+ conveyor_storage *storage = nas->next_storage();
+
+ own<sink_conveyor_node> sink_node = nullptr;
+ try {
+ sink_node = heap<sink_conveyor_node>(std::move(nas), *this);
+ } catch (std::bad_alloc &) {
+ return;
+ }
+ if (storage) {
+ storage->set_parent(sink_node.get());
+ }
+
+ sink_nodes_.emplace_back(std::move(sink_node));
+}
+
+void conveyor_sink_set::fire() {
+ while (!delete_nodes_.empty()) {
+ conveyor_node *node = delete_nodes_.front();
+ /*auto erased = */ std::remove_if(sink_nodes_.begin(),
+ sink_nodes_.end(),
+ [node](own<conveyor_node> &element) {
+ return node == element.get();
+ });
+ delete_nodes_.pop();
+ }
+}
+
+convert_conveyor_node_base::convert_conveyor_node_base(own<conveyor_node> &&dep)
+ : child_mixin_{std::move(dep), *this} {}
+
+void convert_conveyor_node_base::get_result(error_or_value &err_or_val) {
+ get_impl(err_or_val);
+}
+
+void attach_conveyor_node_base::get_result(
+ error_or_value &err_or_val) noexcept {
+ if (child_mixin_.child) {
+ child_mixin_.child->get_result(err_or_val);
+ }
+}
+
+error_or<own<conveyor_node>>
+attach_conveyor_node_base::swap_child(own<conveyor_node> &&swapee_) noexcept {
+ return child_mixin_.swap_child(std::move(swapee_));
+}
+
+conveyor_storage *attach_conveyor_node_base::next_storage() noexcept {
+ if (!child_mixin_.child) {
+ return nullptr;
+ }
+
+ return child_mixin_.child->next_storage();
+}
+
+void detach_conveyor(conveyor<void> &&conveyor) {
+ event_loop &loop = current_event_loop();
+ conveyor_sink_set &sink = loop.daemon();
+ sink.add(std::move(conveyor));
+}
+} // namespace saw