summaryrefslogtreecommitdiff
path: root/forstio/io
diff options
context:
space:
mode:
Diffstat (limited to 'forstio/io')
-rw-r--r--forstio/io/.nix/derivation.nix32
-rw-r--r--forstio/io/SConscript38
-rw-r--r--forstio/io/SConstruct66
-rw-r--r--forstio/io/io.cpp70
-rw-r--r--forstio/io/io.h205
-rw-r--r--forstio/io/io_helpers.cpp85
-rw-r--r--forstio/io/io_helpers.h53
7 files changed, 549 insertions, 0 deletions
diff --git a/forstio/io/.nix/derivation.nix b/forstio/io/.nix/derivation.nix
new file mode 100644
index 0000000..0d213d3
--- /dev/null
+++ b/forstio/io/.nix/derivation.nix
@@ -0,0 +1,32 @@
+{ lib
+, stdenvNoCC
+, scons
+, clang
+, clang-tools
+, version
+, forstio
+}:
+
+let
+
+in stdenvNoCC.mkDerivation {
+ pname = "forstio-io";
+ inherit version;
+
+ src = ./..;
+
+ enableParallelBuilding = true;
+
+ nativeBuildInputs = [
+ scons
+ clang
+ clang-tools
+ ];
+
+ buildInputs = [
+ forstio.core
+ forstio.async
+ ];
+
+ outputs = ["out" "dev"];
+}
diff --git a/forstio/io/SConscript b/forstio/io/SConscript
new file mode 100644
index 0000000..62ad58a
--- /dev/null
+++ b/forstio/io/SConscript
@@ -0,0 +1,38 @@
+#!/bin/false
+
+import os
+import os.path
+import glob
+
+
+Import('env')
+
+dir_path = Dir('.').abspath
+
+# Environment for base library
+io_env = env.Clone();
+
+io_env.sources = sorted(glob.glob(dir_path + "/*.cpp"))
+io_env.headers = sorted(glob.glob(dir_path + "/*.h"))
+
+env.sources += io_env.sources;
+env.headers += io_env.headers;
+
+## Shared lib
+objects_shared = []
+io_env.add_source_files(objects_shared, io_env.sources, shared=True);
+io_env.library_shared = io_env.SharedLibrary('#build/forstio-io', [objects_shared]);
+
+## Static lib
+objects_static = []
+io_env.add_source_files(objects_static, io_env.sources, shared=False);
+io_env.library_static = io_env.StaticLibrary('#build/forstio-io', [objects_static]);
+
+# Set Alias
+env.Alias('library_io', [io_env.library_shared, io_env.library_static]);
+
+env.targets += ['library_io'];
+
+# Install
+env.Install('$prefix/lib/', [io_env.library_shared, io_env.library_static]);
+env.Install('$prefix/include/forstio/io/', [io_env.headers]);
diff --git a/forstio/io/SConstruct b/forstio/io/SConstruct
new file mode 100644
index 0000000..4cccf82
--- /dev/null
+++ b/forstio/io/SConstruct
@@ -0,0 +1,66 @@
+#!/usr/bin/env python3
+
+import sys
+import os
+import os.path
+import glob
+import re
+
+
+if sys.version_info < (3,):
+ def isbasestring(s):
+ return isinstance(s,basestring)
+else:
+ def isbasestring(s):
+ return isinstance(s, (str,bytes))
+
+def add_kel_source_files(self, sources, filetype, lib_env=None, shared=False, target_post=""):
+
+ if isbasestring(filetype):
+ dir_path = self.Dir('.').abspath
+ filetype = sorted(glob.glob(dir_path+"/"+filetype))
+
+ for path in filetype:
+ target_name = re.sub( r'(.*?)(\.cpp|\.c\+\+)', r'\1' + target_post, path )
+ if shared:
+ target_name+='.os'
+ sources.append( self.SharedObject( target=target_name, source=path ) )
+ else:
+ target_name+='.o'
+ sources.append( self.StaticObject( target=target_name, source=path ) )
+ pass
+
+def isAbsolutePath(key, dirname, env):
+ assert os.path.isabs(dirname), "%r must have absolute path syntax" % (key,)
+
+env_vars = Variables(
+ args=ARGUMENTS
+)
+
+env_vars.Add('prefix',
+ help='Installation target location of build results and headers',
+ default='/usr/local/',
+ validator=isAbsolutePath
+)
+
+env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=[],
+ CPPDEFINES=['SAW_UNIX'],
+ CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'],
+ LIBS=['forstio-async'])
+env.__class__.add_source_files = add_kel_source_files
+env.Tool('compilation_db');
+env.cdb = env.CompilationDatabase('compile_commands.json');
+
+env.objects = [];
+env.sources = [];
+env.headers = [];
+env.targets = [];
+
+Export('env')
+SConscript('SConscript')
+
+env.Alias('cdb', env.cdb);
+env.Alias('all', [env.targets]);
+env.Default('all');
+
+env.Alias('install', '$prefix')
diff --git a/forstio/io/io.cpp b/forstio/io/io.cpp
new file mode 100644
index 0000000..f0705d2
--- /dev/null
+++ b/forstio/io/io.cpp
@@ -0,0 +1,70 @@
+#include "io.h"
+
+#include <cassert>
+
+namespace saw {
+
+async_io_stream::async_io_stream(own<io_stream> str)
+ : stream_{std::move(str)},
+ read_ready_{stream_->read_ready()
+ .then([this]() { read_stepper_.read_step(*stream_); })
+ .sink()},
+ write_ready_{stream_->write_ready()
+ .then([this]() { write_stepper_.write_step(*stream_); })
+ .sink()},
+ read_disconnected_{stream_->on_read_disconnected()
+ .then([this]() {
+ if (read_stepper_.on_read_disconnect) {
+ read_stepper_.on_read_disconnect->feed();
+ }
+ })
+ .sink()} {}
+
+void async_io_stream::read(void *buffer, size_t min_length, size_t max_length) {
+ SAW_ASSERT(buffer && max_length >= min_length && min_length > 0) { return; }
+
+ SAW_ASSERT(!read_stepper_.read_task.has_value()) { return; }
+
+ read_stepper_.read_task = read_task_and_step_helper::read_io_task{
+ buffer, min_length, max_length, 0};
+ read_stepper_.read_step(*stream_);
+}
+
+conveyor<size_t> async_io_stream::read_done() {
+ auto caf = new_conveyor_and_feeder<size_t>();
+ read_stepper_.read_done = std::move(caf.feeder);
+ return std::move(caf.conveyor);
+}
+
+conveyor<void> async_io_stream::on_read_disconnected() {
+ auto caf = new_conveyor_and_feeder<void>();
+ read_stepper_.on_read_disconnect = std::move(caf.feeder);
+ return std::move(caf.conveyor);
+}
+
+void async_io_stream::write(const void *buffer, size_t length) {
+ SAW_ASSERT(buffer && length > 0) { return; }
+
+ SAW_ASSERT(!write_stepper_.write_task.has_value()) { return; }
+
+ write_stepper_.write_task =
+ write_task_and_step_helper::write_io_task{buffer, length, 0};
+ write_stepper_.write_step(*stream_);
+}
+
+conveyor<size_t> async_io_stream::write_done() {
+ auto caf = new_conveyor_and_feeder<size_t>();
+ write_stepper_.write_done = std::move(caf.feeder);
+ return std::move(caf.conveyor);
+}
+
+string_network_address::string_network_address(const std::string &address,
+ uint16_t port)
+ : address_value_{address}, port_value_{port} {}
+
+const std::string &string_network_address::address() const {
+ return address_value_;
+}
+
+uint16_t string_network_address::port() const { return port_value_; }
+} // namespace saw
diff --git a/forstio/io/io.h b/forstio/io/io.h
new file mode 100644
index 0000000..4a87da5
--- /dev/null
+++ b/forstio/io/io.h
@@ -0,0 +1,205 @@
+#pragma once
+
+#include <forstio/async/async.h>
+#include <forstio/core/common.h>
+#include "io_helpers.h"
+
+#include <string>
+#include <variant>
+
+namespace saw {
+/*
+ * Input stream
+ */
+class input_stream {
+public:
+ virtual ~input_stream() = default;
+
+ virtual error_or<size_t> read(void *buffer, size_t length) = 0;
+
+ virtual conveyor<void> read_ready() = 0;
+
+ virtual conveyor<void> on_read_disconnected() = 0;
+};
+
+/*
+ * Output stream
+ */
+class output_stream {
+public:
+ virtual ~output_stream() = default;
+
+ virtual error_or<size_t> write(const void *buffer, size_t length) = 0;
+
+ virtual conveyor<void> write_ready() = 0;
+};
+
+/*
+ * Io stream
+ */
+class io_stream : public input_stream, public output_stream {
+public:
+ virtual ~io_stream() = default;
+};
+
+class async_input_stream {
+public:
+ virtual ~async_input_stream() = default;
+
+ virtual void read(void *buffer, size_t min_length, size_t max_length) = 0;
+
+ virtual conveyor<size_t> read_done() = 0;
+ virtual conveyor<void> on_read_disconnected() = 0;
+};
+
+class async_output_stream {
+public:
+ virtual ~async_output_stream() = default;
+
+ virtual void write(const void *buffer, size_t length) = 0;
+
+ virtual conveyor<size_t> write_done() = 0;
+};
+
+class async_io_stream final : public async_input_stream,
+ public async_output_stream {
+private:
+ own<io_stream> stream_;
+
+ conveyor_sink read_ready_;
+ conveyor_sink write_ready_;
+ conveyor_sink read_disconnected_;
+
+ read_task_and_step_helper read_stepper_;
+ write_task_and_step_helper write_stepper_;
+
+public:
+ async_io_stream(own<io_stream> str);
+
+ SAW_FORBID_COPY(async_io_stream);
+ SAW_FORBID_MOVE(async_io_stream);
+
+ void read(void *buffer, size_t length, size_t max_length) override;
+
+ conveyor<size_t> read_done() override;
+
+ conveyor<void> on_read_disconnected() override;
+
+ void write(const void *buffer, size_t length) override;
+
+ conveyor<size_t> write_done() override;
+};
+
+class server {
+public:
+ virtual ~server() = default;
+
+ virtual conveyor<own<io_stream>> accept() = 0;
+};
+
+class network_address;
+/**
+ * Datagram class. Bound to a local address it is able to receive inbound
+ * datagram messages and send them as well as long as an address is provided as
+ * well
+ */
+class datagram {
+public:
+ virtual ~datagram() = default;
+
+ virtual error_or<size_t> read(void *buffer, size_t length) = 0;
+ virtual conveyor<void> read_ready() = 0;
+
+ virtual error_or<size_t> write(const void *buffer, size_t length,
+ network_address &dest) = 0;
+ virtual conveyor<void> write_ready() = 0;
+};
+
+class os_network_address;
+class string_network_address;
+
+class network_address {
+public:
+ using child_variant =
+ std::variant<os_network_address *, string_network_address *>;
+
+ virtual ~network_address() = default;
+
+ virtual network_address::child_variant representation() = 0;
+
+ virtual const std::string &address() const = 0;
+ virtual uint16_t port() const = 0;
+};
+
+class os_network_address : public network_address {
+public:
+ virtual ~os_network_address() = default;
+
+ network_address::child_variant representation() override { return this; }
+};
+
+class string_network_address final : public network_address {
+private:
+ std::string address_value_;
+ uint16_t port_value_;
+
+public:
+ string_network_address(const std::string &address, uint16_t port);
+
+ const std::string &address() const override;
+ uint16_t port() const override;
+
+ network_address::child_variant representation() override { return this; }
+};
+
+class network {
+public:
+ virtual ~network() = default;
+
+ /**
+ * Resolve the provided string and uint16 to the preferred storage method
+ */
+ virtual conveyor<own<network_address>>
+ resolve_address(const std::string &addr, uint16_t port_hint = 0) = 0;
+
+ /**
+ * Parse the provided string and uint16 to the preferred storage method
+ * Since no dns request is made here, no async conveyors have to be used.
+ */
+ /// @todo implement
+ // virtual Own<NetworkAddress> parseAddress(const std::string& addr,
+ // uint16_t port_hint = 0) = 0;
+
+ /**
+ * Set up a listener on this address
+ */
+ virtual own<server> listen(network_address &bind_addr) = 0;
+
+ /**
+ * Connect to a remote address
+ */
+ virtual conveyor<own<io_stream>> connect(network_address &address) = 0;
+
+ /**
+ * Bind a datagram socket at this address.
+ */
+ virtual own<datagram> datagram(network_address &address) = 0;
+};
+
+class io_provider {
+public:
+ virtual ~io_provider() = default;
+
+ virtual own<input_stream> wrap_input_fd(int fd) = 0;
+
+ virtual network &network() = 0;
+};
+
+struct async_io_context {
+ own<io_provider> io;
+ event_loop &event_loop;
+ event_port &event_port;
+};
+
+error_or<async_io_context> setup_async_io();
+} // namespace saw
diff --git a/forstio/io/io_helpers.cpp b/forstio/io/io_helpers.cpp
new file mode 100644
index 0000000..47c5017
--- /dev/null
+++ b/forstio/io/io_helpers.cpp
@@ -0,0 +1,85 @@
+#include "io_helpers.h"
+
+#include "io.h"
+
+#include <cassert>
+
+namespace saw {
+void read_task_and_step_helper::read_step(input_stream &reader) {
+ while (read_task.has_value()) {
+ read_io_task &task = *read_task;
+
+ error_or<size_t> n_err = reader.read(task.buffer, task.max_length);
+ if (n_err.is_error()) {
+ const error &error = n_err.error();
+ if (error.is_critical()) {
+ if (read_done) {
+ read_done->fail(error.copy_error());
+ }
+ read_task = std::nullopt;
+ }
+
+ break;
+ } else if (n_err.is_value()) {
+ size_t n = n_err.value();
+ if (static_cast<size_t>(n) >= task.min_length &&
+ static_cast<size_t>(n) <= task.max_length) {
+ if (read_done) {
+ read_done->feed(n + task.already_read);
+ }
+ read_task = std::nullopt;
+ } else {
+ task.buffer = static_cast<uint8_t *>(task.buffer) + n;
+ task.min_length -= static_cast<size_t>(n);
+ task.max_length -= static_cast<size_t>(n);
+ task.already_read += n;
+ }
+
+ } else {
+ if (read_done) {
+ read_done->fail(critical_error("Read failed"));
+ }
+ read_task = std::nullopt;
+ }
+ }
+}
+
+void write_task_and_step_helper::write_step(output_stream &writer) {
+ while (write_task.has_value()) {
+ write_io_task &task = *write_task;
+
+ error_or<size_t> n_err = writer.write(task.buffer, task.length);
+
+ if (n_err.is_value()) {
+
+ size_t n = n_err.value();
+ assert(n <= task.length);
+ if (n == task.length) {
+ if (write_done) {
+ write_done->feed(n + task.already_written);
+ }
+ write_task = std::nullopt;
+ } else {
+ task.buffer = static_cast<const uint8_t *>(task.buffer) + n;
+ task.length -= n;
+ task.already_written += n;
+ }
+ } else if (n_err.is_error()) {
+ const error &error = n_err.error();
+ if (error.is_critical()) {
+ if (write_done) {
+ write_done->fail(error.copy_error());
+ }
+ write_task = std::nullopt;
+ }
+ break;
+ } else {
+ if (write_done) {
+ write_done->fail(critical_error("Write failed"));
+ }
+ write_task = std::nullopt;
+ }
+ }
+}
+
+} // namespace saw
diff --git a/forstio/io/io_helpers.h b/forstio/io/io_helpers.h
new file mode 100644
index 0000000..94e37f4
--- /dev/null
+++ b/forstio/io/io_helpers.h
@@ -0,0 +1,53 @@
+#pragma once
+
+#include <forstio/async/async.h>
+#include <forstio/core/common.h>
+
+#include <cstdint>
+#include <optional>
+
+namespace saw {
+/*
+ * Helper classes for the specific driver implementations
+ */
+
+/*
+ * Since I don't want to repeat these implementations for tls on unix systems
+ * and gnutls doesn't let me write or read into buffers I have to have this kind
+ * of strange abstraction. This may also be reusable for windows/macOS though.
+ */
+class input_stream;
+
+class read_task_and_step_helper {
+public:
+ struct read_io_task {
+ void *buffer;
+ size_t min_length;
+ size_t max_length;
+ size_t already_read = 0;
+ };
+ std::optional<read_io_task> read_task;
+ own<conveyor_feeder<size_t>> read_done = nullptr;
+
+ own<conveyor_feeder<void>> on_read_disconnect = nullptr;
+
+public:
+ void read_step(input_stream &reader);
+};
+
+class output_stream;
+
+class write_task_and_step_helper {
+public:
+ struct write_io_task {
+ const void *buffer;
+ size_t length;
+ size_t already_written = 0;
+ };
+ std::optional<write_io_task> write_task;
+ own<conveyor_feeder<size_t>> write_done = nullptr;
+
+public:
+ void write_step(output_stream &writer);
+};
+} // namespace saw