#pragma once #include #include #include #include #include #include #include #include namespace saw { class conveyor_storage; class conveyor_node { public: conveyor_node(); virtual ~conveyor_node() = default; /** * Internal method to retrieve results from children */ virtual void get_result(error_or_value &err_or_val) = 0; /** * Swap out child with another one */ virtual error_or> swap_child(own &&swapee_) = 0; /** * Retrieve the next storage node */ virtual conveyor_storage *next_storage() = 0; /** * Notify that a new parent was attached * Only relevant for the feeding nodes */ virtual void notify_parent_attached(conveyor_node &){}; }; class conveyor_node_with_child_mixin final { public: own child = nullptr; conveyor_node_with_child_mixin(own &&child_, conveyor_node &owner_); ~conveyor_node_with_child_mixin() = default; /** * Swap out children and return the child ptr, since the caller is the child * itself. Stack needs to be cleared before the child is destroyed, so the * swapped out node is returned as well. */ error_or> swap_child(own &&swapee); }; class conveyor_node_with_parent_mixin final { public: conveyor_node *parent = nullptr; error_or> swap_child_of_parent(own &&swapee) { SAW_ASSERT(parent) { return critical_error( "Can't swap child, because parent doesn't exist"); } return parent->swap_child(std::move(swapee)); } void change_parent(conveyor_node *p) { parent = p; } }; class event_loop; class wait_scope; /* * Event class similar to capn'proto. * https://github.com/capnproto/capnproto */ class event { private: event_loop &loop_; event **prev_ = nullptr; event *next_ = nullptr; friend class event_loop; public: event(); event(event_loop &loop); virtual ~event(); virtual void fire() = 0; void arm_next(); void arm_later(); void arm_last(); void disarm(); bool is_armed() const; }; class conveyor_storage { protected: conveyor_storage *parent_ = nullptr; public: conveyor_storage(); virtual ~conveyor_storage(); virtual size_t space() const = 0; virtual size_t queued() const = 0; virtual void child_has_fired() = 0; virtual void parent_has_fired() = 0; virtual void set_parent(conveyor_storage *parent) = 0; conveyor_storage *get_parent() const; }; class conveyor_event_storage : public conveyor_storage, public event { public: conveyor_event_storage(); virtual ~conveyor_event_storage() = default; void set_parent(conveyor_storage *parent) override; }; class conveyor_base { protected: own node_; public: conveyor_base(own &&node_p); virtual ~conveyor_base() = default; conveyor_base(conveyor_base &&) = default; conveyor_base &operator=(conveyor_base &&) = default; void get(error_or_value &err_or_val); }; template class conveyor; template conveyor chained_conveyor_type(T *); // template Conveyor chainedConveyorType(Conveyor *); template T remove_error_or_type(T *); template T remove_error_or_type(error_or *); template using remove_error_or = decltype(remove_error_or_type((T *)nullptr)); template using chained_conveyors = decltype(chained_conveyor_type((T *)nullptr)); template using conveyor_result = chained_conveyors>>; struct propagate_error { public: error operator()(const error &err) const; error operator()(error &&err); }; class conveyor_sink { private: own node_; public: conveyor_sink(); conveyor_sink(own &&node); conveyor_sink(conveyor_sink &&) = default; conveyor_sink &operator=(conveyor_sink &&) = default; }; template class merge_conveyor_node_data; template class merge_conveyor { private: lent> data_; public: merge_conveyor() = default; merge_conveyor(lent> d); ~merge_conveyor(); void attach(conveyor conv); }; /** * Main interface for async operations. */ template class conveyor final : public conveyor_base { public: /** * Construct an immediately fulfilled node */ conveyor(fix_void value); /** * Construct an immediately failed node */ conveyor(error &&err); /** * Construct a conveyor with a child node */ conveyor(own node_p); conveyor(conveyor &&) = default; conveyor &operator=(conveyor &&) = default; /** * This method converts values or errors from children */ template [[nodiscard]] conveyor_result then(Func &&func, ErrorFunc &&error_func = propagate_error()); /** * This method adds a buffer node in the conveyor chains which acts as a * scheduler interrupt point and collects elements up to the supplied limit. */ [[nodiscard]] conveyor buffer(size_t limit = std::numeric_limits::max()); /** * This method just takes ownership of any supplied types, * which are destroyed when the chain gets destroyed. * Useful for resource lifetime control. */ template [[nodiscard]] conveyor attach(Args &&...args); /** @todo implement * This method limits the total amount of passed elements * Be careful where you place this node into the chain. * If you meant to fork it and destroy paths you shouldn't place * an interrupt point between the fork and this limiter */ [[nodiscard]] conveyor limit(size_t val = 1); /** * */ [[nodiscard]] std::pair, merge_conveyor> merge(); /** * Moves the conveyor chain into a thread local storage point which drops * every element. Use sink() if you want to control the lifetime of a * conveyor chain */ template void detach(ErrorFunc &&err_func = propagate_error()); /** * Creates a local sink which drops elements, but lifetime control remains * in your hand. */ template [[nodiscard]] conveyor_sink sink(ErrorFunc &&error_func = propagate_error()); /** * If no sink() or detach() is used you have to take elements out of the * chain yourself. */ error_or> take(); /** @todo implement * Specifically pump elements through this chain with the provided * wait_scope */ void poll(wait_scope &wait_scope); // helper static conveyor to_conveyor(own node); // helper static own from_conveyor(conveyor conveyor); }; template conveyor_result exec_later(Func &&func); /* * Join Conveyors into a single one */ template conveyor> join_conveyors(std::tuple...> &conveyors); template class conveyor_feeder { public: virtual ~conveyor_feeder() = default; virtual void feed(T &&data) = 0; virtual void fail(error &&error) = 0; virtual size_t space() const = 0; virtual size_t queued() const = 0; virtual error swap(conveyor &&conveyor) noexcept = 0; }; template <> class conveyor_feeder { public: virtual ~conveyor_feeder() = default; virtual void feed(void_t &&value = void_t{}) = 0; virtual void fail(error &&error) = 0; virtual size_t space() const = 0; virtual size_t queued() const = 0; virtual error swap(conveyor &&conveyor) noexcept = 0; }; template struct conveyor_and_feeder { own> feeder; conveyor conveyor; }; template conveyor_and_feeder new_conveyor_and_feeder(); template conveyor_and_feeder one_time_conveyor_and_feeder(); enum class Signal : uint8_t { Terminate, User1 }; /** * Class which acts as a correspondent between the running framework and outside * events which may be signals from the operating system or just other threads. * Default EventPorts are supplied by setupAsyncIo() in io.h */ class event_port { public: virtual ~event_port() = default; virtual conveyor on_signal(Signal signal) = 0; virtual void poll() = 0; virtual void wait() = 0; virtual void wait(const std::chrono::steady_clock::duration &) = 0; virtual void wait(const std::chrono::steady_clock::time_point &) = 0; virtual void wake() = 0; }; class sink_conveyor_node; class conveyor_sink_set final : public event { private: /* class Helper final : public Event { private: void destroySinkConveyorNode(ConveyorNode& sink); void fail(Error&& error); std::vector> sink_nodes; std::queue delete_nodes; std::function error_handler; public: ConveyorSinks() = default; ConveyorSinks(EventLoop& event_loop); void add(Conveyor node); void fire() override {} }; gin::Own helper; */ friend class sink_conveyor_node; void destroy_sink_conveyor_node(conveyor_node &sink_node); void fail(error &&err); std::list> sink_nodes_; std::queue delete_nodes_; std::function error_handler_; public: // ConveyorSinks(); // ConveyorSinks(EventLoop& event_loop); conveyor_sink_set() = default; conveyor_sink_set(event_loop &event_loop); void add(conveyor &&node); void fire() override; }; /* * EventLoop class similar to capn'proto. * https://github.com/capnproto/capnproto */ class event_loop { private: friend class event; event *head_ = nullptr; event **tail_ = &head_; event **next_insert_point_ = &head_; event **later_insert_point_ = &head_; bool is_runnable_ = false; own event_port_ = nullptr; own daemon_sink_ = nullptr; // functions void set_runnable(bool runnable); friend class wait_scope; void enter_scope(); void leave_scope(); bool turn_loop(); bool turn(); public: event_loop(); event_loop(own &&port); ~event_loop(); event_loop(event_loop &&) = default; event_loop &operator=(event_loop &&) = default; bool wait(); bool wait(const std::chrono::steady_clock::duration &); bool wait(const std::chrono::steady_clock::time_point &); bool poll(); event_port *event_port(); conveyor_sink_set &daemon(); }; /* * WaitScope class similar to capn'proto. * https://github.com/capnproto/capnproto */ class wait_scope { private: event_loop &loop_; public: wait_scope(event_loop &loop); ~wait_scope(); void wait(); void wait(const std::chrono::steady_clock::duration &); void wait(const std::chrono::steady_clock::time_point &); void poll(); }; template conveyor_result yield_next(Func &&func); template conveyor_result yield_later(Func &&func); template conveyor_result yield_last(Func &&func); } // namespace saw // Secret stuff // Aka private semi hidden classes namespace saw { template struct fix_void_caller { template static Out apply(Func &func, In &&in) { return func(std::move(in)); } }; template struct fix_void_caller { template static Out apply(Func &func, void_t &&in) { (void)in; return func(); } }; template struct fix_void_caller { template static void_t apply(Func &func, In &&in) { func(std::move(in)); return void_t{}; } }; template <> struct fix_void_caller { template static void_t apply(Func &func, void_t &&in) { (void)in; func(); return void_t{}; } }; template class adapt_conveyor_node; template class adapt_conveyor_feeder final : public conveyor_feeder> { private: adapt_conveyor_node *feedee_ = nullptr; public: ~adapt_conveyor_feeder(); void set_feedee(adapt_conveyor_node *feedee); void feed(T &&value) override; void fail(error &&error) override; size_t space() const override; size_t queued() const override; error swap(conveyor &&conv) noexcept override; }; template class adapt_conveyor_node final : public conveyor_node, public conveyor_event_storage { private: adapt_conveyor_feeder *feeder_ = nullptr; std::queue>> storage_; conveyor_node_with_parent_mixin parent_node_; public: adapt_conveyor_node(); ~adapt_conveyor_node(); void set_feeder(adapt_conveyor_feeder *feeder); void feed(T &&value); void fail(error &&error); // ConveyorNode void get_result(error_or_value &err_or_val) override; error_or> swap_child(own &&swapee) noexcept override; conveyor_storage *next_storage() noexcept override; void notify_parent_attached(conveyor_node &) noexcept override; // ConveyorStorage size_t space() const override; size_t queued() const override; void child_has_fired() override; void parent_has_fired() override; // Event void fire() override; }; template class one_time_conveyor_node; template class one_time_conveyor_feeder final : public conveyor_feeder> { private: one_time_conveyor_node *feedee_ = nullptr; public: ~one_time_conveyor_feeder(); void set_feedee(one_time_conveyor_node *feedee); void feed(T &&value) override; void fail(error &&error) override; size_t space() const override; size_t queued() const override; }; template class one_time_conveyor_node final : public conveyor_node, public conveyor_storage, public event { private: one_time_conveyor_feeder *feeder_ = nullptr; bool passed_ = false; maybe> storage_ = std::nullopt; public: ~one_time_conveyor_node(); void set_feeder(one_time_conveyor_feeder *feeder); void feed(T &&value); void fail(error &&error); // ConveyorNode void get_result(error_or_value &err_or_val) override; error_or> swap_child(own &&swapee) override; // ConveyorStorage size_t space() const override; size_t queued() const override; void child_has_fired() override {} void parent_has_fired() override; // Event void fire() override; }; /** * This class buffers and saves incoming elements and acts as an interrupt node * for processing calls */ class queue_buffer_conveyor_node_base : public conveyor_node, public conveyor_event_storage { protected: conveyor_node_with_child_mixin child_mixin_; public: queue_buffer_conveyor_node_base(own child_) : conveyor_event_storage{}, child_mixin_{std::move(child_), *this} {} virtual ~queue_buffer_conveyor_node_base() = default; /** * Use mixin */ error_or> swap_child(own &&swapee_) noexcept override; conveyor_storage *next_storage() noexcept override { return static_cast(this); } }; template class queue_buffer_conveyor_node final : public queue_buffer_conveyor_node_base { private: std::queue> storage_; size_t max_store_; public: queue_buffer_conveyor_node(own dep, size_t max_size) : queue_buffer_conveyor_node_base{std::move(dep)}, max_store_{ max_size} {} // Event void fire() override; // ConveyorNode void get_result(error_or_value &eov) noexcept override; // ConveyorStorage size_t space() const override; size_t queued() const override; void child_has_fired() override; void parent_has_fired() override; }; class attach_conveyor_node_base : public conveyor_node { protected: conveyor_node_with_child_mixin child_mixin_; public: attach_conveyor_node_base(own &&child_) : child_mixin_{std::move(child_), *this} {} virtual ~attach_conveyor_node_base() = default; void get_result(error_or_value &err_or_val) noexcept override; /** * Use mixin */ error_or> swap_child(own &&swapee_) noexcept override; conveyor_storage *next_storage() noexcept override; }; template class attach_conveyor_node final : public attach_conveyor_node_base { public: attach_conveyor_node(own &&dep, Args &&...args) : attach_conveyor_node_base(std::move(dep)), attached_data_{ std::move(args...)} {} private: std::tuple attached_data_; }; class convert_conveyor_node_base : public conveyor_node { public: convert_conveyor_node_base(own &&dep); virtual ~convert_conveyor_node_base() = default; void get_result(error_or_value &err_or_val) override; virtual void get_impl(error_or_value &err_or_val) = 0; error_or> swap_child(own &&swapee) noexcept override; conveyor_storage *next_storage() noexcept override; protected: conveyor_node_with_child_mixin child_mixin_; }; template class convert_conveyor_node final : public convert_conveyor_node_base { private: Func func_; ErrorFunc error_func_; static_assert(std::is_same>::value, "Should never be of type ErrorOr"); public: convert_conveyor_node(own &&dep, Func &&func, ErrorFunc &&error_func) : convert_conveyor_node_base(std::move(dep)), func_{std::move(func)}, error_func_{std::move(error_func)} {} void get_impl(error_or_value &err_or_val) noexcept override { error_or> dep_eov; error_or>> &eov = err_or_val.as>>(); if (child_mixin_.child) { child_mixin_.child->get_result(dep_eov); if (dep_eov.is_value()) { try { eov = fix_void_caller::apply( func_, std::move(dep_eov.value())); } catch (const std::bad_alloc &) { eov = critical_error("Out of memory"); } catch (const std::exception &) { eov = critical_error( "Exception in chain occured. Return ErrorOr if you " "want to handle errors which are recoverable"); } } else if (dep_eov.is_error()) { eov = error_func_(std::move(dep_eov.error())); } else { eov = critical_error("No value set in dependency"); } } else { eov = critical_error("Conveyor doesn't have child"); } } }; class sink_conveyor_node final : public conveyor_node, public conveyor_event_storage { private: conveyor_node_with_child_mixin child_mixin_; conveyor_sink_set *conveyor_sink_; public: sink_conveyor_node(own node, conveyor_sink_set &conv_sink) : conveyor_event_storage{}, child_mixin_{std::move(node), *this}, conveyor_sink_{&conv_sink} {} sink_conveyor_node(own node) : conveyor_event_storage{}, child_mixin_{std::move(node), *this}, conveyor_sink_{nullptr} {} // Event only queued if a critical error occured void fire() override { // Queued for destruction of children, because this acts as a sink and // no other event should be here child_mixin_.child = nullptr; if (conveyor_sink_) { conveyor_sink_->destroy_sink_conveyor_node(*this); conveyor_sink_ = nullptr; } } // ConveyorStorage size_t space() const override { return 1; } size_t queued() const override { return 0; } // ConveyorNode void get_result(error_or_value &err_or_val) noexcept override { err_or_val.as() = critical_error("In a sink node no result can be returned"); } error_or> swap_child(own &&swapee) noexcept override { return child_mixin_.swap_child(std::move(swapee)); } // ConveyorStorage void child_has_fired() override { if (child_mixin_.child) { error_or dep_eov; child_mixin_.child->get_result(dep_eov); if (dep_eov.is_error()) { if (dep_eov.error().is_critical()) { if (!is_armed()) { arm_last(); } } if (conveyor_sink_) { conveyor_sink_->fail(std::move(dep_eov.error())); } } } } /* * No parent needs to be fired since we always have space */ void parent_has_fired() override {} conveyor_storage *next_storage() override { // Should never happen though assert(false); return nullptr; // return static_cast(this); } }; class immediate_conveyor_node_base : public conveyor_node, public conveyor_event_storage { private: public: immediate_conveyor_node_base(); virtual ~immediate_conveyor_node_base() = default; error_or> swap_child(own &&swapee) noexcept override { (void)swapee; return recoverable_error("Node doesn't support swapping"); } conveyor_storage *next_storage() noexcept override { return static_cast(this); } }; template class immediate_conveyor_node final : public immediate_conveyor_node_base { private: error_or> value_; uint8_t retrieved_; public: immediate_conveyor_node(fix_void &&val); immediate_conveyor_node(error &&error); // ConveyorStorage size_t space() const override; size_t queued() const override; void child_has_fired() override; void parent_has_fired() override; // ConveyorNode void get_result(error_or_value &err_or_val) noexcept override { if (retrieved_ > 0) { err_or_val.as>() = make_error("Already taken value", error::code::Exhausted); } else { err_or_val.as>() = std::move(value_); } if (queued() > 0) { ++retrieved_; } } // Event void fire() override; }; /* * Collects every incoming value and throws it in one lane */ class merge_conveyor_node_base : public conveyor_node, public conveyor_event_storage { public: merge_conveyor_node_base(); virtual ~merge_conveyor_node_base() = default; conveyor_storage *next_storage() noexcept override { return static_cast(this); } }; template class merge_conveyor_node : public merge_conveyor_node_base { private: class appendage final : public conveyor_node, public conveyor_storage { public: own child; merge_conveyor_node *merger; maybe>> error_or_value_; public: appendage(own n, merge_conveyor_node &m) : conveyor_storage{}, child{std::move(n)}, merger{&m}, error_or_value_{std::nullopt} {} bool child_storage_has_element_queued() const { if (!child) { return false; } conveyor_storage *storage = child->next_storage(); if (storage) { return storage->queued() > 0; } return false; } void get_appendage_result(error_or_value &eov); /** * ConveyorNode */ error_or> swap_child(own &&swapee_) override; conveyor_storage *next_storage() noexcept override { return static_cast(this); } void get_result(error_or_value &err_or_val) override; /** * ConveyorStorage */ size_t space() const override; size_t queued() const override; void child_has_fired() override; void parent_has_fired() override; void set_parent(conveyor_storage *par) override; }; friend class merge_conveyor_node_data; friend class appendage; our> data_; size_t next_appendage_ = 0; public: merge_conveyor_node(our> data); ~merge_conveyor_node(); // ConveyorNode error_or> swap_child(own &&c) noexcept override; // Event void get_result(error_or_value &err_or_val) noexcept override; void fire() override; // ConveyorStorage size_t space() const override; size_t queued() const override; void child_has_fired() override; void parent_has_fired() override; }; template class merge_conveyor_node_data { public: std::vector::appendage>> appendages; merge_conveyor_node *merger = nullptr; public: void attach(conveyor conv); void governing_node_destroyed(); }; /* class JoinConveyorNodeBase : public ConveyorNode, public ConveyorEventStorage { private: public: }; template class JoinConveyorNode final : public JoinConveyorNodeBase { private: template class Appendage : public ConveyorEventStorage { private: Maybe data = std::nullopt; public: size_t space() const override; size_t queued() const override; void fire() override; void get_result(ErrorOrValue& eov) override; }; std::tuple...> appendages; public: }; */ } // namespace saw #include "async.tmpl.h"