adding merging and children notifications

This commit is contained in:
keldu.magnus 2021-09-23 20:04:03 +02:00
parent 61d71c4937
commit 5c6a8c6c9e
5 changed files with 159 additions and 18 deletions

View File

@ -16,7 +16,11 @@ EventLoop &currentEventLoop() {
ConveyorNode::ConveyorNode() {}
void ConveyorStorage::setParent(ConveyorStorage *p) {
ConveyorStorage::ConveyorStorage() : child{nullptr} {}
ConveyorStorage::ConveyorStorage(ConveyorStorage &c) : child{&c} {}
void ConveyorEventStorage::setParent(ConveyorStorage *p) {
/*
* parent check isn't needed, but is used
* for the assert, because the storage should
@ -31,6 +35,13 @@ void ConveyorStorage::setParent(ConveyorStorage *p) {
parent = p;
}
void ConveyorEventStorage::setChild(ConveyorStorage *c) {
if (c) {
}
child = c;
}
ConveyorBase::ConveyorBase(Own<ConveyorNode> &&node_p,
ConveyorStorage *storage_p)
: node{std::move(node_p)}, storage{storage_p} {}

View File

@ -47,18 +47,30 @@ public:
bool isArmed() const;
};
class ConveyorStorage : public Event {
class ConveyorStorage {
protected:
ConveyorStorage *parent = nullptr;
ConveyorStorage *child = nullptr;
public:
ConveyorStorage();
ConveyorStorage(ConveyorStorage &child);
virtual ~ConveyorStorage() = default;
virtual size_t space() const = 0;
virtual size_t queued() const = 0;
virtual void childFired() = 0;
virtual void childHasFired() = 0;
void setParent(ConveyorStorage *parent);
virtual void setParent(ConveyorStorage *parent) = 0;
};
class ConveyorEventStorage : public ConveyorStorage, public Event {
public:
ConveyorEventStorage();
ConveyorEventStorage(ConveyorStorage &child);
virtual ~ConveyorEventStorage() = default;
void setParent(ConveyorStorage *parent) override;
};
class ConveyorBase {
@ -115,6 +127,19 @@ public:
SinkConveyor &operator=(SinkConveyor &&) = default;
};
template <typename T> class MergeConveyorNodeData;
template <typename T> class MergeConveyor {
private:
Our<MergeConveyorNodeData<T>> data;
public:
MergeConveyor(Our<MergeConveyorNodeData<T>> d);
~MergeConveyor();
void attach(Conveyor<T> conveyor);
};
/**
* Main interface for async operations.
*/
@ -133,7 +158,7 @@ public:
/**
* Construct a conveyor with a child node and the next storage point
*/
Conveyor(Own<ConveyorNode> &&node_p, ConveyorStorage *storage_p);
Conveyor(Own<ConveyorNode> node_p, ConveyorStorage *storage_p);
Conveyor(Conveyor<T> &&) = default;
Conveyor<T> &operator=(Conveyor<T> &&) = default;
@ -174,6 +199,11 @@ public:
template <typename ErrorFunc = PropagateError>
void detach(ErrorFunc &&err_func = PropagateError());
/**
*
*/
std::pair<Conveyor<T>, MergeConveyor<T>> merge();
/**
* Creates a local sink which drops elements, but lifetime control remains
* in your hand.
@ -432,7 +462,8 @@ public:
};
template <typename T>
class AdaptConveyorNode final : public ConveyorNode, public ConveyorStorage {
class AdaptConveyorNode final : public ConveyorNode,
public ConveyorEventStorage {
protected:
Own<ConveyorNode> child;
@ -456,7 +487,7 @@ public:
size_t space() const override;
size_t queued() const override;
void childFired() override {}
void childHasFired() override {}
// Event
void fire() override;
@ -482,7 +513,9 @@ public:
};
template <typename T>
class OneTimeConveyorNode final : public ConveyorNode, public ConveyorStorage {
class OneTimeConveyorNode final : public ConveyorNode,
public ConveyorStorage,
public Event {
protected:
Own<ConveyorNode> child;
@ -514,7 +547,7 @@ public:
};
class QueueBufferConveyorNodeBase : public ConveyorNode,
public ConveyorStorage {
public ConveyorEventStorage {
protected:
Own<ConveyorNode> child;
@ -654,7 +687,8 @@ public:
}
};
class SinkConveyorNode final : public ConveyorNode, public ConveyorStorage {
class SinkConveyorNode final : public ConveyorNode,
public ConveyorEventStorage {
private:
Own<ConveyorNode> child;
ConveyorSinks *conveyor_sink;
@ -707,7 +741,8 @@ public:
}
};
class ImmediateConveyorNodeBase : public ConveyorNode, public ConveyorStorage {
class ImmediateConveyorNodeBase : public ConveyorNode,
public ConveyorEventStorage {
private:
public:
virtual ~ImmediateConveyorNodeBase() = default;
@ -746,6 +781,82 @@ public:
void fire() override;
};
/*
* Collects every incoming value and throws it in one lane
*/
class MergeConveyorNodeBase : public ConveyorNode, public Event {
public:
virtual ~MergeConveyorNodeBase() = default;
};
template <typename T> class MergeConveyorNode : public MergeConveyorNodeBase {
private:
class Appendage final : public ConveyorStorage {
private:
Own<ConveyorNode> child;
MergeConveyorNode *merger;
public:
Appendage(Own<ConveyorNode> n, MergeConveyorNode &m)
: child{std::move(n)}, merger{&m} {}
size_t space() const override {
GIN_ASSERT(merger) { return 0; }
if (merger->error_or_value.has_value()) {
return 0;
}
return 1;
}
size_t queued() const override {
GIN_ASSERT(merger) { return 0; }
if (merger->error_or_value.has_value()) {
return 1;
}
return 0;
}
void childFired() override {
GIN_ASSERT(!merger->error_or_value.has_value()) { return; }
ErrorOr<FixVoid<T>> eov;
child->getResult(eov);
merger->error_or_value = std::move(eov);
}
void setParent(ConveyorStorage *par) override {
GIN_ASSERT(merger && merger->error_or_value.has_value()) { return; }
if (par && !merger->isArmed()) {
merger->armNext();
}
parent = par;
}
};
friend class MergeConveyorNodeData<T>;
friend class Appendage;
Our<MergeConveyorNodeData<T>> data;
Maybe<ErrorOr<FixVoid<T>>> error_or_value;
public:
MergeConveyorNode(Our<MergeConveyorNodeData<T>> data);
~MergeConveyorNode();
void getResult(ErrorOrValue &err_or_val) override;
};
template <typename T> class MergeConveyorNodeData {
private:
std::vector<typename MergeConveyorNode<T>::Appendage> appendages;
void attach(Conveyor<T> conv);
};
/*
class JoinConveyorNodeBase : public ConveyorStorage {
public:

View File

@ -69,8 +69,8 @@ Conveyor<T>::Conveyor(Error &&error) : ConveyorBase(nullptr, nullptr) {
}
template <typename T>
Conveyor<T>::Conveyor(Own<ConveyorNode> &&node_p, ConveyorStorage *storage_p)
: ConveyorBase(std::move(node_p), storage_p) {}
Conveyor<T>::Conveyor(Own<ConveyorNode> node_p, ConveyorStorage *storage_p)
: ConveyorBase{std::move(node_p), storage_p} {}
template <typename T>
template <typename Func, typename ErrorFunc>
@ -101,6 +101,15 @@ Conveyor<T> Conveyor<T>::attach(Args &&...args) {
return Conveyor<T>{std::move(attach_node), storage};
}
template <typename T>
std::pair<Conveyor<T>, MergeConveyor<T>> Conveyor<T>::merge() {
Own<MergeConveyorNode<T>> node = heap<MergeConveyorNode<T>>();
MergeConveyor<T> node_ref = node.get();
return std::make_pair(Conveyor<T>{std::move(node), storage}, *node_ref);
}
template <>
template <typename ErrorFunc>
SinkConveyor Conveyor<void>::sink(ErrorFunc &&error_func) {
@ -170,6 +179,17 @@ template <typename T> ConveyorAndFeeder<T> newConveyorAndFeeder() {
Conveyor<T>::toConveyor(std::move(node), storage_ptr)};
}
template <typename T>
MergeConveyor<T>::MergeConveyor(Our<MergeConveyorNodeData<T>> d)
: data{std::move(d)}, error_or_value{std::nullopt} {}
template <typename T> MergeConveyor<T>::~MergeConveyor() {}
template <typename T> void MergeConveyor<T>::attach(Conveyor<T> conveyor) {
/// @unimplemented
assert(false);
}
template <typename T> AdaptConveyorFeeder<T>::~AdaptConveyorFeeder() {
if (feedee) {
feedee->setFeeder(nullptr);

View File

@ -40,7 +40,7 @@ template <typename T> using Our = std::shared_ptr<T>;
template <typename T> using Lent = std::weak_ptr<T>;
template <typename T, class... Args> Own<T> heap(Args &&...args) {
return Own<T>(new (std::nothrow) T(std::forward<Args>(args)...));
return Own<T>(new T(std::forward<Args>(args)...));
}
template <typename T, class... Args> Our<T> share(Args &&...args) {

View File

@ -5,7 +5,7 @@
namespace gin {
template<typename Codec, typename Incoming, typename Outgoing>
template <typename Codec, typename Incoming, typename Outgoing>
class StreamingIoPeer {
private:
Codec codec;
@ -13,6 +13,7 @@ private:
Own<AsyncIoStream> io_stream;
Own<ConveyorFeeder<Incoming>> incoming_feeder = nullptr;
public:
StreamingIoPeer(Own<AsyncIoStream> stream);
@ -21,6 +22,4 @@ public:
Conveyor<Incoming> startReadPump();
};
}
} // namespace gin