348 lines
6.9 KiB
C++
348 lines
6.9 KiB
C++
#include "async.h"
|
|
|
|
#include <algorithm>
|
|
#include <cassert>
|
|
|
|
namespace saw {
|
|
namespace {
|
|
thread_local EventLoop *local_loop = nullptr;
|
|
|
|
EventLoop ¤tEventLoop() {
|
|
EventLoop *loop = local_loop;
|
|
assert(loop);
|
|
return *loop;
|
|
}
|
|
} // namespace
|
|
|
|
ConveyorNode::ConveyorNode() {}
|
|
|
|
ConveyorStorage::ConveyorStorage(ConveyorStorage *c) : child_storage{c} {}
|
|
|
|
ConveyorStorage::~ConveyorStorage() {
|
|
if (parent) {
|
|
parent->unlinkChild();
|
|
}
|
|
}
|
|
|
|
void ConveyorStorage::unlinkChild() { child_storage = nullptr; }
|
|
|
|
void ConveyorEventStorage::setParent(ConveyorStorage *p) {
|
|
/*
|
|
* parent check isn't needed, but is used
|
|
* for the assert, because the storage should
|
|
* be armed if there was an element present
|
|
* and a valid parent
|
|
*/
|
|
if (/*!parent && */ p && !isArmed() && queued() > 0) {
|
|
assert(!parent);
|
|
if (p->space() > 0) {
|
|
armLater();
|
|
}
|
|
}
|
|
|
|
parent = p;
|
|
}
|
|
|
|
ConveyorEventStorage::ConveyorEventStorage(ConveyorStorage *c)
|
|
: ConveyorStorage{c} {}
|
|
|
|
ConveyorBase::ConveyorBase(Own<ConveyorNode> &&node_p,
|
|
ConveyorStorage *storage_p)
|
|
: node{std::move(node_p)}, storage{storage_p} {}
|
|
|
|
Error PropagateError::operator()(const Error &error) const {
|
|
return error.copyError();
|
|
}
|
|
|
|
Error PropagateError::operator()(Error &&error) { return std::move(error); }
|
|
|
|
Event::Event() : Event(currentEventLoop()) {}
|
|
|
|
Event::Event(EventLoop &loop) : loop{loop} {}
|
|
|
|
Event::~Event() { disarm(); }
|
|
|
|
void Event::armNext() {
|
|
assert(&loop == local_loop);
|
|
if (prev == nullptr) {
|
|
// Push the next_insert_point back by one
|
|
// and inserts itself before that
|
|
next = *loop.next_insert_point;
|
|
prev = loop.next_insert_point;
|
|
*prev = this;
|
|
if (next) {
|
|
next->prev = &next;
|
|
}
|
|
|
|
// Set the new insertion ptr location to next
|
|
loop.next_insert_point = &next;
|
|
|
|
// Pushes back the later insert point if it was pointing at the
|
|
// previous event
|
|
if (loop.later_insert_point == prev) {
|
|
loop.later_insert_point = &next;
|
|
}
|
|
|
|
// If tail points at the same location then
|
|
// we are at the end and have to update tail then.
|
|
// Technically should be possible by checking if
|
|
// next is a `nullptr`
|
|
if (loop.tail == prev) {
|
|
loop.tail = &next;
|
|
}
|
|
|
|
loop.setRunnable(true);
|
|
}
|
|
}
|
|
|
|
void Event::armLater() {
|
|
assert(&loop == local_loop);
|
|
|
|
if (prev == nullptr) {
|
|
next = *loop.later_insert_point;
|
|
prev = loop.later_insert_point;
|
|
*prev = this;
|
|
if (next) {
|
|
next->prev = &next;
|
|
}
|
|
|
|
loop.later_insert_point = &next;
|
|
if (loop.tail == prev) {
|
|
loop.tail = &next;
|
|
}
|
|
|
|
loop.setRunnable(true);
|
|
}
|
|
}
|
|
|
|
void Event::armLast() {
|
|
assert(&loop == local_loop);
|
|
|
|
if (prev == nullptr) {
|
|
next = *loop.later_insert_point;
|
|
prev = loop.later_insert_point;
|
|
*prev = this;
|
|
if (next) {
|
|
next->prev = &next;
|
|
}
|
|
|
|
if (loop.tail == prev) {
|
|
loop.tail = &next;
|
|
}
|
|
|
|
loop.setRunnable(true);
|
|
}
|
|
}
|
|
|
|
void Event::disarm() {
|
|
if (prev != nullptr) {
|
|
if (loop.tail == &next) {
|
|
loop.tail = prev;
|
|
}
|
|
|
|
if (loop.next_insert_point == &next) {
|
|
loop.next_insert_point = prev;
|
|
}
|
|
|
|
*prev = next;
|
|
if (next) {
|
|
next->prev = prev;
|
|
}
|
|
|
|
prev = nullptr;
|
|
next = nullptr;
|
|
}
|
|
}
|
|
|
|
bool Event::isArmed() const { return prev != nullptr; }
|
|
|
|
SinkConveyor::SinkConveyor() : node{nullptr} {}
|
|
|
|
SinkConveyor::SinkConveyor(Own<ConveyorNode> &&node_p)
|
|
: node{std::move(node_p)} {}
|
|
|
|
void EventLoop::setRunnable(bool runnable) { is_runnable = runnable; }
|
|
|
|
EventLoop::EventLoop() {}
|
|
|
|
EventLoop::EventLoop(Own<EventPort> &&event_port)
|
|
: event_port{std::move(event_port)} {}
|
|
|
|
EventLoop::~EventLoop() { assert(local_loop != this); }
|
|
|
|
void EventLoop::enterScope() {
|
|
assert(!local_loop);
|
|
local_loop = this;
|
|
}
|
|
|
|
void EventLoop::leaveScope() {
|
|
assert(local_loop == this);
|
|
local_loop = nullptr;
|
|
}
|
|
|
|
bool EventLoop::turnLoop() {
|
|
size_t turn_step = 0;
|
|
while (head && turn_step < 65536) {
|
|
if (!turn()) {
|
|
return false;
|
|
}
|
|
++turn_step;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool EventLoop::turn() {
|
|
Event *event = head;
|
|
|
|
if (!event) {
|
|
return false;
|
|
}
|
|
|
|
head = event->next;
|
|
if (head) {
|
|
head->prev = &head;
|
|
}
|
|
|
|
next_insert_point = &head;
|
|
if (later_insert_point == &event->next) {
|
|
later_insert_point = &head;
|
|
}
|
|
if (tail == &event->next) {
|
|
tail = &head;
|
|
}
|
|
|
|
event->next = nullptr;
|
|
event->prev = nullptr;
|
|
|
|
next_insert_point = &head;
|
|
|
|
event->fire();
|
|
|
|
return true;
|
|
}
|
|
|
|
bool EventLoop::wait(const std::chrono::steady_clock::duration &duration) {
|
|
if (event_port) {
|
|
event_port->wait(duration);
|
|
}
|
|
|
|
return turnLoop();
|
|
}
|
|
|
|
bool EventLoop::wait(const std::chrono::steady_clock::time_point &time_point) {
|
|
if (event_port) {
|
|
event_port->wait(time_point);
|
|
}
|
|
|
|
return turnLoop();
|
|
}
|
|
|
|
bool EventLoop::wait() {
|
|
if (event_port) {
|
|
event_port->wait();
|
|
}
|
|
|
|
return turnLoop();
|
|
}
|
|
|
|
bool EventLoop::poll() {
|
|
if (event_port) {
|
|
event_port->poll();
|
|
}
|
|
|
|
return turnLoop();
|
|
}
|
|
|
|
EventPort *EventLoop::eventPort() { return event_port.get(); }
|
|
|
|
ConveyorSinks &EventLoop::daemon() {
|
|
if (!daemon_sink) {
|
|
daemon_sink = heap<ConveyorSinks>();
|
|
}
|
|
return *daemon_sink;
|
|
}
|
|
|
|
WaitScope::WaitScope(EventLoop &loop) : loop{loop} { loop.enterScope(); }
|
|
|
|
WaitScope::~WaitScope() { loop.leaveScope(); }
|
|
|
|
void WaitScope::wait() { loop.wait(); }
|
|
|
|
void WaitScope::wait(const std::chrono::steady_clock::duration &duration) {
|
|
loop.wait(duration);
|
|
}
|
|
|
|
void WaitScope::wait(const std::chrono::steady_clock::time_point &time_point) {
|
|
loop.wait(time_point);
|
|
}
|
|
|
|
void WaitScope::poll() { loop.poll(); }
|
|
|
|
ImmediateConveyorNodeBase::ImmediateConveyorNodeBase()
|
|
: ConveyorEventStorage{nullptr} {}
|
|
|
|
MergeConveyorNodeBase::MergeConveyorNodeBase()
|
|
: ConveyorEventStorage{nullptr} {}
|
|
|
|
void ConveyorSinks::destroySinkConveyorNode(ConveyorNode &node) {
|
|
if (!isArmed()) {
|
|
armLast();
|
|
}
|
|
|
|
delete_nodes.push(&node);
|
|
}
|
|
|
|
void ConveyorSinks::fail(Error &&error) {
|
|
/// @todo call error_handler
|
|
}
|
|
|
|
ConveyorSinks::ConveyorSinks(EventLoop &event_loop) : Event{event_loop} {}
|
|
|
|
void ConveyorSinks::add(Conveyor<void> &&sink) {
|
|
auto nas = Conveyor<void>::fromConveyor(std::move(sink));
|
|
|
|
Own<SinkConveyorNode> sink_node = nullptr;
|
|
try {
|
|
sink_node =
|
|
heap<SinkConveyorNode>(nas.second, std::move(nas.first), *this);
|
|
} catch (std::bad_alloc &) {
|
|
return;
|
|
}
|
|
if (nas.second) {
|
|
nas.second->setParent(sink_node.get());
|
|
}
|
|
|
|
sink_nodes.emplace_back(std::move(sink_node));
|
|
}
|
|
|
|
void ConveyorSinks::fire() {
|
|
while (!delete_nodes.empty()) {
|
|
ConveyorNode *node = delete_nodes.front();
|
|
/*auto erased = */ std::remove_if(sink_nodes.begin(), sink_nodes.end(),
|
|
[node](Own<ConveyorNode> &element) {
|
|
return node == element.get();
|
|
});
|
|
delete_nodes.pop();
|
|
}
|
|
}
|
|
|
|
ConvertConveyorNodeBase::ConvertConveyorNodeBase(Own<ConveyorNode> &&dep)
|
|
: child{std::move(dep)} {}
|
|
|
|
void ConvertConveyorNodeBase::getResult(ErrorOrValue &err_or_val) {
|
|
getImpl(err_or_val);
|
|
}
|
|
|
|
void AttachConveyorNodeBase::getResult(ErrorOrValue &err_or_val) noexcept {
|
|
if (child) {
|
|
child->getResult(err_or_val);
|
|
}
|
|
}
|
|
|
|
void detachConveyor(Conveyor<void> &&conveyor) {
|
|
EventLoop &loop = currentEventLoop();
|
|
ConveyorSinks &sink = loop.daemon();
|
|
sink.add(std::move(conveyor));
|
|
}
|
|
} // namespace saw
|