forstio/source/forstio/async.h

884 lines
20 KiB
C
Raw Permalink Normal View History

#pragma once
2020-10-29 02:35:43 +01:00
#include "common.h"
#include "error.h"
#include "timer.h"
2020-08-28 16:59:25 +02:00
#include <functional>
#include <limits>
2020-08-20 00:36:21 +02:00
#include <list>
#include <queue>
2020-10-29 02:36:01 +01:00
#include <type_traits>
2021-12-29 19:26:22 +01:00
namespace saw {
2020-08-06 19:28:33 +02:00
class ConveyorNode {
public:
ConveyorNode();
virtual ~ConveyorNode() = default;
2020-08-20 00:36:21 +02:00
virtual void getResult(ErrorOrValue &err_or_val) = 0;
2020-08-06 19:28:33 +02:00
};
2020-08-06 02:19:05 +02:00
class EventLoop;
2020-11-17 20:07:14 +01:00
/*
2021-04-25 15:44:29 +02:00
* Event class similar to capn'proto.
* https://github.com/capnproto/capnproto
*/
class Event {
private:
EventLoop &loop;
Event **prev = nullptr;
Event *next = nullptr;
friend class EventLoop;
public:
Event();
Event(EventLoop &loop);
virtual ~Event();
virtual void fire() = 0;
void armNext();
void armLater();
void armLast();
void disarm();
bool isArmed() const;
};
class ConveyorStorage {
2020-08-20 00:36:21 +02:00
protected:
ConveyorStorage *parent = nullptr;
ConveyorStorage *child_storage = nullptr;
2020-08-20 00:36:21 +02:00
public:
ConveyorStorage(ConveyorStorage *child);
2021-10-02 14:34:49 +02:00
virtual ~ConveyorStorage();
virtual size_t space() const = 0;
virtual size_t queued() const = 0;
virtual void childHasFired() = 0;
virtual void parentHasFired() = 0;
2020-08-20 00:36:21 +02:00
virtual void setParent(ConveyorStorage *parent) = 0;
void unlinkChild();
};
class ConveyorEventStorage : public ConveyorStorage, public Event {
public:
ConveyorEventStorage(ConveyorStorage *child);
virtual ~ConveyorEventStorage() = default;
void setParent(ConveyorStorage *parent) override;
};
2020-08-18 19:59:59 +02:00
class ConveyorBase {
protected:
Own<ConveyorNode> node;
2020-08-20 00:36:21 +02:00
ConveyorStorage *storage;
2020-08-18 19:59:59 +02:00
public:
2020-08-20 00:36:21 +02:00
ConveyorBase(Own<ConveyorNode> &&node_p,
ConveyorStorage *storage_p = nullptr);
2020-08-18 19:59:59 +02:00
virtual ~ConveyorBase() = default;
2020-08-18 22:38:36 +02:00
ConveyorBase(ConveyorBase &&) = default;
ConveyorBase &operator=(ConveyorBase &&) = default;
2020-09-19 21:12:16 +02:00
2020-08-20 00:36:21 +02:00
void get(ErrorOrValue &err_or_val);
2020-08-18 19:59:59 +02:00
};
2020-08-09 02:04:48 +02:00
template <typename T> class Conveyor;
2020-08-09 02:03:09 +02:00
2020-08-09 02:04:48 +02:00
template <typename T> Conveyor<T> chainedConveyorType(T *);
2020-08-09 02:03:09 +02:00
2021-06-18 17:32:31 +02:00
// template <typename T> Conveyor<T> chainedConveyorType(Conveyor<T> *);
2020-08-09 02:03:09 +02:00
2021-06-18 17:32:31 +02:00
template <typename T> T removeErrorOrType(T *);
2021-06-18 17:32:31 +02:00
template <typename T> T removeErrorOrType(ErrorOr<T> *);
template <typename T>
2021-06-18 17:32:31 +02:00
using RemoveErrorOr = decltype(removeErrorOrType((T *)nullptr));
2020-08-09 02:03:09 +02:00
template <typename T>
2020-08-09 02:04:48 +02:00
using ChainedConveyors = decltype(chainedConveyorType((T *)nullptr));
2020-08-09 02:03:09 +02:00
template <typename Func, typename T>
2021-06-18 17:32:31 +02:00
using ConveyorResult = ChainedConveyors<RemoveErrorOr<ReturnType<Func, T>>>;
2020-08-09 02:03:09 +02:00
2020-08-09 02:32:21 +02:00
struct PropagateError {
public:
2020-08-25 20:52:32 +02:00
Error operator()(const Error &error) const;
Error operator()(Error &&error);
2020-08-09 02:32:21 +02:00
};
2020-10-29 02:35:43 +01:00
class SinkConveyor {
private:
Own<ConveyorNode> node;
2020-10-29 02:36:01 +01:00
2020-10-29 02:35:43 +01:00
public:
2021-05-01 15:26:45 +02:00
SinkConveyor();
2020-10-29 02:36:01 +01:00
SinkConveyor(Own<ConveyorNode> &&node);
2020-10-29 02:35:43 +01:00
2020-10-29 02:36:01 +01:00
SinkConveyor(SinkConveyor &&) = default;
SinkConveyor &operator=(SinkConveyor &&) = default;
2020-10-29 02:35:43 +01:00
};
template <typename T> class MergeConveyorNodeData;
template <typename T> class MergeConveyor {
private:
2021-10-02 14:34:49 +02:00
Lent<MergeConveyorNodeData<T>> data;
public:
2021-10-02 14:34:49 +02:00
MergeConveyor(Lent<MergeConveyorNodeData<T>> d);
~MergeConveyor();
void attach(Conveyor<T> conveyor);
};
2020-12-04 22:13:31 +01:00
/**
* Main interface for async operations.
*/
2021-04-25 15:44:29 +02:00
template <typename T> class Conveyor final : public ConveyorBase {
public:
2020-12-04 22:13:31 +01:00
/**
2021-04-20 15:26:30 +02:00
* Construct an immediately fulfilled node
*/
2020-09-29 02:30:26 +02:00
Conveyor(FixVoid<T> value);
2020-12-04 22:13:31 +01:00
2021-04-20 15:26:30 +02:00
/**
* Construct an immediately failed node
*/
Conveyor(Error &&error);
2020-12-04 22:13:31 +01:00
/**
* Construct a conveyor with a child node and the next storage point
*/
Conveyor(Own<ConveyorNode> node_p, ConveyorStorage *storage_p);
2020-08-18 19:59:59 +02:00
Conveyor(Conveyor<T> &&) = default;
Conveyor<T> &operator=(Conveyor<T> &&) = default;
2020-09-19 21:12:16 +02:00
2020-12-04 22:13:31 +01:00
/**
* This method converts values or errors from children
2020-08-20 00:36:21 +02:00
*/
template <typename Func, typename ErrorFunc = PropagateError>
2021-10-02 14:34:49 +02:00
[[nodiscard]] ConveyorResult<Func, T>
then(Func &&func, ErrorFunc &&error_func = PropagateError());
2020-08-17 20:19:42 +02:00
2020-12-04 22:13:31 +01:00
/**
2021-03-12 02:30:11 +01:00
* This method adds a buffer node in the conveyor chains which acts as a
* scheduler interrupt point and collects elements up to the supplied limit.
2020-08-20 00:36:21 +02:00
*/
2021-10-02 14:34:49 +02:00
[[nodiscard]] Conveyor<T>
buffer(size_t limit = std::numeric_limits<size_t>::max());
2020-08-20 00:36:21 +02:00
2020-12-04 22:13:31 +01:00
/**
2021-03-12 02:30:11 +01:00
* This method just takes ownership of any supplied types,
* which are destroyed when the chain gets destroyed.
2021-04-10 20:11:01 +02:00
* Useful for resource lifetime control.
2020-08-24 12:47:18 +02:00
*/
2021-10-02 14:34:49 +02:00
template <typename... Args>
[[nodiscard]] Conveyor<T> attach(Args &&...args);
2020-08-23 15:47:14 +02:00
2020-12-04 22:13:31 +01:00
/** @todo implement
* This method limits the total amount of passed elements
* Be careful where you place this node into the chain.
2021-04-10 20:11:01 +02:00
* If you meant to fork it and destroy paths you shouldn't place
* an interrupt point between the fork and this limiter
*/
2021-10-02 14:34:49 +02:00
[[nodiscard]] Conveyor<T> limit(size_t val = 1);
2021-10-07 18:09:05 +02:00
/**
*
*/
[[nodiscard]] std::pair<Conveyor<T>, MergeConveyor<T>> merge();
2020-12-04 22:13:31 +01:00
/**
* Moves the conveyor chain into a thread local storage point which drops
2021-03-12 02:30:11 +01:00
* every element. Use sink() if you want to control the lifetime of a
* conveyor chain
2020-08-28 16:59:25 +02:00
*/
2021-03-12 02:30:11 +01:00
template <typename ErrorFunc = PropagateError>
2020-12-04 22:13:31 +01:00
void detach(ErrorFunc &&err_func = PropagateError());
/**
* Creates a local sink which drops elements, but lifetime control remains
* in your hand.
2020-10-29 02:36:01 +01:00
*/
2021-03-12 02:30:11 +01:00
template <typename ErrorFunc = PropagateError>
2021-10-02 14:34:49 +02:00
[[nodiscard]] SinkConveyor sink(ErrorFunc &&error_func = PropagateError());
2020-10-29 02:35:43 +01:00
2020-12-04 22:13:31 +01:00
/**
* If no sink() or detach() is used you have to take elements out of the
* chain yourself.
*/
2020-08-25 20:52:32 +02:00
ErrorOr<FixVoid<T>> take();
2020-08-17 20:19:42 +02:00
2020-12-04 22:13:31 +01:00
/** @todo implement
* Specifically pump elements through this chain
*/
2020-08-20 00:36:21 +02:00
void poll();
// helper
2021-10-02 14:34:49 +02:00
static Conveyor<T> toConveyor(Own<ConveyorNode> node,
2020-08-20 00:36:21 +02:00
ConveyorStorage *is_storage = nullptr);
// helper
2020-08-28 16:59:25 +02:00
static std::pair<Own<ConveyorNode>, ConveyorStorage *>
2021-10-02 14:34:49 +02:00
fromConveyor(Conveyor<T> conveyor);
};
template <typename Func> ConveyorResult<Func, void> execLater(Func &&func);
2021-04-23 14:40:06 +02:00
/*
* Join Conveyors into a single one
*/
template <typename... Args>
Conveyor<std::tuple<Args...>>
joinConveyors(std::tuple<Conveyor<Args>...> &conveyors);
2021-04-23 14:40:06 +02:00
template <typename T> class ConveyorFeeder {
public:
virtual ~ConveyorFeeder() = default;
virtual void feed(T &&data) = 0;
virtual void fail(Error &&error) = 0;
virtual size_t space() const = 0;
virtual size_t queued() const = 0;
};
2020-08-25 20:52:32 +02:00
template <> class ConveyorFeeder<void> {
public:
virtual ~ConveyorFeeder() = default;
2020-08-28 16:59:25 +02:00
virtual void feed(Void &&value = Void{}) = 0;
virtual void fail(Error &&error) = 0;
2020-08-25 20:52:32 +02:00
virtual size_t space() const = 0;
2020-08-28 16:59:25 +02:00
virtual size_t queued() const = 0;
2020-08-25 20:52:32 +02:00
};
template <typename T> struct ConveyorAndFeeder {
Own<ConveyorFeeder<T>> feeder;
Conveyor<T> conveyor;
};
2020-08-18 16:40:49 +02:00
template <typename T> ConveyorAndFeeder<T> newConveyorAndFeeder();
2020-08-24 12:47:18 +02:00
template <typename T> ConveyorAndFeeder<T> oneTimeConveyorAndFeeder();
2020-09-23 21:25:59 +02:00
enum class Signal : uint8_t { Terminate, User1 };
2020-08-25 19:51:45 +02:00
2020-12-04 22:13:31 +01:00
/**
* Class which acts as a correspondent between the running framework and outside
* events which may be signals from the operating system or just other threads.
* Default EventPorts are supplied by setupAsyncIo() in io.h
*/
2020-08-25 19:51:45 +02:00
class EventPort {
public:
virtual ~EventPort() = default;
2020-08-25 20:52:32 +02:00
virtual Conveyor<void> onSignal(Signal signal) = 0;
virtual void poll() = 0;
2020-09-02 16:50:42 +02:00
virtual void wait() = 0;
2020-10-11 22:21:06 +02:00
virtual void wait(const std::chrono::steady_clock::duration &) = 0;
virtual void wait(const std::chrono::steady_clock::time_point &) = 0;
2020-11-17 20:22:49 +01:00
virtual void wake() = 0;
2020-08-25 19:51:45 +02:00
};
class SinkConveyorNode;
2021-04-25 15:44:29 +02:00
class ConveyorSinks final : public Event {
2020-08-25 19:51:45 +02:00
private:
/*
class Helper final : public Event {
private:
void destroySinkConveyorNode(ConveyorNode& sink);
void fail(Error&& error);
std::vector<Own<ConveyorNode>> sink_nodes;
std::queue<ConveyorNode*> delete_nodes;
std::function<void(Error&& error)> error_handler;
public:
ConveyorSinks() = default;
ConveyorSinks(EventLoop& event_loop);
void add(Conveyor<void> node);
void fire() override {}
};
gin::Own<Helper> helper;
*/
friend class SinkConveyorNode;
2020-08-28 16:59:25 +02:00
void destroySinkConveyorNode(ConveyorNode &sink_node);
void fail(Error &&error);
std::list<Own<ConveyorNode>> sink_nodes;
2020-08-28 16:59:25 +02:00
std::queue<ConveyorNode *> delete_nodes;
std::function<void(Error &&error)> error_handler;
2020-08-25 19:51:45 +02:00
public:
// ConveyorSinks();
// ConveyorSinks(EventLoop& event_loop);
2020-10-29 02:35:43 +01:00
ConveyorSinks() = default;
ConveyorSinks(EventLoop &event_loop);
2020-08-28 16:59:25 +02:00
void add(Conveyor<void> &&node);
void fire() override;
2020-08-25 19:51:45 +02:00
};
2020-11-17 20:07:14 +01:00
/*
2021-04-25 15:44:29 +02:00
* EventLoop class similar to capn'proto.
* https://github.com/capnproto/capnproto
*/
class EventLoop {
private:
friend class Event;
Event *head = nullptr;
Event **tail = &head;
Event **next_insert_point = &head;
Event **later_insert_point = &head;
bool is_runnable = false;
2020-08-25 19:51:45 +02:00
Own<EventPort> event_port = nullptr;
2020-10-29 02:35:43 +01:00
Own<ConveyorSinks> daemon_sink = nullptr;
2020-08-25 19:51:45 +02:00
// functions
void setRunnable(bool runnable);
friend class WaitScope;
void enterScope();
void leaveScope();
2020-11-03 13:27:00 +01:00
bool turnLoop();
2020-08-20 00:50:36 +02:00
bool turn();
2020-08-20 00:52:51 +02:00
public:
EventLoop();
2020-08-28 16:59:25 +02:00
EventLoop(Own<EventPort> &&port);
~EventLoop();
EventLoop(EventLoop &&) = default;
EventLoop &operator=(EventLoop &&) = default;
bool wait();
bool wait(const std::chrono::steady_clock::duration &);
bool wait(const std::chrono::steady_clock::time_point &);
bool poll();
2020-08-25 19:51:45 +02:00
2020-08-28 16:59:25 +02:00
EventPort *eventPort();
2020-10-29 02:35:43 +01:00
ConveyorSinks &daemon();
};
2020-11-17 20:07:14 +01:00
/*
2021-04-25 15:44:29 +02:00
* WaitScope class similar to capn'proto.
* https://github.com/capnproto/capnproto
*/
class WaitScope {
private:
EventLoop &loop;
2020-08-20 00:36:21 +02:00
public:
WaitScope(EventLoop &loop);
~WaitScope();
2020-08-09 02:03:09 +02:00
void wait();
void wait(const std::chrono::steady_clock::duration &);
void wait(const std::chrono::steady_clock::time_point &);
void poll();
};
2020-09-28 19:36:35 +02:00
template <typename Func> ConveyorResult<Func, void> yieldNext(Func &&func);
2020-09-28 19:36:35 +02:00
template <typename Func> ConveyorResult<Func, void> yieldLater(Func &&func);
2020-09-28 19:36:35 +02:00
template <typename Func> ConveyorResult<Func, void> yieldLast(Func &&func);
2021-12-29 19:26:22 +01:00
} // namespace saw
2020-08-09 02:03:09 +02:00
// Secret stuff
// Aka private semi hidden classes
2021-12-29 19:26:22 +01:00
namespace saw {
2020-08-28 16:59:25 +02:00
template <typename Out, typename In> struct FixVoidCaller {
template <typename Func> static Out apply(Func &func, In &&in) {
2020-08-25 19:51:45 +02:00
return func(std::move(in));
}
};
2020-08-28 16:59:25 +02:00
template <typename Out> struct FixVoidCaller<Out, Void> {
template <typename Func> static Out apply(Func &func, Void &&in) {
2020-08-25 20:52:32 +02:00
(void)in;
2020-08-25 19:51:45 +02:00
return func();
}
};
2020-08-28 16:59:25 +02:00
template <typename In> struct FixVoidCaller<Void, In> {
template <typename Func> static Void apply(Func &func, In &&in) {
2020-08-25 19:51:45 +02:00
func(std::move(in));
return Void{};
}
};
2020-08-28 16:59:25 +02:00
template <> struct FixVoidCaller<Void, Void> {
template <typename Func> static Void apply(Func &func, Void &&in) {
2020-08-25 20:52:32 +02:00
(void)in;
2020-08-25 19:51:45 +02:00
func();
return Void{};
}
};
template <typename T> class AdaptConveyorNode;
2020-08-28 16:59:25 +02:00
template <typename T>
2021-04-25 15:44:29 +02:00
class AdaptConveyorFeeder final : public ConveyorFeeder<UnfixVoid<T>> {
private:
AdaptConveyorNode<T> *feedee = nullptr;
2020-08-20 00:36:21 +02:00
public:
~AdaptConveyorFeeder();
void setFeedee(AdaptConveyorNode<T> *feedee);
void feed(T &&value) override;
void fail(Error &&error) override;
size_t space() const override;
size_t queued() const override;
};
template <typename T>
class AdaptConveyorNode final : public ConveyorNode,
public ConveyorEventStorage {
private:
AdaptConveyorFeeder<T> *feeder = nullptr;
std::queue<ErrorOr<UnfixVoid<T>>> storage;
2020-08-20 00:36:21 +02:00
public:
AdaptConveyorNode();
~AdaptConveyorNode();
void setFeeder(AdaptConveyorFeeder<T> *feeder);
void feed(T &&value);
void fail(Error &&error);
2020-08-20 00:36:21 +02:00
// ConveyorNode
void getResult(ErrorOrValue &err_or_val) override;
// ConveyorStorage
size_t space() const override;
size_t queued() const override;
2020-08-18 22:38:36 +02:00
void childHasFired() override;
void parentHasFired() override;
2020-08-20 00:36:21 +02:00
// Event
void fire() override;
};
2020-08-09 01:35:15 +02:00
2020-08-24 12:47:18 +02:00
template <typename T> class OneTimeConveyorNode;
2020-08-28 16:59:25 +02:00
template <typename T>
2021-04-25 15:44:29 +02:00
class OneTimeConveyorFeeder final : public ConveyorFeeder<UnfixVoid<T>> {
2020-08-24 12:47:18 +02:00
private:
OneTimeConveyorNode<T> *feedee = nullptr;
public:
~OneTimeConveyorFeeder();
void setFeedee(OneTimeConveyorNode<T> *feedee);
void feed(T &&value) override;
void fail(Error &&error) override;
size_t space() const override;
2020-09-29 02:30:26 +02:00
size_t queued() const override;
2020-08-24 12:47:18 +02:00
};
template <typename T>
class OneTimeConveyorNode final : public ConveyorNode,
public ConveyorStorage,
public Event {
protected:
Own<ConveyorNode> child;
2020-08-24 12:47:18 +02:00
private:
OneTimeConveyorFeeder<T> *feeder = nullptr;
bool passed = false;
Maybe<ErrorOr<T>> storage = std::nullopt;
2020-08-28 16:59:25 +02:00
public:
~OneTimeConveyorNode();
2020-08-24 12:47:18 +02:00
void setFeeder(OneTimeConveyorFeeder<T> *feeder);
2020-09-29 02:30:26 +02:00
void feed(T &&value);
void fail(Error &&error);
2020-08-24 12:47:18 +02:00
// ConveyorNode
void getResult(ErrorOrValue &err_or_val) override;
// ConveyorStorage
size_t space() const override;
size_t queued() const override;
void childHasFired() override {}
void parentHasFired() override;
2020-08-24 12:47:18 +02:00
// Event
void fire() override;
};
class QueueBufferConveyorNodeBase : public ConveyorNode,
public ConveyorEventStorage {
protected:
Own<ConveyorNode> child;
public:
QueueBufferConveyorNodeBase(ConveyorStorage *child_store,
Own<ConveyorNode> dep)
: ConveyorEventStorage{child_store}, child(std::move(dep)) {}
virtual ~QueueBufferConveyorNodeBase() = default;
};
2020-08-20 00:36:21 +02:00
template <typename T>
2021-04-25 15:44:29 +02:00
class QueueBufferConveyorNode final : public QueueBufferConveyorNodeBase {
private:
2020-08-20 00:36:21 +02:00
std::queue<ErrorOr<T>> storage;
size_t max_store;
public:
QueueBufferConveyorNode(ConveyorStorage *child_store, Own<ConveyorNode> dep,
size_t max_size)
: QueueBufferConveyorNodeBase{child_store, std::move(dep)},
max_store{max_size} {}
2020-08-20 00:36:21 +02:00
// Event
void fire() override;
2020-08-20 00:36:21 +02:00
// ConveyorNode
void getResult(ErrorOrValue &eov) noexcept override;
2020-08-20 00:36:21 +02:00
// ConveyorStorage
size_t space() const override;
size_t queued() const override;
void childHasFired() override;
void parentHasFired() override;
};
2020-08-09 02:03:09 +02:00
2020-08-23 15:47:14 +02:00
class AttachConveyorNodeBase : public ConveyorNode {
protected:
Own<ConveyorNode> child;
2020-08-23 15:47:14 +02:00
public:
AttachConveyorNodeBase(Own<ConveyorNode> &&dep) : child(std::move(dep)) {}
2020-08-23 15:47:14 +02:00
2021-04-25 15:44:29 +02:00
virtual ~AttachConveyorNodeBase() = default;
void getResult(ErrorOrValue &err_or_val) noexcept override;
2020-08-23 15:47:14 +02:00
};
2020-08-24 12:47:18 +02:00
template <typename... Args>
2021-04-25 15:44:29 +02:00
class AttachConveyorNode final : public AttachConveyorNodeBase {
2020-08-23 15:47:14 +02:00
private:
std::tuple<Args...> attached_data;
2020-08-24 12:47:18 +02:00
2020-08-23 15:47:14 +02:00
public:
2020-12-04 22:13:31 +01:00
AttachConveyorNode(Own<ConveyorNode> &&dep, Args &&...args)
2020-08-24 12:47:18 +02:00
: AttachConveyorNodeBase(std::move(dep)), attached_data{
std::move(args...)} {}
2020-08-23 15:47:14 +02:00
};
2020-08-20 00:36:21 +02:00
class ConvertConveyorNodeBase : public ConveyorNode {
protected:
Own<ConveyorNode> child;
2020-08-20 00:36:21 +02:00
public:
ConvertConveyorNodeBase(Own<ConveyorNode> &&dep);
2021-04-25 15:44:29 +02:00
virtual ~ConvertConveyorNodeBase() = default;
2020-08-20 00:36:21 +02:00
void getResult(ErrorOrValue &err_or_val) override;
virtual void getImpl(ErrorOrValue &err_or_val) = 0;
};
template <typename T, typename DepT, typename Func, typename ErrorFunc>
2021-04-25 15:44:29 +02:00
class ConvertConveyorNode final : public ConvertConveyorNodeBase {
private:
2020-08-20 00:36:21 +02:00
Func func;
ErrorFunc error_func;
2021-06-18 17:32:31 +02:00
static_assert(std::is_same<DepT, RemoveErrorOr<DepT>>::value,
"Should never be of type ErrorOr");
public:
2020-08-20 00:36:21 +02:00
ConvertConveyorNode(Own<ConveyorNode> &&dep, Func &&func,
ErrorFunc &&error_func)
: ConvertConveyorNodeBase(std::move(dep)), func{std::move(func)},
error_func{std::move(error_func)} {}
void getImpl(ErrorOrValue &err_or_val) noexcept override {
ErrorOr<UnfixVoid<DepT>> dep_eov;
ErrorOr<UnfixVoid<RemoveErrorOr<T>>> &eov =
err_or_val.as<UnfixVoid<RemoveErrorOr<T>>>();
2020-08-20 00:36:21 +02:00
if (child) {
child->getResult(dep_eov);
if (dep_eov.isValue()) {
try {
2021-06-18 17:32:31 +02:00
eov = FixVoidCaller<T, DepT>::apply(
func, std::move(dep_eov.value()));
} catch (const std::bad_alloc &) {
eov = criticalError("Out of memory");
} catch (const std::exception &) {
eov = criticalError(
"Exception in chain occured. Return ErrorOr<T> if you "
"want to handle errors which are recoverable");
}
2020-08-20 00:36:21 +02:00
} else if (dep_eov.isError()) {
eov = error_func(std::move(dep_eov.error()));
2020-08-20 00:36:21 +02:00
} else {
eov = criticalError("No value set in dependency");
}
} else {
eov = criticalError("Conveyor doesn't have child");
}
}
2020-08-09 01:35:15 +02:00
};
2020-08-20 00:36:21 +02:00
class SinkConveyorNode final : public ConveyorNode,
public ConveyorEventStorage {
2020-08-25 20:52:32 +02:00
private:
Own<ConveyorNode> child;
2020-10-29 02:35:43 +01:00
ConveyorSinks *conveyor_sink;
2020-08-28 16:59:25 +02:00
2020-08-25 20:52:32 +02:00
public:
SinkConveyorNode(ConveyorStorage *child_store, Own<ConveyorNode> node,
ConveyorSinks &conv_sink)
: ConveyorEventStorage{child_store}, child{std::move(node)},
conveyor_sink{&conv_sink} {}
2020-08-25 20:52:32 +02:00
SinkConveyorNode(ConveyorStorage *child_store, Own<ConveyorNode> node)
: ConveyorEventStorage{child_store}, child{std::move(node)},
conveyor_sink{nullptr} {}
2020-10-29 02:35:43 +01:00
2020-09-23 21:24:34 +02:00
// Event only queued if a critical error occured
2020-08-25 20:52:32 +02:00
void fire() override {
2020-08-28 16:59:25 +02:00
// Queued for destruction of children, because this acts as a sink and
// no other event should be here
child = nullptr;
2020-08-28 16:59:25 +02:00
if (conveyor_sink) {
conveyor_sink->destroySinkConveyorNode(*this);
conveyor_sink = nullptr;
}
2020-08-25 20:52:32 +02:00
}
// ConveyorStorage
2020-08-28 16:59:25 +02:00
size_t space() const override { return 1; }
size_t queued() const override { return 0; }
2020-08-25 20:52:32 +02:00
// ConveyorNode
void getResult(ErrorOrValue &err_or_val) noexcept override {
2020-08-28 16:59:25 +02:00
err_or_val.as<Void>() =
criticalError("In a sink node no result can be returned");
}
// ConveyorStorage
void childHasFired() override {
2020-08-28 16:59:25 +02:00
if (child) {
ErrorOr<void> dep_eov;
2020-08-25 20:52:32 +02:00
child->getResult(dep_eov);
2020-08-28 16:59:25 +02:00
if (dep_eov.isError()) {
if (dep_eov.error().isCritical()) {
if (!isArmed()) {
armLast();
}
}
2020-08-28 16:59:25 +02:00
if (conveyor_sink) {
conveyor_sink->fail(std::move(dep_eov.error()));
}
2020-08-25 20:52:32 +02:00
}
}
}
/*
* No parent needs to be fired since we always have space
*/
void parentHasFired() override {}
2020-08-25 20:52:32 +02:00
};
class ImmediateConveyorNodeBase : public ConveyorNode,
public ConveyorEventStorage {
2020-09-29 02:30:26 +02:00
private:
public:
ImmediateConveyorNodeBase();
2021-04-25 15:44:29 +02:00
virtual ~ImmediateConveyorNodeBase() = default;
2020-09-29 02:30:26 +02:00
};
template <typename T>
2021-04-25 15:44:29 +02:00
class ImmediateConveyorNode final : public ImmediateConveyorNodeBase {
2020-09-29 02:30:26 +02:00
private:
2021-04-20 15:26:30 +02:00
ErrorOr<FixVoid<T>> value;
2021-04-25 15:44:29 +02:00
uint8_t retrieved;
2020-09-29 02:30:26 +02:00
public:
ImmediateConveyorNode(FixVoid<T> &&val);
2021-04-20 15:26:30 +02:00
ImmediateConveyorNode(Error &&error);
2020-09-29 02:30:26 +02:00
// ConveyorStorage
size_t space() const override;
size_t queued() const override;
void childHasFired() override;
void parentHasFired() override;
2020-09-29 02:30:26 +02:00
// ConveyorNode
void getResult(ErrorOrValue &err_or_val) noexcept override {
2021-06-18 00:17:18 +02:00
if (retrieved > 0) {
err_or_val.as<FixVoid<T>>() =
makeError("Already taken value", Error::Code::Exhausted);
} else {
2020-09-29 02:30:26 +02:00
err_or_val.as<FixVoid<T>>() = std::move(value);
}
2021-06-18 00:46:06 +02:00
if (queued() > 0) {
2021-06-18 00:17:18 +02:00
++retrieved;
}
2020-09-29 02:30:26 +02:00
}
// Event
void fire() override;
2020-09-29 02:30:26 +02:00
};
2021-06-18 17:32:31 +02:00
/*
* Collects every incoming value and throws it in one lane
*/
2021-10-02 14:34:49 +02:00
class MergeConveyorNodeBase : public ConveyorNode, public ConveyorEventStorage {
public:
2021-10-02 14:34:49 +02:00
MergeConveyorNodeBase();
virtual ~MergeConveyorNodeBase() = default;
};
template <typename T> class MergeConveyorNode : public MergeConveyorNodeBase {
private:
class Appendage final : public ConveyorStorage {
2021-10-02 14:34:49 +02:00
public:
Own<ConveyorNode> child;
MergeConveyorNode *merger;
Maybe<ErrorOr<FixVoid<T>>> error_or_value;
public:
Appendage(ConveyorStorage *child_store, Own<ConveyorNode> n,
MergeConveyorNode &m)
: ConveyorStorage{child_store}, child{std::move(n)}, merger{&m},
error_or_value{std::nullopt} {}
2021-10-02 14:34:49 +02:00
bool childStorageHasElementQueued() const {
if (child_storage) {
return child_storage->queued() > 0;
}
return false;
}
2021-10-03 18:27:54 +02:00
void getAppendageResult(ErrorOrValue &eov);
2021-09-29 21:35:46 +02:00
size_t space() const override;
2021-09-29 21:35:46 +02:00
size_t queued() const override;
2021-09-29 21:35:46 +02:00
void childHasFired() override;
2021-10-02 14:34:49 +02:00
void parentHasFired() override;
2021-09-29 21:35:46 +02:00
void setParent(ConveyorStorage *par) override;
};
friend class MergeConveyorNodeData<T>;
friend class Appendage;
Our<MergeConveyorNodeData<T>> data;
2021-10-03 18:27:54 +02:00
size_t next_appendage = 0;
public:
MergeConveyorNode(Our<MergeConveyorNodeData<T>> data);
~MergeConveyorNode();
2021-10-02 14:34:49 +02:00
// Event
2021-10-07 18:09:05 +02:00
void getResult(ErrorOrValue &err_or_val) noexcept override;
2021-09-29 21:35:46 +02:00
void fire() override;
2021-10-02 14:34:49 +02:00
// ConveyorStorage
size_t space() const override;
size_t queued() const override;
void childHasFired() override;
void parentHasFired() override;
};
template <typename T> class MergeConveyorNodeData {
2021-10-02 14:34:49 +02:00
public:
std::vector<Own<typename MergeConveyorNode<T>::Appendage>> appendages;
2021-10-02 14:34:49 +02:00
MergeConveyorNode<T> *merger = nullptr;
public:
void attach(Conveyor<T> conv);
2021-10-02 14:34:49 +02:00
void governingNodeDestroyed();
};
2021-06-18 00:19:04 +02:00
/*
2021-10-07 18:09:05 +02:00
class JoinConveyorNodeBase : public ConveyorNode, public ConveyorEventStorage {
2021-06-18 17:32:31 +02:00
private:
2021-06-18 00:17:18 +02:00
public:
};
2021-06-18 17:32:31 +02:00
template <typename... Args>
2021-10-07 18:09:05 +02:00
class JoinConveyorNode final : public JoinConveyorNodeBase {
2021-06-18 17:32:31 +02:00
private:
2021-10-07 18:09:05 +02:00
template<typename T>
class Appendage : public ConveyorEventStorage {
private:
Maybe<T> data = std::nullopt;
2021-06-18 00:17:18 +02:00
2021-10-07 18:09:05 +02:00
public:
size_t space() const override;
size_t queued() const override;
2021-06-18 00:17:18 +02:00
2021-10-07 18:09:05 +02:00
void fire() override;
void getResult(ErrorOrValue& eov) override;
};
2021-06-18 00:17:18 +02:00
2021-10-07 18:09:05 +02:00
std::tuple<Appendage<Args>...> appendages;
2021-06-18 00:17:18 +02:00
public:
};
2021-06-18 00:19:04 +02:00
*/
2021-06-18 17:32:31 +02:00
2021-12-29 19:26:22 +01:00
} // namespace saw
2020-09-29 02:30:26 +02:00
2021-03-17 11:24:18 +01:00
#include "async.tmpl.h"