summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorClaudius Holeksa <mail@keldu.de>2023-05-03 20:34:02 +0200
committerClaudius Holeksa <mail@keldu.de>2023-05-03 20:34:02 +0200
commit2aa2af0007b7e969845642027c635cd3fd9c8aea (patch)
treee72a05a3c2bfe58442b160c0c8e98ce1d095f36f /src
parent9b81a2585142260f89d47cbe1e592cec9e1f778f (diff)
Moved dirs and added codec-json dir
Diffstat (limited to 'src')
-rw-r--r--src/SConscript8
-rw-r--r--src/async/.nix/derivation.nix31
-rw-r--r--src/async/SConscript38
-rw-r--r--src/async/SConstruct66
-rw-r--r--src/async/async.cpp419
-rw-r--r--src/async/async.h1023
-rw-r--r--src/async/async.tmpl.h769
-rw-r--r--src/codec-json/.nix/derivation.nix34
-rw-r--r--src/codec-json/SConscript38
-rw-r--r--src/codec-json/SConstruct66
-rw-r--r--src/codec-json/json.h12
-rw-r--r--src/codec/.nix/derivation.nix31
-rw-r--r--src/codec/SConscript38
-rw-r--r--src/codec/SConstruct66
-rw-r--r--src/codec/data.h89
-rw-r--r--src/codec/proto_kel.h41
-rw-r--r--src/codec/schema.h79
-rw-r--r--src/core/.nix/derivation.nix26
-rw-r--r--src/core/SConscript38
-rw-r--r--src/core/SConstruct66
-rw-r--r--src/core/buffer.cpp434
-rw-r--r--src/core/buffer.h195
-rw-r--r--src/core/common.h75
-rw-r--r--src/core/error.cpp121
-rw-r--r--src/core/error.h233
-rw-r--r--src/core/string_literal.h40
-rw-r--r--src/io-tls/.nix/derivation.nix35
-rw-r--r--src/io-tls/SConscript38
-rw-r--r--src/io-tls/SConstruct66
-rw-r--r--src/io-tls/tls.cpp252
-rw-r--r--src/io-tls/tls.h68
-rw-r--r--src/io/.nix/derivation.nix32
-rw-r--r--src/io/SConscript38
-rw-r--r--src/io/SConstruct66
-rw-r--r--src/io/io.cpp70
-rw-r--r--src/io/io.h214
-rw-r--r--src/io/io_helpers.cpp85
-rw-r--r--src/io/io_helpers.h53
38 files changed, 5093 insertions, 0 deletions
diff --git a/src/SConscript b/src/SConscript
new file mode 100644
index 0000000..8da5a3d
--- /dev/null
+++ b/src/SConscript
@@ -0,0 +1,8 @@
+#!/bin/false
+
+Import('env')
+
+# Export to other libs
+Export('env');
+SConscript('core/SConscript');
+SConscript('async/SConscript');
diff --git a/src/async/.nix/derivation.nix b/src/async/.nix/derivation.nix
new file mode 100644
index 0000000..8ceac08
--- /dev/null
+++ b/src/async/.nix/derivation.nix
@@ -0,0 +1,31 @@
+{ lib
+, stdenvNoCC
+, scons
+, clang
+, clang-tools
+, version
+, forstio
+}:
+
+let
+
+in stdenvNoCC.mkDerivation {
+ pname = "forstio-async";
+ inherit version;
+
+ src = ./..;
+
+ enableParallelBuilding = true;
+
+ nativeBuildInputs = [
+ scons
+ clang
+ clang-tools
+ ];
+
+ buildInputs = [
+ forstio.core
+ ];
+
+ outputs = ["out" "dev"];
+}
diff --git a/src/async/SConscript b/src/async/SConscript
new file mode 100644
index 0000000..69f8950
--- /dev/null
+++ b/src/async/SConscript
@@ -0,0 +1,38 @@
+#!/bin/false
+
+import os
+import os.path
+import glob
+
+
+Import('env')
+
+dir_path = Dir('.').abspath
+
+# Environment for base library
+async_env = env.Clone();
+
+async_env.sources = sorted(glob.glob(dir_path + "/*.cpp"))
+async_env.headers = sorted(glob.glob(dir_path + "/*.h"))
+
+env.sources += async_env.sources;
+env.headers += async_env.headers;
+
+## Shared lib
+objects_shared = []
+async_env.add_source_files(objects_shared, async_env.sources, shared=True);
+async_env.library_shared = async_env.SharedLibrary('#build/forstio-async', [objects_shared]);
+
+## Static lib
+objects_static = []
+async_env.add_source_files(objects_static, async_env.sources, shared=False);
+async_env.library_static = async_env.StaticLibrary('#build/forstio-async', [objects_static]);
+
+# Set Alias
+env.Alias('library_async', [async_env.library_shared, async_env.library_static]);
+
+env.targets += ['library_async'];
+
+# Install
+env.Install('$prefix/lib/', [async_env.library_shared, async_env.library_static]);
+env.Install('$prefix/include/forstio/async/', [async_env.headers]);
diff --git a/src/async/SConstruct b/src/async/SConstruct
new file mode 100644
index 0000000..0d7b7c6
--- /dev/null
+++ b/src/async/SConstruct
@@ -0,0 +1,66 @@
+#!/usr/bin/env python3
+
+import sys
+import os
+import os.path
+import glob
+import re
+
+
+if sys.version_info < (3,):
+ def isbasestring(s):
+ return isinstance(s,basestring)
+else:
+ def isbasestring(s):
+ return isinstance(s, (str,bytes))
+
+def add_kel_source_files(self, sources, filetype, lib_env=None, shared=False, target_post=""):
+
+ if isbasestring(filetype):
+ dir_path = self.Dir('.').abspath
+ filetype = sorted(glob.glob(dir_path+"/"+filetype))
+
+ for path in filetype:
+ target_name = re.sub( r'(.*?)(\.cpp|\.c\+\+)', r'\1' + target_post, path )
+ if shared:
+ target_name+='.os'
+ sources.append( self.SharedObject( target=target_name, source=path ) )
+ else:
+ target_name+='.o'
+ sources.append( self.StaticObject( target=target_name, source=path ) )
+ pass
+
+def isAbsolutePath(key, dirname, env):
+ assert os.path.isabs(dirname), "%r must have absolute path syntax" % (key,)
+
+env_vars = Variables(
+ args=ARGUMENTS
+)
+
+env_vars.Add('prefix',
+ help='Installation target location of build results and headers',
+ default='/usr/local/',
+ validator=isAbsolutePath
+)
+
+env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=[],
+ CPPDEFINES=['SAW_UNIX'],
+ CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'],
+ LIBS=['forstio-core'])
+env.__class__.add_source_files = add_kel_source_files
+env.Tool('compilation_db');
+env.cdb = env.CompilationDatabase('compile_commands.json');
+
+env.objects = [];
+env.sources = [];
+env.headers = [];
+env.targets = [];
+
+Export('env')
+SConscript('SConscript')
+
+env.Alias('cdb', env.cdb);
+env.Alias('all', [env.targets]);
+env.Default('all');
+
+env.Alias('install', '$prefix')
diff --git a/src/async/async.cpp b/src/async/async.cpp
new file mode 100644
index 0000000..c53ffa6
--- /dev/null
+++ b/src/async/async.cpp
@@ -0,0 +1,419 @@
+#include "async.h"
+#include <forstio/core/common.h>
+#include <forstio/core/error.h>
+
+#include <algorithm>
+#include <cassert>
+
+namespace saw {
+namespace {
+thread_local event_loop *local_loop = nullptr;
+
+event_loop &current_event_loop() {
+ event_loop *loop = local_loop;
+ assert(loop);
+ return *loop;
+}
+} // namespace
+
+conveyor_node::conveyor_node() {}
+
+conveyor_node_with_child_mixin::conveyor_node_with_child_mixin(
+ own<conveyor_node> &&child_, conveyor_node &owner)
+ : child{std::move(child_)} {
+ assert(child);
+
+ child->notify_parent_attached(owner);
+}
+
+error_or<own<conveyor_node>>
+conveyor_node_with_child_mixin::swap_child(own<conveyor_node> &&swapee) {
+ SAW_ASSERT(child) {
+ return make_error<err::invalid_state>("Child should exist if this function is called");
+ }
+ own<conveyor_node> old_child = std::move(child);
+
+ /**
+ * We need the parent of the old_child's next storage
+ */
+ conveyor_storage *old_storage = old_child->next_storage();
+ conveyor_storage *old_storage_parent =
+ old_storage ? old_storage->get_parent() : nullptr;
+
+ /**
+ * Swap in the new child
+ */
+ if (swapee) {
+ child = std::move(swapee);
+
+ /**
+ * Then we need to set the new child's storage parent since the next
+ * storage has a nullptr set And if the old_storage_parent is a nullptr,
+ * then it doesn't matter. So we don't check for it
+ */
+ conveyor_storage *swapee_storage = child->next_storage();
+ if (swapee_storage) {
+ swapee_storage->set_parent(old_storage_parent);
+ }
+ }
+
+ return old_child;
+}
+
+conveyor_storage::conveyor_storage() {}
+
+conveyor_storage::~conveyor_storage() {}
+
+conveyor_storage *conveyor_storage::get_parent() const { return parent_; }
+
+void conveyor_event_storage::set_parent(conveyor_storage *p) {
+ /*
+ * parent check isn't needed, but is used
+ * for the assert, because the storage should
+ * be armed if there was an element present
+ * and a valid parent
+ */
+ if (/*!parent && */ p && !is_armed() && queued() > 0) {
+ assert(!parent_);
+ if (p->space() > 0) {
+ arm_later();
+ }
+ }
+
+ parent_ = p;
+}
+
+conveyor_event_storage::conveyor_event_storage() : conveyor_storage{} {}
+
+conveyor_base::conveyor_base(own<conveyor_node> &&node_p)
+ : node_{std::move(node_p)} {}
+
+error propagate_error::operator()(const error &error) const {
+ return error.copy_error();
+}
+
+error propagate_error::operator()(error &&err) { return std::move(err); }
+
+event::event() : event(current_event_loop()) {}
+
+event::event(event_loop &loop) : loop_{loop} {}
+
+event::~event() { disarm(); }
+
+void event::arm_next() {
+ assert(&loop_ == local_loop);
+ if (prev_ == nullptr) {
+ // Push the next_insert_point back by one
+ // and inserts itself before that
+ next_ = *loop_.next_insert_point_;
+ prev_ = loop_.next_insert_point_;
+ *prev_ = this;
+ if (next_) {
+ next_->prev_ = &next_;
+ }
+
+ // Set the new insertion ptr location to next
+ loop_.next_insert_point_ = &next_;
+
+ // Pushes back the later insert point if it was pointing at the
+ // previous event
+ if (loop_.later_insert_point_ == prev_) {
+ loop_.later_insert_point_ = &next_;
+ }
+
+ // If tail_ points at the same location then
+ // we are at the end and have to update tail_ then.
+ // Technically should be possible by checking if
+ // next is a `nullptr`
+ if (loop_.tail_ == prev_) {
+ loop_.tail_ = &next_;
+ }
+
+ loop_.set_runnable(true);
+ }
+}
+
+void event::arm_later() {
+ assert(&loop_ == local_loop);
+
+ if (prev_ == nullptr) {
+ next_ = *loop_.later_insert_point_;
+ prev_ = loop_.later_insert_point_;
+ *prev_ = this;
+ if (next_) {
+ next_->prev_ = &next_;
+ }
+
+ loop_.later_insert_point_ = &next_;
+ if (loop_.tail_ == prev_) {
+ loop_.tail_ = &next_;
+ }
+
+ loop_.set_runnable(true);
+ }
+}
+
+void event::arm_last() {
+ assert(&loop_ == local_loop);
+
+ if (prev_ == nullptr) {
+ next_ = *loop_.later_insert_point_;
+ prev_ = loop_.later_insert_point_;
+ *prev_ = this;
+ if (next_) {
+ next_->prev_ = &next_;
+ }
+
+ if (loop_.tail_ == prev_) {
+ loop_.tail_ = &next_;
+ }
+
+ loop_.set_runnable(true);
+ }
+}
+
+void event::disarm() {
+ if (prev_ != nullptr) {
+ if (loop_.tail_ == &next_) {
+ loop_.tail_ = prev_;
+ }
+
+ if (loop_.next_insert_point_ == &next_) {
+ loop_.next_insert_point_ = prev_;
+ }
+
+ *prev_ = next_;
+ if (next_) {
+ next_->prev_ = prev_;
+ }
+
+ prev_ = nullptr;
+ next_ = nullptr;
+ }
+}
+
+bool event::is_armed() const { return prev_ != nullptr; }
+
+conveyor_sink::conveyor_sink() : node_{nullptr} {}
+
+conveyor_sink::conveyor_sink(own<conveyor_node> &&node_p)
+ : node_{std::move(node_p)} {}
+
+void event_loop::set_runnable(bool runnable) { is_runnable_ = runnable; }
+
+event_loop::event_loop() {}
+
+event_loop::event_loop(own<class event_port> &&ep)
+ : event_port_{std::move(ep)} {}
+
+event_loop::~event_loop() { assert(local_loop != this); }
+
+void event_loop::enter_scope() {
+ assert(!local_loop);
+ local_loop = this;
+}
+
+void event_loop::leave_scope() {
+ assert(local_loop == this);
+ local_loop = nullptr;
+}
+
+bool event_loop::turn_loop() {
+ size_t turn_step = 0;
+ while (head_ && turn_step < 65536) {
+ if (!turn()) {
+ return false;
+ }
+ ++turn_step;
+ }
+ return true;
+}
+
+bool event_loop::turn() {
+ event *event = head_;
+
+ if (!event) {
+ return false;
+ }
+
+ head_ = event->next_;
+ if (head_) {
+ head_->prev_ = &head_;
+ }
+
+ next_insert_point_ = &head_;
+ if (later_insert_point_ == &event->next_) {
+ later_insert_point_ = &head_;
+ }
+ if (tail_ == &event->next_) {
+ tail_ = &head_;
+ }
+
+ event->next_ = nullptr;
+ event->prev_ = nullptr;
+
+ next_insert_point_ = &head_;
+
+ event->fire();
+
+ return true;
+}
+
+bool event_loop::wait(const std::chrono::steady_clock::duration &duration) {
+ if (event_port_) {
+ event_port_->wait(duration);
+ }
+
+ return turn_loop();
+}
+
+bool event_loop::wait(const std::chrono::steady_clock::time_point &time_point) {
+ if (event_port_) {
+ event_port_->wait(time_point);
+ }
+
+ return turn_loop();
+}
+
+bool event_loop::wait() {
+ if (event_port_) {
+ event_port_->wait();
+ }
+
+ return turn_loop();
+}
+
+bool event_loop::poll() {
+ if (event_port_) {
+ event_port_->poll();
+ }
+
+ return turn_loop();
+}
+
+event_port *event_loop::event_port() { return event_port_.get(); }
+
+conveyor_sink_set &event_loop::daemon() {
+ if (!daemon_sink_) {
+ daemon_sink_ = heap<conveyor_sink_set>();
+ }
+ return *daemon_sink_;
+}
+
+wait_scope::wait_scope(event_loop &loop) : loop_{loop} { loop_.enter_scope(); }
+
+wait_scope::~wait_scope() { loop_.leave_scope(); }
+
+void wait_scope::wait() { loop_.wait(); }
+
+void wait_scope::wait(const std::chrono::steady_clock::duration &duration) {
+ loop_.wait(duration);
+}
+
+void wait_scope::wait(const std::chrono::steady_clock::time_point &time_point) {
+ loop_.wait(time_point);
+}
+
+void wait_scope::poll() { loop_.poll(); }
+
+error_or<own<conveyor_node>>
+convert_conveyor_node_base::swap_child(own<conveyor_node> &&swapee) noexcept {
+ return child_mixin_.swap_child(std::move(swapee));
+}
+
+conveyor_storage *convert_conveyor_node_base::next_storage() noexcept {
+ if (!child_mixin_.child) {
+ return nullptr;
+ }
+ return child_mixin_.child->next_storage();
+}
+
+immediate_conveyor_node_base::immediate_conveyor_node_base()
+ : conveyor_event_storage{} {}
+
+merge_conveyor_node_base::merge_conveyor_node_base()
+ : conveyor_event_storage{} {}
+
+error_or<own<conveyor_node>> queue_buffer_conveyor_node_base::swap_child(
+ own<conveyor_node> &&swapee_) noexcept {
+ return child_mixin_.swap_child(std::move(swapee_));
+}
+
+void conveyor_sink_set::destroy_sink_conveyor_node(conveyor_node &node) {
+ if (!is_armed()) {
+ arm_last();
+ }
+
+ delete_nodes_.push(&node);
+}
+
+void conveyor_sink_set::fail(error &&error) {
+ /// @todo call error_handler
+}
+
+conveyor_sink_set::conveyor_sink_set(event_loop &event_loop)
+ : event{event_loop} {}
+
+void conveyor_sink_set::add(conveyor<void> &&sink) {
+ auto nas = conveyor<void>::from_conveyor(std::move(sink));
+ SAW_ASSERT(nas) { return; }
+ conveyor_storage *storage = nas->next_storage();
+
+ own<sink_conveyor_node> sink_node = nullptr;
+ try {
+ sink_node = heap<sink_conveyor_node>(std::move(nas), *this);
+ } catch (std::bad_alloc &) {
+ return;
+ }
+ if (storage) {
+ storage->set_parent(sink_node.get());
+ }
+
+ sink_nodes_.emplace_back(std::move(sink_node));
+}
+
+void conveyor_sink_set::fire() {
+ while (!delete_nodes_.empty()) {
+ conveyor_node *node = delete_nodes_.front();
+ /*auto erased = */ std::remove_if(sink_nodes_.begin(),
+ sink_nodes_.end(),
+ [node](own<conveyor_node> &element) {
+ return node == element.get();
+ });
+ delete_nodes_.pop();
+ }
+}
+
+convert_conveyor_node_base::convert_conveyor_node_base(own<conveyor_node> &&dep)
+ : child_mixin_{std::move(dep), *this} {}
+
+void convert_conveyor_node_base::get_result(error_or_value &err_or_val) {
+ get_impl(err_or_val);
+}
+
+void attach_conveyor_node_base::get_result(
+ error_or_value &err_or_val) noexcept {
+ if (child_mixin_.child) {
+ child_mixin_.child->get_result(err_or_val);
+ }
+}
+
+error_or<own<conveyor_node>>
+attach_conveyor_node_base::swap_child(own<conveyor_node> &&swapee_) noexcept {
+ return child_mixin_.swap_child(std::move(swapee_));
+}
+
+conveyor_storage *attach_conveyor_node_base::next_storage() noexcept {
+ if (!child_mixin_.child) {
+ return nullptr;
+ }
+
+ return child_mixin_.child->next_storage();
+}
+
+void detach_conveyor(conveyor<void> &&conveyor) {
+ event_loop &loop = current_event_loop();
+ conveyor_sink_set &sink = loop.daemon();
+ sink.add(std::move(conveyor));
+}
+} // namespace saw
diff --git a/src/async/async.h b/src/async/async.h
new file mode 100644
index 0000000..4cfed60
--- /dev/null
+++ b/src/async/async.h
@@ -0,0 +1,1023 @@
+#pragma once
+
+#include <forstio/core/common.h>
+#include <forstio/core/error.h>
+
+#include <chrono>
+#include <functional>
+#include <limits>
+#include <list>
+#include <queue>
+#include <type_traits>
+
+namespace saw {
+class conveyor_storage;
+class conveyor_node {
+public:
+ conveyor_node();
+ virtual ~conveyor_node() = default;
+
+ /**
+ * Internal method to retrieve results from children
+ */
+ virtual void get_result(error_or_value &err_or_val) = 0;
+
+ /**
+ * Swap out child with another one
+ */
+ virtual error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee_) = 0;
+
+ /**
+ * Retrieve the next storage node
+ */
+ virtual conveyor_storage *next_storage() = 0;
+
+ /**
+ * Notify that a new parent was attached
+ * Only relevant for the feeding nodes
+ */
+ virtual void notify_parent_attached(conveyor_node &){};
+};
+
+class conveyor_node_with_child_mixin final {
+public:
+ own<conveyor_node> child = nullptr;
+
+ conveyor_node_with_child_mixin(own<conveyor_node> &&child_,
+ conveyor_node &owner_);
+ ~conveyor_node_with_child_mixin() = default;
+
+ /**
+ * Swap out children and return the child ptr, since the caller is the child
+ * itself. Stack needs to be cleared before the child is destroyed, so the
+ * swapped out node is returned as well.
+ */
+ error_or<own<conveyor_node>> swap_child(own<conveyor_node> &&swapee);
+};
+
+class conveyor_node_with_parent_mixin final {
+public:
+ conveyor_node *parent = nullptr;
+
+ error_or<own<conveyor_node>>
+ swap_child_of_parent(own<conveyor_node> &&swapee) {
+ SAW_ASSERT(parent) {
+ return make_error<err::invalid_state>(
+ "Can't swap child, because parent doesn't exist");
+ }
+
+ return parent->swap_child(std::move(swapee));
+ }
+ void change_parent(conveyor_node *p) { parent = p; }
+};
+
+class event_loop;
+class wait_scope;
+/*
+ * Event class similar to capn'proto.
+ * https://github.com/capnproto/capnproto
+ */
+class event {
+private:
+ event_loop &loop_;
+ event **prev_ = nullptr;
+ event *next_ = nullptr;
+
+ friend class event_loop;
+
+public:
+ event();
+ event(event_loop &loop);
+ virtual ~event();
+
+ virtual void fire() = 0;
+
+ void arm_next();
+ void arm_later();
+ void arm_last();
+ void disarm();
+
+ bool is_armed() const;
+};
+
+class conveyor_storage {
+protected:
+ conveyor_storage *parent_ = nullptr;
+
+public:
+ conveyor_storage();
+ virtual ~conveyor_storage();
+
+ virtual size_t space() const = 0;
+ virtual size_t queued() const = 0;
+ virtual void child_has_fired() = 0;
+ virtual void parent_has_fired() = 0;
+
+ virtual void set_parent(conveyor_storage *parent) = 0;
+ conveyor_storage *get_parent() const;
+};
+
+class conveyor_event_storage : public conveyor_storage, public event {
+public:
+ conveyor_event_storage();
+ virtual ~conveyor_event_storage() = default;
+
+ void set_parent(conveyor_storage *parent) override;
+};
+
+class conveyor_base {
+protected:
+ own<conveyor_node> node_;
+
+public:
+ conveyor_base(own<conveyor_node> &&node_p);
+ virtual ~conveyor_base() = default;
+
+ conveyor_base(conveyor_base &&) = default;
+ conveyor_base &operator=(conveyor_base &&) = default;
+
+ void get(error_or_value &err_or_val);
+};
+
+template <typename T> class conveyor;
+
+template <typename T> conveyor<T> chained_conveyor_type(T *);
+
+// template <typename T> Conveyor<T> chainedConveyorType(Conveyor<T> *);
+
+template <typename T> T remove_error_or_type(T *);
+
+template <typename T> T remove_error_or_type(error_or<T> *);
+
+template <typename T>
+using remove_error_or = decltype(remove_error_or_type((T *)nullptr));
+
+template <typename T>
+using chained_conveyors = decltype(chained_conveyor_type((T *)nullptr));
+
+template <typename Func, typename T>
+using conveyor_result =
+ chained_conveyors<remove_error_or<return_type<Func, T>>>;
+
+struct propagate_error {
+public:
+ error operator()(const error &err) const;
+ error operator()(error &&err);
+};
+
+class conveyor_sink {
+private:
+ own<conveyor_node> node_;
+
+public:
+ conveyor_sink();
+ conveyor_sink(own<conveyor_node> &&node);
+
+ conveyor_sink(conveyor_sink &&) = default;
+ conveyor_sink &operator=(conveyor_sink &&) = default;
+};
+
+template <typename T> class merge_conveyor_node_data;
+
+template <typename T> class merge_conveyor {
+private:
+ lent<merge_conveyor_node_data<T>> data_;
+
+public:
+ merge_conveyor() = default;
+ merge_conveyor(lent<merge_conveyor_node_data<T>> d);
+ ~merge_conveyor();
+
+ void attach(conveyor<T> conv);
+};
+
+/**
+ * Main interface for async operations.
+ */
+template <typename T> class conveyor final : public conveyor_base {
+public:
+ /**
+ * Construct an immediately fulfilled node
+ */
+ conveyor(fix_void<T> value);
+
+ /**
+ * Construct an immediately failed node
+ */
+ conveyor(error &&err);
+
+ /**
+ * Construct a conveyor with a child node
+ */
+ conveyor(own<conveyor_node> node_p);
+
+ conveyor(conveyor<T> &&) = default;
+ conveyor<T> &operator=(conveyor<T> &&) = default;
+
+ /**
+ * This method converts values or errors from children
+ */
+ template <typename Func, typename ErrorFunc = propagate_error>
+ [[nodiscard]] conveyor_result<Func, T>
+ then(Func &&func, ErrorFunc &&error_func = propagate_error());
+
+ /**
+ * This method adds a buffer node in the conveyor chains which acts as a
+ * scheduler interrupt point and collects elements up to the supplied limit.
+ */
+ [[nodiscard]] conveyor<T>
+ buffer(size_t limit = std::numeric_limits<size_t>::max());
+
+ /**
+ * This method just takes ownership of any supplied types,
+ * which are destroyed when the chain gets destroyed.
+ * Useful for resource lifetime control.
+ */
+ template <typename... Args>
+ [[nodiscard]] conveyor<T> attach(Args &&...args);
+
+ /** @todo implement
+ * This method limits the total amount of passed elements
+ * Be careful where you place this node into the chain.
+ * If you meant to fork it and destroy paths you shouldn't place
+ * an interrupt point between the fork and this limiter
+ */
+ [[nodiscard]] conveyor<T> limit(size_t val = 1);
+
+ /**
+ *
+ */
+ [[nodiscard]] std::pair<conveyor<T>, merge_conveyor<T>> merge();
+
+ /**
+ * Moves the conveyor chain into a thread local storage point which drops
+ * every element. Use sink() if you want to control the lifetime of a
+ * conveyor chain
+ */
+ template <typename ErrorFunc = propagate_error>
+ void detach(ErrorFunc &&err_func = propagate_error());
+ /**
+ * Creates a local sink which drops elements, but lifetime control remains
+ * in your hand.
+ */
+ template <typename ErrorFunc = propagate_error>
+ [[nodiscard]] conveyor_sink
+ sink(ErrorFunc &&error_func = propagate_error());
+
+ /**
+ * If no sink() or detach() is used you have to take elements out of the
+ * chain yourself.
+ */
+ error_or<fix_void<T>> take();
+
+ /** @todo implement
+ * Specifically pump elements through this chain with the provided
+ * wait_scope
+ */
+ void poll(wait_scope &wait_scope);
+
+ // helper
+ static conveyor<T> to_conveyor(own<conveyor_node> node);
+
+ // helper
+ static own<conveyor_node> from_conveyor(conveyor<T> conveyor);
+};
+
+template <typename Func> conveyor_result<Func, void> exec_later(Func &&func);
+
+/*
+ * Join Conveyors into a single one
+ */
+template <typename... Args>
+conveyor<std::tuple<Args...>>
+join_conveyors(std::tuple<conveyor<Args>...> &conveyors);
+
+template <typename T> class conveyor_feeder {
+public:
+ virtual ~conveyor_feeder() = default;
+
+ virtual void feed(T &&data) = 0;
+ virtual void fail(error &&error) = 0;
+
+ virtual size_t space() const = 0;
+ virtual size_t queued() const = 0;
+
+ virtual error swap(conveyor<T> &&conveyor) noexcept = 0;
+};
+
+template <> class conveyor_feeder<void> {
+public:
+ virtual ~conveyor_feeder() = default;
+
+ virtual void feed(void_t &&value = void_t{}) = 0;
+ virtual void fail(error &&error) = 0;
+
+ virtual size_t space() const = 0;
+ virtual size_t queued() const = 0;
+
+ virtual error swap(conveyor<void_t> &&conveyor) noexcept = 0;
+};
+
+template <typename T> struct conveyor_and_feeder {
+ own<conveyor_feeder<T>> feeder;
+ conveyor<T> conveyor;
+};
+
+template <typename T> conveyor_and_feeder<T> new_conveyor_and_feeder();
+
+template <typename T> conveyor_and_feeder<T> one_time_conveyor_and_feeder();
+
+enum class Signal : uint8_t { Terminate, User1 };
+
+/**
+ * Class which acts as a correspondent between the running framework and outside
+ * events which may be signals from the operating system or just other threads.
+ * Default EventPorts are supplied by setupAsyncIo() in io.h
+ */
+class event_port {
+public:
+ virtual ~event_port() = default;
+
+ virtual conveyor<void> on_signal(Signal signal) = 0;
+
+ virtual void poll() = 0;
+ virtual void wait() = 0;
+ virtual void wait(const std::chrono::steady_clock::duration &) = 0;
+ virtual void wait(const std::chrono::steady_clock::time_point &) = 0;
+
+ virtual void wake() = 0;
+};
+
+class sink_conveyor_node;
+
+class conveyor_sink_set final : public event {
+private:
+ /*
+ class Helper final : public Event {
+ private:
+ void destroySinkConveyorNode(ConveyorNode& sink);
+ void fail(Error&& error);
+
+ std::vector<Own<ConveyorNode>> sink_nodes;
+ std::queue<ConveyorNode*> delete_nodes;
+ std::function<void(Error&& error)> error_handler;
+
+ public:
+ ConveyorSinks() = default;
+ ConveyorSinks(EventLoop& event_loop);
+
+ void add(Conveyor<void> node);
+
+ void fire() override {}
+ };
+
+ gin::Own<Helper> helper;
+ */
+ friend class sink_conveyor_node;
+
+ void destroy_sink_conveyor_node(conveyor_node &sink_node);
+ void fail(error &&err);
+
+ std::list<own<conveyor_node>> sink_nodes_;
+
+ std::queue<conveyor_node *> delete_nodes_;
+
+ std::function<void(error &&)> error_handler_;
+
+public:
+ // ConveyorSinks();
+ // ConveyorSinks(EventLoop& event_loop);
+ conveyor_sink_set() = default;
+ conveyor_sink_set(event_loop &event_loop);
+
+ void add(conveyor<void> &&node);
+
+ void fire() override;
+};
+
+/*
+ * EventLoop class similar to capn'proto.
+ * https://github.com/capnproto/capnproto
+ */
+class event_loop {
+private:
+ friend class event;
+ event *head_ = nullptr;
+ event **tail_ = &head_;
+ event **next_insert_point_ = &head_;
+ event **later_insert_point_ = &head_;
+
+ bool is_runnable_ = false;
+
+ own<event_port> event_port_ = nullptr;
+
+ own<conveyor_sink_set> daemon_sink_ = nullptr;
+
+ // functions
+ void set_runnable(bool runnable);
+
+ friend class wait_scope;
+ void enter_scope();
+ void leave_scope();
+
+ bool turn_loop();
+ bool turn();
+
+public:
+ event_loop();
+ event_loop(own<event_port> &&port);
+ ~event_loop();
+
+ event_loop(event_loop &&) = default;
+ event_loop &operator=(event_loop &&) = default;
+
+ bool wait();
+ bool wait(const std::chrono::steady_clock::duration &);
+ bool wait(const std::chrono::steady_clock::time_point &);
+ bool poll();
+
+ event_port *event_port();
+
+ conveyor_sink_set &daemon();
+};
+
+/*
+ * WaitScope class similar to capn'proto.
+ * https://github.com/capnproto/capnproto
+ */
+class wait_scope {
+private:
+ event_loop &loop_;
+
+public:
+ wait_scope(event_loop &loop);
+ ~wait_scope();
+
+ void wait();
+ void wait(const std::chrono::steady_clock::duration &);
+ void wait(const std::chrono::steady_clock::time_point &);
+ void poll();
+};
+
+template <typename Func> conveyor_result<Func, void> yield_next(Func &&func);
+
+template <typename Func> conveyor_result<Func, void> yield_later(Func &&func);
+
+template <typename Func> conveyor_result<Func, void> yield_last(Func &&func);
+} // namespace saw
+
+// Secret stuff
+// Aka private semi hidden classes
+namespace saw {
+
+template <typename Out, typename In> struct fix_void_caller {
+ template <typename Func> static Out apply(Func &func, In &&in) {
+ return func(std::move(in));
+ }
+};
+
+template <typename Out> struct fix_void_caller<Out, void_t> {
+ template <typename Func> static Out apply(Func &func, void_t &&in) {
+ (void)in;
+ return func();
+ }
+};
+
+template <typename In> struct fix_void_caller<void_t, In> {
+ template <typename Func> static void_t apply(Func &func, In &&in) {
+ func(std::move(in));
+ return void_t{};
+ }
+};
+
+template <> struct fix_void_caller<void_t, void_t> {
+ template <typename Func> static void_t apply(Func &func, void_t &&in) {
+ (void)in;
+ func();
+ return void_t{};
+ }
+};
+
+template <typename T> class adapt_conveyor_node;
+
+template <typename T>
+class adapt_conveyor_feeder final : public conveyor_feeder<unfix_void<T>> {
+private:
+ adapt_conveyor_node<T> *feedee_ = nullptr;
+
+public:
+ ~adapt_conveyor_feeder();
+
+ void set_feedee(adapt_conveyor_node<T> *feedee);
+
+ void feed(T &&value) override;
+ void fail(error &&error) override;
+
+ size_t space() const override;
+ size_t queued() const override;
+
+ error swap(conveyor<T> &&conv) noexcept override;
+};
+
+template <typename T>
+class adapt_conveyor_node final : public conveyor_node,
+ public conveyor_event_storage {
+private:
+ adapt_conveyor_feeder<T> *feeder_ = nullptr;
+
+ std::queue<error_or<unfix_void<T>>> storage_;
+
+ conveyor_node_with_parent_mixin parent_node_;
+
+public:
+ adapt_conveyor_node();
+ ~adapt_conveyor_node();
+
+ void set_feeder(adapt_conveyor_feeder<T> *feeder);
+
+ void feed(T &&value);
+ void fail(error &&error);
+
+ // ConveyorNode
+ void get_result(error_or_value &err_or_val) override;
+
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee) noexcept override;
+
+ conveyor_storage *next_storage() noexcept override;
+ void notify_parent_attached(conveyor_node &) noexcept override;
+
+ // ConveyorStorage
+ size_t space() const override;
+ size_t queued() const override;
+
+ void child_has_fired() override;
+ void parent_has_fired() override;
+
+ // Event
+ void fire() override;
+};
+
+template <typename T> class one_time_conveyor_node;
+
+template <typename T>
+class one_time_conveyor_feeder final : public conveyor_feeder<unfix_void<T>> {
+private:
+ one_time_conveyor_node<T> *feedee_ = nullptr;
+
+public:
+ ~one_time_conveyor_feeder();
+
+ void set_feedee(one_time_conveyor_node<T> *feedee);
+
+ void feed(T &&value) override;
+ void fail(error &&error) override;
+
+ size_t space() const override;
+ size_t queued() const override;
+};
+
+template <typename T>
+class one_time_conveyor_node final : public conveyor_node,
+ public conveyor_storage,
+ public event {
+private:
+ one_time_conveyor_feeder<T> *feeder_ = nullptr;
+
+ bool passed_ = false;
+ maybe<error_or<T>> storage_ = std::nullopt;
+
+public:
+ ~one_time_conveyor_node();
+
+ void set_feeder(one_time_conveyor_feeder<T> *feeder);
+
+ void feed(T &&value);
+ void fail(error &&error);
+
+ // ConveyorNode
+ void get_result(error_or_value &err_or_val) override;
+
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee) override;
+
+ // ConveyorStorage
+ size_t space() const override;
+ size_t queued() const override;
+
+ void child_has_fired() override {}
+ void parent_has_fired() override;
+
+ // Event
+ void fire() override;
+};
+
+/**
+ * This class buffers and saves incoming elements and acts as an interrupt node
+ * for processing calls
+ */
+class queue_buffer_conveyor_node_base : public conveyor_node,
+ public conveyor_event_storage {
+protected:
+ conveyor_node_with_child_mixin child_mixin_;
+
+public:
+ queue_buffer_conveyor_node_base(own<conveyor_node> child_)
+ : conveyor_event_storage{}, child_mixin_{std::move(child_), *this} {}
+ virtual ~queue_buffer_conveyor_node_base() = default;
+
+ /**
+ * Use mixin
+ */
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee_) noexcept override;
+
+ conveyor_storage *next_storage() noexcept override {
+ return static_cast<conveyor_storage *>(this);
+ }
+};
+
+template <typename T>
+class queue_buffer_conveyor_node final
+ : public queue_buffer_conveyor_node_base {
+private:
+ std::queue<error_or<T>> storage_;
+ size_t max_store_;
+
+public:
+ queue_buffer_conveyor_node(own<conveyor_node> dep, size_t max_size)
+ : queue_buffer_conveyor_node_base{std::move(dep)}, max_store_{
+ max_size} {}
+ // Event
+ void fire() override;
+ // ConveyorNode
+ void get_result(error_or_value &eov) noexcept override;
+
+ // ConveyorStorage
+ size_t space() const override;
+ size_t queued() const override;
+
+ void child_has_fired() override;
+ void parent_has_fired() override;
+};
+
+class attach_conveyor_node_base : public conveyor_node {
+protected:
+ conveyor_node_with_child_mixin child_mixin_;
+
+public:
+ attach_conveyor_node_base(own<conveyor_node> &&child_)
+ : child_mixin_{std::move(child_), *this} {}
+
+ virtual ~attach_conveyor_node_base() = default;
+
+ void get_result(error_or_value &err_or_val) noexcept override;
+
+ /**
+ * Use mixin
+ */
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee_) noexcept override;
+
+ conveyor_storage *next_storage() noexcept override;
+};
+
+template <typename... Args>
+class attach_conveyor_node final : public attach_conveyor_node_base {
+public:
+ attach_conveyor_node(own<conveyor_node> &&dep, Args &&...args)
+ : attach_conveyor_node_base(std::move(dep)), attached_data_{
+ std::move(args...)} {}
+
+private:
+ std::tuple<Args...> attached_data_;
+};
+
+class convert_conveyor_node_base : public conveyor_node {
+public:
+ convert_conveyor_node_base(own<conveyor_node> &&dep);
+ virtual ~convert_conveyor_node_base() = default;
+
+ void get_result(error_or_value &err_or_val) override;
+
+ virtual void get_impl(error_or_value &err_or_val) = 0;
+
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee) noexcept override;
+
+ conveyor_storage *next_storage() noexcept override;
+
+protected:
+ conveyor_node_with_child_mixin child_mixin_;
+};
+
+template <typename T, typename DepT, typename Func, typename ErrorFunc>
+class convert_conveyor_node final : public convert_conveyor_node_base {
+private:
+ Func func_;
+ ErrorFunc error_func_;
+
+ static_assert(std::is_same<DepT, remove_error_or<DepT>>::value,
+ "Should never be of type ErrorOr");
+
+public:
+ convert_conveyor_node(own<conveyor_node> &&dep, Func &&func,
+ ErrorFunc &&error_func)
+ : convert_conveyor_node_base(std::move(dep)), func_{std::move(func)},
+ error_func_{std::move(error_func)} {}
+
+ void get_impl(error_or_value &err_or_val) noexcept override {
+ error_or<unfix_void<DepT>> dep_eov;
+ error_or<unfix_void<remove_error_or<T>>> &eov =
+ err_or_val.as<unfix_void<remove_error_or<T>>>();
+ if (child_mixin_.child) {
+ child_mixin_.child->get_result(dep_eov);
+ if (dep_eov.is_value()) {
+ try {
+
+ eov = fix_void_caller<T, DepT>::apply(
+ func_, std::move(dep_eov.get_value()));
+ } catch (const std::bad_alloc &) {
+ eov = make_error<err::out_of_memory>("Out of memory");
+ } catch (const std::exception &) {
+ eov = make_error<err::invalid_state>(
+ "Exception in chain occured. Return ErrorOr<T> if you "
+ "want to handle errors which are recoverable");
+ }
+ } else if (dep_eov.is_error()) {
+ eov = error_func_(std::move(dep_eov.get_error()));
+ } else {
+ eov = make_error<err::invalid_state>("No value set in dependency");
+ }
+ } else {
+ eov = make_error<err::invalid_state>("Conveyor doesn't have child");
+ }
+ }
+};
+
+class sink_conveyor_node final : public conveyor_node,
+ public conveyor_event_storage {
+private:
+ conveyor_node_with_child_mixin child_mixin_;
+ conveyor_sink_set *conveyor_sink_;
+
+public:
+ sink_conveyor_node(own<conveyor_node> node, conveyor_sink_set &conv_sink)
+ : conveyor_event_storage{}, child_mixin_{std::move(node), *this},
+ conveyor_sink_{&conv_sink} {}
+
+ sink_conveyor_node(own<conveyor_node> node)
+ : conveyor_event_storage{}, child_mixin_{std::move(node), *this},
+ conveyor_sink_{nullptr} {}
+
+ // Event only queued if a critical error occured
+ void fire() override {
+ // Queued for destruction of children, because this acts as a sink and
+ // no other event should be here
+ child_mixin_.child = nullptr;
+
+ if (conveyor_sink_) {
+ conveyor_sink_->destroy_sink_conveyor_node(*this);
+ conveyor_sink_ = nullptr;
+ }
+ }
+
+ // ConveyorStorage
+ size_t space() const override { return 1; }
+ size_t queued() const override { return 0; }
+
+ // ConveyorNode
+ void get_result(error_or_value &err_or_val) noexcept override {
+ err_or_val.as<void_t>() =
+ make_error<err::invalid_state>("In a sink node no result can be returned");
+ }
+
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee) noexcept override {
+ return child_mixin_.swap_child(std::move(swapee));
+ }
+
+ // ConveyorStorage
+ void child_has_fired() override {
+ if (child_mixin_.child) {
+ error_or<void> dep_eov;
+ child_mixin_.child->get_result(dep_eov);
+ if (dep_eov.is_error()) {
+ if (dep_eov.get_error().is_critical()) {
+ if (!is_armed()) {
+ arm_last();
+ }
+ }
+ if (conveyor_sink_) {
+ conveyor_sink_->fail(std::move(dep_eov.get_error()));
+ }
+ }
+ }
+ }
+
+ /*
+ * No parent needs to be fired since we always have space
+ */
+ void parent_has_fired() override {}
+
+ conveyor_storage *next_storage() override {
+ // Should never happen though
+ assert(false);
+ return nullptr;
+ // return static_cast<ConveyorStorage*>(this);
+ }
+};
+
+class immediate_conveyor_node_base : public conveyor_node,
+ public conveyor_event_storage {
+private:
+public:
+ immediate_conveyor_node_base();
+
+ virtual ~immediate_conveyor_node_base() = default;
+
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee) noexcept override {
+ (void)swapee;
+ return make_error<err::not_supported>("Node doesn't support swapping");
+ }
+
+ conveyor_storage *next_storage() noexcept override {
+ return static_cast<conveyor_storage *>(this);
+ }
+};
+
+template <typename T>
+class immediate_conveyor_node final : public immediate_conveyor_node_base {
+private:
+ error_or<fix_void<T>> value_;
+ uint8_t retrieved_;
+
+public:
+ immediate_conveyor_node(fix_void<T> &&val);
+ immediate_conveyor_node(error &&error);
+
+ // ConveyorStorage
+ size_t space() const override;
+ size_t queued() const override;
+
+ void child_has_fired() override;
+ void parent_has_fired() override;
+
+ // ConveyorNode
+ void get_result(error_or_value &err_or_val) noexcept override {
+ if (retrieved_ > 0) {
+ err_or_val.as<fix_void<T>>() =
+ make_error<err::buffer_exhausted>("Already taken value");
+ } else {
+ err_or_val.as<fix_void<T>>() = std::move(value_);
+ }
+ if (queued() > 0) {
+ ++retrieved_;
+ }
+ }
+
+ // Event
+ void fire() override;
+};
+
+/*
+ * Collects every incoming value and throws it in one lane
+ */
+class merge_conveyor_node_base : public conveyor_node,
+ public conveyor_event_storage {
+public:
+ merge_conveyor_node_base();
+
+ virtual ~merge_conveyor_node_base() = default;
+
+ conveyor_storage *next_storage() noexcept override {
+ return static_cast<conveyor_storage *>(this);
+ }
+};
+
+template <typename T>
+class merge_conveyor_node : public merge_conveyor_node_base {
+private:
+ class appendage final : public conveyor_node, public conveyor_storage {
+ public:
+ own<conveyor_node> child;
+ merge_conveyor_node *merger;
+
+ maybe<error_or<fix_void<T>>> error_or_value_;
+
+ public:
+ appendage(own<conveyor_node> n, merge_conveyor_node &m)
+ : conveyor_storage{}, child{std::move(n)}, merger{&m},
+ error_or_value_{std::nullopt} {}
+
+ bool child_storage_has_element_queued() const {
+ if (!child) {
+ return false;
+ }
+ conveyor_storage *storage = child->next_storage();
+ if (storage) {
+ return storage->queued() > 0;
+ }
+ return false;
+ }
+
+ void get_appendage_result(error_or_value &eov);
+
+ /**
+ * ConveyorNode
+ */
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee_) override;
+
+ conveyor_storage *next_storage() noexcept override {
+ return static_cast<conveyor_storage *>(this);
+ }
+
+ void get_result(error_or_value &err_or_val) override;
+
+ /**
+ * ConveyorStorage
+ */
+ size_t space() const override;
+
+ size_t queued() const override;
+
+ void child_has_fired() override;
+
+ void parent_has_fired() override;
+
+ void set_parent(conveyor_storage *par) override;
+ };
+
+ friend class merge_conveyor_node_data<T>;
+ friend class appendage;
+
+ our<merge_conveyor_node_data<T>> data_;
+ size_t next_appendage_ = 0;
+
+public:
+ merge_conveyor_node(our<merge_conveyor_node_data<T>> data);
+ ~merge_conveyor_node();
+ // ConveyorNode
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&c) noexcept override;
+
+ // Event
+ void get_result(error_or_value &err_or_val) noexcept override;
+
+ void fire() override;
+
+ // ConveyorStorage
+ size_t space() const override;
+ size_t queued() const override;
+ void child_has_fired() override;
+ void parent_has_fired() override;
+};
+
+template <typename T> class merge_conveyor_node_data {
+public:
+ std::vector<own<typename merge_conveyor_node<T>::appendage>> appendages;
+
+ merge_conveyor_node<T> *merger = nullptr;
+
+public:
+ void attach(conveyor<T> conv);
+
+ void governing_node_destroyed();
+};
+
+/*
+class JoinConveyorNodeBase : public ConveyorNode, public ConveyorEventStorage {
+private:
+
+public:
+};
+
+template <typename... Args>
+class JoinConveyorNode final : public JoinConveyorNodeBase {
+private:
+ template<typename T>
+ class Appendage : public ConveyorEventStorage {
+ private:
+ Maybe<T> data = std::nullopt;
+
+ public:
+ size_t space() const override;
+ size_t queued() const override;
+
+ void fire() override;
+ void get_result(ErrorOrValue& eov) override;
+ };
+
+ std::tuple<Appendage<Args>...> appendages;
+
+public:
+};
+
+*/
+
+} // namespace saw
+
+#include "async.tmpl.h"
diff --git a/src/async/async.tmpl.h b/src/async/async.tmpl.h
new file mode 100644
index 0000000..d081fa9
--- /dev/null
+++ b/src/async/async.tmpl.h
@@ -0,0 +1,769 @@
+#pragma once
+
+#include <forstio/core/common.h>
+#include <forstio/core/error.h>
+
+#include <cassert>
+// Template inlining
+
+#include <iostream>
+
+namespace saw {
+
+template <typename Func> conveyor_result<Func, void> execLater(Func &&func) {
+ conveyor<void> conveyor{fix_void<void>{}};
+ return conveyor.then(std::move(func));
+}
+
+template <typename T>
+conveyor<T>::conveyor(fix_void<T> value) : conveyor_base(nullptr) {
+ // Is there any way to do this?
+ // @todo new conveyor_base constructor for Immediate values
+
+ own<immediate_conveyor_node<fix_void<T>>> immediate =
+ heap<immediate_conveyor_node<fix_void<T>>>(std::move(value));
+
+ if (!immediate) {
+ return;
+ }
+
+ node_ = std::move(immediate);
+}
+
+template <typename T>
+conveyor<T>::conveyor(error &&err) : conveyor_base(nullptr) {
+ own<immediate_conveyor_node<fix_void<T>>> immediate =
+ heap<immediate_conveyor_node<fix_void<T>>>(std::move(err));
+
+ if (!immediate) {
+ return;
+ }
+
+ node_ = std::move(immediate);
+}
+
+template <typename T>
+conveyor<T>::conveyor(own<conveyor_node> node_p)
+ : conveyor_base{std::move(node_p)} {}
+
+template <typename T>
+template <typename Func, typename ErrorFunc>
+conveyor_result<Func, T> conveyor<T>::then(Func &&func,
+ ErrorFunc &&error_func) {
+ own<conveyor_node> conversion_node =
+ heap<convert_conveyor_node<fix_void<return_type<Func, T>>, fix_void<T>,
+ Func, ErrorFunc>>(
+ std::move(node_), std::move(func), std::move(error_func));
+
+ return conveyor<remove_error_or<return_type<Func, T>>>::to_conveyor(
+ std::move(conversion_node));
+}
+
+template <typename T> conveyor<T> conveyor<T>::buffer(size_t size) {
+ SAW_ASSERT(node_) { return conveyor<T>{own<conveyor_node>{nullptr}}; }
+ conveyor_storage *storage = node_->next_storage();
+ SAW_ASSERT(storage) { return conveyor<T>{own<conveyor_node>{nullptr}}; }
+
+ own<queue_buffer_conveyor_node<fix_void<T>>> storage_node =
+ heap<queue_buffer_conveyor_node<fix_void<T>>>(std::move(node_), size);
+
+ conveyor_storage *storage_ptr =
+ static_cast<conveyor_storage *>(storage_node.get());
+
+ storage->set_parent(storage_ptr);
+ return conveyor<T>{std::move(storage_node)};
+}
+
+template <typename T>
+template <typename... Args>
+conveyor<T> conveyor<T>::attach(Args &&...args) {
+ own<attach_conveyor_node<Args...>> attach_node =
+ heap<attach_conveyor_node<Args...>>(std::move(node_),
+ std::move(args...));
+ return conveyor<T>{std::move(attach_node)};
+}
+
+template <typename T>
+std::pair<conveyor<T>, merge_conveyor<T>> conveyor<T>::merge() {
+ our<merge_conveyor_node_data<T>> data =
+ share<merge_conveyor_node_data<T>>();
+
+ own<merge_conveyor_node<T>> merge_node = heap<merge_conveyor_node<T>>(data);
+
+ SAW_ASSERT(node_) {
+ return std::make_pair(conveyor<T>{own<conveyor_node>{nullptr}},
+ merge_conveyor<T>{});
+ }
+ conveyor_storage *storage = node_->next_storage();
+ SAW_ASSERT(storage) {
+ return std::make_pair(conveyor<T>{own<conveyor_node>{nullptr}},
+ merge_conveyor<T>{});
+ }
+
+ data->attach(conveyor<T>::to_conveyor(std::move(node_)));
+
+ merge_conveyor<T> node_ref{data};
+
+ return std::make_pair(conveyor<T>{std::move(merge_node)},
+ std::move(node_ref));
+}
+
+template <>
+template <typename ErrorFunc>
+conveyor_sink conveyor<void>::sink(ErrorFunc &&error_func) {
+ conveyor_storage *storage = node_->next_storage();
+ SAW_ASSERT(storage) { return conveyor_sink{}; }
+
+ own<sink_conveyor_node> sink_node =
+ heap<sink_conveyor_node>(std::move(node_));
+ conveyor_storage *storage_ptr =
+ static_cast<conveyor_storage *>(sink_node.get());
+
+ storage->set_parent(storage_ptr);
+
+ return conveyor_sink{std::move(sink_node)};
+}
+
+void detach_conveyor(conveyor<void> &&conveyor);
+
+template <typename T>
+template <typename ErrorFunc>
+void conveyor<T>::detach(ErrorFunc &&func) {
+ detach_conveyor(std::move(then([](T &&) {}, std::move(func))));
+}
+
+template <>
+template <typename ErrorFunc>
+void conveyor<void>::detach(ErrorFunc &&func) {
+ detach_conveyor(std::move(then([]() {}, std::move(func))));
+}
+
+template <typename T>
+conveyor<T> conveyor<T>::to_conveyor(own<conveyor_node> node) {
+ return conveyor<T>{std::move(node)};
+}
+
+template <typename T>
+own<conveyor_node> conveyor<T>::from_conveyor(conveyor<T> conveyor) {
+ return std::move(conveyor.node_);
+}
+
+template <typename T> error_or<fix_void<T>> conveyor<T>::take() {
+ SAW_ASSERT(node_) {
+ return error_or<fix_void<T>>{
+ make_error<err::invalid_state>("conveyor in invalid state")};
+ }
+ conveyor_storage *storage = node_->next_storage();
+ if (storage) {
+ if (storage->queued() > 0) {
+ error_or<fix_void<T>> result;
+ node_->get_result(result);
+ return result;
+ } else {
+ return error_or<fix_void<T>>{
+ make_error<err::buffer_exhausted>("conveyor buffer has no elements")};
+ }
+ } else {
+ return error_or<fix_void<T>>{
+ make_error<err::invalid_state>("conveyor node has no child storage")};
+ }
+}
+
+template <typename T> conveyor_and_feeder<T> new_conveyor_and_feeder() {
+ own<adapt_conveyor_feeder<fix_void<T>>> feeder =
+ heap<adapt_conveyor_feeder<fix_void<T>>>();
+ own<adapt_conveyor_node<fix_void<T>>> node =
+ heap<adapt_conveyor_node<fix_void<T>>>();
+
+ feeder->set_feedee(node.get());
+ node->set_feeder(feeder.get());
+
+ return conveyor_and_feeder<T>{std::move(feeder),
+ conveyor<T>::to_conveyor(std::move(node))};
+}
+
+// QueueBuffer
+template <typename T> void queue_buffer_conveyor_node<T>::fire() {
+ if (child_mixin_.child) {
+ if (!storage_.empty()) {
+ if (storage_.front().is_error()) {
+ if (storage_.front().get_error().is_critical()) {
+ child_mixin_.child = nullptr;
+ }
+ }
+ }
+ }
+
+ bool has_space_before_fire = space() > 0;
+
+ if (parent_) {
+ parent_->child_has_fired();
+ if (!storage_.empty() && parent_->space() > 0) {
+ arm_later();
+ }
+ }
+
+ if (!child_mixin_.child) {
+ while (!storage_.empty()) {
+ storage_.pop();
+ }
+ return;
+ }
+
+ conveyor_storage *ch_storage = child_mixin_.child->next_storage();
+ if (ch_storage && !has_space_before_fire) {
+ ch_storage->parent_has_fired();
+ }
+}
+
+template <typename T>
+void queue_buffer_conveyor_node<T>::get_result(error_or_value &eov) noexcept {
+ error_or<T> &err_or_val = eov.as<T>();
+ err_or_val = std::move(storage_.front());
+ storage_.pop();
+}
+
+template <typename T> size_t queue_buffer_conveyor_node<T>::space() const {
+ return max_store_ - storage_.size();
+}
+
+template <typename T> size_t queue_buffer_conveyor_node<T>::queued() const {
+ return storage_.size();
+}
+
+template <typename T> void queue_buffer_conveyor_node<T>::child_has_fired() {
+ if (child_mixin_.child && storage_.size() < max_store_) {
+ error_or<T> eov;
+ child_mixin_.child->get_result(eov);
+
+ if (eov.is_error()) {
+ if (eov.get_error().is_critical()) {
+ }
+ }
+
+ storage_.push(std::move(eov));
+ if (!is_armed()) {
+ arm_later();
+ }
+ }
+}
+
+template <typename T> void queue_buffer_conveyor_node<T>::parent_has_fired() {
+ SAW_ASSERT(parent_) { return; }
+
+ if (parent_->space() == 0) {
+ return;
+ }
+
+ if (queued() > 0) {
+ arm_later();
+ }
+}
+
+template <typename T>
+immediate_conveyor_node<T>::immediate_conveyor_node(fix_void<T> &&val)
+ : value_{std::move(val)}, retrieved_{0} {}
+
+template <typename T>
+immediate_conveyor_node<T>::immediate_conveyor_node(error &&error)
+ : value_{std::move(error)}, retrieved_{0} {}
+
+template <typename T> size_t immediate_conveyor_node<T>::space() const {
+ return 0;
+}
+
+template <typename T> size_t immediate_conveyor_node<T>::queued() const {
+ return retrieved_ > 1 ? 0 : 1;
+}
+
+template <typename T> void immediate_conveyor_node<T>::child_has_fired() {
+ // Impossible case
+ assert(false);
+}
+
+template <typename T> void immediate_conveyor_node<T>::parent_has_fired() {
+ SAW_ASSERT(parent_) { return; }
+ assert(parent_->space() > 0);
+
+ if (queued() > 0) {
+ arm_next();
+ }
+}
+
+template <typename T> void immediate_conveyor_node<T>::fire() {
+
+ if (parent_) {
+ parent_->child_has_fired();
+ if (queued() > 0 && parent_->space() > 0) {
+ arm_last();
+ }
+ }
+}
+
+template <typename T>
+merge_conveyor<T>::merge_conveyor(lent<merge_conveyor_node_data<T>> d)
+ : data_{std::move(d)} {}
+
+template <typename T> merge_conveyor<T>::~merge_conveyor() {}
+
+template <typename T> void merge_conveyor<T>::attach(conveyor<T> conveyor) {
+ auto sp = data_.lock();
+ SAW_ASSERT(sp) { return; }
+
+ sp->attach(std::move(conveyor));
+}
+
+template <typename T>
+merge_conveyor_node<T>::merge_conveyor_node(our<merge_conveyor_node_data<T>> d)
+ : data_{d} {
+ SAW_ASSERT(data_) { return; }
+
+ data_->merger = this;
+}
+
+template <typename T> merge_conveyor_node<T>::~merge_conveyor_node() {}
+
+template <typename T>
+error_or<own<conveyor_node>>
+merge_conveyor_node<T>::swap_child(own<conveyor_node> &&swapee_) noexcept {
+ (void)swapee_;
+ return make_error<err::invalid_state>(
+ "merge_conveyor_node<T>::appendage should block calls to this class");
+}
+
+template <typename T>
+void merge_conveyor_node<T>::get_result(error_or_value &eov) noexcept {
+ error_or<fix_void<T>> &err_or_val = eov.as<fix_void<T>>();
+
+ SAW_ASSERT(data_) { return; }
+
+ /// @todo search appendages for result
+
+ auto &appendages = data_->appendages;
+ next_appendage_ = std::min(appendages.size(), next_appendage_);
+
+ for (size_t i = next_appendage_; i < appendages.size(); ++i) {
+ if (appendages[i]->queued() > 0) {
+ err_or_val = std::move(appendages[i]->error_or_value_.value());
+ appendages[i]->error_or_value_ = std::nullopt;
+ next_appendage_ = i + 1;
+ return;
+ }
+ }
+ for (size_t i = 0; i < next_appendage_; ++i) {
+ if (appendages[i]->queued() > 0) {
+ err_or_val = std::move(appendages[i]->error_or_value_.value());
+ appendages[i]->error_or_value_ = std::nullopt;
+ next_appendage_ = i + 1;
+ return;
+ }
+ }
+
+ err_or_val = make_error<err::invalid_state>("No value in Merge appendages");
+}
+
+template <typename T> void merge_conveyor_node<T>::fire() {
+ SAW_ASSERT(queued() > 0) { return; }
+
+ if (parent_) {
+ parent_->child_has_fired();
+
+ if (queued() > 0 && parent_->space() > 0) {
+ arm_later();
+ }
+ }
+}
+
+template <typename T> size_t merge_conveyor_node<T>::space() const { return 0; }
+
+template <typename T> size_t merge_conveyor_node<T>::queued() const {
+ SAW_ASSERT(data_) { return 0; }
+
+ size_t queue_count = 0;
+
+ for (auto &iter : data_->appendages) {
+ queue_count += iter->queued();
+ }
+
+ return queue_count;
+}
+
+template <typename T> void merge_conveyor_node<T>::child_has_fired() {
+ /// This can never happen
+ assert(false);
+}
+
+template <typename T> void merge_conveyor_node<T>::parent_has_fired() {
+ SAW_ASSERT(parent_) { return; }
+ if (queued() > 0) {
+ if (parent_->space() > 0) {
+ arm_later();
+ }
+ }
+}
+
+/**
+ * merge_conveyor_node<T>::Apendage
+ */
+
+template <typename T>
+error_or<own<conveyor_node>>
+merge_conveyor_node<T>::appendage::swap_child(own<conveyor_node> &&swapee_) {
+ own<conveyor_node> old_child = std::move(child);
+
+ child = std::move(swapee_);
+
+ // This case should never happen
+ SAW_ASSERT(old_child) { return make_error<err::invalid_state>("No child exists"); }
+
+ return old_child;
+}
+
+template <typename T>
+void merge_conveyor_node<T>::appendage::get_result(error_or_value &eov) {
+ error_or<fix_void<T>> &err_or_val = eov.as<fix_void<T>>();
+
+ SAW_ASSERT(queued() > 0) {
+ err_or_val =
+ make_error<err::invalid_state>("No element queued in Merge appendage Node");
+ return;
+ }
+
+ err_or_val = std::move(error_or_value_.value());
+ error_or_value_ = std::nullopt;
+}
+
+template <typename T> size_t merge_conveyor_node<T>::appendage::space() const {
+ SAW_ASSERT(merger) { return 0; }
+
+ if (error_or_value_.has_value()) {
+ return 0;
+ }
+
+ return 1;
+}
+
+template <typename T> size_t merge_conveyor_node<T>::appendage::queued() const {
+ SAW_ASSERT(merger) { return 0; }
+
+ if (error_or_value_.has_value()) {
+ return 1;
+ }
+
+ return 0;
+}
+
+/// @todo delete this function. Replaced by the regular get_result
+template <typename T>
+void merge_conveyor_node<T>::appendage::get_appendage_result(
+ error_or_value &eov) {
+ error_or<fix_void<T>> &err_or_val = eov.as<fix_void<T>>();
+
+ SAW_ASSERT(queued() > 0) {
+ err_or_val =
+ make_error<err::invalid_state>("No element queued in Merge appendage Node");
+ return;
+ }
+
+ err_or_val = std::move(error_or_value_.value());
+ error_or_value_ = std::nullopt;
+}
+
+template <typename T>
+void merge_conveyor_node<T>::appendage::child_has_fired() {
+ SAW_ASSERT(!error_or_value_.has_value()) { return; }
+ error_or<fix_void<T>> eov;
+ child->get_result(eov);
+
+ error_or_value_ = std::move(eov);
+
+ if (!merger->is_armed()) {
+ merger->arm_later();
+ }
+}
+
+template <typename T>
+void merge_conveyor_node<T>::appendage::parent_has_fired() {
+ conveyor_storage *child_storage = child->next_storage();
+ if (child_storage) {
+ child_storage->parent_has_fired();
+ }
+}
+
+template <typename T>
+void merge_conveyor_node<T>::appendage::set_parent(conveyor_storage *par) {
+ SAW_ASSERT(merger) { return; }
+
+ SAW_ASSERT(child) { return; }
+
+ parent_ = par;
+}
+
+template <typename T>
+void merge_conveyor_node_data<T>::attach(conveyor<T> conv) {
+ auto nas = conveyor<T>::from_conveyor(std::move(conv));
+ SAW_ASSERT(nas) { return; }
+ conveyor_storage *storage = nas->next_storage();
+ SAW_ASSERT(storage) { return; }
+
+ auto merge_node_appendage =
+ heap<typename merge_conveyor_node<T>::appendage>(std::move(nas),
+ *merger);
+ auto merge_node_appendage_ptr = merge_node_appendage.get();
+
+ storage->set_parent(merge_node_appendage.get());
+
+ SAW_ASSERT(merger) { return; }
+
+ conveyor_storage *mrg_storage = merger->next_storage();
+ SAW_ASSERT(mrg_storage) { return; }
+
+ merge_node_appendage->set_parent(mrg_storage);
+
+ appendages.push_back(std::move(merge_node_appendage));
+
+ /// @todo return this. necessary? maybe for the weird linking setup
+ /// maybe not
+ // return merge_node_appendage_ptr;
+}
+
+template <typename T>
+void merge_conveyor_node_data<T>::governing_node_destroyed() {
+ appendages.clear();
+ merger = nullptr;
+}
+
+template <typename T> adapt_conveyor_feeder<T>::~adapt_conveyor_feeder() {
+ if (feedee_) {
+ feedee_->set_feeder(nullptr);
+ feedee_ = nullptr;
+ }
+}
+
+template <typename T>
+void adapt_conveyor_feeder<T>::set_feedee(adapt_conveyor_node<T> *feedee_p) {
+ feedee_ = feedee_p;
+}
+
+template <typename T> void adapt_conveyor_feeder<T>::feed(T &&value) {
+ if (feedee_) {
+ feedee_->feed(std::move(value));
+ }
+}
+
+template <typename T> void adapt_conveyor_feeder<T>::fail(error &&error) {
+ if (feedee_) {
+ feedee_->fail(std::move(error));
+ }
+}
+
+template <typename T> size_t adapt_conveyor_feeder<T>::queued() const {
+ if (feedee_) {
+ return feedee_->queued();
+ }
+ return 0;
+}
+
+template <typename T> size_t adapt_conveyor_feeder<T>::space() const {
+ if (feedee_) {
+ return feedee_->space();
+ }
+ return 0;
+}
+
+template <typename T>
+error adapt_conveyor_feeder<T>::swap(conveyor<T> &&conv) noexcept {
+ SAW_ASSERT(feedee_) { return make_error<err::invalid_state>("No feedee connected"); }
+
+ auto node = conveyor<T>::from_conveyor(std::move(conv));
+
+ feedee_->swap_child(std::move(node));
+
+ return no_error();
+}
+
+template <typename T>
+adapt_conveyor_node<T>::adapt_conveyor_node() : conveyor_event_storage{} {}
+
+template <typename T> adapt_conveyor_node<T>::~adapt_conveyor_node() {
+ if (feeder_) {
+ feeder_->set_feedee(nullptr);
+ feeder_ = nullptr;
+ }
+}
+
+template <typename T>
+error_or<own<conveyor_node>>
+adapt_conveyor_node<T>::swap_child(own<conveyor_node> &&swapee) noexcept {
+ // This should return the owning pointer of this instance
+ auto myself_err = parent_node_.swap_child_of_parent(std::move(swapee));
+
+ if (myself_err.is_error()) {
+ return myself_err;
+ }
+
+ auto &myself = myself_err.get_value();
+
+ assert(myself.get() == this);
+
+ return myself_err;
+}
+
+template <typename T>
+conveyor_storage *adapt_conveyor_node<T>::next_storage() noexcept {
+ return static_cast<conveyor_storage *>(this);
+}
+
+template <typename T>
+void adapt_conveyor_node<T>::notify_parent_attached(
+ conveyor_node &par) noexcept {
+ parent_node_.change_parent(&par);
+}
+
+template <typename T>
+void adapt_conveyor_node<T>::set_feeder(adapt_conveyor_feeder<T> *feeder_p) {
+ feeder_ = feeder_p;
+}
+
+template <typename T> void adapt_conveyor_node<T>::feed(T &&value) {
+ storage_.push(std::move(value));
+ arm_next();
+}
+
+template <typename T> void adapt_conveyor_node<T>::fail(error &&error) {
+ storage_.push(std::move(error));
+ arm_next();
+}
+
+template <typename T> size_t adapt_conveyor_node<T>::queued() const {
+ return storage_.size();
+}
+
+template <typename T> size_t adapt_conveyor_node<T>::space() const {
+ return std::numeric_limits<size_t>::max() - storage_.size();
+}
+
+template <typename T>
+void adapt_conveyor_node<T>::get_result(error_or_value &err_or_val) {
+ if (!storage_.empty()) {
+ err_or_val.as<T>() = std::move(storage_.front());
+ storage_.pop();
+ } else {
+ err_or_val.as<T>() = make_error<err::invalid_state>(
+ "Signal for retrieval of storage sent even though no "
+ "data is present");
+ }
+}
+
+template <typename T> void adapt_conveyor_node<T>::child_has_fired() {
+ // Adapt node has no children
+ assert(false);
+}
+
+template <typename T> void adapt_conveyor_node<T>::parent_has_fired() {
+ SAW_ASSERT(parent_) { return; }
+
+ if (parent_->space() == 0) {
+ return;
+ }
+}
+
+template <typename T> void adapt_conveyor_node<T>::fire() {
+ if (parent_) {
+ parent_->child_has_fired();
+
+ if (storage_.size() > 0) {
+ arm_later();
+ }
+ }
+}
+
+template <typename T> one_time_conveyor_feeder<T>::~one_time_conveyor_feeder() {
+ if (feedee_) {
+ feedee_->set_feeder(nullptr);
+ feedee_ = nullptr;
+ }
+}
+
+template <typename T>
+void one_time_conveyor_feeder<T>::set_feedee(
+ one_time_conveyor_node<T> *feedee_p) {
+ feedee_ = feedee_p;
+}
+
+template <typename T> void one_time_conveyor_feeder<T>::feed(T &&value) {
+ if (feedee_) {
+ feedee_->feed(std::move(value));
+ }
+}
+
+template <typename T> void one_time_conveyor_feeder<T>::fail(error &&error) {
+ if (feedee_) {
+ feedee_->fail(std::move(error));
+ }
+}
+
+template <typename T> size_t one_time_conveyor_feeder<T>::queued() const {
+ if (feedee_) {
+ return feedee_->queued();
+ }
+ return 0;
+}
+
+template <typename T> size_t one_time_conveyor_feeder<T>::space() const {
+ if (feedee_) {
+ return feedee_->space();
+ }
+ return 0;
+}
+
+template <typename T> one_time_conveyor_node<T>::~one_time_conveyor_node() {
+ if (feeder_) {
+ feeder_->set_feedee(nullptr);
+ feeder_ = nullptr;
+ }
+}
+
+template <typename T>
+void one_time_conveyor_node<T>::set_feeder(
+ one_time_conveyor_feeder<T> *feeder_p) {
+ feeder_ = feeder_p;
+}
+
+template <typename T> void one_time_conveyor_node<T>::feed(T &&value) {
+ storage_ = std::move(value);
+ arm_next();
+}
+
+template <typename T> void one_time_conveyor_node<T>::fail(error &&error) {
+ storage_ = std::move(error);
+ arm_next();
+}
+
+template <typename T> size_t one_time_conveyor_node<T>::queued() const {
+ return storage_.has_value() ? 1 : 0;
+}
+
+template <typename T> size_t one_time_conveyor_node<T>::space() const {
+ return passed_ ? 0 : 1;
+}
+
+template <typename T>
+void one_time_conveyor_node<T>::get_result(error_or_value &err_or_val) {
+ if (storage_.has_value()) {
+ err_or_val.as<T>() = std::move(storage_.value());
+ storage_ = std::nullopt;
+ } else {
+ err_or_val.as<T>() = make_error<err::invalid_state>(
+ "Signal for retrieval of storage sent even though no "
+ "data is present");
+ }
+}
+
+template <typename T> void one_time_conveyor_node<T>::fire() {
+ if (parent_) {
+ parent_->child_has_fired();
+ }
+}
+
+} // namespace saw
diff --git a/src/codec-json/.nix/derivation.nix b/src/codec-json/.nix/derivation.nix
new file mode 100644
index 0000000..fcc276d
--- /dev/null
+++ b/src/codec-json/.nix/derivation.nix
@@ -0,0 +1,34 @@
+{ lib
+, stdenvNoCC
+, scons
+, clang
+, clang-tools
+, version
+, forstio
+, gnutls
+}:
+
+let
+
+in stdenvNoCC.mkDerivation {
+ pname = "forstio-codec-json";
+ inherit version;
+
+ src = ./..;
+
+ enableParallelBuilding = true;
+
+ nativeBuildInputs = [
+ scons
+ clang
+ clang-tools
+ ];
+
+ buildInputs = [
+ forstio.core
+ forstio.async
+ forstio.codec
+ ];
+
+ outputs = ["out" "dev"];
+}
diff --git a/src/codec-json/SConscript b/src/codec-json/SConscript
new file mode 100644
index 0000000..772ac0b
--- /dev/null
+++ b/src/codec-json/SConscript
@@ -0,0 +1,38 @@
+#!/bin/false
+
+import os
+import os.path
+import glob
+
+
+Import('env')
+
+dir_path = Dir('.').abspath
+
+# Environment for base library
+codec_json_env = env.Clone();
+
+codec_json_env.sources = sorted(glob.glob(dir_path + "/*.cpp"))
+codec_json_env.headers = sorted(glob.glob(dir_path + "/*.h"))
+
+env.sources += codec_json_env.sources;
+env.headers += codec_json_env.headers;
+
+## Shared lib
+objects_shared = []
+codec_json_env.add_source_files(objects_shared, codec_json_env.sources, shared=True);
+codec_json_env.library_shared = codec_json_env.SharedLibrary('#build/forstio-codec-json', [objects_shared]);
+
+## Static lib
+objects_static = []
+codec_json_env.add_source_files(objects_static, codec_json_env.sources, shared=False);
+codec_json_env.library_static = codec_json_env.StaticLibrary('#build/forstio-codec-json', [objects_static]);
+
+# Set Alias
+env.Alias('library_codec_json', [codec_json_env.library_shared, codec_json_env.library_static]);
+
+env.targets += ['library_codec_json'];
+
+# Install
+env.Install('$prefix/lib/', [codec_json_env.library_shared, codec_json_env.library_static]);
+env.Install('$prefix/include/forstio/codec/json/', [codec_json_env.headers]);
diff --git a/src/codec-json/SConstruct b/src/codec-json/SConstruct
new file mode 100644
index 0000000..edd5f57
--- /dev/null
+++ b/src/codec-json/SConstruct
@@ -0,0 +1,66 @@
+#!/usr/bin/env python3
+
+import sys
+import os
+import os.path
+import glob
+import re
+
+
+if sys.version_info < (3,):
+ def isbasestring(s):
+ return isinstance(s,basestring)
+else:
+ def isbasestring(s):
+ return isinstance(s, (str,bytes))
+
+def add_kel_source_files(self, sources, filetype, lib_env=None, shared=False, target_post=""):
+
+ if isbasestring(filetype):
+ dir_path = self.Dir('.').abspath
+ filetype = sorted(glob.glob(dir_path+"/"+filetype))
+
+ for path in filetype:
+ target_name = re.sub( r'(.*?)(\.cpp|\.c\+\+)', r'\1' + target_post, path )
+ if shared:
+ target_name+='.os'
+ sources.append( self.SharedObject( target=target_name, source=path ) )
+ else:
+ target_name+='.o'
+ sources.append( self.StaticObject( target=target_name, source=path ) )
+ pass
+
+def isAbsolutePath(key, dirname, env):
+ assert os.path.isabs(dirname), "%r must have absolute path syntax" % (key,)
+
+env_vars = Variables(
+ args=ARGUMENTS
+)
+
+env_vars.Add('prefix',
+ help='Installation target location of build results and headers',
+ default='/usr/local/',
+ validator=isAbsolutePath
+)
+
+env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=[],
+ CPPDEFINES=['SAW_UNIX'],
+ CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'],
+ LIBS=['forstio-codec'])
+env.__class__.add_source_files = add_kel_source_files
+env.Tool('compilation_db');
+env.cdb = env.CompilationDatabase('compile_commands.json');
+
+env.objects = [];
+env.sources = [];
+env.headers = [];
+env.targets = [];
+
+Export('env')
+SConscript('SConscript')
+
+env.Alias('cdb', env.cdb);
+env.Alias('all', [env.targets]);
+env.Default('all');
+
+env.Alias('install', '$prefix')
diff --git a/src/codec-json/json.h b/src/codec-json/json.h
new file mode 100644
index 0000000..2c5b83e
--- /dev/null
+++ b/src/codec-json/json.h
@@ -0,0 +1,12 @@
+#pragma once
+
+namespace saw {
+namespace encoded {
+struct Json {};
+}
+
+template<typename Schema>
+class codec<Schema, Json> {
+
+};
+}
diff --git a/src/codec/.nix/derivation.nix b/src/codec/.nix/derivation.nix
new file mode 100644
index 0000000..c9fac2e
--- /dev/null
+++ b/src/codec/.nix/derivation.nix
@@ -0,0 +1,31 @@
+{ lib
+, stdenvNoCC
+, scons
+, clang
+, clang-tools
+, version
+, forstio
+}:
+
+let
+
+in stdenvNoCC.mkDerivation {
+ pname = "forstio-codec";
+ inherit version;
+
+ src = ./..;
+
+ enableParallelBuilding = true;
+
+ buildInputs = [
+ forstio.core
+ ];
+
+ nativeBuildInputs = [
+ scons
+ clang
+ clang-tools
+ ];
+
+ outputs = ["out" "dev"];
+}
diff --git a/src/codec/SConscript b/src/codec/SConscript
new file mode 100644
index 0000000..c038d42
--- /dev/null
+++ b/src/codec/SConscript
@@ -0,0 +1,38 @@
+#!/bin/false
+
+import os
+import os.path
+import glob
+
+
+Import('env')
+
+dir_path = Dir('.').abspath
+
+# Environment for base library
+codec_env = env.Clone();
+
+codec_env.sources = sorted(glob.glob(dir_path + "/*.cpp"))
+codec_env.headers = sorted(glob.glob(dir_path + "/*.h"))
+
+env.sources += codec_env.sources;
+env.headers += codec_env.headers;
+
+## Shared lib
+objects_shared = []
+codec_env.add_source_files(objects_shared, codec_env.sources, shared=True);
+codec_env.library_shared = codec_env.SharedLibrary('#build/forstio-codec', [objects_shared]);
+
+## Static lib
+objects_static = []
+codec_env.add_source_files(objects_static, codec_env.sources, shared=False);
+codec_env.library_static = codec_env.StaticLibrary('#build/forstio-codec', [objects_static]);
+
+# Set Alias
+env.Alias('library_codec', [codec_env.library_shared, codec_env.library_static]);
+
+env.targets += ['library_codec'];
+
+# Install
+env.Install('$prefix/lib/', [codec_env.library_shared, codec_env.library_static]);
+env.Install('$prefix/include/forstio/codec/', [codec_env.headers]);
diff --git a/src/codec/SConstruct b/src/codec/SConstruct
new file mode 100644
index 0000000..0d7b7c6
--- /dev/null
+++ b/src/codec/SConstruct
@@ -0,0 +1,66 @@
+#!/usr/bin/env python3
+
+import sys
+import os
+import os.path
+import glob
+import re
+
+
+if sys.version_info < (3,):
+ def isbasestring(s):
+ return isinstance(s,basestring)
+else:
+ def isbasestring(s):
+ return isinstance(s, (str,bytes))
+
+def add_kel_source_files(self, sources, filetype, lib_env=None, shared=False, target_post=""):
+
+ if isbasestring(filetype):
+ dir_path = self.Dir('.').abspath
+ filetype = sorted(glob.glob(dir_path+"/"+filetype))
+
+ for path in filetype:
+ target_name = re.sub( r'(.*?)(\.cpp|\.c\+\+)', r'\1' + target_post, path )
+ if shared:
+ target_name+='.os'
+ sources.append( self.SharedObject( target=target_name, source=path ) )
+ else:
+ target_name+='.o'
+ sources.append( self.StaticObject( target=target_name, source=path ) )
+ pass
+
+def isAbsolutePath(key, dirname, env):
+ assert os.path.isabs(dirname), "%r must have absolute path syntax" % (key,)
+
+env_vars = Variables(
+ args=ARGUMENTS
+)
+
+env_vars.Add('prefix',
+ help='Installation target location of build results and headers',
+ default='/usr/local/',
+ validator=isAbsolutePath
+)
+
+env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=[],
+ CPPDEFINES=['SAW_UNIX'],
+ CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'],
+ LIBS=['forstio-core'])
+env.__class__.add_source_files = add_kel_source_files
+env.Tool('compilation_db');
+env.cdb = env.CompilationDatabase('compile_commands.json');
+
+env.objects = [];
+env.sources = [];
+env.headers = [];
+env.targets = [];
+
+Export('env')
+SConscript('SConscript')
+
+env.Alias('cdb', env.cdb);
+env.Alias('all', [env.targets]);
+env.Default('all');
+
+env.Alias('install', '$prefix')
diff --git a/src/codec/data.h b/src/codec/data.h
new file mode 100644
index 0000000..1682ae7
--- /dev/null
+++ b/src/codec/data.h
@@ -0,0 +1,89 @@
+#pragma once
+
+#include <forstio/core/common.h>
+#include "schema.h"
+
+namespace saw {
+namespace encode {
+struct Native {};
+}
+/*
+ * Helper for the basic message container, so the class doesn't have to be
+ * specialized 10 times.
+ */
+template <class T> struct native_data_type;
+
+template <>
+struct native_data_type<schema::Primitive<schema::SignedInteger, 1>> {
+ using type = int8_t;
+};
+
+template <>
+struct native_data_type<schema::Primitive<schema::SignedInteger, 2>> {
+ using type = int16_t;
+};
+
+template <>
+struct native_data_type<schema::Primitive<schema::SignedInteger, 4>> {
+ using type = int32_t;
+};
+
+template <>
+struct native_data_type<schema::Primitive<schema::SignedInteger, 8>> {
+ using type = int64_t;
+};
+
+template <>
+struct native_data_type<schema::Primitive<schema::UnsignedInteger, 1>> {
+ using type = uint8_t;
+};
+
+template <>
+struct native_data_type<schema::Primitive<schema::UnsignedInteger, 2>> {
+ using type = uint16_t;
+};
+
+template <>
+struct native_data_type<schema::Primitive<schema::UnsignedInteger, 4>> {
+ using type = uint32_t;
+};
+
+template <>
+struct native_data_type<schema::Primitive<schema::UnsignedInteger, 8>> {
+ using type = uint64_t;
+};
+
+template <>
+struct native_data_type<schema::Primitive<schema::FloatingPoint, 4>> {
+ using type = float;
+};
+
+template<typename T, typename Encoding = encode::Native>
+class data {
+private:
+ static_assert(always_false<T>, "Type not supported");
+};
+
+template<>
+class data<schema::String, encode::Native> {
+private:
+ std::string value_;
+public:
+ SAW_FORBID_COPY(data);
+
+ data(std::string&& value__):value_{std::move(value__)}{}
+
+ std::size_t size() const {
+ return value_.size();
+ }
+
+ bool operator==(const data<schema::String, encode::Native>& data){
+ return value_ == data.value_;
+ }
+};
+
+template<typename T, size_t N>
+class data<schema::Primitive<T,N>, encode::Native> {
+private:
+};
+}
diff --git a/src/codec/proto_kel.h b/src/codec/proto_kel.h
new file mode 100644
index 0000000..3b4ebac
--- /dev/null
+++ b/src/codec/proto_kel.h
@@ -0,0 +1,41 @@
+#pragma once
+
+#include "data.h"
+
+#include <forstio/core/error.h>
+
+namespace saw {
+namespace encode {
+struct ProtoKel {};
+}
+
+template<typename Schema>
+class data<Schema, encode::ProtoKel> {
+private:
+ own<buffer> buffer_;
+public:
+ data(own<buffer>&& buffer__):buffer_{std::move(buffer__)}{}
+
+ buffer& get_buffer(){
+ return *buffer_;
+ }
+
+ const buffer& get_buffer() const {
+ return *buffer_;
+ }
+};
+
+template<typename Schema>
+class codec<Schema, encode::ProtoKel> {
+private:
+public:
+ error_or<data<Schema, encode::Native>> decode(const data<Schema, encode::ProtoKel>& encoded){
+ return make_error<err::not_implemented>();
+ }
+
+ error_or<data<Schema, encode::ProtoKel>> encode(const data<Schema, encode::Native>& native){
+ return make_error<err::not_implemented>();
+ }
+};
+}
+}
diff --git a/src/codec/schema.h b/src/codec/schema.h
new file mode 100644
index 0000000..b23aaa1
--- /dev/null
+++ b/src/codec/schema.h
@@ -0,0 +1,79 @@
+#pragma once
+
+#include <forstio/common.h>
+#include <forstio/string_literal.h>
+
+namespace saw {
+namespace schema {
+// NOLINTBEGIN
+template <typename T, string_literal Literal> struct NamedMember {};
+
+template <typename... T> struct Struct {
+ static_assert(
+ always_false<T...>,
+ "This schema template doesn't support this type of template argument");
+};
+
+template <typename... V, string_literal... K>
+struct Struct<NamedMember<V, K>...> {};
+
+template <typename... T> struct Union {
+ static_assert(
+ always_false<T...>,
+ "This schema template doesn't support this type of template argument");
+};
+
+template <typename... V, string_literal... K>
+struct Union<NamedMember<V, K>...> {};
+
+template <typename T> struct Array {};
+
+template<typename T, size_t S> FixedArray {};
+
+template <typename... T> struct Tuple {};
+
+struct String {};
+
+struct SignedInteger {};
+struct UnsignedInteger {};
+struct FloatingPoint {};
+
+template <class T, size_t N> struct Primitive {
+ static_assert(((std::is_same_v<T, SignedInteger> ||
+ std::is_same_v<T, UnsignedInteger>)&&(N == 1 || N == 2 ||
+ N == 4 || N == 8)) ||
+ (std::is_same_v<T, FloatingPoint> && (N == 4 || N == 8)),
+ "Primitive Type is not supported");
+};
+
+using Int8 = Primitive<SignedInteger, 1>;
+using Int16 = Primitive<SignedInteger, 2>;
+using Int32 = Primitive<SignedInteger, 4>;
+using Int64 = Primitive<SignedInteger, 8>;
+
+using UInt8 = Primitive<UnsignedInteger, 1>;
+using UInt16 = Primitive<UnsignedInteger, 2>;
+using UInt32 = Primitive<UnsignedInteger, 4>;
+using UInt64 = Primitive<UnsignedInteger, 8>;
+
+using Float32 = Primitive<FloatingPoint, 4>;
+using Float64 = Primitive<FloatingPoint, 8>;
+
+/**
+ * Classes enabling Rpc calls
+ */
+template <class Request, class Response, string_literal Literal>
+struct Function {};
+
+template <class... T> struct Interface {
+ static_assert(
+ always_false<T...>,
+ "This schema template doesn't support this type of template argument");
+};
+
+template <class... Request, class... Response, string_literal... Literal>
+struct Interface<Function<Request, Response, Literal>...> {};
+
+// NOLINTEND
+} // namespace schema
+} // namespace saw
diff --git a/src/core/.nix/derivation.nix b/src/core/.nix/derivation.nix
new file mode 100644
index 0000000..adf0cb4
--- /dev/null
+++ b/src/core/.nix/derivation.nix
@@ -0,0 +1,26 @@
+{ lib
+, stdenvNoCC
+, scons
+, clang
+, clang-tools
+, version
+}:
+
+let
+
+in stdenvNoCC.mkDerivation {
+ pname = "forstio-core";
+ inherit version;
+
+ src = ./..;
+
+ enableParallelBuilding = true;
+
+ nativeBuildInputs = [
+ scons
+ clang
+ clang-tools
+ ];
+
+ outputs = ["out" "dev"];
+}
diff --git a/src/core/SConscript b/src/core/SConscript
new file mode 100644
index 0000000..04eb4c3
--- /dev/null
+++ b/src/core/SConscript
@@ -0,0 +1,38 @@
+#!/bin/false
+
+import os
+import os.path
+import glob
+
+
+Import('env')
+
+dir_path = Dir('.').abspath
+
+# Environment for base library
+core_env = env.Clone();
+
+core_env.sources = sorted(glob.glob(dir_path + "/*.cpp"))
+core_env.headers = sorted(glob.glob(dir_path + "/*.h"))
+
+env.sources += core_env.sources;
+env.headers += core_env.headers;
+
+## Shared lib
+objects_shared = []
+core_env.add_source_files(objects_shared, core_env.sources, shared=True);
+core_env.library_shared = core_env.SharedLibrary('#build/forstio-core', [objects_shared]);
+
+## Static lib
+objects_static = []
+core_env.add_source_files(objects_static, core_env.sources, shared=False);
+core_env.library_static = core_env.StaticLibrary('#build/forstio-core', [objects_static]);
+
+# Set Alias
+env.Alias('library_core', [core_env.library_shared, core_env.library_static]);
+
+env.targets += ['library_core'];
+
+# Install
+env.Install('$prefix/lib/', [core_env.library_shared, core_env.library_static]);
+env.Install('$prefix/include/forstio/core/', [core_env.headers]);
diff --git a/src/core/SConstruct b/src/core/SConstruct
new file mode 100644
index 0000000..865d131
--- /dev/null
+++ b/src/core/SConstruct
@@ -0,0 +1,66 @@
+#!/usr/bin/env python3
+
+import sys
+import os
+import os.path
+import glob
+import re
+
+
+if sys.version_info < (3,):
+ def isbasestring(s):
+ return isinstance(s,basestring)
+else:
+ def isbasestring(s):
+ return isinstance(s, (str,bytes))
+
+def add_kel_source_files(self, sources, filetype, lib_env=None, shared=False, target_post=""):
+
+ if isbasestring(filetype):
+ dir_path = self.Dir('.').abspath
+ filetype = sorted(glob.glob(dir_path+"/"+filetype))
+
+ for path in filetype:
+ target_name = re.sub( r'(.*?)(\.cpp|\.c\+\+)', r'\1' + target_post, path )
+ if shared:
+ target_name+='.os'
+ sources.append( self.SharedObject( target=target_name, source=path ) )
+ else:
+ target_name+='.o'
+ sources.append( self.StaticObject( target=target_name, source=path ) )
+ pass
+
+def isAbsolutePath(key, dirname, env):
+ assert os.path.isabs(dirname), "%r must have absolute path syntax" % (key,)
+
+env_vars = Variables(
+ args=ARGUMENTS
+)
+
+env_vars.Add('prefix',
+ help='Installation target location of build results and headers',
+ default='/usr/local/',
+ validator=isAbsolutePath
+)
+
+env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=[],
+ CPPDEFINES=['SAW_UNIX'],
+ CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'],
+ LIBS=[])
+env.__class__.add_source_files = add_kel_source_files
+env.Tool('compilation_db');
+env.cdb = env.CompilationDatabase('compile_commands.json');
+
+env.objects = [];
+env.sources = [];
+env.headers = [];
+env.targets = [];
+
+Export('env')
+SConscript('SConscript')
+
+env.Alias('cdb', env.cdb);
+env.Alias('all', [env.targets]);
+env.Default('all');
+
+env.Alias('install', '$prefix')
diff --git a/src/core/buffer.cpp b/src/core/buffer.cpp
new file mode 100644
index 0000000..ad471d7
--- /dev/null
+++ b/src/core/buffer.cpp
@@ -0,0 +1,434 @@
+#include "buffer.h"
+
+#include <algorithm>
+#include <cassert>
+#include <cstring>
+#include <iomanip>
+#include <sstream>
+
+namespace saw {
+error buffer::push(const uint8_t &value) {
+ size_t write_remain = write_composite_length();
+ if (write_remain > 0) {
+ write() = value;
+ write_advance(1);
+ } else {
+ return make_error<err::buffer_exhausted>();
+ }
+ return no_error();
+}
+
+error buffer::push(const uint8_t &buffer, size_t size) {
+ error error = write_require_length(size);
+ if (error.failed()) {
+ return error;
+ }
+ const uint8_t *buffer_ptr = &buffer;
+ while (size > 0) {
+ size_t segment = std::min(write_segment_length(), size);
+ memcpy(&write(), buffer_ptr, segment);
+ write_advance(segment);
+ size -= segment;
+ buffer_ptr += segment;
+ }
+ return no_error();
+}
+
+error buffer::pop(uint8_t &value) {
+ if (read_composite_length() > 0) {
+ value = read();
+ read_advance(1);
+ } else {
+ return make_error<err::buffer_exhausted>();
+ }
+ return no_error();
+}
+
+error buffer::pop(uint8_t &buffer, size_t size) {
+ if (read_composite_length() >= size) {
+ uint8_t *buffer_ptr = &buffer;
+ while (size > 0) {
+ size_t segment = std::min(read_segment_length(), size);
+ memcpy(buffer_ptr, &read(), segment);
+ read_advance(segment);
+ size -= segment;
+ buffer_ptr += segment;
+ }
+ } else {
+ return make_error<err::buffer_exhausted>();
+ }
+ return no_error();
+}
+
+std::string buffer::to_string() const {
+ std::ostringstream oss;
+ for (size_t i = 0; i < read_composite_length(); ++i) {
+ oss << read(i);
+ }
+ return oss.str();
+}
+
+std::string buffer::to_hex() const {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ for (size_t i = 0; i < read_composite_length(); ++i) {
+ oss << std::setw(2) << (uint16_t)read(i);
+ if ((i + 1) < read_composite_length()) {
+ oss << ((i % 4 == 3) ? '\n' : ' ');
+ }
+ }
+ return oss.str();
+}
+
+buffer_view::buffer_view(buffer &buffer)
+ : buffer_{buffer}, read_offset_{0}, write_offset_{0} {}
+
+size_t buffer_view::read_position() const {
+ return read_offset_ + buffer_.read_position();
+}
+
+size_t buffer_view::read_composite_length() const {
+ assert(read_offset_ <= buffer_.read_composite_length());
+ if (read_offset_ > buffer_.read_composite_length()) {
+ return 0;
+ }
+
+ return buffer_.read_composite_length() - read_offset_;
+}
+
+size_t buffer_view::read_segment_length(size_t offset) const {
+ size_t off = offset + read_offset_;
+ assert(off <= buffer_.read_composite_length());
+ if (off > buffer_.read_composite_length()) {
+ return 0;
+ }
+
+ return buffer_.read_segment_length(off);
+}
+
+void buffer_view::read_advance(size_t bytes) {
+ size_t offset = bytes + read_offset_;
+ assert(offset <= buffer_.read_composite_length());
+ if (offset > buffer_.read_composite_length()) {
+ read_offset_ += buffer_.read_composite_length();
+ return;
+ }
+
+ read_offset_ += bytes;
+}
+
+uint8_t &buffer_view::read(size_t i) {
+ size_t pos = i + read_offset_;
+
+ assert(pos < buffer_.read_composite_length());
+
+ return buffer_.read(pos);
+}
+
+const uint8_t &buffer_view::read(size_t i) const {
+ size_t pos = i + read_offset_;
+
+ assert(pos < buffer_.read_composite_length());
+
+ return buffer_.read(pos);
+}
+
+size_t buffer_view::write_position() const {
+ return write_offset_ + buffer_.write_position();
+}
+
+size_t buffer_view::write_composite_length() const {
+ assert(write_offset_ <= buffer_.write_composite_length());
+ if (write_offset_ > buffer_.write_composite_length()) {
+ return 0;
+ }
+
+ return buffer_.write_composite_length() - write_offset_;
+}
+
+size_t buffer_view::write_segment_length(size_t offset) const {
+ size_t off = offset + write_offset_;
+ assert(off <= buffer_.write_composite_length());
+ if (off > buffer_.write_composite_length()) {
+ return 0;
+ }
+
+ return buffer_.write_segment_length(off);
+}
+
+void buffer_view::write_advance(size_t bytes) {
+ size_t offset = bytes + write_offset_;
+ assert(offset <= buffer_.write_composite_length());
+ if (offset > buffer_.write_composite_length()) {
+ write_offset_ += buffer_.write_composite_length();
+ return;
+ }
+
+ write_offset_ += bytes;
+}
+
+uint8_t &buffer_view::write(size_t i) {
+ size_t pos = i + write_offset_;
+
+ assert(pos < buffer_.write_composite_length());
+
+ return buffer_.write(pos);
+}
+
+const uint8_t &buffer_view::write(size_t i) const {
+ size_t pos = i + write_offset_;
+
+ assert(pos < buffer_.write_composite_length());
+
+ return buffer_.write(pos);
+}
+
+error buffer_view::write_require_length(size_t bytes) {
+ return buffer_.write_require_length(bytes + write_offset_);
+}
+
+size_t buffer_view::read_offset() const { return read_offset_; }
+
+size_t buffer_view::write_offset() const { return write_offset_; }
+
+ring_buffer::ring_buffer() : read_position_{0}, write_position_{0} {
+ buffer_.resize(RING_BUFFER_MAX_SIZE);
+}
+
+ring_buffer::ring_buffer(size_t size) : read_position_{0}, write_position_{0} {
+ buffer_.resize(size);
+}
+
+size_t ring_buffer::read_position() const { return read_position_; }
+
+/*
+ * If write is ahead of read it is a simple distance, but if read ist ahead of
+ * write then there are two segments
+ *
+ */
+size_t ring_buffer::read_composite_length() const {
+ return write_position() < read_position()
+ ? buffer_.size() - (read_position() - write_position())
+ : (write_reached_read_ ? buffer_.size()
+ : write_position() - read_position());
+}
+
+/*
+ * If write is ahead then it's the simple distance again. If read is ahead it's
+ * until the end of the buffer/segment
+ */
+size_t ring_buffer::read_segment_length(size_t offset) const {
+ size_t read_composite = read_composite_length();
+ assert(offset <= read_composite);
+ offset = std::min(offset, read_composite);
+ size_t remaining = read_composite - offset;
+
+ size_t read_offset = read_position() + offset;
+ read_offset = read_offset >= buffer_.size() ? read_offset - buffer_.size()
+ : read_offset;
+
+ // case 1 write is located before read and reached read
+ // then offset can be used normally
+ // case 2 write is located at read, but read reached write
+ // then it is set to zero by readCompositeLength()
+ // case 3 write is located after read
+ // since std::min you can use simple subtraction
+ if (write_position() < read_offset) {
+ return buffer_.size() - read_offset;
+ }
+
+ if (write_position() == read_offset) {
+ if (remaining > 0) {
+ return buffer_.size() - read_offset;
+ } else {
+ return 0;
+ }
+ }
+
+ return write_position() - read_offset;
+}
+
+void ring_buffer::read_advance(size_t bytes) {
+ size_t read_composite = read_composite_length();
+
+ assert(bytes <= read_composite);
+ bytes = std::min(bytes, read_composite);
+ size_t advanced = read_position_ + bytes;
+ read_position_ = advanced >= buffer_.size() ? advanced - buffer_.size()
+ : advanced;
+ write_reached_read_ = bytes > 0 ? false : write_reached_read_;
+}
+
+uint8_t &ring_buffer::read(size_t i) {
+ assert(i < read_composite_length());
+ size_t pos = read_position_ + i;
+ pos = pos >= buffer_.size() ? pos - buffer_.size() : pos;
+ return buffer_[pos];
+}
+
+const uint8_t &ring_buffer::read(size_t i) const {
+ assert(i < read_composite_length());
+ size_t pos = read_position_ + i;
+ pos = pos >= buffer_.size() ? pos - buffer_.size() : pos;
+ return buffer_[pos];
+}
+
+size_t ring_buffer::write_position() const { return write_position_; }
+
+size_t ring_buffer::write_composite_length() const {
+ return read_position() > write_position()
+ ? (read_position() - write_position())
+ : (write_reached_read_
+ ? 0
+ : buffer_.size() - (write_position() - read_position()));
+}
+
+size_t ring_buffer::write_segment_length(size_t offset) const {
+ size_t write_composite = write_composite_length();
+ assert(offset <= write_composite);
+ offset = std::min(offset, write_composite);
+
+ size_t write_offset = write_position() + offset;
+ write_offset = write_offset >= buffer_.size()
+ ? write_offset - buffer_.size()
+ : write_offset;
+
+ if (read_position_ > write_offset) {
+ return read_position_ - write_offset;
+ }
+
+ if (write_reached_read_) {
+ return 0;
+ }
+
+ return buffer_.size() - write_offset;
+}
+
+void ring_buffer::write_advance(size_t bytes) {
+ assert(bytes <= write_composite_length());
+ size_t advanced = write_position_ + bytes;
+ write_position_ = advanced >= buffer_.size() ? advanced - buffer_.size()
+ : advanced;
+
+ write_reached_read_ =
+ (write_position_ == read_position_ && bytes > 0 ? true : false);
+}
+
+uint8_t &ring_buffer::write(size_t i) {
+ assert(i < write_composite_length());
+ size_t pos = write_position_ + i;
+ pos = pos >= buffer_.size() ? pos - buffer_.size() : pos;
+ return buffer_[pos];
+}
+
+const uint8_t &ring_buffer::write(size_t i) const {
+ assert(i < write_composite_length());
+ size_t pos = write_position_ + i;
+ pos = pos >= buffer_.size() ? pos - buffer_.size() : pos;
+ return buffer_[pos];
+}
+/*
+ Error RingBuffer::increaseSize(size_t size){
+ size_t old_size = buffer.size();
+ size_t new_size = old_size + size;
+ buffer.resize(new_size);
+ if(readPosition() > writePosition() || (readPosition() ==
+ writePosition() && write_reached_read)){ size_t remaining = old_size -
+ writePosition(); size_t real_remaining = 0; while(remaining > 0){ size_t
+ segment = std::min(remaining, size); memcpy(&buffer[new_size-segment],
+ &buffer[old_size-segment], segment); remaining -= segment; size -= segment;
+ old_size -= segment;
+ new_size -= segment;
+ }
+ }
+
+ return noError();
+ }
+*/
+error ring_buffer::write_require_length(size_t bytes) {
+ size_t write_remain = write_composite_length();
+ if (bytes > write_remain) {
+ return make_error<err::buffer_exhausted>();
+ }
+ return no_error();
+}
+
+array_buffer::array_buffer(size_t size)
+ : read_position_{0}, write_position_{0} {
+ buffer_.resize(size);
+}
+
+size_t array_buffer::read_position() const { return read_position_; }
+
+size_t array_buffer::read_composite_length() const {
+ return write_position_ - read_position_;
+}
+
+size_t array_buffer::read_segment_length(size_t offset) const {
+ size_t read_composite = read_composite_length();
+ assert(offset <= read_composite);
+
+ offset = std::min(read_composite, offset);
+ size_t read_offset = read_position_ + offset;
+
+ return write_position_ - read_offset;
+}
+
+void array_buffer::read_advance(size_t bytes) {
+ assert(bytes <= read_composite_length());
+ read_position_ += bytes;
+}
+
+uint8_t &array_buffer::read(size_t i) {
+ assert(i < read_composite_length());
+
+ return buffer_[i + read_position_];
+}
+
+const uint8_t &array_buffer::read(size_t i) const {
+ assert(i + read_position_ < buffer_.size());
+
+ return buffer_[i + read_position_];
+}
+
+size_t array_buffer::write_position() const { return write_position_; }
+
+size_t array_buffer::write_composite_length() const {
+ assert(write_position_ <= buffer_.size());
+ return buffer_.size() - write_position_;
+}
+
+size_t array_buffer::write_segment_length(size_t offset) const {
+ assert(write_position_ <= buffer_.size());
+ size_t write_composite = write_composite_length();
+
+ assert(offset <= write_composite);
+ offset = std::min(write_composite, offset);
+ size_t write_offset = write_position_ + offset;
+
+ return buffer_.size() - write_offset;
+}
+
+void array_buffer::write_advance(size_t bytes) {
+ assert(bytes <= write_composite_length());
+ write_position_ += bytes;
+}
+
+uint8_t &array_buffer::write(size_t i) {
+ assert(i < write_composite_length());
+ return buffer_[i + write_position_];
+}
+
+const uint8_t &array_buffer::write(size_t i) const {
+ assert(i < write_composite_length());
+ return buffer_[i + write_position_];
+}
+error array_buffer::write_require_length(size_t bytes) {
+ size_t write_remain = write_composite_length();
+ if (bytes > write_remain) {
+ return make_error<err::buffer_exhausted>();
+ }
+ return no_error();
+}
+
+} // namespace saw
diff --git a/src/core/buffer.h b/src/core/buffer.h
new file mode 100644
index 0000000..4485ff1
--- /dev/null
+++ b/src/core/buffer.h
@@ -0,0 +1,195 @@
+#pragma once
+
+#include "error.h"
+
+#include <array>
+#include <cstdint>
+#include <deque>
+#include <list>
+#include <string>
+#include <vector>
+
+namespace saw {
+/*
+ * Access class to reduce templated BufferSegments bloat
+ */
+class buffer {
+protected:
+ ~buffer() = default;
+
+public:
+ virtual size_t read_position() const = 0;
+ virtual size_t read_composite_length() const = 0;
+ virtual size_t read_segment_length(size_t offset = 0) const = 0;
+ virtual void read_advance(size_t bytes) = 0;
+
+ virtual uint8_t &read(size_t i = 0) = 0;
+ virtual const uint8_t &read(size_t i = 0) const = 0;
+
+ virtual size_t write_position() const = 0;
+ virtual size_t write_composite_length() const = 0;
+ virtual size_t write_segment_length(size_t offset = 0) const = 0;
+ virtual void write_advance(size_t bytes) = 0;
+
+ virtual uint8_t &write(size_t i = 0) = 0;
+ virtual const uint8_t &write(size_t i = 0) const = 0;
+
+ /*
+ * Sometime buffers need to grow with a little more control
+ * than with push and pop for more efficient calls.
+ * There is nothing you can do if read hasn't been filled, but at
+ * least write can be increased if it is demanded.
+ */
+ virtual error write_require_length(size_t bytes) = 0;
+
+ error push(const uint8_t &value);
+ error push(const uint8_t &buffer, size_t size);
+ error pop(uint8_t &value);
+ error pop(uint8_t &buffer, size_t size);
+
+ /*
+ * Subject to change
+ */
+ std::string to_string() const;
+ std::string to_hex() const;
+};
+
+/*
+ * A viewer class for buffers.
+ * Working on the reference buffer invalidates the buffer view
+ */
+class buffer_view : public buffer {
+private:
+ buffer &buffer_;
+ size_t read_offset_;
+ size_t write_offset_;
+
+public:
+ buffer_view(buffer &);
+
+ size_t read_position() const override;
+ size_t read_composite_length() const override;
+ size_t read_segment_length(size_t offset = 0) const override;
+ void read_advance(size_t bytes) override;
+
+ uint8_t &read(size_t i = 0) override;
+ const uint8_t &read(size_t i = 0) const override;
+
+ size_t write_position() const override;
+ size_t write_composite_length() const override;
+ size_t write_segment_length(size_t offset = 0) const override;
+ void write_advance(size_t bytes) override;
+
+ uint8_t &write(size_t i = 0) override;
+ const uint8_t &write(size_t i = 0) const override;
+
+ error write_require_length(size_t bytes) override;
+
+ size_t read_offset() const;
+ size_t write_offset() const;
+};
+
+/*
+ * Buffer size meant for default allocation size of the ringbuffer since
+ * this class currently doesn't support proper resizing
+ */
+constexpr size_t RING_BUFFER_MAX_SIZE = 4096;
+/*
+ * Buffer wrapping around if read caught up
+ */
+class ring_buffer final : public buffer {
+private:
+ std::vector<uint8_t> buffer_;
+ size_t read_position_;
+ size_t write_position_;
+ bool write_reached_read_ = false;
+
+public:
+ ring_buffer();
+ ring_buffer(size_t size);
+
+ inline size_t size() const { return buffer_.size(); }
+
+ inline uint8_t &operator[](size_t i) { return buffer_[i]; }
+ inline const uint8_t &operator[](size_t i) const { return buffer_[i]; }
+
+ size_t read_position() const override;
+ size_t read_composite_length() const override;
+ size_t read_segment_length(size_t offset = 0) const override;
+ void read_advance(size_t bytes) override;
+
+ uint8_t &read(size_t i = 0) override;
+ const uint8_t &read(size_t i = 0) const override;
+
+ size_t write_position() const override;
+ size_t write_composite_length() const override;
+ size_t write_segment_length(size_t offset = 0) const override;
+ void write_advance(size_t bytes) override;
+
+ uint8_t &write(size_t i = 0) override;
+ const uint8_t &write(size_t i = 0) const override;
+
+ error write_require_length(size_t bytes) override;
+};
+
+/*
+ * One time buffer
+ */
+class array_buffer : public buffer {
+private:
+ std::vector<uint8_t> buffer_;
+
+ size_t read_position_;
+ size_t write_position_;
+
+public:
+ array_buffer(size_t size);
+
+ size_t read_position() const override;
+ size_t read_composite_length() const override;
+ size_t read_segment_length(size_t offset = 0) const override;
+ void read_advance(size_t bytes) override;
+
+ uint8_t &read(size_t i = 0) override;
+ const uint8_t &read(size_t i = 0) const override;
+
+ size_t write_position() const override;
+ size_t write_composite_length() const override;
+ size_t write_segment_length(size_t offset = 0) const override;
+ void write_advance(size_t bytes) override;
+
+ uint8_t &write(size_t i = 0) override;
+ const uint8_t &write(size_t i = 0) const override;
+
+ error write_require_length(size_t bytes) override;
+};
+
+class chain_array_buffer : public buffer {
+private:
+ std::deque<array_buffer> buffer_;
+
+ size_t read_position_;
+ size_t write_position_;
+
+public:
+ chain_array_buffer();
+
+ size_t read_position() const override;
+ size_t read_composite_length() const override;
+ size_t read_segment_length(size_t offset = 0) const override;
+ void read_advance(size_t bytes) override;
+
+ uint8_t &read(size_t i = 0) override;
+ const uint8_t &read(size_t i = 0) const override;
+
+ size_t write_position() const override;
+ size_t write_composite_length() const override;
+ size_t write_segment_length(size_t offset = 0) const override;
+ void write_advance(size_t bytes) override;
+
+ uint8_t &write(size_t i = 0) override;
+ const uint8_t &write(size_t i = 0) const override;
+
+ error write_require_length(size_t bytes) override;
+};
+} // namespace saw
diff --git a/src/core/common.h b/src/core/common.h
new file mode 100644
index 0000000..a06c238
--- /dev/null
+++ b/src/core/common.h
@@ -0,0 +1,75 @@
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <utility>
+
+namespace saw {
+
+#define SAW_CONCAT_(x, y) x##y
+#define SAW_CONCAT(x, y) SAW_CONCAT_(x, y)
+#define SAW_UNIQUE_NAME(prefix) SAW_CONCAT(prefix, __LINE__)
+
+#define SAW_FORBID_COPY(classname) \
+ classname(const classname &) = delete; \
+ classname &operator=(const classname &) = delete
+
+#define SAW_FORBID_MOVE(classname) \
+ classname(classname &&) = delete; \
+ classname &operator=(classname &&) = delete
+
+#define SAW_DEFAULT_COPY(classname) \
+ classname(const classname &) = default; \
+ classname &operator=(const classname &) = default
+
+#define SAW_DEFAULT_MOVE(classname) \
+ classname(classname &&) = default; \
+ classname &operator=(classname &&) = default
+
+// In case of C++20
+#define SAW_ASSERT(expression) \
+ assert(expression); \
+ if (!(expression)) [[unlikely]]
+
+template <typename T> using maybe = std::optional<T>;
+
+template <typename T> using own = std::unique_ptr<T>;
+
+template <typename T> using our = std::shared_ptr<T>;
+
+template <typename T> using lent = std::weak_ptr<T>;
+
+template <typename T, class... Args> own<T> heap(Args &&...args) {
+ return own<T>(new T(std::forward<Args>(args)...));
+}
+
+template <typename T, class... Args> our<T> share(Args &&...args) {
+ return std::make_shared<T>(std::forward<Args>(args)...);
+}
+
+template <typename T> T instance() noexcept;
+
+template <typename Func, typename T> struct return_type_helper {
+ typedef decltype(instance<Func>()(instance<T>())) Type;
+};
+template <typename Func> struct return_type_helper<Func, void> {
+ typedef decltype(instance<Func>()()) Type;
+};
+
+template <typename Func, typename T>
+using return_type = typename return_type_helper<Func, T>::Type;
+
+struct void_t {};
+
+template <typename T> struct void_fix { typedef T Type; };
+template <> struct void_fix<void> { typedef void_t Type; };
+template <typename T> using fix_void = typename void_fix<T>::Type;
+
+template <typename T> struct void_unfix { typedef T Type; };
+template <> struct void_unfix<void_t> { typedef void Type; };
+template <typename T> using unfix_void = typename void_unfix<T>::Type;
+
+template <typename... T> constexpr bool always_false = false;
+
+} // namespace saw
diff --git a/src/core/error.cpp b/src/core/error.cpp
new file mode 100644
index 0000000..727ca95
--- /dev/null
+++ b/src/core/error.cpp
@@ -0,0 +1,121 @@
+#include "error.h"
+
+namespace saw {
+error::error(error::code code_, bool is_critical__)
+ : error_code_{static_cast<error::code>(code_)}, is_critical_{is_critical__} {}
+
+error::error(error::code code_, bool is_critical__, const std::string_view &msg)
+ :
+ error_code_{static_cast<error::code>(code_)}
+ , is_critical_{is_critical__}, error_message_{msg}{}
+
+error::error(error &&error)
+ :
+ error_code_{std::move(error.error_code_)}
+ , is_critical_{std::move(error.is_critical_)}
+ , error_message_{std::move(error.error_message_)}{}
+
+const std::string_view error::message() const {
+
+ return std::visit(
+ [this](auto &&arg) -> const std::string_view {
+ using T = std::decay_t<decltype(arg)>;
+
+ if constexpr (std::is_same_v<T, std::string>) {
+ return std::string_view{arg};
+ } else if constexpr (std::is_same_v<T, std::string_view>) {
+ return arg;
+ } else {
+ return "Error in class Error. Good luck :)";
+ }
+ },
+ error_message_);
+}
+
+bool error::failed() const {
+ return this->is_error<err::no_error>();
+}
+
+bool error::is_critical() const {
+ return is_critical_;
+}
+
+bool error::is_recoverable() const {
+ return !is_critical_;
+}
+
+error error::copy_error() const {
+ auto copy_error_code = error_code_;
+ error error{copy_error_code, is_critical_};
+
+ try {
+ error.error_message_ = error_message_;
+ } catch (const std::bad_alloc &) {
+ error.error_message_ =
+ std::string_view{"Error while copying Error string. Out of memory"};
+ }
+
+ return error;
+}
+
+error::code error::get_id() const { return error_code_; }
+
+namespace impl {
+error_registry& get_error_registry() {
+ static own<error_registry> reg = nullptr;
+ if(!reg){
+ reg = heap<error_registry>();
+ }
+
+ assert(reg);
+ return *reg;
+}
+}
+
+error no_error(){
+ return make_error<err::no_error>();
+}
+
+namespace impl {
+error_or<error::code> error_registry::search_id(const std::string_view& desc)const{
+ /**
+ * Search the index in the vector
+ */
+ size_t i{};
+ size_t info_max_size = std::min<std::size_t>(infos.size(), std::numeric_limits<error::code>::max());
+ for(i = 0; i < info_max_size; ++i){
+ if(infos.at(i).description == desc){
+ break;
+ }
+ }
+
+ if(i == info_max_size){
+ return make_error<err::not_found>();
+ }
+
+ return static_cast<error::code>(i);
+}
+
+error_or<error::code> error_registry::search_or_register_id(const std::string_view& desc, bool is_critical){
+ auto err_or_id = search_id(desc);
+
+ if(err_or_id.is_value()){
+ return err_or_id.get_value();
+ }
+
+ auto& err = err_or_id.get_error();
+
+ if(err.is_error<err::not_found>()){
+ size_t new_index = infos.size();
+ if(new_index == std::numeric_limits<error::code>::max()){
+ return make_error<err::out_of_memory>("Error registry ids are exhausted");
+ }
+ infos.emplace_back(error_info{desc, is_critical});
+ return static_cast<error::code>(new_index);
+ }
+
+ return std::move(err);
+}
+}
+
+} // namespace saw
diff --git a/src/core/error.h b/src/core/error.h
new file mode 100644
index 0000000..3d242b9
--- /dev/null
+++ b/src/core/error.h
@@ -0,0 +1,233 @@
+#pragma once
+
+#include <algorithm>
+#include <limits>
+#include <string>
+#include <string_view>
+#include <variant>
+#include <vector>
+
+#include <cassert>
+
+#include "common.h"
+
+namespace saw {
+/**
+ * Utility class for generating errors. Has a base distinction between
+ * critical and recoverable errors. Additional code ids can be provided to the
+ * constructor if additional distinctions are necessary.
+ */
+class error {
+public:
+ using code = uint32_t;
+private:
+ code error_code_;
+ bool is_critical_;
+ std::variant<std::string_view, std::string> error_message_;
+
+public:
+ error(error::code id, bool is_critical);
+ error(error::code id, bool is_critical, const std::string_view &msg);
+ error(error &&error);
+
+ SAW_FORBID_COPY(error);
+
+ error &operator=(error &&) = default;
+
+ const std::string_view message() const;
+ bool failed() const;
+
+ bool is_critical() const;
+ bool is_recoverable() const;
+
+ /**
+ * Replaces the copy constructor. We need this since we want to explicitly copy and not implicitly
+ */
+ error copy_error() const;
+
+ code get_id() const;
+
+ template<typename T>
+ bool is_error() const;
+};
+
+template<typename T>
+class error_or;
+
+namespace impl {
+
+class error_registry {
+private:
+ struct error_info {
+ error_info() = delete;
+ error_info(const std::string_view& d_, bool critical_):description{d_}, is_critical{critical_}{}
+
+ std::string_view description;
+ bool is_critical;
+ };
+
+ std::vector<error_info> infos;
+public:
+ error_or<error::code> search_id(const std::string_view& desc) const;
+
+ error_or<error::code> search_or_register_id(const std::string_view& desc, bool is_critical);
+};
+
+error_registry& get_error_registry();
+
+template<typename T>
+error::code get_template_id(){
+ static error::code id = std::numeric_limits<error::code>::max();
+
+ if(id == std::numeric_limits<error::code>::max()){
+ auto& reg = get_error_registry();
+
+ auto err_or_id = reg.search_or_register_id(T::description, T::is_critical);
+ if(err_or_id.is_error()){
+ return std::numeric_limits<error::code>::max();
+ }
+
+ id = err_or_id.get_value();
+ }
+
+ return id;
+}
+}
+
+template<typename T> error make_error(const std::string_view& generic){
+ error::code id = impl::get_template_id<T>();
+
+ return error{id, T::is_critical, generic};
+}
+
+template<typename T> error make_error(){
+ error::code id = impl::get_template_id<T>();
+
+ return error{id, T::is_critical};
+}
+
+error make_error(error::code code, const std::string_view &generic);
+
+
+namespace err {
+struct no_error {
+ static constexpr std::string_view description = "No error has occured";
+ static constexpr bool is_critical = false;
+};
+
+struct recoverable {
+ static constexpr std::string_view description = "No error has occured";
+ static constexpr bool is_critical = false;
+};
+
+struct critical {
+ static constexpr std::string_view description = "No error has occured";
+ static constexpr bool is_critical = true;
+};
+
+struct buffer_exhausted {
+ static constexpr std::string_view description = "Buffer is too small";
+ static constexpr bool is_critical = false;
+};
+
+struct not_found {
+ static constexpr std::string_view description = "Not found";
+ static constexpr bool is_critical = false;
+};
+
+struct out_of_memory {
+ static constexpr std::string_view description = "Out of memory";
+ static constexpr bool is_critical = true;
+};
+
+struct invalid_state {
+ static constexpr std::string_view description = "Invalid state";
+ static constexpr bool is_critical = true;
+};
+
+struct not_supported {
+ static constexpr std::string_view description = "Not supported";
+ static constexpr bool is_critical = false;
+};
+
+struct not_implemented {
+ static constexpr std::string_view description = "Not implemented";
+ static constexpr bool is_critical = true;
+};
+}
+
+/**
+ * Shorthand for no error happened
+ */
+error no_error();
+
+/**
+ * Exception alternative. Since I code without exceptions this class is
+ * essentially a kind of exception replacement.
+ */
+template <typename T> class error_or;
+
+class error_or_value {
+public:
+ virtual ~error_or_value() = default;
+
+ template <typename T> error_or<unfix_void<T>> &as() {
+ return static_cast<error_or<unfix_void<T>> &>(*this);
+ }
+
+ template <typename T> const error_or<unfix_void<T>> &as() const {
+ return static_cast<const error_or<unfix_void<T>> &>(*this);
+ }
+};
+
+template <typename T> class error_or final : public error_or_value {
+private:
+ std::variant<error, fix_void<T>> value_or_error_;
+
+ static_assert(!std::is_same_v<T, void_t>,
+ "Don't use internal private types");
+
+public:
+ error_or():value_or_error_{fix_void<T>{}}{}
+ error_or(const fix_void<T> &value) : value_or_error_{value} {}
+
+ error_or(fix_void<T> &&value) : value_or_error_{std::move(value)} {}
+
+ error_or(const error &error) : value_or_error_{error} {}
+ error_or(error &&error) : value_or_error_{std::move(error)} {}
+
+ bool is_value() const {
+ return std::holds_alternative<fix_void<T>>(value_or_error_);
+ }
+
+ bool is_error() const {
+ return std::holds_alternative<class error>(value_or_error_);
+ }
+
+ class error &get_error() {
+ return std::get<class error>(value_or_error_);
+ }
+
+ const class error &get_error() const {
+ return std::get<class error>(value_or_error_);
+ }
+
+ fix_void<T> &get_value() { return std::get<fix_void<T>>(value_or_error_); }
+
+ const fix_void<T> &get_value() const {
+ return std::get<fix_void<T>>(value_or_error_);
+ }
+};
+
+template <typename T> class error_or<error_or<T>> {
+private:
+ error_or() = delete;
+};
+
+template<typename T>
+bool error::is_error() const {
+
+ return error_code_ == impl::get_template_id<T>();
+}
+
+} // namespace saw
diff --git a/src/core/string_literal.h b/src/core/string_literal.h
new file mode 100644
index 0000000..d530a54
--- /dev/null
+++ b/src/core/string_literal.h
@@ -0,0 +1,40 @@
+#pragma once
+
+#include <array>
+#include <string_view>
+
+namespace saw {
+/**
+ * Helper object which creates a templated string from the provided string
+ * literal. It guarantees compile time uniqueness and thus allows using strings
+ * in template parameters.
+ */
+template <class CharT, size_t N> class string_literal {
+public:
+ constexpr string_literal(const CharT (&input)[N]) noexcept {
+ for (size_t i = 0; i < N; ++i) {
+ data[i] = input[i];
+ }
+ }
+
+ std::array<CharT, N> data{};
+
+ constexpr std::string_view view() const noexcept {
+ return std::string_view{data.data()};
+ }
+
+ constexpr bool
+ operator==(const string_literal<CharT, N> &) const noexcept = default;
+
+ template <class CharTR, size_t NR>
+ constexpr bool
+ operator==(const string_literal<CharTR, NR> &) const noexcept {
+ return false;
+ }
+};
+
+template <typename T, T... Chars>
+constexpr string_literal<T, sizeof...(Chars)> operator""_key() {
+ return string_literal<T, sizeof...(Chars) + 1u>{Chars..., '\0'};
+}
+} // namespace saw
diff --git a/src/io-tls/.nix/derivation.nix b/src/io-tls/.nix/derivation.nix
new file mode 100644
index 0000000..6c62b51
--- /dev/null
+++ b/src/io-tls/.nix/derivation.nix
@@ -0,0 +1,35 @@
+{ lib
+, stdenvNoCC
+, scons
+, clang
+, clang-tools
+, version
+, forstio
+, gnutls
+}:
+
+let
+
+in stdenvNoCC.mkDerivation {
+ pname = "forstio-io-tls";
+ inherit version;
+
+ src = ./..;
+
+ enableParallelBuilding = true;
+
+ nativeBuildInputs = [
+ scons
+ clang
+ clang-tools
+ ];
+
+ buildInputs = [
+ forstio.core
+ forstio.async
+ forstio.io
+ gnutls
+ ];
+
+ outputs = ["out" "dev"];
+}
diff --git a/src/io-tls/SConscript b/src/io-tls/SConscript
new file mode 100644
index 0000000..4f88f37
--- /dev/null
+++ b/src/io-tls/SConscript
@@ -0,0 +1,38 @@
+#!/bin/false
+
+import os
+import os.path
+import glob
+
+
+Import('env')
+
+dir_path = Dir('.').abspath
+
+# Environment for base library
+io_tls_env = env.Clone();
+
+io_tls_env.sources = sorted(glob.glob(dir_path + "/*.cpp"))
+io_tls_env.headers = sorted(glob.glob(dir_path + "/*.h"))
+
+env.sources += io_tls_env.sources;
+env.headers += io_tls_env.headers;
+
+## Shared lib
+objects_shared = []
+io_tls_env.add_source_files(objects_shared, io_tls_env.sources, shared=True);
+io_tls_env.library_shared = io_tls_env.SharedLibrary('#build/forstio-io-tls', [objects_shared]);
+
+## Static lib
+objects_static = []
+io_tls_env.add_source_files(objects_static, io_tls_env.sources, shared=False);
+io_tls_env.library_static = io_tls_env.StaticLibrary('#build/forstio-io-tls', [objects_static]);
+
+# Set Alias
+env.Alias('library_io_tls', [io_tls_env.library_shared, io_tls_env.library_static]);
+
+env.targets += ['library_io_tls'];
+
+# Install
+env.Install('$prefix/lib/', [io_tls_env.library_shared, io_tls_env.library_static]);
+env.Install('$prefix/include/forstio/io/tls/', [io_tls_env.headers]);
diff --git a/src/io-tls/SConstruct b/src/io-tls/SConstruct
new file mode 100644
index 0000000..fbd8657
--- /dev/null
+++ b/src/io-tls/SConstruct
@@ -0,0 +1,66 @@
+#!/usr/bin/env python3
+
+import sys
+import os
+import os.path
+import glob
+import re
+
+
+if sys.version_info < (3,):
+ def isbasestring(s):
+ return isinstance(s,basestring)
+else:
+ def isbasestring(s):
+ return isinstance(s, (str,bytes))
+
+def add_kel_source_files(self, sources, filetype, lib_env=None, shared=False, target_post=""):
+
+ if isbasestring(filetype):
+ dir_path = self.Dir('.').abspath
+ filetype = sorted(glob.glob(dir_path+"/"+filetype))
+
+ for path in filetype:
+ target_name = re.sub( r'(.*?)(\.cpp|\.c\+\+)', r'\1' + target_post, path )
+ if shared:
+ target_name+='.os'
+ sources.append( self.SharedObject( target=target_name, source=path ) )
+ else:
+ target_name+='.o'
+ sources.append( self.StaticObject( target=target_name, source=path ) )
+ pass
+
+def isAbsolutePath(key, dirname, env):
+ assert os.path.isabs(dirname), "%r must have absolute path syntax" % (key,)
+
+env_vars = Variables(
+ args=ARGUMENTS
+)
+
+env_vars.Add('prefix',
+ help='Installation target location of build results and headers',
+ default='/usr/local/',
+ validator=isAbsolutePath
+)
+
+env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=[],
+ CPPDEFINES=['SAW_UNIX'],
+ CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'],
+ LIBS=['gnutls','forstio-io'])
+env.__class__.add_source_files = add_kel_source_files
+env.Tool('compilation_db');
+env.cdb = env.CompilationDatabase('compile_commands.json');
+
+env.objects = [];
+env.sources = [];
+env.headers = [];
+env.targets = [];
+
+Export('env')
+SConscript('SConscript')
+
+env.Alias('cdb', env.cdb);
+env.Alias('all', [env.targets]);
+env.Default('all');
+
+env.Alias('install', '$prefix')
diff --git a/src/io-tls/tls.cpp b/src/io-tls/tls.cpp
new file mode 100644
index 0000000..9fa143c
--- /dev/null
+++ b/src/io-tls/tls.cpp
@@ -0,0 +1,252 @@
+#include "tls.h"
+
+#include <gnutls/gnutls.h>
+#include <gnutls/x509.h>
+
+#include <forstio/io/io_helpers.h>
+
+#include <cassert>
+
+#include <iostream>
+
+namespace saw {
+
+class tls::impl {
+public:
+ gnutls_certificate_credentials_t xcred;
+
+public:
+ impl() {
+ gnutls_global_init();
+ gnutls_certificate_allocate_credentials(&xcred);
+ gnutls_certificate_set_x509_system_trust(xcred);
+ }
+
+ ~impl() {
+ gnutls_certificate_free_credentials(xcred);
+ gnutls_global_deinit();
+ }
+};
+
+static ssize_t forst_tls_push_func(gnutls_transport_ptr_t p, const void *data,
+ size_t size);
+static ssize_t forst_tls_pull_func(gnutls_transport_ptr_t p, void *data, size_t size);
+
+tls::tls() : impl_{heap<tls::impl>()} {}
+
+tls::~tls() {}
+
+tls::impl &tls::get_impl() { return *impl_; }
+
+class tls_io_stream final : public io_stream {
+private:
+ own<io_stream> internal;
+ gnutls_session_t session_handle;
+
+public:
+ tls_io_stream(own<io_stream> internal_) : internal{std::move(internal_)} {}
+
+ ~tls_io_stream() { gnutls_bye(session_handle, GNUTLS_SHUT_RDWR); }
+
+ error_or<size_t> read(void *buffer, size_t length) override {
+ ssize_t size = gnutls_record_recv(session_handle, buffer, length);
+ if (size < 0) {
+ if(gnutls_error_is_fatal(size) == 0){
+ return make_error<err::recoverable>("Recoverable error on read in gnutls. TODO better error msg handling");
+ // Leaving proper message handling done in previous error framework
+ //return recoverable_error([size](){return std::string{"Read recoverable Error "}+std::string{gnutls_strerror(size)};}, "Error read r");
+ }else{
+ return make_error<err::critical>("Fatal error on read in gnutls. TODO better error msg handling");
+ }
+ }else if(size == 0){
+ return make_error<err::disconnected>();
+ }
+
+ return static_cast<size_t>(length);
+ }
+
+ conveyor<void> read_ready() override { return internal->read_ready(); }
+
+ conveyor<void> on_read_disconnected() override {
+ return internal->on_read_disconnected();
+ }
+
+ error_or<size_t> write(const void *buffer, size_t length) override {
+ ssize_t size = gnutls_record_send(session_handle, buffer, length);
+ if(size < 0){
+ if(gnutls_error_is_fatal(size) == 0){
+ return make_error<err::recoverable>("Recoverable error on write in gnutls. TODO better error msg handling");
+ }else{
+ return make_error<err::critical>("Fatal error on write in gnutls. TODO better error msg handling");
+ }
+ }
+
+ return static_cast<size_t>(size);
+ }
+
+ conveyor<void> write_ready() override { return internal->write_ready(); }
+
+ gnutls_session_t &session() { return session_handle; }
+};
+
+tls_server::tls_server(own<server> srv) : internal{std::move(srv)} {}
+
+conveyor<own<io_stream>> tls_server::accept() {
+ SAW_ASSERT(internal) { return conveyor<own<io_stream>>{fix_void<own<io_stream>>{nullptr}}; }
+ return internal->accept().then([](own<io_stream> stream) -> own<io_stream> {
+ /// @todo handshake
+
+
+ return heap<tls_io_stream>(std::move(stream));
+ });
+}
+
+namespace {
+/*
+* Small helper for setting up the nonblocking connection handshake
+*/
+struct tls_client_stream_helper {
+public:
+ own<conveyor_feeder<own<io_stream>>> feeder;
+ conveyor_sink connection_sink;
+ conveyor_sink stream_reader;
+ conveyor_sink stream_writer;
+
+ own<tls_io_stream> stream = nullptr;
+public:
+ tls_client_stream_helper(own<conveyor_feeder<own<io_stream>>> f):
+ feeder{std::move(f)}
+ {}
+
+ void setupTurn(){
+ SAW_ASSERT(stream){
+ return;
+ }
+
+ stream_reader = stream->read_ready().then([this](){
+ turn();
+ }).sink();
+
+ stream_writer = stream->write_ready().then([this](){
+ turn();
+ }).sink();
+ }
+
+ void turn(){
+ if(stream){
+ // Guarantee that the receiving end is already setup
+ SAW_ASSERT(feeder){
+ return;
+ }
+
+ auto &session = stream->session();
+
+ int ret;
+ do {
+ ret = gnutls_handshake(session);
+ } while ( (ret == GNUTLS_E_AGAIN || ret == GNUTLS_E_INTERRUPTED) && gnutls_error_is_fatal(ret) == 0);
+
+ if(gnutls_error_is_fatal(ret)){
+ feeder->fail(make_error<err::critical>("Couldn't create Tls connection"));
+ stream = nullptr;
+ }else if(ret == GNUTLS_E_SUCCESS){
+ feeder->feed(std::move(stream));
+ }
+ }
+ }
+};
+}
+
+own<server> tls_network::listen(network_address& address) {
+ return heap<tls_server>(internal.listen(address));
+}
+
+conveyor<own<io_stream>> tls_network::connect(network_address& address) {
+ // Helper setups
+ auto caf = new_conveyor_and_feeder<own<io_stream>>();
+ own<tls_client_stream_helper> helper = heap<tls_client_stream_helper>(std::move(caf.feeder));
+ tls_client_stream_helper* hlp_ptr = helper.get();
+
+ // Conveyor entangled structure
+ auto prim_conv = internal.connect(address).then([this, hlp_ptr, addr = address.address()](
+ own<io_stream> stream) -> error_or<void> {
+ io_stream* inner_stream = stream.get();
+ auto tls_stream = heap<tls_io_stream>(std::move(stream));
+
+ auto &session = tls_stream->session();
+
+ gnutls_init(&session, GNUTLS_CLIENT);
+
+ gnutls_server_name_set(session, GNUTLS_NAME_DNS, addr.c_str(),
+ addr.size());
+
+ gnutls_set_default_priority(session);
+ gnutls_credentials_set(session, GNUTLS_CRD_CERTIFICATE,
+ tls_.get_impl().xcred);
+ gnutls_session_set_verify_cert(session, addr.c_str(), 0);
+
+ gnutls_transport_set_ptr(session, reinterpret_cast<gnutls_transport_ptr_t>(inner_stream));
+ gnutls_transport_set_push_function(session, forst_tls_push_func);
+ gnutls_transport_set_pull_function(session, forst_tls_pull_func);
+
+ // gnutls_handshake_set_timeout(session, GNUTLS_DEFAULT_HANDSHAKE_TIMEOUT);
+
+ hlp_ptr->stream = std::move(tls_stream);
+ hlp_ptr->setupTurn();
+ hlp_ptr->turn();
+
+ return void_t{};
+ });
+
+ helper->connection_sink = prim_conv.sink();
+
+ return caf.conveyor.attach(std::move(helper));
+}
+
+own<datagram> tls_network::datagram(network_address& address){
+ ///@unimplemented
+ return nullptr;
+}
+
+static ssize_t forst_tls_push_func(gnutls_transport_ptr_t p, const void *data,
+ size_t size) {
+ io_stream *stream = reinterpret_cast<io_stream *>(p);
+ if (!stream) {
+ return -1;
+ }
+
+ error_or<size_t> length = stream->write(data, size);
+ if (length.is_error() || !length.is_value()) {
+ return -1;
+ }
+
+ return static_cast<ssize_t>(length.get_value());
+}
+
+static ssize_t forst_tls_pull_func(gnutls_transport_ptr_t p, void *data, size_t size) {
+ io_stream *stream = reinterpret_cast<io_stream *>(p);
+ if (!stream) {
+ return -1;
+ }
+
+ error_or<size_t> length = stream->read(data, size);
+ if (length.is_error() || !length.is_value()) {
+ return -1;
+ }
+
+ return static_cast<ssize_t>(length.get_value());
+}
+
+tls_network::tls_network(tls& tls_, network &network) : tls_{tls_},internal{network} {}
+
+conveyor<own<network_address>> tls_network::resolve_address(const std::string &addr,
+ uint16_t port) {
+ /// @todo tls server name needed. Check validity. Won't matter later on, because gnutls should fail anyway. But
+ /// it's better to find the error source sooner rather than later
+ return internal.resolve_address(addr, port);
+}
+
+std::optional<own<tls_network>> setup_tls_network(network &network) {
+ return std::nullopt;
+}
+} // namespace saw
diff --git a/src/io-tls/tls.h b/src/io-tls/tls.h
new file mode 100644
index 0000000..74b39ff
--- /dev/null
+++ b/src/io-tls/tls.h
@@ -0,0 +1,68 @@
+#pragma once
+
+#include <forstio/core/common.h>
+#include <forstio/io/io.h>
+
+#include <optional>
+#include <variant>
+
+namespace saw {
+class tls;
+
+class tls_server final : public server {
+private:
+ own<server> internal;
+
+public:
+ tls_server(own<server> srv);
+
+ conveyor<own<io_stream>> accept() override;
+};
+
+class tls_network final : public network {
+private:
+ tls& tls_;
+ network &internal;
+public:
+ tls_network(tls& tls_, network &network_);
+
+ conveyor<own<network_address>> resolve_address(const std::string &addr, uint16_t port = 0) override;
+
+ own<server> listen(network_address& address) override;
+
+ conveyor<own<io_stream>> connect(network_address& address) override;
+
+ own<class datagram> datagram(network_address& address) override;
+};
+
+/**
+* tls context class.
+* Provides tls network class which ensures the usage of tls encrypted connections
+*/
+class tls {
+private:
+ class impl;
+ own<impl> impl_;
+public:
+ tls();
+ ~tls();
+
+ struct version {
+ struct tls_1_0{};
+ struct tls_1_1{};
+ struct tls_1_2{};
+ };
+
+ struct options {
+ public:
+ version version;
+ };
+
+ impl &get_impl();
+private:
+ options options_;
+};
+
+std::optional<own<tls_network>> setup_tls_network(network &network);
+
+} // namespace saw
diff --git a/src/io/.nix/derivation.nix b/src/io/.nix/derivation.nix
new file mode 100644
index 0000000..0d213d3
--- /dev/null
+++ b/src/io/.nix/derivation.nix
@@ -0,0 +1,32 @@
+{ lib
+, stdenvNoCC
+, scons
+, clang
+, clang-tools
+, version
+, forstio
+}:
+
+let
+
+in stdenvNoCC.mkDerivation {
+ pname = "forstio-io";
+ inherit version;
+
+ src = ./..;
+
+ enableParallelBuilding = true;
+
+ nativeBuildInputs = [
+ scons
+ clang
+ clang-tools
+ ];
+
+ buildInputs = [
+ forstio.core
+ forstio.async
+ ];
+
+ outputs = ["out" "dev"];
+}
diff --git a/src/io/SConscript b/src/io/SConscript
new file mode 100644
index 0000000..62ad58a
--- /dev/null
+++ b/src/io/SConscript
@@ -0,0 +1,38 @@
+#!/bin/false
+
+import os
+import os.path
+import glob
+
+
+Import('env')
+
+dir_path = Dir('.').abspath
+
+# Environment for base library
+io_env = env.Clone();
+
+io_env.sources = sorted(glob.glob(dir_path + "/*.cpp"))
+io_env.headers = sorted(glob.glob(dir_path + "/*.h"))
+
+env.sources += io_env.sources;
+env.headers += io_env.headers;
+
+## Shared lib
+objects_shared = []
+io_env.add_source_files(objects_shared, io_env.sources, shared=True);
+io_env.library_shared = io_env.SharedLibrary('#build/forstio-io', [objects_shared]);
+
+## Static lib
+objects_static = []
+io_env.add_source_files(objects_static, io_env.sources, shared=False);
+io_env.library_static = io_env.StaticLibrary('#build/forstio-io', [objects_static]);
+
+# Set Alias
+env.Alias('library_io', [io_env.library_shared, io_env.library_static]);
+
+env.targets += ['library_io'];
+
+# Install
+env.Install('$prefix/lib/', [io_env.library_shared, io_env.library_static]);
+env.Install('$prefix/include/forstio/io/', [io_env.headers]);
diff --git a/src/io/SConstruct b/src/io/SConstruct
new file mode 100644
index 0000000..4cccf82
--- /dev/null
+++ b/src/io/SConstruct
@@ -0,0 +1,66 @@
+#!/usr/bin/env python3
+
+import sys
+import os
+import os.path
+import glob
+import re
+
+
+if sys.version_info < (3,):
+ def isbasestring(s):
+ return isinstance(s,basestring)
+else:
+ def isbasestring(s):
+ return isinstance(s, (str,bytes))
+
+def add_kel_source_files(self, sources, filetype, lib_env=None, shared=False, target_post=""):
+
+ if isbasestring(filetype):
+ dir_path = self.Dir('.').abspath
+ filetype = sorted(glob.glob(dir_path+"/"+filetype))
+
+ for path in filetype:
+ target_name = re.sub( r'(.*?)(\.cpp|\.c\+\+)', r'\1' + target_post, path )
+ if shared:
+ target_name+='.os'
+ sources.append( self.SharedObject( target=target_name, source=path ) )
+ else:
+ target_name+='.o'
+ sources.append( self.StaticObject( target=target_name, source=path ) )
+ pass
+
+def isAbsolutePath(key, dirname, env):
+ assert os.path.isabs(dirname), "%r must have absolute path syntax" % (key,)
+
+env_vars = Variables(
+ args=ARGUMENTS
+)
+
+env_vars.Add('prefix',
+ help='Installation target location of build results and headers',
+ default='/usr/local/',
+ validator=isAbsolutePath
+)
+
+env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=[],
+ CPPDEFINES=['SAW_UNIX'],
+ CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'],
+ LIBS=['forstio-async'])
+env.__class__.add_source_files = add_kel_source_files
+env.Tool('compilation_db');
+env.cdb = env.CompilationDatabase('compile_commands.json');
+
+env.objects = [];
+env.sources = [];
+env.headers = [];
+env.targets = [];
+
+Export('env')
+SConscript('SConscript')
+
+env.Alias('cdb', env.cdb);
+env.Alias('all', [env.targets]);
+env.Default('all');
+
+env.Alias('install', '$prefix')
diff --git a/src/io/io.cpp b/src/io/io.cpp
new file mode 100644
index 0000000..f0705d2
--- /dev/null
+++ b/src/io/io.cpp
@@ -0,0 +1,70 @@
+#include "io.h"
+
+#include <cassert>
+
+namespace saw {
+
+async_io_stream::async_io_stream(own<io_stream> str)
+ : stream_{std::move(str)},
+ read_ready_{stream_->read_ready()
+ .then([this]() { read_stepper_.read_step(*stream_); })
+ .sink()},
+ write_ready_{stream_->write_ready()
+ .then([this]() { write_stepper_.write_step(*stream_); })
+ .sink()},
+ read_disconnected_{stream_->on_read_disconnected()
+ .then([this]() {
+ if (read_stepper_.on_read_disconnect) {
+ read_stepper_.on_read_disconnect->feed();
+ }
+ })
+ .sink()} {}
+
+void async_io_stream::read(void *buffer, size_t min_length, size_t max_length) {
+ SAW_ASSERT(buffer && max_length >= min_length && min_length > 0) { return; }
+
+ SAW_ASSERT(!read_stepper_.read_task.has_value()) { return; }
+
+ read_stepper_.read_task = read_task_and_step_helper::read_io_task{
+ buffer, min_length, max_length, 0};
+ read_stepper_.read_step(*stream_);
+}
+
+conveyor<size_t> async_io_stream::read_done() {
+ auto caf = new_conveyor_and_feeder<size_t>();
+ read_stepper_.read_done = std::move(caf.feeder);
+ return std::move(caf.conveyor);
+}
+
+conveyor<void> async_io_stream::on_read_disconnected() {
+ auto caf = new_conveyor_and_feeder<void>();
+ read_stepper_.on_read_disconnect = std::move(caf.feeder);
+ return std::move(caf.conveyor);
+}
+
+void async_io_stream::write(const void *buffer, size_t length) {
+ SAW_ASSERT(buffer && length > 0) { return; }
+
+ SAW_ASSERT(!write_stepper_.write_task.has_value()) { return; }
+
+ write_stepper_.write_task =
+ write_task_and_step_helper::write_io_task{buffer, length, 0};
+ write_stepper_.write_step(*stream_);
+}
+
+conveyor<size_t> async_io_stream::write_done() {
+ auto caf = new_conveyor_and_feeder<size_t>();
+ write_stepper_.write_done = std::move(caf.feeder);
+ return std::move(caf.conveyor);
+}
+
+string_network_address::string_network_address(const std::string &address,
+ uint16_t port)
+ : address_value_{address}, port_value_{port} {}
+
+const std::string &string_network_address::address() const {
+ return address_value_;
+}
+
+uint16_t string_network_address::port() const { return port_value_; }
+} // namespace saw
diff --git a/src/io/io.h b/src/io/io.h
new file mode 100644
index 0000000..bcc59fd
--- /dev/null
+++ b/src/io/io.h
@@ -0,0 +1,214 @@
+#pragma once
+
+#include <forstio/async/async.h>
+#include <forstio/core/common.h>
+#include "io_helpers.h"
+
+#include <string>
+#include <variant>
+
+namespace saw {
+/**
+ * Set of error common in io
+ */
+namespace err {
+struct disconnected {
+ static constexpr std::string_view description = "Disconnected";
+ static constexpr bool is_critical = true;
+};
+}
+/*
+ * Input stream
+ */
+class input_stream {
+public:
+ virtual ~input_stream() = default;
+
+ virtual error_or<size_t> read(void *buffer, size_t length) = 0;
+
+ virtual conveyor<void> read_ready() = 0;
+
+ virtual conveyor<void> on_read_disconnected() = 0;
+};
+
+/*
+ * Output stream
+ */
+class output_stream {
+public:
+ virtual ~output_stream() = default;
+
+ virtual error_or<size_t> write(const void *buffer, size_t length) = 0;
+
+ virtual conveyor<void> write_ready() = 0;
+};
+
+/*
+ * Io stream
+ */
+class io_stream : public input_stream, public output_stream {
+public:
+ virtual ~io_stream() = default;
+};
+
+class async_input_stream {
+public:
+ virtual ~async_input_stream() = default;
+
+ virtual void read(void *buffer, size_t min_length, size_t max_length) = 0;
+
+ virtual conveyor<size_t> read_done() = 0;
+ virtual conveyor<void> on_read_disconnected() = 0;
+};
+
+class async_output_stream {
+public:
+ virtual ~async_output_stream() = default;
+
+ virtual void write(const void *buffer, size_t length) = 0;
+
+ virtual conveyor<size_t> write_done() = 0;
+};
+
+class async_io_stream final : public async_input_stream,
+ public async_output_stream {
+private:
+ own<io_stream> stream_;
+
+ conveyor_sink read_ready_;
+ conveyor_sink write_ready_;
+ conveyor_sink read_disconnected_;
+
+ read_task_and_step_helper read_stepper_;
+ write_task_and_step_helper write_stepper_;
+
+public:
+ async_io_stream(own<io_stream> str);
+
+ SAW_FORBID_COPY(async_io_stream);
+ SAW_FORBID_MOVE(async_io_stream);
+
+ void read(void *buffer, size_t length, size_t max_length) override;
+
+ conveyor<size_t> read_done() override;
+
+ conveyor<void> on_read_disconnected() override;
+
+ void write(const void *buffer, size_t length) override;
+
+ conveyor<size_t> write_done() override;
+};
+
+class server {
+public:
+ virtual ~server() = default;
+
+ virtual conveyor<own<io_stream>> accept() = 0;
+};
+
+class network_address;
+/**
+ * Datagram class. Bound to a local address it is able to receive inbound
+ * datagram messages and send them as well as long as an address is provided as
+ * well
+ */
+class datagram {
+public:
+ virtual ~datagram() = default;
+
+ virtual error_or<size_t> read(void *buffer, size_t length) = 0;
+ virtual conveyor<void> read_ready() = 0;
+
+ virtual error_or<size_t> write(const void *buffer, size_t length,
+ network_address &dest) = 0;
+ virtual conveyor<void> write_ready() = 0;
+};
+
+class os_network_address;
+class string_network_address;
+
+class network_address {
+public:
+ using child_variant =
+ std::variant<os_network_address *, string_network_address *>;
+
+ virtual ~network_address() = default;
+
+ virtual network_address::child_variant representation() = 0;
+
+ virtual const std::string &address() const = 0;
+ virtual uint16_t port() const = 0;
+};
+
+class os_network_address : public network_address {
+public:
+ virtual ~os_network_address() = default;
+
+ network_address::child_variant representation() override { return this; }
+};
+
+class string_network_address final : public network_address {
+private:
+ std::string address_value_;
+ uint16_t port_value_;
+
+public:
+ string_network_address(const std::string &address, uint16_t port);
+
+ const std::string &address() const override;
+ uint16_t port() const override;
+
+ network_address::child_variant representation() override { return this; }
+};
+
+class network {
+public:
+ virtual ~network() = default;
+
+ /**
+ * Resolve the provided string and uint16 to the preferred storage method
+ */
+ virtual conveyor<own<network_address>>
+ resolve_address(const std::string &addr, uint16_t port_hint = 0) = 0;
+
+ /**
+ * Parse the provided string and uint16 to the preferred storage method
+ * Since no dns request is made here, no async conveyors have to be used.
+ */
+ /// @todo implement
+ // virtual Own<NetworkAddress> parseAddress(const std::string& addr,
+ // uint16_t port_hint = 0) = 0;
+
+ /**
+ * Set up a listener on this address
+ */
+ virtual own<server> listen(network_address &bind_addr) = 0;
+
+ /**
+ * Connect to a remote address
+ */
+ virtual conveyor<own<io_stream>> connect(network_address &address) = 0;
+
+ /**
+ * Bind a datagram socket at this address.
+ */
+ virtual own<datagram> datagram(network_address &address) = 0;
+};
+
+class io_provider {
+public:
+ virtual ~io_provider() = default;
+
+ virtual own<input_stream> wrap_input_fd(int fd) = 0;
+
+ virtual network &network() = 0;
+};
+
+struct async_io_context {
+ own<io_provider> io;
+ event_loop &event_loop;
+ event_port &event_port;
+};
+
+error_or<async_io_context> setup_async_io();
+} // namespace saw
diff --git a/src/io/io_helpers.cpp b/src/io/io_helpers.cpp
new file mode 100644
index 0000000..c2cf2be
--- /dev/null
+++ b/src/io/io_helpers.cpp
@@ -0,0 +1,85 @@
+#include "io_helpers.h"
+
+#include "io.h"
+
+#include <cassert>
+
+namespace saw {
+void read_task_and_step_helper::read_step(input_stream &reader) {
+ while (read_task.has_value()) {
+ read_io_task &task = *read_task;
+
+ error_or<size_t> n_err = reader.read(task.buffer, task.max_length);
+ if (n_err.is_error()) {
+ const error &error = n_err.get_error();
+ if (error.is_critical()) {
+ if (read_done) {
+ read_done->fail(error.copy_error());
+ }
+ read_task = std::nullopt;
+ }
+
+ break;
+ } else if (n_err.is_value()) {
+ size_t n = n_err.get_value();
+ if (static_cast<size_t>(n) >= task.min_length &&
+ static_cast<size_t>(n) <= task.max_length) {
+ if (read_done) {
+ read_done->feed(n + task.already_read);
+ }
+ read_task = std::nullopt;
+ } else {
+ task.buffer = static_cast<uint8_t *>(task.buffer) + n;
+ task.min_length -= static_cast<size_t>(n);
+ task.max_length -= static_cast<size_t>(n);
+ task.already_read += n;
+ }
+
+ } else {
+ if (read_done) {
+ read_done->fail(make_error<err::invalid_state>("Read failed"));
+ }
+ read_task = std::nullopt;
+ }
+ }
+}
+
+void write_task_and_step_helper::write_step(output_stream &writer) {
+ while (write_task.has_value()) {
+ write_io_task &task = *write_task;
+
+ error_or<size_t> n_err = writer.write(task.buffer, task.length);
+
+ if (n_err.is_value()) {
+
+ size_t n = n_err.get_value();
+ assert(n <= task.length);
+ if (n == task.length) {
+ if (write_done) {
+ write_done->feed(n + task.already_written);
+ }
+ write_task = std::nullopt;
+ } else {
+ task.buffer = static_cast<const uint8_t *>(task.buffer) + n;
+ task.length -= n;
+ task.already_written += n;
+ }
+ } else if (n_err.is_error()) {
+ const error &error = n_err.get_error();
+ if (error.is_critical()) {
+ if (write_done) {
+ write_done->fail(error.copy_error());
+ }
+ write_task = std::nullopt;
+ }
+ break;
+ } else {
+ if (write_done) {
+ write_done->fail(make_error<err::invalid_state>("Write failed"));
+ }
+ write_task = std::nullopt;
+ }
+ }
+}
+
+} // namespace saw
diff --git a/src/io/io_helpers.h b/src/io/io_helpers.h
new file mode 100644
index 0000000..94e37f4
--- /dev/null
+++ b/src/io/io_helpers.h
@@ -0,0 +1,53 @@
+#pragma once
+
+#include <forstio/async/async.h>
+#include <forstio/core/common.h>
+
+#include <cstdint>
+#include <optional>
+
+namespace saw {
+/*
+ * Helper classes for the specific driver implementations
+ */
+
+/*
+ * Since I don't want to repeat these implementations for tls on unix systems
+ * and gnutls doesn't let me write or read into buffers I have to have this kind
+ * of strange abstraction. This may also be reusable for windows/macOS though.
+ */
+class input_stream;
+
+class read_task_and_step_helper {
+public:
+ struct read_io_task {
+ void *buffer;
+ size_t min_length;
+ size_t max_length;
+ size_t already_read = 0;
+ };
+ std::optional<read_io_task> read_task;
+ own<conveyor_feeder<size_t>> read_done = nullptr;
+
+ own<conveyor_feeder<void>> on_read_disconnect = nullptr;
+
+public:
+ void read_step(input_stream &reader);
+};
+
+class output_stream;
+
+class write_task_and_step_helper {
+public:
+ struct write_io_task {
+ const void *buffer;
+ size_t length;
+ size_t already_written = 0;
+ };
+ std::optional<write_io_task> write_task;
+ own<conveyor_feeder<size_t>> write_done = nullptr;
+
+public:
+ void write_step(output_stream &writer);
+};
+} // namespace saw