forstio/source/forstio/async.tmpl.h

648 lines
15 KiB
C++

#pragma once
#include "common.h"
#include <cassert>
// Template inlining
#include <iostream>
namespace saw {
template <typename Func> ConveyorResult<Func, void> execLater(Func &&func) {
Conveyor<void> conveyor{FixVoid<void>{}};
return conveyor.then(std::move(func));
}
template <typename T>
Conveyor<T>::Conveyor(FixVoid<T> value) : ConveyorBase(nullptr, nullptr) {
// Is there any way to do this?
// @todo new ConveyorBase constructor for Immediate values
Own<ImmediateConveyorNode<FixVoid<T>>> immediate =
heap<ImmediateConveyorNode<FixVoid<T>>>(std::move(value));
if (!immediate) {
return;
}
storage = static_cast<ConveyorStorage *>(immediate.get());
node = std::move(immediate);
}
template <typename T>
Conveyor<T>::Conveyor(Error &&error) : ConveyorBase(nullptr, nullptr) {
Own<ImmediateConveyorNode<FixVoid<T>>> immediate =
heap<ImmediateConveyorNode<FixVoid<T>>>(std::move(error));
if (!immediate) {
return;
}
storage = static_cast<ConveyorStorage *>(immediate.get());
node = std::move(immediate);
}
template <typename T>
Conveyor<T>::Conveyor(Own<ConveyorNode> node_p, ConveyorStorage *storage_p)
: ConveyorBase{std::move(node_p), storage_p} {}
template <typename T>
template <typename Func, typename ErrorFunc>
ConveyorResult<Func, T> Conveyor<T>::then(Func &&func, ErrorFunc &&error_func) {
Own<ConveyorNode> conversion_node =
heap<ConvertConveyorNode<FixVoid<ReturnType<Func, T>>, FixVoid<T>, Func,
ErrorFunc>>(std::move(node), std::move(func),
std::move(error_func));
return Conveyor<RemoveErrorOr<ReturnType<Func, T>>>::toConveyor(
std::move(conversion_node), storage);
}
template <typename T> Conveyor<T> Conveyor<T>::buffer(size_t size) {
Own<QueueBufferConveyorNode<FixVoid<T>>> storage_node =
heap<QueueBufferConveyorNode<FixVoid<T>>>(storage, std::move(node),
size);
ConveyorStorage *storage_ptr =
static_cast<ConveyorStorage *>(storage_node.get());
SAW_ASSERT(storage) { return Conveyor<T>{nullptr, nullptr}; }
storage->setParent(storage_ptr);
return Conveyor<T>{std::move(storage_node), storage_ptr};
}
template <typename T>
template <typename... Args>
Conveyor<T> Conveyor<T>::attach(Args &&...args) {
Own<AttachConveyorNode<Args...>> attach_node =
heap<AttachConveyorNode<Args...>>(std::move(node), std::move(args...));
return Conveyor<T>{std::move(attach_node), storage};
}
template <typename T>
std::pair<Conveyor<T>, MergeConveyor<T>> Conveyor<T>::merge() {
Our<MergeConveyorNodeData<T>> data = share<MergeConveyorNodeData<T>>();
Own<MergeConveyorNode<T>> merge_node = heap<MergeConveyorNode<T>>(data);
data->attach(Conveyor<T>::toConveyor(std::move(node), storage));
MergeConveyor<T> node_ref{data};
ConveyorStorage *merge_storage =
static_cast<ConveyorStorage *>(merge_node.get());
return std::make_pair(Conveyor<T>{std::move(merge_node), merge_storage},
std::move(node_ref));
}
template <>
template <typename ErrorFunc>
SinkConveyor Conveyor<void>::sink(ErrorFunc &&error_func) {
Own<SinkConveyorNode> sink_node =
heap<SinkConveyorNode>(storage, std::move(node));
ConveyorStorage *storage_ptr =
static_cast<ConveyorStorage *>(sink_node.get());
SAW_ASSERT(storage) { return SinkConveyor{}; }
storage->setParent(storage_ptr);
return SinkConveyor{std::move(sink_node)};
}
void detachConveyor(Conveyor<void> &&conveyor);
template <typename T>
template <typename ErrorFunc>
void Conveyor<T>::detach(ErrorFunc &&func) {
detachConveyor(std::move(then([](T &&) {}, std::move(func))));
}
template <>
template <typename ErrorFunc>
void Conveyor<void>::detach(ErrorFunc &&func) {
detachConveyor(std::move(then([]() {}, std::move(func))));
}
template <typename T>
Conveyor<T> Conveyor<T>::toConveyor(Own<ConveyorNode> node,
ConveyorStorage *storage) {
return Conveyor<T>{std::move(node), storage};
}
template <typename T>
std::pair<Own<ConveyorNode>, ConveyorStorage *>
Conveyor<T>::fromConveyor(Conveyor<T> conveyor) {
return std::make_pair(std::move(conveyor.node), conveyor.storage);
}
template <typename T> ErrorOr<FixVoid<T>> Conveyor<T>::take() {
if (storage) {
if (storage->queued() > 0) {
ErrorOr<FixVoid<T>> result;
node->getResult(result);
return result;
} else {
return ErrorOr<FixVoid<T>>{
recoverableError("Conveyor buffer has no elements")};
}
} else {
return ErrorOr<FixVoid<T>>{criticalError("Conveyor in invalid state")};
}
}
template <typename T> ConveyorAndFeeder<T> newConveyorAndFeeder() {
Own<AdaptConveyorFeeder<FixVoid<T>>> feeder =
heap<AdaptConveyorFeeder<FixVoid<T>>>();
Own<AdaptConveyorNode<FixVoid<T>>> node =
heap<AdaptConveyorNode<FixVoid<T>>>();
feeder->setFeedee(node.get());
node->setFeeder(feeder.get());
ConveyorStorage *storage_ptr = static_cast<ConveyorStorage *>(node.get());
return ConveyorAndFeeder<T>{
std::move(feeder),
Conveyor<T>::toConveyor(std::move(node), storage_ptr)};
}
// QueueBuffer
template <typename T> void QueueBufferConveyorNode<T>::fire() {
if (child) {
if (!storage.empty()) {
if (storage.front().isError()) {
if (storage.front().error().isCritical()) {
child = nullptr;
child_storage = nullptr;
}
}
}
}
bool has_space_before_fire = space() > 0;
if (parent) {
parent->childHasFired();
if (!storage.empty() && parent->space() > 0) {
armLater();
}
}
if (child_storage && !has_space_before_fire) {
child_storage->parentHasFired();
}
}
template <typename T>
void QueueBufferConveyorNode<T>::getResult(ErrorOrValue &eov) noexcept {
ErrorOr<T> &err_or_val = eov.as<T>();
err_or_val = std::move(storage.front());
storage.pop();
}
template <typename T> size_t QueueBufferConveyorNode<T>::space() const {
return max_store - storage.size();
}
template <typename T> size_t QueueBufferConveyorNode<T>::queued() const {
return storage.size();
}
template <typename T> void QueueBufferConveyorNode<T>::childHasFired() {
if (child && storage.size() < max_store) {
ErrorOr<T> eov;
child->getResult(eov);
if (eov.isError()) {
if (eov.error().isCritical()) {
child_storage = nullptr;
}
}
storage.push(std::move(eov));
if (!isArmed()) {
armLater();
}
}
}
template <typename T> void QueueBufferConveyorNode<T>::parentHasFired() {
SAW_ASSERT(parent) { return; }
if (parent->space() == 0) {
return;
}
if (queued() > 0) {
armLater();
}
}
template <typename T>
ImmediateConveyorNode<T>::ImmediateConveyorNode(FixVoid<T> &&val)
: value{std::move(val)}, retrieved{0} {}
template <typename T>
ImmediateConveyorNode<T>::ImmediateConveyorNode(Error &&error)
: value{std::move(error)}, retrieved{0} {}
template <typename T> size_t ImmediateConveyorNode<T>::space() const {
return 0;
}
template <typename T> size_t ImmediateConveyorNode<T>::queued() const {
return retrieved > 1 ? 0 : 1;
}
template <typename T> void ImmediateConveyorNode<T>::childHasFired() {
// Impossible case
assert(false);
}
template <typename T> void ImmediateConveyorNode<T>::parentHasFired() {
SAW_ASSERT(parent) { return; }
assert(parent->space() > 0);
if (queued() > 0) {
armNext();
}
}
template <typename T> void ImmediateConveyorNode<T>::fire() {
if (parent) {
parent->childHasFired();
if (queued() > 0 && parent->space() > 0) {
armLast();
}
}
}
template <typename T>
MergeConveyor<T>::MergeConveyor(Lent<MergeConveyorNodeData<T>> d)
: data{std::move(d)} {}
template <typename T> MergeConveyor<T>::~MergeConveyor() {}
template <typename T> void MergeConveyor<T>::attach(Conveyor<T> conveyor) {
auto sp = data.lock();
SAW_ASSERT(sp) { return; }
sp->attach(std::move(conveyor));
}
template <typename T>
MergeConveyorNode<T>::MergeConveyorNode(Our<MergeConveyorNodeData<T>> d)
: data{d} {
SAW_ASSERT(data) { return; }
data->merger = this;
}
template <typename T> MergeConveyorNode<T>::~MergeConveyorNode() {}
template <typename T>
void MergeConveyorNode<T>::getResult(ErrorOrValue &eov) noexcept {
ErrorOr<FixVoid<T>> &err_or_val = eov.as<FixVoid<T>>();
SAW_ASSERT(data) { return; }
/// @todo search appendages for result
auto &appendages = data->appendages;
next_appendage = std::min(appendages.size(), next_appendage);
for (size_t i = next_appendage; i < appendages.size(); ++i) {
if (appendages[i]->queued() > 0) {
err_or_val = std::move(appendages[i]->error_or_value.value());
appendages[i]->error_or_value = std::nullopt;
next_appendage = i + 1;
return;
}
}
for (size_t i = 0; i < next_appendage; ++i) {
if (appendages[i]->queued() > 0) {
err_or_val = std::move(appendages[i]->error_or_value.value());
appendages[i]->error_or_value = std::nullopt;
next_appendage = i + 1;
return;
}
}
err_or_val = criticalError("No value in Merge Appendages");
}
template <typename T> void MergeConveyorNode<T>::fire() {
SAW_ASSERT(queued() > 0) { return; }
if (parent) {
parent->childHasFired();
if (queued() > 0 && parent->space() > 0) {
armLater();
}
}
}
template <typename T> size_t MergeConveyorNode<T>::space() const { return 0; }
template <typename T> size_t MergeConveyorNode<T>::queued() const {
SAW_ASSERT(data) { return 0; }
size_t queue_count = 0;
for (auto &iter : data->appendages) {
queue_count += iter->queued();
}
return queue_count;
}
template <typename T> void MergeConveyorNode<T>::childHasFired() {
/// This can never happen
assert(false);
}
template <typename T> void MergeConveyorNode<T>::parentHasFired() {
SAW_ASSERT(parent) { return; }
if (queued() > 0) {
if (parent->space() > 0) {
armLater();
}
}
}
template <typename T> size_t MergeConveyorNode<T>::Appendage::space() const {
SAW_ASSERT(merger) { return 0; }
if (error_or_value.has_value()) {
return 0;
}
return 1;
}
template <typename T> size_t MergeConveyorNode<T>::Appendage::queued() const {
SAW_ASSERT(merger) { return 0; }
if (error_or_value.has_value()) {
return 1;
}
return 0;
}
template <typename T>
void MergeConveyorNode<T>::Appendage::getAppendageResult(ErrorOrValue &eov) {
ErrorOr<FixVoid<T>> &err_or_val = eov.as<FixVoid<T>>();
SAW_ASSERT(queued() > 0) {
err_or_val = criticalError("No element queued in Merge Appendage Node");
return;
}
err_or_val = std::move(error_or_value.value());
error_or_value = std::nullopt;
}
template <typename T> void MergeConveyorNode<T>::Appendage::childHasFired() {
SAW_ASSERT(!error_or_value.has_value()) { return; }
ErrorOr<FixVoid<T>> eov;
child->getResult(eov);
error_or_value = std::move(eov);
if (!merger->isArmed()) {
merger->armLater();
}
}
template <typename T> void MergeConveyorNode<T>::Appendage::parentHasFired() {
if (child_storage) {
child_storage->parentHasFired();
}
}
template <typename T>
void MergeConveyorNode<T>::Appendage::setParent(ConveyorStorage *par) {
SAW_ASSERT(merger) { return; }
SAW_ASSERT(child) { return; }
parent = par;
}
template <typename T>
void MergeConveyorNodeData<T>::attach(Conveyor<T> conveyor) {
auto nas = Conveyor<T>::fromConveyor(std::move(conveyor));
auto merge_node_appendage = heap<typename MergeConveyorNode<T>::Appendage>(
nas.second, std::move(nas.first), *merger);
if (nas.second) {
nas.second->setParent(merge_node_appendage.get());
}
appendages.push_back(std::move(merge_node_appendage));
}
template <typename T> void MergeConveyorNodeData<T>::governingNodeDestroyed() {
appendages.clear();
merger = nullptr;
}
template <typename T> AdaptConveyorFeeder<T>::~AdaptConveyorFeeder() {
if (feedee) {
feedee->setFeeder(nullptr);
feedee = nullptr;
}
}
template <typename T>
void AdaptConveyorFeeder<T>::setFeedee(AdaptConveyorNode<T> *feedee_p) {
feedee = feedee_p;
}
template <typename T> void AdaptConveyorFeeder<T>::feed(T &&value) {
if (feedee) {
feedee->feed(std::move(value));
}
}
template <typename T> void AdaptConveyorFeeder<T>::fail(Error &&error) {
if (feedee) {
feedee->fail(std::move(error));
}
}
template <typename T> size_t AdaptConveyorFeeder<T>::queued() const {
if (feedee) {
return feedee->queued();
}
return 0;
}
template <typename T> size_t AdaptConveyorFeeder<T>::space() const {
if (feedee) {
return feedee->space();
}
return 0;
}
template <typename T>
AdaptConveyorNode<T>::AdaptConveyorNode() : ConveyorEventStorage{nullptr} {}
template <typename T> AdaptConveyorNode<T>::~AdaptConveyorNode() {
if (feeder) {
feeder->setFeedee(nullptr);
feeder = nullptr;
}
}
template <typename T>
void AdaptConveyorNode<T>::setFeeder(AdaptConveyorFeeder<T> *feeder_p) {
feeder = feeder_p;
}
template <typename T> void AdaptConveyorNode<T>::feed(T &&value) {
storage.push(std::move(value));
armNext();
}
template <typename T> void AdaptConveyorNode<T>::fail(Error &&error) {
storage.push(std::move(error));
armNext();
}
template <typename T> size_t AdaptConveyorNode<T>::queued() const {
return storage.size();
}
template <typename T> size_t AdaptConveyorNode<T>::space() const {
return std::numeric_limits<size_t>::max() - storage.size();
}
template <typename T>
void AdaptConveyorNode<T>::getResult(ErrorOrValue &err_or_val) {
if (!storage.empty()) {
err_or_val.as<T>() = std::move(storage.front());
storage.pop();
} else {
err_or_val.as<T>() =
criticalError("Signal for retrieval of storage sent even though no "
"data is present");
}
}
template <typename T> void AdaptConveyorNode<T>::childHasFired() {
// Adapt node has no children
assert(false);
}
template <typename T> void AdaptConveyorNode<T>::parentHasFired() {
SAW_ASSERT(parent) { return; }
if (parent->space() == 0) {
return;
}
}
template <typename T> void AdaptConveyorNode<T>::fire() {
if (parent) {
parent->childHasFired();
if (storage.size() > 0) {
armLater();
}
}
}
template <typename T> OneTimeConveyorFeeder<T>::~OneTimeConveyorFeeder() {
if (feedee) {
feedee->setFeeder(nullptr);
feedee = nullptr;
}
}
template <typename T>
void OneTimeConveyorFeeder<T>::setFeedee(OneTimeConveyorNode<T> *feedee_p) {
feedee = feedee_p;
}
template <typename T> void OneTimeConveyorFeeder<T>::feed(T &&value) {
if (feedee) {
feedee->feed(std::move(value));
}
}
template <typename T> void OneTimeConveyorFeeder<T>::fail(Error &&error) {
if (feedee) {
feedee->fail(std::move(error));
}
}
template <typename T> size_t OneTimeConveyorFeeder<T>::queued() const {
if (feedee) {
return feedee->queued();
}
return 0;
}
template <typename T> size_t OneTimeConveyorFeeder<T>::space() const {
if (feedee) {
return feedee->space();
}
return 0;
}
template <typename T> OneTimeConveyorNode<T>::~OneTimeConveyorNode() {
if (feeder) {
feeder->setFeedee(nullptr);
feeder = nullptr;
}
}
template <typename T>
void OneTimeConveyorNode<T>::setFeeder(OneTimeConveyorFeeder<T> *feeder_p) {
feeder = feeder_p;
}
template <typename T> void OneTimeConveyorNode<T>::feed(T &&value) {
storage = std::move(value);
armNext();
}
template <typename T> void OneTimeConveyorNode<T>::fail(Error &&error) {
storage = std::move(error);
armNext();
}
template <typename T> size_t OneTimeConveyorNode<T>::queued() const {
return storage.has_value() ? 1 : 0;
}
template <typename T> size_t OneTimeConveyorNode<T>::space() const {
return passed ? 0 : 1;
}
template <typename T>
void OneTimeConveyorNode<T>::getResult(ErrorOrValue &err_or_val) {
if (storage.has_value()) {
err_or_val.as<T>() = std::move(storage.value());
storage = std::nullopt;
} else {
err_or_val.as<T>() =
criticalError("Signal for retrieval of storage sent even though no "
"data is present");
}
}
template <typename T> void OneTimeConveyorNode<T>::fire() {
if (parent) {
parent->childHasFired();
}
}
} // namespace saw