forstio/source/forstio/async.h

1024 lines
24 KiB
C++

#pragma once
#include "common.h"
#include "error.h"
#include "timer.h"
#include <functional>
#include <limits>
#include <list>
#include <queue>
#include <type_traits>
namespace saw {
class conveyor_storage;
class conveyor_node {
public:
conveyor_node();
virtual ~conveyor_node() = default;
/**
* Internal method to retrieve results from children
*/
virtual void get_result(error_or_value &err_or_val) = 0;
/**
* Swap out child with another one
*/
virtual error_or<own<conveyor_node>>
swap_child(own<conveyor_node> &&swapee_) = 0;
/**
* Retrieve the next storage node
*/
virtual conveyor_storage *next_storage() = 0;
/**
* Notify that a new parent was attached
* Only relevant for the feeding nodes
*/
virtual void notify_parent_attached(conveyor_node &){};
};
class conveyor_node_with_child_mixin final {
public:
own<conveyor_node> child = nullptr;
conveyor_node_with_child_mixin(own<conveyor_node> &&child_,
conveyor_node &owner_);
~conveyor_node_with_child_mixin() = default;
/**
* Swap out children and return the child ptr, since the caller is the child
* itself. Stack needs to be cleared before the child is destroyed, so the
* swapped out node is returned as well.
*/
error_or<own<conveyor_node>> swap_child(own<conveyor_node> &&swapee);
};
class conveyor_node_with_parent_mixin final {
public:
conveyor_node *parent = nullptr;
error_or<own<conveyor_node>>
swap_child_of_parent(own<conveyor_node> &&swapee) {
SAW_ASSERT(parent) {
return critical_error(
"Can't swap child, because parent doesn't exist");
}
return parent->swap_child(std::move(swapee));
}
void change_parent(conveyor_node *p) { parent = p; }
};
class event_loop;
class wait_scope;
/*
* Event class similar to capn'proto.
* https://github.com/capnproto/capnproto
*/
class event {
private:
event_loop &loop_;
event **prev_ = nullptr;
event *next_ = nullptr;
friend class event_loop;
public:
event();
event(event_loop &loop);
virtual ~event();
virtual void fire() = 0;
void arm_next();
void arm_later();
void arm_last();
void disarm();
bool is_armed() const;
};
class conveyor_storage {
protected:
conveyor_storage *parent_ = nullptr;
public:
conveyor_storage();
virtual ~conveyor_storage();
virtual size_t space() const = 0;
virtual size_t queued() const = 0;
virtual void child_has_fired() = 0;
virtual void parent_has_fired() = 0;
virtual void set_parent(conveyor_storage *parent) = 0;
conveyor_storage *get_parent() const;
};
class conveyor_event_storage : public conveyor_storage, public event {
public:
conveyor_event_storage();
virtual ~conveyor_event_storage() = default;
void set_parent(conveyor_storage *parent) override;
};
class conveyor_base {
protected:
own<conveyor_node> node_;
public:
conveyor_base(own<conveyor_node> &&node_p);
virtual ~conveyor_base() = default;
conveyor_base(conveyor_base &&) = default;
conveyor_base &operator=(conveyor_base &&) = default;
void get(error_or_value &err_or_val);
};
template <typename T> class conveyor;
template <typename T> conveyor<T> chained_conveyor_type(T *);
// template <typename T> Conveyor<T> chainedConveyorType(Conveyor<T> *);
template <typename T> T remove_error_or_type(T *);
template <typename T> T remove_error_or_type(error_or<T> *);
template <typename T>
using remove_error_or = decltype(remove_error_or_type((T *)nullptr));
template <typename T>
using chained_conveyors = decltype(chained_conveyor_type((T *)nullptr));
template <typename Func, typename T>
using conveyor_result =
chained_conveyors<remove_error_or<return_type<Func, T>>>;
struct propagate_error {
public:
error operator()(const error &err) const;
error operator()(error &&err);
};
class conveyor_sink {
private:
own<conveyor_node> node_;
public:
conveyor_sink();
conveyor_sink(own<conveyor_node> &&node);
conveyor_sink(conveyor_sink &&) = default;
conveyor_sink &operator=(conveyor_sink &&) = default;
};
template <typename T> class merge_conveyor_node_data;
template <typename T> class merge_conveyor {
private:
lent<merge_conveyor_node_data<T>> data_;
public:
merge_conveyor() = default;
merge_conveyor(lent<merge_conveyor_node_data<T>> d);
~merge_conveyor();
void attach(conveyor<T> conv);
};
/**
* Main interface for async operations.
*/
template <typename T> class conveyor final : public conveyor_base {
public:
/**
* Construct an immediately fulfilled node
*/
conveyor(fix_void<T> value);
/**
* Construct an immediately failed node
*/
conveyor(error &&err);
/**
* Construct a conveyor with a child node
*/
conveyor(own<conveyor_node> node_p);
conveyor(conveyor<T> &&) = default;
conveyor<T> &operator=(conveyor<T> &&) = default;
/**
* This method converts values or errors from children
*/
template <typename Func, typename ErrorFunc = propagate_error>
[[nodiscard]] conveyor_result<Func, T>
then(Func &&func, ErrorFunc &&error_func = propagate_error());
/**
* This method adds a buffer node in the conveyor chains which acts as a
* scheduler interrupt point and collects elements up to the supplied limit.
*/
[[nodiscard]] conveyor<T>
buffer(size_t limit = std::numeric_limits<size_t>::max());
/**
* This method just takes ownership of any supplied types,
* which are destroyed when the chain gets destroyed.
* Useful for resource lifetime control.
*/
template <typename... Args>
[[nodiscard]] conveyor<T> attach(Args &&...args);
/** @todo implement
* This method limits the total amount of passed elements
* Be careful where you place this node into the chain.
* If you meant to fork it and destroy paths you shouldn't place
* an interrupt point between the fork and this limiter
*/
[[nodiscard]] conveyor<T> limit(size_t val = 1);
/**
*
*/
[[nodiscard]] std::pair<conveyor<T>, merge_conveyor<T>> merge();
/**
* Moves the conveyor chain into a thread local storage point which drops
* every element. Use sink() if you want to control the lifetime of a
* conveyor chain
*/
template <typename ErrorFunc = propagate_error>
void detach(ErrorFunc &&err_func = propagate_error());
/**
* Creates a local sink which drops elements, but lifetime control remains
* in your hand.
*/
template <typename ErrorFunc = propagate_error>
[[nodiscard]] conveyor_sink
sink(ErrorFunc &&error_func = propagate_error());
/**
* If no sink() or detach() is used you have to take elements out of the
* chain yourself.
*/
error_or<fix_void<T>> take();
/** @todo implement
* Specifically pump elements through this chain with the provided
* wait_scope
*/
void poll(wait_scope &wait_scope);
// helper
static conveyor<T> to_conveyor(own<conveyor_node> node);
// helper
static own<conveyor_node> from_conveyor(conveyor<T> conveyor);
};
template <typename Func> conveyor_result<Func, void> exec_later(Func &&func);
/*
* Join Conveyors into a single one
*/
template <typename... Args>
conveyor<std::tuple<Args...>>
joinConveyors(std::tuple<conveyor<Args>...> &conveyors);
template <typename T> class conveyor_feeder {
public:
virtual ~conveyor_feeder() = default;
virtual void feed(T &&data) = 0;
virtual void fail(error &&error) = 0;
virtual size_t space() const = 0;
virtual size_t queued() const = 0;
virtual error swap(conveyor<T> &&conveyor) noexcept = 0;
};
template <> class conveyor_feeder<void> {
public:
virtual ~conveyor_feeder() = default;
virtual void feed(Void &&value = Void{}) = 0;
virtual void fail(error &&error) = 0;
virtual size_t space() const = 0;
virtual size_t queued() const = 0;
virtual error swap(conveyor<Void> &&conveyor) noexcept = 0;
};
template <typename T> struct conveyor_and_feeder {
own<conveyor_feeder<T>> feeder;
conveyor<T> conveyor;
};
template <typename T> conveyor_and_feeder<T> new_conveyor_and_feeder();
template <typename T> conveyor_and_feeder<T> one_time_conveyor_and_feeder();
enum class Signal : uint8_t { Terminate, User1 };
/**
* Class which acts as a correspondent between the running framework and outside
* events which may be signals from the operating system or just other threads.
* Default EventPorts are supplied by setupAsyncIo() in io.h
*/
class event_port {
public:
virtual ~event_port() = default;
virtual conveyor<void> on_signal(Signal signal) = 0;
virtual void poll() = 0;
virtual void wait() = 0;
virtual void wait(const std::chrono::steady_clock::duration &) = 0;
virtual void wait(const std::chrono::steady_clock::time_point &) = 0;
virtual void wake() = 0;
};
class sink_conveyor_node;
class conveyor_sink_set final : public event {
private:
/*
class Helper final : public Event {
private:
void destroySinkConveyorNode(ConveyorNode& sink);
void fail(Error&& error);
std::vector<Own<ConveyorNode>> sink_nodes;
std::queue<ConveyorNode*> delete_nodes;
std::function<void(Error&& error)> error_handler;
public:
ConveyorSinks() = default;
ConveyorSinks(EventLoop& event_loop);
void add(Conveyor<void> node);
void fire() override {}
};
gin::Own<Helper> helper;
*/
friend class sink_conveyor_node;
void destroy_sink_conveyor_node(conveyor_node &sink_node);
void fail(error &&err);
std::list<own<conveyor_node>> sink_nodes_;
std::queue<conveyor_node *> delete_nodes_;
std::function<void(error &&)> error_handler_;
public:
// ConveyorSinks();
// ConveyorSinks(EventLoop& event_loop);
conveyor_sink_set() = default;
conveyor_sink_set(event_loop &event_loop);
void add(conveyor<void> &&node);
void fire() override;
};
/*
* EventLoop class similar to capn'proto.
* https://github.com/capnproto/capnproto
*/
class event_loop {
private:
friend class event;
event *head_ = nullptr;
event **tail_ = &head_;
event **next_insert_point_ = &head_;
event **later_insert_point_ = &head_;
bool is_runnable_ = false;
own<event_port> event_port_ = nullptr;
own<conveyor_sink_set> daemon_sink_ = nullptr;
// functions
void set_runnable(bool runnable);
friend class wait_scope;
void enter_scope();
void leave_scope();
bool turn_loop();
bool turn();
public:
event_loop();
event_loop(own<event_port> &&port);
~event_loop();
event_loop(event_loop &&) = default;
event_loop &operator=(event_loop &&) = default;
bool wait();
bool wait(const std::chrono::steady_clock::duration &);
bool wait(const std::chrono::steady_clock::time_point &);
bool poll();
event_port *event_port();
conveyor_sink_set &daemon();
};
/*
* WaitScope class similar to capn'proto.
* https://github.com/capnproto/capnproto
*/
class wait_scope {
private:
event_loop &loop_;
public:
wait_scope(event_loop &loop);
~wait_scope();
void wait();
void wait(const std::chrono::steady_clock::duration &);
void wait(const std::chrono::steady_clock::time_point &);
void poll();
};
template <typename Func> conveyor_result<Func, void> yieldNext(Func &&func);
template <typename Func> conveyor_result<Func, void> yieldLater(Func &&func);
template <typename Func> conveyor_result<Func, void> yieldLast(Func &&func);
} // namespace saw
// Secret stuff
// Aka private semi hidden classes
namespace saw {
template <typename Out, typename In> struct fix_void_caller {
template <typename Func> static Out apply(Func &func, In &&in) {
return func(std::move(in));
}
};
template <typename Out> struct fix_void_caller<Out, Void> {
template <typename Func> static Out apply(Func &func, Void &&in) {
(void)in;
return func();
}
};
template <typename In> struct fix_void_caller<Void, In> {
template <typename Func> static Void apply(Func &func, In &&in) {
func(std::move(in));
return Void{};
}
};
template <> struct fix_void_caller<Void, Void> {
template <typename Func> static Void apply(Func &func, Void &&in) {
(void)in;
func();
return Void{};
}
};
template <typename T> class adapt_conveyor_node;
template <typename T>
class adapt_conveyor_feeder final : public conveyor_feeder<unfix_void<T>> {
private:
adapt_conveyor_node<T> *feedee_ = nullptr;
public:
~adapt_conveyor_feeder();
void set_feedee(adapt_conveyor_node<T> *feedee);
void feed(T &&value) override;
void fail(error &&error) override;
size_t space() const override;
size_t queued() const override;
error swap(conveyor<T> &&conv) noexcept override;
};
template <typename T>
class adapt_conveyor_node final : public conveyor_node,
public conveyor_event_storage {
private:
adapt_conveyor_feeder<T> *feeder_ = nullptr;
std::queue<error_or<unfix_void<T>>> storage_;
conveyor_node_with_parent_mixin parent_node_;
public:
adapt_conveyor_node();
~adapt_conveyor_node();
void set_feeder(adapt_conveyor_feeder<T> *feeder);
void feed(T &&value);
void fail(error &&error);
// ConveyorNode
void get_result(error_or_value &err_or_val) override;
error_or<own<conveyor_node>>
swap_child(own<conveyor_node> &&swapee) noexcept override;
conveyor_storage *next_storage() noexcept override;
void notify_parent_attached(conveyor_node &) noexcept override;
// ConveyorStorage
size_t space() const override;
size_t queued() const override;
void child_has_fired() override;
void parent_has_fired() override;
// Event
void fire() override;
};
template <typename T> class one_time_conveyor_node;
template <typename T>
class one_time_conveyor_feeder final : public conveyor_feeder<unfix_void<T>> {
private:
one_time_conveyor_node<T> *feedee_ = nullptr;
public:
~one_time_conveyor_feeder();
void set_feedee(one_time_conveyor_node<T> *feedee);
void feed(T &&value) override;
void fail(error &&error) override;
size_t space() const override;
size_t queued() const override;
};
template <typename T>
class one_time_conveyor_node final : public conveyor_node,
public conveyor_storage,
public event {
private:
one_time_conveyor_feeder<T> *feeder_ = nullptr;
bool passed_ = false;
maybe<error_or<T>> storage_ = std::nullopt;
public:
~one_time_conveyor_node();
void set_feeder(one_time_conveyor_feeder<T> *feeder);
void feed(T &&value);
void fail(error &&error);
// ConveyorNode
void get_result(error_or_value &err_or_val) override;
error_or<own<conveyor_node>>
swap_child(own<conveyor_node> &&swapee) override;
// ConveyorStorage
size_t space() const override;
size_t queued() const override;
void child_has_fired() override {}
void parent_has_fired() override;
// Event
void fire() override;
};
/**
* This class buffers and saves incoming elements and acts as an interrupt node
* for processing calls
*/
class queue_buffer_conveyor_node_base : public conveyor_node,
public conveyor_event_storage {
protected:
conveyor_node_with_child_mixin child_mixin_;
public:
queue_buffer_conveyor_node_base(own<conveyor_node> child_)
: conveyor_event_storage{}, child_mixin_{std::move(child_), *this} {}
virtual ~queue_buffer_conveyor_node_base() = default;
/**
* Use mixin
*/
error_or<own<conveyor_node>>
swap_child(own<conveyor_node> &&swapee_) noexcept override;
conveyor_storage *next_storage() noexcept override {
return static_cast<conveyor_storage *>(this);
}
};
template <typename T>
class queue_buffer_conveyor_node final
: public queue_buffer_conveyor_node_base {
private:
std::queue<error_or<T>> storage_;
size_t max_store_;
public:
queue_buffer_conveyor_node(own<conveyor_node> dep, size_t max_size)
: queue_buffer_conveyor_node_base{std::move(dep)}, max_store_{
max_size} {}
// Event
void fire() override;
// ConveyorNode
void get_result(error_or_value &eov) noexcept override;
// ConveyorStorage
size_t space() const override;
size_t queued() const override;
void child_has_fired() override;
void parent_has_fired() override;
};
class attach_conveyor_node_base : public conveyor_node {
protected:
conveyor_node_with_child_mixin child_mixin_;
public:
attach_conveyor_node_base(own<conveyor_node> &&child_)
: child_mixin_{std::move(child_), *this} {}
virtual ~attach_conveyor_node_base() = default;
void get_result(error_or_value &err_or_val) noexcept override;
/**
* Use mixin
*/
error_or<own<conveyor_node>>
swap_child(own<conveyor_node> &&swapee_) noexcept override;
conveyor_storage *next_storage() noexcept override;
};
template <typename... Args>
class attach_conveyor_node final : public attach_conveyor_node_base {
public:
attach_conveyor_node(own<conveyor_node> &&dep, Args &&...args)
: attach_conveyor_node_base(std::move(dep)), attached_data_{
std::move(args...)} {}
private:
std::tuple<Args...> attached_data_;
};
class convert_conveyor_node_base : public conveyor_node {
public:
convert_conveyor_node_base(own<conveyor_node> &&dep);
virtual ~convert_conveyor_node_base() = default;
void get_result(error_or_value &err_or_val) override;
virtual void get_impl(error_or_value &err_or_val) = 0;
error_or<own<conveyor_node>>
swap_child(own<conveyor_node> &&swapee) noexcept override;
conveyor_storage *next_storage() noexcept override;
protected:
conveyor_node_with_child_mixin child_mixin_;
};
template <typename T, typename DepT, typename Func, typename ErrorFunc>
class convert_conveyor_node final : public convert_conveyor_node_base {
private:
Func func_;
ErrorFunc error_func_;
static_assert(std::is_same<DepT, remove_error_or<DepT>>::value,
"Should never be of type ErrorOr");
public:
convert_conveyor_node(own<conveyor_node> &&dep, Func &&func,
ErrorFunc &&error_func)
: convert_conveyor_node_base(std::move(dep)), func_{std::move(func)},
error_func_{std::move(error_func)} {}
void get_impl(error_or_value &err_or_val) noexcept override {
error_or<unfix_void<DepT>> dep_eov;
error_or<unfix_void<remove_error_or<T>>> &eov =
err_or_val.as<unfix_void<remove_error_or<T>>>();
if (child_mixin_.child) {
child_mixin_.child->get_result(dep_eov);
if (dep_eov.is_value()) {
try {
eov = fix_void_caller<T, DepT>::apply(
func_, std::move(dep_eov.value()));
} catch (const std::bad_alloc &) {
eov = critical_error("Out of memory");
} catch (const std::exception &) {
eov = critical_error(
"Exception in chain occured. Return ErrorOr<T> if you "
"want to handle errors which are recoverable");
}
} else if (dep_eov.is_error()) {
eov = error_func_(std::move(dep_eov.error()));
} else {
eov = critical_error("No value set in dependency");
}
} else {
eov = critical_error("Conveyor doesn't have child");
}
}
};
class sink_conveyor_node final : public conveyor_node,
public conveyor_event_storage {
private:
conveyor_node_with_child_mixin child_mixin_;
conveyor_sink_set *conveyor_sink_;
public:
sink_conveyor_node(own<conveyor_node> node, conveyor_sink_set &conv_sink)
: conveyor_event_storage{}, child_mixin_{std::move(node), *this},
conveyor_sink_{&conv_sink} {}
sink_conveyor_node(own<conveyor_node> node)
: conveyor_event_storage{}, child_mixin_{std::move(node), *this},
conveyor_sink_{nullptr} {}
// Event only queued if a critical error occured
void fire() override {
// Queued for destruction of children, because this acts as a sink and
// no other event should be here
child_mixin_.child = nullptr;
if (conveyor_sink_) {
conveyor_sink_->destroy_sink_conveyor_node(*this);
conveyor_sink_ = nullptr;
}
}
// ConveyorStorage
size_t space() const override { return 1; }
size_t queued() const override { return 0; }
// ConveyorNode
void get_result(error_or_value &err_or_val) noexcept override {
err_or_val.as<Void>() =
critical_error("In a sink node no result can be returned");
}
error_or<own<conveyor_node>>
swap_child(own<conveyor_node> &&swapee) noexcept override {
return child_mixin_.swap_child(std::move(swapee));
}
// ConveyorStorage
void child_has_fired() override {
if (child_mixin_.child) {
error_or<void> dep_eov;
child_mixin_.child->get_result(dep_eov);
if (dep_eov.is_error()) {
if (dep_eov.error().is_critical()) {
if (!is_armed()) {
arm_last();
}
}
if (conveyor_sink_) {
conveyor_sink_->fail(std::move(dep_eov.error()));
}
}
}
}
/*
* No parent needs to be fired since we always have space
*/
void parent_has_fired() override {}
conveyor_storage *next_storage() override {
// Should never happen though
assert(false);
return nullptr;
// return static_cast<ConveyorStorage*>(this);
}
};
class immediate_conveyor_node_base : public conveyor_node,
public conveyor_event_storage {
private:
public:
immediate_conveyor_node_base();
virtual ~immediate_conveyor_node_base() = default;
error_or<own<conveyor_node>>
swap_child(own<conveyor_node> &&swapee) noexcept override {
(void)swapee;
return recoverable_error("Node doesn't support swapping");
}
conveyor_storage *next_storage() noexcept override {
return static_cast<conveyor_storage *>(this);
}
};
template <typename T>
class immediate_conveyor_node final : public immediate_conveyor_node_base {
private:
error_or<fix_void<T>> value_;
uint8_t retrieved_;
public:
immediate_conveyor_node(fix_void<T> &&val);
immediate_conveyor_node(error &&error);
// ConveyorStorage
size_t space() const override;
size_t queued() const override;
void child_has_fired() override;
void parent_has_fired() override;
// ConveyorNode
void get_result(error_or_value &err_or_val) noexcept override {
if (retrieved_ > 0) {
err_or_val.as<fix_void<T>>() =
make_error("Already taken value", error::code::Exhausted);
} else {
err_or_val.as<fix_void<T>>() = std::move(value_);
}
if (queued() > 0) {
++retrieved_;
}
}
// Event
void fire() override;
};
/*
* Collects every incoming value and throws it in one lane
*/
class merge_conveyor_node_base : public conveyor_node,
public conveyor_event_storage {
public:
merge_conveyor_node_base();
virtual ~merge_conveyor_node_base() = default;
conveyor_storage *next_storage() noexcept override {
return static_cast<conveyor_storage *>(this);
}
};
template <typename T>
class merge_conveyor_node : public merge_conveyor_node_base {
private:
class appendage final : public conveyor_node, public conveyor_storage {
public:
own<conveyor_node> child;
merge_conveyor_node *merger;
maybe<error_or<fix_void<T>>> error_or_value_;
public:
appendage(own<conveyor_node> n, merge_conveyor_node &m)
: conveyor_storage{}, child{std::move(n)}, merger{&m},
error_or_value_{std::nullopt} {}
bool child_storage_has_element_queued() const {
if (!child) {
return false;
}
conveyor_storage *storage = child->next_storage();
if (storage) {
return storage->queued() > 0;
}
return false;
}
void get_appendage_result(error_or_value &eov);
/**
* ConveyorNode
*/
error_or<own<conveyor_node>>
swap_child(own<conveyor_node> &&swapee_) override;
conveyor_storage *next_storage() noexcept override {
return static_cast<conveyor_storage *>(this);
}
void get_result(error_or_value &err_or_val) override;
/**
* ConveyorStorage
*/
size_t space() const override;
size_t queued() const override;
void child_has_fired() override;
void parent_has_fired() override;
void set_parent(conveyor_storage *par) override;
};
friend class merge_conveyor_node_data<T>;
friend class appendage;
our<merge_conveyor_node_data<T>> data_;
size_t next_appendage_ = 0;
public:
merge_conveyor_node(our<merge_conveyor_node_data<T>> data);
~merge_conveyor_node();
// ConveyorNode
error_or<own<conveyor_node>>
swap_child(own<conveyor_node> &&c) noexcept override;
// Event
void get_result(error_or_value &err_or_val) noexcept override;
void fire() override;
// ConveyorStorage
size_t space() const override;
size_t queued() const override;
void child_has_fired() override;
void parent_has_fired() override;
};
template <typename T> class merge_conveyor_node_data {
public:
std::vector<own<typename merge_conveyor_node<T>::appendage>> appendages;
merge_conveyor_node<T> *merger = nullptr;
public:
void attach(conveyor<T> conv);
void governing_node_destroyed();
};
/*
class JoinConveyorNodeBase : public ConveyorNode, public ConveyorEventStorage {
private:
public:
};
template <typename... Args>
class JoinConveyorNode final : public JoinConveyorNodeBase {
private:
template<typename T>
class Appendage : public ConveyorEventStorage {
private:
Maybe<T> data = std::nullopt;
public:
size_t space() const override;
size_t queued() const override;
void fire() override;
void get_result(ErrorOrValue& eov) override;
};
std::tuple<Appendage<Args>...> appendages;
public:
};
*/
} // namespace saw
#include "async.tmpl.h"