merge conveyor impl

This commit is contained in:
keldu.magnus 2021-09-29 21:35:46 +02:00
parent c60157ed89
commit 21950cab93
3 changed files with 67 additions and 31 deletions

View File

@ -796,40 +796,15 @@ private:
MergeConveyorNode &m)
: ConveyorStorage{child_store}, child{std::move(n)}, merger{&m} {}
size_t space() const override {
GIN_ASSERT(merger) { return 0; }
size_t space() const override;
if (merger->error_or_value.has_value()) {
return 0;
}
size_t queued() const override;
return 1;
}
size_t queued() const override {
GIN_ASSERT(merger) { return 0; }
GIN_ASSERT(!merger->error_or_value.has_value()) { return 1; }
return 0;
}
void childHasFired() override {
GIN_ASSERT(!merger->error_or_value.has_value()) { return; }
ErrorOr<FixVoid<T>> eov;
child->getResult(eov);
merger->error_or_value = std::move(eov);
}
void childHasFired() override;
void parentHasFired() override {}
void setParent(ConveyorStorage *par) override {
GIN_ASSERT(merger && merger->error_or_value.has_value()) { return; }
if (par && !merger->isArmed()) {
merger->armNext();
}
parent = par;
}
void setParent(ConveyorStorage *par) override;
};
friend class MergeConveyorNodeData<T>;
@ -844,6 +819,8 @@ public:
~MergeConveyorNode();
void getResult(ErrorOrValue &err_or_val) override;
void fire() override;
};
template <typename T> class MergeConveyorNodeData {

View File

@ -1,5 +1,7 @@
#pragma once
#include "common.h"
#include <cassert>
// Template inlining
namespace gin {
@ -74,9 +76,11 @@ Conveyor<T> Conveyor<T>::attach(Args &&...args) {
template <typename T>
std::pair<Conveyor<T>, MergeConveyor<T>> Conveyor<T>::merge() {
Own<MergeConveyorNode<T>> node = heap<MergeConveyorNode<T>>();
Our<MergeConveyorNodeData<T>> data = share<MergeConveyorNodeData<T>>();
MergeConveyor<T> node_ref = node.get();
Own<MergeConveyorNode<T>> node = heap<MergeConveyorNode<T>>(data);
MergeConveyor<T> node_ref{data};
return std::make_pair(Conveyor<T>{std::move(node), storage}, *node_ref);
}
@ -247,6 +251,52 @@ template <typename T> void MergeConveyor<T>::attach(Conveyor<T> conveyor) {
assert(false);
}
template <typename T>
MergeConveyorNode<T>::MergeConveyorNode(Our<MergeConveyorNodeData<T>> d)
: data{d} {}
template <typename T> MergeConveyorNode<T>::~MergeConveyorNode() {}
template <typename T> size_t MergeConveyorNode<T>::Appendage::space() const {
GIN_ASSERT(merger) { return 0; }
if (merger->error_or_value.has_value()) {
return 0;
}
return 1;
}
template <typename T> void MergeConveyorNode<T>::fire() {
/// @unimplemented
}
template <typename T> size_t MergeConveyorNode<T>::Appendage::queued() const {
GIN_ASSERT(merger) { return 0; }
GIN_ASSERT(!merger->error_or_value.has_value()) { return 1; }
return 0;
}
template <typename T> void MergeConveyorNode<T>::Appendage::childHasFired() {
GIN_ASSERT(!merger->error_or_value.has_value()) { return; }
ErrorOr<FixVoid<T>> eov;
child->getResult(eov);
merger->error_or_value = std::move(eov);
}
template <typename T>
void MergeConveyorNode<T>::Appendage::setParent(ConveyorStorage *par) {
GIN_ASSERT(merger && merger->error_or_value.has_value()) { return; }
if (par && !merger->isArmed()) {
merger->armNext();
}
parent = par;
}
template <typename T> AdaptConveyorFeeder<T>::~AdaptConveyorFeeder() {
if (feedee) {
feedee->setFeeder(nullptr);

View File

@ -198,4 +198,13 @@ GIN_TEST("Async detach"){
GIN_EXPECT(num == 10, std::string{"Bad value: Expected 10, but got "} + std::to_string(num));
}
GIN_TEST("Async Merge"){
using namespace gin;
EventLoop event_loop;
WaitScope wait_scope{event_loop};
auto cam = Conveyor<int>{10}.merge();
}
}