diff options
author | Claudius Holeksa <mail@keldu.de> | 2023-05-03 20:34:02 +0200 |
---|---|---|
committer | Claudius Holeksa <mail@keldu.de> | 2023-05-03 20:34:02 +0200 |
commit | 2aa2af0007b7e969845642027c635cd3fd9c8aea (patch) | |
tree | e72a05a3c2bfe58442b160c0c8e98ce1d095f36f /src/async/async.tmpl.h | |
parent | 9b81a2585142260f89d47cbe1e592cec9e1f778f (diff) |
Moved dirs and added codec-json dir
Diffstat (limited to 'src/async/async.tmpl.h')
-rw-r--r-- | src/async/async.tmpl.h | 769 |
1 files changed, 769 insertions, 0 deletions
diff --git a/src/async/async.tmpl.h b/src/async/async.tmpl.h new file mode 100644 index 0000000..d081fa9 --- /dev/null +++ b/src/async/async.tmpl.h @@ -0,0 +1,769 @@ +#pragma once + +#include <forstio/core/common.h> +#include <forstio/core/error.h> + +#include <cassert> +// Template inlining + +#include <iostream> + +namespace saw { + +template <typename Func> conveyor_result<Func, void> execLater(Func &&func) { + conveyor<void> conveyor{fix_void<void>{}}; + return conveyor.then(std::move(func)); +} + +template <typename T> +conveyor<T>::conveyor(fix_void<T> value) : conveyor_base(nullptr) { + // Is there any way to do this? + // @todo new conveyor_base constructor for Immediate values + + own<immediate_conveyor_node<fix_void<T>>> immediate = + heap<immediate_conveyor_node<fix_void<T>>>(std::move(value)); + + if (!immediate) { + return; + } + + node_ = std::move(immediate); +} + +template <typename T> +conveyor<T>::conveyor(error &&err) : conveyor_base(nullptr) { + own<immediate_conveyor_node<fix_void<T>>> immediate = + heap<immediate_conveyor_node<fix_void<T>>>(std::move(err)); + + if (!immediate) { + return; + } + + node_ = std::move(immediate); +} + +template <typename T> +conveyor<T>::conveyor(own<conveyor_node> node_p) + : conveyor_base{std::move(node_p)} {} + +template <typename T> +template <typename Func, typename ErrorFunc> +conveyor_result<Func, T> conveyor<T>::then(Func &&func, + ErrorFunc &&error_func) { + own<conveyor_node> conversion_node = + heap<convert_conveyor_node<fix_void<return_type<Func, T>>, fix_void<T>, + Func, ErrorFunc>>( + std::move(node_), std::move(func), std::move(error_func)); + + return conveyor<remove_error_or<return_type<Func, T>>>::to_conveyor( + std::move(conversion_node)); +} + +template <typename T> conveyor<T> conveyor<T>::buffer(size_t size) { + SAW_ASSERT(node_) { return conveyor<T>{own<conveyor_node>{nullptr}}; } + conveyor_storage *storage = node_->next_storage(); + SAW_ASSERT(storage) { return conveyor<T>{own<conveyor_node>{nullptr}}; } + + own<queue_buffer_conveyor_node<fix_void<T>>> storage_node = + heap<queue_buffer_conveyor_node<fix_void<T>>>(std::move(node_), size); + + conveyor_storage *storage_ptr = + static_cast<conveyor_storage *>(storage_node.get()); + + storage->set_parent(storage_ptr); + return conveyor<T>{std::move(storage_node)}; +} + +template <typename T> +template <typename... Args> +conveyor<T> conveyor<T>::attach(Args &&...args) { + own<attach_conveyor_node<Args...>> attach_node = + heap<attach_conveyor_node<Args...>>(std::move(node_), + std::move(args...)); + return conveyor<T>{std::move(attach_node)}; +} + +template <typename T> +std::pair<conveyor<T>, merge_conveyor<T>> conveyor<T>::merge() { + our<merge_conveyor_node_data<T>> data = + share<merge_conveyor_node_data<T>>(); + + own<merge_conveyor_node<T>> merge_node = heap<merge_conveyor_node<T>>(data); + + SAW_ASSERT(node_) { + return std::make_pair(conveyor<T>{own<conveyor_node>{nullptr}}, + merge_conveyor<T>{}); + } + conveyor_storage *storage = node_->next_storage(); + SAW_ASSERT(storage) { + return std::make_pair(conveyor<T>{own<conveyor_node>{nullptr}}, + merge_conveyor<T>{}); + } + + data->attach(conveyor<T>::to_conveyor(std::move(node_))); + + merge_conveyor<T> node_ref{data}; + + return std::make_pair(conveyor<T>{std::move(merge_node)}, + std::move(node_ref)); +} + +template <> +template <typename ErrorFunc> +conveyor_sink conveyor<void>::sink(ErrorFunc &&error_func) { + conveyor_storage *storage = node_->next_storage(); + SAW_ASSERT(storage) { return conveyor_sink{}; } + + own<sink_conveyor_node> sink_node = + heap<sink_conveyor_node>(std::move(node_)); + conveyor_storage *storage_ptr = + static_cast<conveyor_storage *>(sink_node.get()); + + storage->set_parent(storage_ptr); + + return conveyor_sink{std::move(sink_node)}; +} + +void detach_conveyor(conveyor<void> &&conveyor); + +template <typename T> +template <typename ErrorFunc> +void conveyor<T>::detach(ErrorFunc &&func) { + detach_conveyor(std::move(then([](T &&) {}, std::move(func)))); +} + +template <> +template <typename ErrorFunc> +void conveyor<void>::detach(ErrorFunc &&func) { + detach_conveyor(std::move(then([]() {}, std::move(func)))); +} + +template <typename T> +conveyor<T> conveyor<T>::to_conveyor(own<conveyor_node> node) { + return conveyor<T>{std::move(node)}; +} + +template <typename T> +own<conveyor_node> conveyor<T>::from_conveyor(conveyor<T> conveyor) { + return std::move(conveyor.node_); +} + +template <typename T> error_or<fix_void<T>> conveyor<T>::take() { + SAW_ASSERT(node_) { + return error_or<fix_void<T>>{ + make_error<err::invalid_state>("conveyor in invalid state")}; + } + conveyor_storage *storage = node_->next_storage(); + if (storage) { + if (storage->queued() > 0) { + error_or<fix_void<T>> result; + node_->get_result(result); + return result; + } else { + return error_or<fix_void<T>>{ + make_error<err::buffer_exhausted>("conveyor buffer has no elements")}; + } + } else { + return error_or<fix_void<T>>{ + make_error<err::invalid_state>("conveyor node has no child storage")}; + } +} + +template <typename T> conveyor_and_feeder<T> new_conveyor_and_feeder() { + own<adapt_conveyor_feeder<fix_void<T>>> feeder = + heap<adapt_conveyor_feeder<fix_void<T>>>(); + own<adapt_conveyor_node<fix_void<T>>> node = + heap<adapt_conveyor_node<fix_void<T>>>(); + + feeder->set_feedee(node.get()); + node->set_feeder(feeder.get()); + + return conveyor_and_feeder<T>{std::move(feeder), + conveyor<T>::to_conveyor(std::move(node))}; +} + +// QueueBuffer +template <typename T> void queue_buffer_conveyor_node<T>::fire() { + if (child_mixin_.child) { + if (!storage_.empty()) { + if (storage_.front().is_error()) { + if (storage_.front().get_error().is_critical()) { + child_mixin_.child = nullptr; + } + } + } + } + + bool has_space_before_fire = space() > 0; + + if (parent_) { + parent_->child_has_fired(); + if (!storage_.empty() && parent_->space() > 0) { + arm_later(); + } + } + + if (!child_mixin_.child) { + while (!storage_.empty()) { + storage_.pop(); + } + return; + } + + conveyor_storage *ch_storage = child_mixin_.child->next_storage(); + if (ch_storage && !has_space_before_fire) { + ch_storage->parent_has_fired(); + } +} + +template <typename T> +void queue_buffer_conveyor_node<T>::get_result(error_or_value &eov) noexcept { + error_or<T> &err_or_val = eov.as<T>(); + err_or_val = std::move(storage_.front()); + storage_.pop(); +} + +template <typename T> size_t queue_buffer_conveyor_node<T>::space() const { + return max_store_ - storage_.size(); +} + +template <typename T> size_t queue_buffer_conveyor_node<T>::queued() const { + return storage_.size(); +} + +template <typename T> void queue_buffer_conveyor_node<T>::child_has_fired() { + if (child_mixin_.child && storage_.size() < max_store_) { + error_or<T> eov; + child_mixin_.child->get_result(eov); + + if (eov.is_error()) { + if (eov.get_error().is_critical()) { + } + } + + storage_.push(std::move(eov)); + if (!is_armed()) { + arm_later(); + } + } +} + +template <typename T> void queue_buffer_conveyor_node<T>::parent_has_fired() { + SAW_ASSERT(parent_) { return; } + + if (parent_->space() == 0) { + return; + } + + if (queued() > 0) { + arm_later(); + } +} + +template <typename T> +immediate_conveyor_node<T>::immediate_conveyor_node(fix_void<T> &&val) + : value_{std::move(val)}, retrieved_{0} {} + +template <typename T> +immediate_conveyor_node<T>::immediate_conveyor_node(error &&error) + : value_{std::move(error)}, retrieved_{0} {} + +template <typename T> size_t immediate_conveyor_node<T>::space() const { + return 0; +} + +template <typename T> size_t immediate_conveyor_node<T>::queued() const { + return retrieved_ > 1 ? 0 : 1; +} + +template <typename T> void immediate_conveyor_node<T>::child_has_fired() { + // Impossible case + assert(false); +} + +template <typename T> void immediate_conveyor_node<T>::parent_has_fired() { + SAW_ASSERT(parent_) { return; } + assert(parent_->space() > 0); + + if (queued() > 0) { + arm_next(); + } +} + +template <typename T> void immediate_conveyor_node<T>::fire() { + + if (parent_) { + parent_->child_has_fired(); + if (queued() > 0 && parent_->space() > 0) { + arm_last(); + } + } +} + +template <typename T> +merge_conveyor<T>::merge_conveyor(lent<merge_conveyor_node_data<T>> d) + : data_{std::move(d)} {} + +template <typename T> merge_conveyor<T>::~merge_conveyor() {} + +template <typename T> void merge_conveyor<T>::attach(conveyor<T> conveyor) { + auto sp = data_.lock(); + SAW_ASSERT(sp) { return; } + + sp->attach(std::move(conveyor)); +} + +template <typename T> +merge_conveyor_node<T>::merge_conveyor_node(our<merge_conveyor_node_data<T>> d) + : data_{d} { + SAW_ASSERT(data_) { return; } + + data_->merger = this; +} + +template <typename T> merge_conveyor_node<T>::~merge_conveyor_node() {} + +template <typename T> +error_or<own<conveyor_node>> +merge_conveyor_node<T>::swap_child(own<conveyor_node> &&swapee_) noexcept { + (void)swapee_; + return make_error<err::invalid_state>( + "merge_conveyor_node<T>::appendage should block calls to this class"); +} + +template <typename T> +void merge_conveyor_node<T>::get_result(error_or_value &eov) noexcept { + error_or<fix_void<T>> &err_or_val = eov.as<fix_void<T>>(); + + SAW_ASSERT(data_) { return; } + + /// @todo search appendages for result + + auto &appendages = data_->appendages; + next_appendage_ = std::min(appendages.size(), next_appendage_); + + for (size_t i = next_appendage_; i < appendages.size(); ++i) { + if (appendages[i]->queued() > 0) { + err_or_val = std::move(appendages[i]->error_or_value_.value()); + appendages[i]->error_or_value_ = std::nullopt; + next_appendage_ = i + 1; + return; + } + } + for (size_t i = 0; i < next_appendage_; ++i) { + if (appendages[i]->queued() > 0) { + err_or_val = std::move(appendages[i]->error_or_value_.value()); + appendages[i]->error_or_value_ = std::nullopt; + next_appendage_ = i + 1; + return; + } + } + + err_or_val = make_error<err::invalid_state>("No value in Merge appendages"); +} + +template <typename T> void merge_conveyor_node<T>::fire() { + SAW_ASSERT(queued() > 0) { return; } + + if (parent_) { + parent_->child_has_fired(); + + if (queued() > 0 && parent_->space() > 0) { + arm_later(); + } + } +} + +template <typename T> size_t merge_conveyor_node<T>::space() const { return 0; } + +template <typename T> size_t merge_conveyor_node<T>::queued() const { + SAW_ASSERT(data_) { return 0; } + + size_t queue_count = 0; + + for (auto &iter : data_->appendages) { + queue_count += iter->queued(); + } + + return queue_count; +} + +template <typename T> void merge_conveyor_node<T>::child_has_fired() { + /// This can never happen + assert(false); +} + +template <typename T> void merge_conveyor_node<T>::parent_has_fired() { + SAW_ASSERT(parent_) { return; } + if (queued() > 0) { + if (parent_->space() > 0) { + arm_later(); + } + } +} + +/** + * merge_conveyor_node<T>::Apendage + */ + +template <typename T> +error_or<own<conveyor_node>> +merge_conveyor_node<T>::appendage::swap_child(own<conveyor_node> &&swapee_) { + own<conveyor_node> old_child = std::move(child); + + child = std::move(swapee_); + + // This case should never happen + SAW_ASSERT(old_child) { return make_error<err::invalid_state>("No child exists"); } + + return old_child; +} + +template <typename T> +void merge_conveyor_node<T>::appendage::get_result(error_or_value &eov) { + error_or<fix_void<T>> &err_or_val = eov.as<fix_void<T>>(); + + SAW_ASSERT(queued() > 0) { + err_or_val = + make_error<err::invalid_state>("No element queued in Merge appendage Node"); + return; + } + + err_or_val = std::move(error_or_value_.value()); + error_or_value_ = std::nullopt; +} + +template <typename T> size_t merge_conveyor_node<T>::appendage::space() const { + SAW_ASSERT(merger) { return 0; } + + if (error_or_value_.has_value()) { + return 0; + } + + return 1; +} + +template <typename T> size_t merge_conveyor_node<T>::appendage::queued() const { + SAW_ASSERT(merger) { return 0; } + + if (error_or_value_.has_value()) { + return 1; + } + + return 0; +} + +/// @todo delete this function. Replaced by the regular get_result +template <typename T> +void merge_conveyor_node<T>::appendage::get_appendage_result( + error_or_value &eov) { + error_or<fix_void<T>> &err_or_val = eov.as<fix_void<T>>(); + + SAW_ASSERT(queued() > 0) { + err_or_val = + make_error<err::invalid_state>("No element queued in Merge appendage Node"); + return; + } + + err_or_val = std::move(error_or_value_.value()); + error_or_value_ = std::nullopt; +} + +template <typename T> +void merge_conveyor_node<T>::appendage::child_has_fired() { + SAW_ASSERT(!error_or_value_.has_value()) { return; } + error_or<fix_void<T>> eov; + child->get_result(eov); + + error_or_value_ = std::move(eov); + + if (!merger->is_armed()) { + merger->arm_later(); + } +} + +template <typename T> +void merge_conveyor_node<T>::appendage::parent_has_fired() { + conveyor_storage *child_storage = child->next_storage(); + if (child_storage) { + child_storage->parent_has_fired(); + } +} + +template <typename T> +void merge_conveyor_node<T>::appendage::set_parent(conveyor_storage *par) { + SAW_ASSERT(merger) { return; } + + SAW_ASSERT(child) { return; } + + parent_ = par; +} + +template <typename T> +void merge_conveyor_node_data<T>::attach(conveyor<T> conv) { + auto nas = conveyor<T>::from_conveyor(std::move(conv)); + SAW_ASSERT(nas) { return; } + conveyor_storage *storage = nas->next_storage(); + SAW_ASSERT(storage) { return; } + + auto merge_node_appendage = + heap<typename merge_conveyor_node<T>::appendage>(std::move(nas), + *merger); + auto merge_node_appendage_ptr = merge_node_appendage.get(); + + storage->set_parent(merge_node_appendage.get()); + + SAW_ASSERT(merger) { return; } + + conveyor_storage *mrg_storage = merger->next_storage(); + SAW_ASSERT(mrg_storage) { return; } + + merge_node_appendage->set_parent(mrg_storage); + + appendages.push_back(std::move(merge_node_appendage)); + + /// @todo return this. necessary? maybe for the weird linking setup + /// maybe not + // return merge_node_appendage_ptr; +} + +template <typename T> +void merge_conveyor_node_data<T>::governing_node_destroyed() { + appendages.clear(); + merger = nullptr; +} + +template <typename T> adapt_conveyor_feeder<T>::~adapt_conveyor_feeder() { + if (feedee_) { + feedee_->set_feeder(nullptr); + feedee_ = nullptr; + } +} + +template <typename T> +void adapt_conveyor_feeder<T>::set_feedee(adapt_conveyor_node<T> *feedee_p) { + feedee_ = feedee_p; +} + +template <typename T> void adapt_conveyor_feeder<T>::feed(T &&value) { + if (feedee_) { + feedee_->feed(std::move(value)); + } +} + +template <typename T> void adapt_conveyor_feeder<T>::fail(error &&error) { + if (feedee_) { + feedee_->fail(std::move(error)); + } +} + +template <typename T> size_t adapt_conveyor_feeder<T>::queued() const { + if (feedee_) { + return feedee_->queued(); + } + return 0; +} + +template <typename T> size_t adapt_conveyor_feeder<T>::space() const { + if (feedee_) { + return feedee_->space(); + } + return 0; +} + +template <typename T> +error adapt_conveyor_feeder<T>::swap(conveyor<T> &&conv) noexcept { + SAW_ASSERT(feedee_) { return make_error<err::invalid_state>("No feedee connected"); } + + auto node = conveyor<T>::from_conveyor(std::move(conv)); + + feedee_->swap_child(std::move(node)); + + return no_error(); +} + +template <typename T> +adapt_conveyor_node<T>::adapt_conveyor_node() : conveyor_event_storage{} {} + +template <typename T> adapt_conveyor_node<T>::~adapt_conveyor_node() { + if (feeder_) { + feeder_->set_feedee(nullptr); + feeder_ = nullptr; + } +} + +template <typename T> +error_or<own<conveyor_node>> +adapt_conveyor_node<T>::swap_child(own<conveyor_node> &&swapee) noexcept { + // This should return the owning pointer of this instance + auto myself_err = parent_node_.swap_child_of_parent(std::move(swapee)); + + if (myself_err.is_error()) { + return myself_err; + } + + auto &myself = myself_err.get_value(); + + assert(myself.get() == this); + + return myself_err; +} + +template <typename T> +conveyor_storage *adapt_conveyor_node<T>::next_storage() noexcept { + return static_cast<conveyor_storage *>(this); +} + +template <typename T> +void adapt_conveyor_node<T>::notify_parent_attached( + conveyor_node &par) noexcept { + parent_node_.change_parent(&par); +} + +template <typename T> +void adapt_conveyor_node<T>::set_feeder(adapt_conveyor_feeder<T> *feeder_p) { + feeder_ = feeder_p; +} + +template <typename T> void adapt_conveyor_node<T>::feed(T &&value) { + storage_.push(std::move(value)); + arm_next(); +} + +template <typename T> void adapt_conveyor_node<T>::fail(error &&error) { + storage_.push(std::move(error)); + arm_next(); +} + +template <typename T> size_t adapt_conveyor_node<T>::queued() const { + return storage_.size(); +} + +template <typename T> size_t adapt_conveyor_node<T>::space() const { + return std::numeric_limits<size_t>::max() - storage_.size(); +} + +template <typename T> +void adapt_conveyor_node<T>::get_result(error_or_value &err_or_val) { + if (!storage_.empty()) { + err_or_val.as<T>() = std::move(storage_.front()); + storage_.pop(); + } else { + err_or_val.as<T>() = make_error<err::invalid_state>( + "Signal for retrieval of storage sent even though no " + "data is present"); + } +} + +template <typename T> void adapt_conveyor_node<T>::child_has_fired() { + // Adapt node has no children + assert(false); +} + +template <typename T> void adapt_conveyor_node<T>::parent_has_fired() { + SAW_ASSERT(parent_) { return; } + + if (parent_->space() == 0) { + return; + } +} + +template <typename T> void adapt_conveyor_node<T>::fire() { + if (parent_) { + parent_->child_has_fired(); + + if (storage_.size() > 0) { + arm_later(); + } + } +} + +template <typename T> one_time_conveyor_feeder<T>::~one_time_conveyor_feeder() { + if (feedee_) { + feedee_->set_feeder(nullptr); + feedee_ = nullptr; + } +} + +template <typename T> +void one_time_conveyor_feeder<T>::set_feedee( + one_time_conveyor_node<T> *feedee_p) { + feedee_ = feedee_p; +} + +template <typename T> void one_time_conveyor_feeder<T>::feed(T &&value) { + if (feedee_) { + feedee_->feed(std::move(value)); + } +} + +template <typename T> void one_time_conveyor_feeder<T>::fail(error &&error) { + if (feedee_) { + feedee_->fail(std::move(error)); + } +} + +template <typename T> size_t one_time_conveyor_feeder<T>::queued() const { + if (feedee_) { + return feedee_->queued(); + } + return 0; +} + +template <typename T> size_t one_time_conveyor_feeder<T>::space() const { + if (feedee_) { + return feedee_->space(); + } + return 0; +} + +template <typename T> one_time_conveyor_node<T>::~one_time_conveyor_node() { + if (feeder_) { + feeder_->set_feedee(nullptr); + feeder_ = nullptr; + } +} + +template <typename T> +void one_time_conveyor_node<T>::set_feeder( + one_time_conveyor_feeder<T> *feeder_p) { + feeder_ = feeder_p; +} + +template <typename T> void one_time_conveyor_node<T>::feed(T &&value) { + storage_ = std::move(value); + arm_next(); +} + +template <typename T> void one_time_conveyor_node<T>::fail(error &&error) { + storage_ = std::move(error); + arm_next(); +} + +template <typename T> size_t one_time_conveyor_node<T>::queued() const { + return storage_.has_value() ? 1 : 0; +} + +template <typename T> size_t one_time_conveyor_node<T>::space() const { + return passed_ ? 0 : 1; +} + +template <typename T> +void one_time_conveyor_node<T>::get_result(error_or_value &err_or_val) { + if (storage_.has_value()) { + err_or_val.as<T>() = std::move(storage_.value()); + storage_ = std::nullopt; + } else { + err_or_val.as<T>() = make_error<err::invalid_state>( + "Signal for retrieval of storage sent even though no " + "data is present"); + } +} + +template <typename T> void one_time_conveyor_node<T>::fire() { + if (parent_) { + parent_->child_has_fired(); + } +} + +} // namespace saw |