#pragma once #include "common.h" #include // Template inlining #include namespace saw { template ConveyorResult execLater(Func &&func) { Conveyor conveyor{FixVoid{}}; return conveyor.then(std::move(func)); } template Conveyor::Conveyor(FixVoid value) : ConveyorBase(nullptr, nullptr) { // Is there any way to do this? // @todo new ConveyorBase constructor for Immediate values Own>> immediate = heap>>(std::move(value)); if (!immediate) { return; } storage = static_cast(immediate.get()); node = std::move(immediate); } template Conveyor::Conveyor(Error &&error) : ConveyorBase(nullptr, nullptr) { Own>> immediate = heap>>(std::move(error)); if (!immediate) { return; } storage = static_cast(immediate.get()); node = std::move(immediate); } template Conveyor::Conveyor(Own node_p, ConveyorStorage *storage_p) : ConveyorBase{std::move(node_p), storage_p} {} template template ConveyorResult Conveyor::then(Func &&func, ErrorFunc &&error_func) { Own conversion_node = heap>, FixVoid, Func, ErrorFunc>>(std::move(node), std::move(func), std::move(error_func)); return Conveyor>>::toConveyor( std::move(conversion_node), storage); } template Conveyor Conveyor::buffer(size_t size) { Own>> storage_node = heap>>(storage, std::move(node), size); ConveyorStorage *storage_ptr = static_cast(storage_node.get()); SAW_ASSERT(storage) { return Conveyor{nullptr, nullptr}; } storage->setParent(storage_ptr); return Conveyor{std::move(storage_node), storage_ptr}; } template template Conveyor Conveyor::attach(Args &&...args) { Own> attach_node = heap>(std::move(node), std::move(args...)); return Conveyor{std::move(attach_node), storage}; } template std::pair, MergeConveyor> Conveyor::merge() { Our> data = share>(); Own> merge_node = heap>(data); data->attach(Conveyor::toConveyor(std::move(node), storage)); MergeConveyor node_ref{data}; ConveyorStorage *merge_storage = static_cast(merge_node.get()); return std::make_pair(Conveyor{std::move(merge_node), merge_storage}, std::move(node_ref)); } template <> template SinkConveyor Conveyor::sink(ErrorFunc &&error_func) { Own sink_node = heap(storage, std::move(node)); ConveyorStorage *storage_ptr = static_cast(sink_node.get()); SAW_ASSERT(storage) { return SinkConveyor{}; } storage->setParent(storage_ptr); return SinkConveyor{std::move(sink_node)}; } void detachConveyor(Conveyor &&conveyor); template template void Conveyor::detach(ErrorFunc &&func) { detachConveyor(std::move(then([](T &&) {}, std::move(func)))); } template <> template void Conveyor::detach(ErrorFunc &&func) { detachConveyor(std::move(then([]() {}, std::move(func)))); } template Conveyor Conveyor::toConveyor(Own node, ConveyorStorage *storage) { return Conveyor{std::move(node), storage}; } template std::pair, ConveyorStorage *> Conveyor::fromConveyor(Conveyor conveyor) { return std::make_pair(std::move(conveyor.node), conveyor.storage); } template ErrorOr> Conveyor::take() { if (storage) { if (storage->queued() > 0) { ErrorOr> result; node->getResult(result); return result; } else { return ErrorOr>{ recoverableError("Conveyor buffer has no elements")}; } } else { return ErrorOr>{criticalError("Conveyor in invalid state")}; } } template ConveyorAndFeeder newConveyorAndFeeder() { Own>> feeder = heap>>(); Own>> node = heap>>(); feeder->setFeedee(node.get()); node->setFeeder(feeder.get()); ConveyorStorage *storage_ptr = static_cast(node.get()); return ConveyorAndFeeder{ std::move(feeder), Conveyor::toConveyor(std::move(node), storage_ptr)}; } // QueueBuffer template void QueueBufferConveyorNode::fire() { if (child) { if (!storage.empty()) { if (storage.front().isError()) { if (storage.front().error().isCritical()) { child = nullptr; child_storage = nullptr; } } } } bool has_space_before_fire = space() > 0; if (parent) { parent->childHasFired(); if (!storage.empty() && parent->space() > 0) { armLater(); } } if (child_storage && !has_space_before_fire) { child_storage->parentHasFired(); } } template void QueueBufferConveyorNode::getResult(ErrorOrValue &eov) noexcept { ErrorOr &err_or_val = eov.as(); err_or_val = std::move(storage.front()); storage.pop(); } template size_t QueueBufferConveyorNode::space() const { return max_store - storage.size(); } template size_t QueueBufferConveyorNode::queued() const { return storage.size(); } template void QueueBufferConveyorNode::childHasFired() { if (child && storage.size() < max_store) { ErrorOr eov; child->getResult(eov); if (eov.isError()) { if (eov.error().isCritical()) { child_storage = nullptr; } } storage.push(std::move(eov)); if (!isArmed()) { armLater(); } } } template void QueueBufferConveyorNode::parentHasFired() { SAW_ASSERT(parent) { return; } if (parent->space() == 0) { return; } if (queued() > 0) { armLater(); } } template ImmediateConveyorNode::ImmediateConveyorNode(FixVoid &&val) : value{std::move(val)}, retrieved{0} {} template ImmediateConveyorNode::ImmediateConveyorNode(Error &&error) : value{std::move(error)}, retrieved{0} {} template size_t ImmediateConveyorNode::space() const { return 0; } template size_t ImmediateConveyorNode::queued() const { return retrieved > 1 ? 0 : 1; } template void ImmediateConveyorNode::childHasFired() { // Impossible case assert(false); } template void ImmediateConveyorNode::parentHasFired() { SAW_ASSERT(parent) { return; } assert(parent->space() > 0); if (queued() > 0) { armNext(); } } template void ImmediateConveyorNode::fire() { if (parent) { parent->childHasFired(); if (queued() > 0 && parent->space() > 0) { armLast(); } } } template MergeConveyor::MergeConveyor(Lent> d) : data{std::move(d)} {} template MergeConveyor::~MergeConveyor() {} template void MergeConveyor::attach(Conveyor conveyor) { auto sp = data.lock(); SAW_ASSERT(sp) { return; } sp->attach(std::move(conveyor)); } template MergeConveyorNode::MergeConveyorNode(Our> d) : data{d} { SAW_ASSERT(data) { return; } data->merger = this; } template MergeConveyorNode::~MergeConveyorNode() {} template void MergeConveyorNode::getResult(ErrorOrValue &eov) noexcept { ErrorOr> &err_or_val = eov.as>(); SAW_ASSERT(data) { return; } /// @todo search appendages for result auto &appendages = data->appendages; next_appendage = std::min(appendages.size(), next_appendage); for (size_t i = next_appendage; i < appendages.size(); ++i) { if (appendages[i]->queued() > 0) { err_or_val = std::move(appendages[i]->error_or_value.value()); appendages[i]->error_or_value = std::nullopt; next_appendage = i + 1; return; } } for (size_t i = 0; i < next_appendage; ++i) { if (appendages[i]->queued() > 0) { err_or_val = std::move(appendages[i]->error_or_value.value()); appendages[i]->error_or_value = std::nullopt; next_appendage = i + 1; return; } } err_or_val = criticalError("No value in Merge Appendages"); } template void MergeConveyorNode::fire() { SAW_ASSERT(queued() > 0) { return; } if (parent) { parent->childHasFired(); if (queued() > 0 && parent->space() > 0) { armLater(); } } } template size_t MergeConveyorNode::space() const { return 0; } template size_t MergeConveyorNode::queued() const { SAW_ASSERT(data) { return 0; } size_t queue_count = 0; for (auto &iter : data->appendages) { queue_count += iter->queued(); } return queue_count; } template void MergeConveyorNode::childHasFired() { /// This can never happen assert(false); } template void MergeConveyorNode::parentHasFired() { SAW_ASSERT(parent) { return; } if (queued() > 0) { if (parent->space() > 0) { armLater(); } } } template size_t MergeConveyorNode::Appendage::space() const { SAW_ASSERT(merger) { return 0; } if (error_or_value.has_value()) { return 0; } return 1; } template size_t MergeConveyorNode::Appendage::queued() const { SAW_ASSERT(merger) { return 0; } if (error_or_value.has_value()) { return 1; } return 0; } template void MergeConveyorNode::Appendage::getAppendageResult(ErrorOrValue &eov) { ErrorOr> &err_or_val = eov.as>(); SAW_ASSERT(queued() > 0) { err_or_val = criticalError("No element queued in Merge Appendage Node"); return; } err_or_val = std::move(error_or_value.value()); error_or_value = std::nullopt; } template void MergeConveyorNode::Appendage::childHasFired() { SAW_ASSERT(!error_or_value.has_value()) { return; } ErrorOr> eov; child->getResult(eov); error_or_value = std::move(eov); if (!merger->isArmed()) { merger->armLater(); } } template void MergeConveyorNode::Appendage::parentHasFired() { if (child_storage) { child_storage->parentHasFired(); } } template void MergeConveyorNode::Appendage::setParent(ConveyorStorage *par) { SAW_ASSERT(merger) { return; } SAW_ASSERT(child) { return; } parent = par; } template void MergeConveyorNodeData::attach(Conveyor conveyor) { auto nas = Conveyor::fromConveyor(std::move(conveyor)); auto merge_node_appendage = heap::Appendage>( nas.second, std::move(nas.first), *merger); if (nas.second) { nas.second->setParent(merge_node_appendage.get()); } appendages.push_back(std::move(merge_node_appendage)); } template void MergeConveyorNodeData::governingNodeDestroyed() { appendages.clear(); merger = nullptr; } template AdaptConveyorFeeder::~AdaptConveyorFeeder() { if (feedee) { feedee->setFeeder(nullptr); feedee = nullptr; } } template void AdaptConveyorFeeder::setFeedee(AdaptConveyorNode *feedee_p) { feedee = feedee_p; } template void AdaptConveyorFeeder::feed(T &&value) { if (feedee) { feedee->feed(std::move(value)); } } template void AdaptConveyorFeeder::fail(Error &&error) { if (feedee) { feedee->fail(std::move(error)); } } template size_t AdaptConveyorFeeder::queued() const { if (feedee) { return feedee->queued(); } return 0; } template size_t AdaptConveyorFeeder::space() const { if (feedee) { return feedee->space(); } return 0; } template AdaptConveyorNode::AdaptConveyorNode() : ConveyorEventStorage{nullptr} {} template AdaptConveyorNode::~AdaptConveyorNode() { if (feeder) { feeder->setFeedee(nullptr); feeder = nullptr; } } template void AdaptConveyorNode::setFeeder(AdaptConveyorFeeder *feeder_p) { feeder = feeder_p; } template void AdaptConveyorNode::feed(T &&value) { storage.push(std::move(value)); armNext(); } template void AdaptConveyorNode::fail(Error &&error) { storage.push(std::move(error)); armNext(); } template size_t AdaptConveyorNode::queued() const { return storage.size(); } template size_t AdaptConveyorNode::space() const { return std::numeric_limits::max() - storage.size(); } template void AdaptConveyorNode::getResult(ErrorOrValue &err_or_val) { if (!storage.empty()) { err_or_val.as() = std::move(storage.front()); storage.pop(); } else { err_or_val.as() = criticalError("Signal for retrieval of storage sent even though no " "data is present"); } } template void AdaptConveyorNode::childHasFired() { // Adapt node has no children assert(false); } template void AdaptConveyorNode::parentHasFired() { SAW_ASSERT(parent) { return; } if (parent->space() == 0) { return; } } template void AdaptConveyorNode::fire() { if (parent) { parent->childHasFired(); if (storage.size() > 0) { armLater(); } } } template OneTimeConveyorFeeder::~OneTimeConveyorFeeder() { if (feedee) { feedee->setFeeder(nullptr); feedee = nullptr; } } template void OneTimeConveyorFeeder::setFeedee(OneTimeConveyorNode *feedee_p) { feedee = feedee_p; } template void OneTimeConveyorFeeder::feed(T &&value) { if (feedee) { feedee->feed(std::move(value)); } } template void OneTimeConveyorFeeder::fail(Error &&error) { if (feedee) { feedee->fail(std::move(error)); } } template size_t OneTimeConveyorFeeder::queued() const { if (feedee) { return feedee->queued(); } return 0; } template size_t OneTimeConveyorFeeder::space() const { if (feedee) { return feedee->space(); } return 0; } template OneTimeConveyorNode::~OneTimeConveyorNode() { if (feeder) { feeder->setFeedee(nullptr); feeder = nullptr; } } template void OneTimeConveyorNode::setFeeder(OneTimeConveyorFeeder *feeder_p) { feeder = feeder_p; } template void OneTimeConveyorNode::feed(T &&value) { storage = std::move(value); armNext(); } template void OneTimeConveyorNode::fail(Error &&error) { storage = std::move(error); armNext(); } template size_t OneTimeConveyorNode::queued() const { return storage.has_value() ? 1 : 0; } template size_t OneTimeConveyorNode::space() const { return passed ? 0 : 1; } template void OneTimeConveyorNode::getResult(ErrorOrValue &err_or_val) { if (storage.has_value()) { err_or_val.as() = std::move(storage.value()); storage = std::nullopt; } else { err_or_val.as() = criticalError("Signal for retrieval of storage sent even though no " "data is present"); } } template void OneTimeConveyorNode::fire() { if (parent) { parent->childHasFired(); } } } // namespace saw