#pragma once #include #include #include // Template inlining #include namespace saw { template conveyor_result execLater(Func &&func) { conveyor conveyor{fix_void{}}; return conveyor.then(std::move(func)); } template conveyor::conveyor(fix_void value) : conveyor_base(nullptr) { // Is there any way to do this? // @todo new conveyor_base constructor for Immediate values own>> immediate = heap>>(std::move(value)); if (!immediate) { return; } node_ = std::move(immediate); } template conveyor::conveyor(error &&err) : conveyor_base(nullptr) { own>> immediate = heap>>(std::move(err)); if (!immediate) { return; } node_ = std::move(immediate); } template conveyor::conveyor(own node_p) : conveyor_base{std::move(node_p)} {} template template conveyor_result conveyor::then(Func &&func, ErrorFunc &&error_func) { own conversion_node = heap>, fix_void, Func, ErrorFunc>>( std::move(node_), std::move(func), std::move(error_func)); return conveyor>>::to_conveyor( std::move(conversion_node)); } template conveyor conveyor::buffer(size_t size) { SAW_ASSERT(node_) { return conveyor{own{nullptr}}; } conveyor_storage *storage = node_->next_storage(); SAW_ASSERT(storage) { return conveyor{own{nullptr}}; } own>> storage_node = heap>>(std::move(node_), size); conveyor_storage *storage_ptr = static_cast(storage_node.get()); storage->set_parent(storage_ptr); return conveyor{std::move(storage_node)}; } template template conveyor conveyor::attach(Args &&...args) { own> attach_node = heap>(std::move(node_), std::move(args...)); return conveyor{std::move(attach_node)}; } template std::pair, merge_conveyor> conveyor::merge() { our> data = share>(); own> merge_node = heap>(data); SAW_ASSERT(node_) { return std::make_pair(conveyor{own{nullptr}}, merge_conveyor{}); } conveyor_storage *storage = node_->next_storage(); SAW_ASSERT(storage) { return std::make_pair(conveyor{own{nullptr}}, merge_conveyor{}); } data->attach(conveyor::to_conveyor(std::move(node_))); merge_conveyor node_ref{data}; return std::make_pair(conveyor{std::move(merge_node)}, std::move(node_ref)); } template <> template conveyor_sink conveyor::sink(ErrorFunc &&error_func) { conveyor_storage *storage = node_->next_storage(); SAW_ASSERT(storage) { return conveyor_sink{}; } own sink_node = heap(std::move(node_)); conveyor_storage *storage_ptr = static_cast(sink_node.get()); storage->set_parent(storage_ptr); return conveyor_sink{std::move(sink_node)}; } void detach_conveyor(conveyor &&conveyor); template template void conveyor::detach(ErrorFunc &&func) { detach_conveyor(std::move(then([](T &&) {}, std::move(func)))); } template <> template void conveyor::detach(ErrorFunc &&func) { detach_conveyor(std::move(then([]() {}, std::move(func)))); } template conveyor conveyor::to_conveyor(own node) { return conveyor{std::move(node)}; } template own conveyor::from_conveyor(conveyor conveyor) { return std::move(conveyor.node_); } template error_or> conveyor::take() { SAW_ASSERT(node_) { return error_or>{ critical_error("conveyor in invalid state")}; } conveyor_storage *storage = node_->next_storage(); if (storage) { if (storage->queued() > 0) { error_or> result; node_->get_result(result); return result; } else { return error_or>{ recoverable_error("conveyor buffer has no elements")}; } } else { return error_or>{ critical_error("conveyor node has no child storage")}; } } template conveyor_and_feeder new_conveyor_and_feeder() { own>> feeder = heap>>(); own>> node = heap>>(); feeder->set_feedee(node.get()); node->set_feeder(feeder.get()); return conveyor_and_feeder{std::move(feeder), conveyor::to_conveyor(std::move(node))}; } // QueueBuffer template void queue_buffer_conveyor_node::fire() { if (child_mixin_.child) { if (!storage_.empty()) { if (storage_.front().is_error()) { if (storage_.front().error().is_critical()) { child_mixin_.child = nullptr; } } } } bool has_space_before_fire = space() > 0; if (parent_) { parent_->child_has_fired(); if (!storage_.empty() && parent_->space() > 0) { arm_later(); } } if (!child_mixin_.child) { while (!storage_.empty()) { storage_.pop(); } return; } conveyor_storage *ch_storage = child_mixin_.child->next_storage(); if (ch_storage && !has_space_before_fire) { ch_storage->parent_has_fired(); } } template void queue_buffer_conveyor_node::get_result(error_or_value &eov) noexcept { error_or &err_or_val = eov.as(); err_or_val = std::move(storage_.front()); storage_.pop(); } template size_t queue_buffer_conveyor_node::space() const { return max_store_ - storage_.size(); } template size_t queue_buffer_conveyor_node::queued() const { return storage_.size(); } template void queue_buffer_conveyor_node::child_has_fired() { if (child_mixin_.child && storage_.size() < max_store_) { error_or eov; child_mixin_.child->get_result(eov); if (eov.is_error()) { if (eov.error().is_critical()) { } } storage_.push(std::move(eov)); if (!is_armed()) { arm_later(); } } } template void queue_buffer_conveyor_node::parent_has_fired() { SAW_ASSERT(parent_) { return; } if (parent_->space() == 0) { return; } if (queued() > 0) { arm_later(); } } template immediate_conveyor_node::immediate_conveyor_node(fix_void &&val) : value_{std::move(val)}, retrieved_{0} {} template immediate_conveyor_node::immediate_conveyor_node(error &&error) : value_{std::move(error)}, retrieved_{0} {} template size_t immediate_conveyor_node::space() const { return 0; } template size_t immediate_conveyor_node::queued() const { return retrieved_ > 1 ? 0 : 1; } template void immediate_conveyor_node::child_has_fired() { // Impossible case assert(false); } template void immediate_conveyor_node::parent_has_fired() { SAW_ASSERT(parent_) { return; } assert(parent_->space() > 0); if (queued() > 0) { arm_next(); } } template void immediate_conveyor_node::fire() { if (parent_) { parent_->child_has_fired(); if (queued() > 0 && parent_->space() > 0) { arm_last(); } } } template merge_conveyor::merge_conveyor(lent> d) : data_{std::move(d)} {} template merge_conveyor::~merge_conveyor() {} template void merge_conveyor::attach(conveyor conveyor) { auto sp = data_.lock(); SAW_ASSERT(sp) { return; } sp->attach(std::move(conveyor)); } template merge_conveyor_node::merge_conveyor_node(our> d) : data_{d} { SAW_ASSERT(data_) { return; } data_->merger = this; } template merge_conveyor_node::~merge_conveyor_node() {} template error_or> merge_conveyor_node::swap_child(own &&swapee_) noexcept { (void)swapee_; return critical_error( "merge_conveyor_node::appendage should block calls to this class"); } template void merge_conveyor_node::get_result(error_or_value &eov) noexcept { error_or> &err_or_val = eov.as>(); SAW_ASSERT(data_) { return; } /// @todo search appendages for result auto &appendages = data_->appendages; next_appendage_ = std::min(appendages.size(), next_appendage_); for (size_t i = next_appendage_; i < appendages.size(); ++i) { if (appendages[i]->queued() > 0) { err_or_val = std::move(appendages[i]->error_or_value_.value()); appendages[i]->error_or_value_ = std::nullopt; next_appendage_ = i + 1; return; } } for (size_t i = 0; i < next_appendage_; ++i) { if (appendages[i]->queued() > 0) { err_or_val = std::move(appendages[i]->error_or_value_.value()); appendages[i]->error_or_value_ = std::nullopt; next_appendage_ = i + 1; return; } } err_or_val = critical_error("No value in Merge appendages"); } template void merge_conveyor_node::fire() { SAW_ASSERT(queued() > 0) { return; } if (parent_) { parent_->child_has_fired(); if (queued() > 0 && parent_->space() > 0) { arm_later(); } } } template size_t merge_conveyor_node::space() const { return 0; } template size_t merge_conveyor_node::queued() const { SAW_ASSERT(data_) { return 0; } size_t queue_count = 0; for (auto &iter : data_->appendages) { queue_count += iter->queued(); } return queue_count; } template void merge_conveyor_node::child_has_fired() { /// This can never happen assert(false); } template void merge_conveyor_node::parent_has_fired() { SAW_ASSERT(parent_) { return; } if (queued() > 0) { if (parent_->space() > 0) { arm_later(); } } } /** * merge_conveyor_node::Apendage */ template error_or> merge_conveyor_node::appendage::swap_child(own &&swapee_) { own old_child = std::move(child); child = std::move(swapee_); // This case should never happen SAW_ASSERT(old_child) { return critical_error("No child exists"); } return old_child; } template void merge_conveyor_node::appendage::get_result(error_or_value &eov) { error_or> &err_or_val = eov.as>(); SAW_ASSERT(queued() > 0) { err_or_val = critical_error("No element queued in Merge appendage Node"); return; } err_or_val = std::move(error_or_value_.value()); error_or_value_ = std::nullopt; } template size_t merge_conveyor_node::appendage::space() const { SAW_ASSERT(merger) { return 0; } if (error_or_value_.has_value()) { return 0; } return 1; } template size_t merge_conveyor_node::appendage::queued() const { SAW_ASSERT(merger) { return 0; } if (error_or_value_.has_value()) { return 1; } return 0; } /// @todo delete this function. Replaced by the regular get_result template void merge_conveyor_node::appendage::get_appendage_result( error_or_value &eov) { error_or> &err_or_val = eov.as>(); SAW_ASSERT(queued() > 0) { err_or_val = critical_error("No element queued in Merge appendage Node"); return; } err_or_val = std::move(error_or_value_.value()); error_or_value_ = std::nullopt; } template void merge_conveyor_node::appendage::child_has_fired() { SAW_ASSERT(!error_or_value_.has_value()) { return; } error_or> eov; child->get_result(eov); error_or_value_ = std::move(eov); if (!merger->is_armed()) { merger->arm_later(); } } template void merge_conveyor_node::appendage::parent_has_fired() { conveyor_storage *child_storage = child->next_storage(); if (child_storage) { child_storage->parent_has_fired(); } } template void merge_conveyor_node::appendage::set_parent(conveyor_storage *par) { SAW_ASSERT(merger) { return; } SAW_ASSERT(child) { return; } parent_ = par; } template void merge_conveyor_node_data::attach(conveyor conv) { auto nas = conveyor::from_conveyor(std::move(conv)); SAW_ASSERT(nas) { return; } conveyor_storage *storage = nas->next_storage(); SAW_ASSERT(storage) { return; } auto merge_node_appendage = heap::appendage>(std::move(nas), *merger); auto merge_node_appendage_ptr = merge_node_appendage.get(); storage->set_parent(merge_node_appendage.get()); SAW_ASSERT(merger) { return; } conveyor_storage *mrg_storage = merger->next_storage(); SAW_ASSERT(mrg_storage) { return; } merge_node_appendage->set_parent(mrg_storage); appendages.push_back(std::move(merge_node_appendage)); /// @todo return this. necessary? maybe for the weird linking setup /// maybe not // return merge_node_appendage_ptr; } template void merge_conveyor_node_data::governing_node_destroyed() { appendages.clear(); merger = nullptr; } template adapt_conveyor_feeder::~adapt_conveyor_feeder() { if (feedee_) { feedee_->set_feeder(nullptr); feedee_ = nullptr; } } template void adapt_conveyor_feeder::set_feedee(adapt_conveyor_node *feedee_p) { feedee_ = feedee_p; } template void adapt_conveyor_feeder::feed(T &&value) { if (feedee_) { feedee_->feed(std::move(value)); } } template void adapt_conveyor_feeder::fail(error &&error) { if (feedee_) { feedee_->fail(std::move(error)); } } template size_t adapt_conveyor_feeder::queued() const { if (feedee_) { return feedee_->queued(); } return 0; } template size_t adapt_conveyor_feeder::space() const { if (feedee_) { return feedee_->space(); } return 0; } template error adapt_conveyor_feeder::swap(conveyor &&conv) noexcept { SAW_ASSERT(feedee_) { return critical_error("No feedee connected"); } auto node = conveyor::from_conveyor(std::move(conv)); feedee_->swap_child(std::move(node)); return no_error(); } template adapt_conveyor_node::adapt_conveyor_node() : conveyor_event_storage{} {} template adapt_conveyor_node::~adapt_conveyor_node() { if (feeder_) { feeder_->set_feedee(nullptr); feeder_ = nullptr; } } template error_or> adapt_conveyor_node::swap_child(own &&swapee) noexcept { // This should return the owning pointer of this instance auto myself_err = parent_node_.swap_child_of_parent(std::move(swapee)); if (myself_err.is_error()) { return myself_err; } auto &myself = myself_err.value(); assert(myself.get() == this); return myself_err; } template conveyor_storage *adapt_conveyor_node::next_storage() noexcept { return static_cast(this); } template void adapt_conveyor_node::notify_parent_attached( conveyor_node &par) noexcept { parent_node_.change_parent(&par); } template void adapt_conveyor_node::set_feeder(adapt_conveyor_feeder *feeder_p) { feeder_ = feeder_p; } template void adapt_conveyor_node::feed(T &&value) { storage_.push(std::move(value)); arm_next(); } template void adapt_conveyor_node::fail(error &&error) { storage_.push(std::move(error)); arm_next(); } template size_t adapt_conveyor_node::queued() const { return storage_.size(); } template size_t adapt_conveyor_node::space() const { return std::numeric_limits::max() - storage_.size(); } template void adapt_conveyor_node::get_result(error_or_value &err_or_val) { if (!storage_.empty()) { err_or_val.as() = std::move(storage_.front()); storage_.pop(); } else { err_or_val.as() = critical_error( "Signal for retrieval of storage sent even though no " "data is present"); } } template void adapt_conveyor_node::child_has_fired() { // Adapt node has no children assert(false); } template void adapt_conveyor_node::parent_has_fired() { SAW_ASSERT(parent_) { return; } if (parent_->space() == 0) { return; } } template void adapt_conveyor_node::fire() { if (parent_) { parent_->child_has_fired(); if (storage_.size() > 0) { arm_later(); } } } template one_time_conveyor_feeder::~one_time_conveyor_feeder() { if (feedee_) { feedee_->set_feeder(nullptr); feedee_ = nullptr; } } template void one_time_conveyor_feeder::set_feedee( one_time_conveyor_node *feedee_p) { feedee_ = feedee_p; } template void one_time_conveyor_feeder::feed(T &&value) { if (feedee_) { feedee_->feed(std::move(value)); } } template void one_time_conveyor_feeder::fail(error &&error) { if (feedee_) { feedee_->fail(std::move(error)); } } template size_t one_time_conveyor_feeder::queued() const { if (feedee_) { return feedee_->queued(); } return 0; } template size_t one_time_conveyor_feeder::space() const { if (feedee_) { return feedee_->space(); } return 0; } template one_time_conveyor_node::~one_time_conveyor_node() { if (feeder_) { feeder_->set_feedee(nullptr); feeder_ = nullptr; } } template void one_time_conveyor_node::set_feeder( one_time_conveyor_feeder *feeder_p) { feeder_ = feeder_p; } template void one_time_conveyor_node::feed(T &&value) { storage_ = std::move(value); arm_next(); } template void one_time_conveyor_node::fail(error &&error) { storage_ = std::move(error); arm_next(); } template size_t one_time_conveyor_node::queued() const { return storage_.has_value() ? 1 : 0; } template size_t one_time_conveyor_node::space() const { return passed_ ? 0 : 1; } template void one_time_conveyor_node::get_result(error_or_value &err_or_val) { if (storage_.has_value()) { err_or_val.as() = std::move(storage_.value()); storage_ = std::nullopt; } else { err_or_val.as() = critical_error( "Signal for retrieval of storage sent even though no " "data is present"); } } template void one_time_conveyor_node::fire() { if (parent_) { parent_->child_has_fired(); } } } // namespace saw