From f07487ce8f0f3ebd5c4d1082a9521f09588fa34a Mon Sep 17 00:00:00 2001 From: Claudius Holeksa Date: Sat, 29 Apr 2023 18:44:59 +0200 Subject: Added io to new repo --- default.nix | 7 ++ forstio/io/.nix/derivation.nix | 32 +++++++ forstio/io/SConscript | 38 ++++++++ forstio/io/SConstruct | 66 +++++++++++++ forstio/io/io.cpp | 70 ++++++++++++++ forstio/io/io.h | 205 +++++++++++++++++++++++++++++++++++++++++ forstio/io/io_helpers.cpp | 85 +++++++++++++++++ forstio/io/io_helpers.h | 53 +++++++++++ 8 files changed, 556 insertions(+) create mode 100644 forstio/io/.nix/derivation.nix create mode 100644 forstio/io/SConscript create mode 100644 forstio/io/SConstruct create mode 100644 forstio/io/io.cpp create mode 100644 forstio/io/io.h create mode 100644 forstio/io/io_helpers.cpp create mode 100644 forstio/io/io_helpers.h diff --git a/default.nix b/default.nix index 8311373..334b692 100644 --- a/default.nix +++ b/default.nix @@ -24,5 +24,12 @@ in rec { clang = pkgs.clang_15; clang-tools = pkgs.clang-tools_15; }; + + io = pkgs.callPackage forstio/io/.nix/derivation.nix { + inherit version; + inherit forstio; + clang = pkgs.clang_15; + clang-tools = pkgs.clang-tools_15; + }; }; } diff --git a/forstio/io/.nix/derivation.nix b/forstio/io/.nix/derivation.nix new file mode 100644 index 0000000..0d213d3 --- /dev/null +++ b/forstio/io/.nix/derivation.nix @@ -0,0 +1,32 @@ +{ lib +, stdenvNoCC +, scons +, clang +, clang-tools +, version +, forstio +}: + +let + +in stdenvNoCC.mkDerivation { + pname = "forstio-io"; + inherit version; + + src = ./..; + + enableParallelBuilding = true; + + nativeBuildInputs = [ + scons + clang + clang-tools + ]; + + buildInputs = [ + forstio.core + forstio.async + ]; + + outputs = ["out" "dev"]; +} diff --git a/forstio/io/SConscript b/forstio/io/SConscript new file mode 100644 index 0000000..62ad58a --- /dev/null +++ b/forstio/io/SConscript @@ -0,0 +1,38 @@ +#!/bin/false + +import os +import os.path +import glob + + +Import('env') + +dir_path = Dir('.').abspath + +# Environment for base library +io_env = env.Clone(); + +io_env.sources = sorted(glob.glob(dir_path + "/*.cpp")) +io_env.headers = sorted(glob.glob(dir_path + "/*.h")) + +env.sources += io_env.sources; +env.headers += io_env.headers; + +## Shared lib +objects_shared = [] +io_env.add_source_files(objects_shared, io_env.sources, shared=True); +io_env.library_shared = io_env.SharedLibrary('#build/forstio-io', [objects_shared]); + +## Static lib +objects_static = [] +io_env.add_source_files(objects_static, io_env.sources, shared=False); +io_env.library_static = io_env.StaticLibrary('#build/forstio-io', [objects_static]); + +# Set Alias +env.Alias('library_io', [io_env.library_shared, io_env.library_static]); + +env.targets += ['library_io']; + +# Install +env.Install('$prefix/lib/', [io_env.library_shared, io_env.library_static]); +env.Install('$prefix/include/forstio/io/', [io_env.headers]); diff --git a/forstio/io/SConstruct b/forstio/io/SConstruct new file mode 100644 index 0000000..4cccf82 --- /dev/null +++ b/forstio/io/SConstruct @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 + +import sys +import os +import os.path +import glob +import re + + +if sys.version_info < (3,): + def isbasestring(s): + return isinstance(s,basestring) +else: + def isbasestring(s): + return isinstance(s, (str,bytes)) + +def add_kel_source_files(self, sources, filetype, lib_env=None, shared=False, target_post=""): + + if isbasestring(filetype): + dir_path = self.Dir('.').abspath + filetype = sorted(glob.glob(dir_path+"/"+filetype)) + + for path in filetype: + target_name = re.sub( r'(.*?)(\.cpp|\.c\+\+)', r'\1' + target_post, path ) + if shared: + target_name+='.os' + sources.append( self.SharedObject( target=target_name, source=path ) ) + else: + target_name+='.o' + sources.append( self.StaticObject( target=target_name, source=path ) ) + pass + +def isAbsolutePath(key, dirname, env): + assert os.path.isabs(dirname), "%r must have absolute path syntax" % (key,) + +env_vars = Variables( + args=ARGUMENTS +) + +env_vars.Add('prefix', + help='Installation target location of build results and headers', + default='/usr/local/', + validator=isAbsolutePath +) + +env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=[], + CPPDEFINES=['SAW_UNIX'], + CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'], + LIBS=['forstio-async']) +env.__class__.add_source_files = add_kel_source_files +env.Tool('compilation_db'); +env.cdb = env.CompilationDatabase('compile_commands.json'); + +env.objects = []; +env.sources = []; +env.headers = []; +env.targets = []; + +Export('env') +SConscript('SConscript') + +env.Alias('cdb', env.cdb); +env.Alias('all', [env.targets]); +env.Default('all'); + +env.Alias('install', '$prefix') diff --git a/forstio/io/io.cpp b/forstio/io/io.cpp new file mode 100644 index 0000000..f0705d2 --- /dev/null +++ b/forstio/io/io.cpp @@ -0,0 +1,70 @@ +#include "io.h" + +#include + +namespace saw { + +async_io_stream::async_io_stream(own str) + : stream_{std::move(str)}, + read_ready_{stream_->read_ready() + .then([this]() { read_stepper_.read_step(*stream_); }) + .sink()}, + write_ready_{stream_->write_ready() + .then([this]() { write_stepper_.write_step(*stream_); }) + .sink()}, + read_disconnected_{stream_->on_read_disconnected() + .then([this]() { + if (read_stepper_.on_read_disconnect) { + read_stepper_.on_read_disconnect->feed(); + } + }) + .sink()} {} + +void async_io_stream::read(void *buffer, size_t min_length, size_t max_length) { + SAW_ASSERT(buffer && max_length >= min_length && min_length > 0) { return; } + + SAW_ASSERT(!read_stepper_.read_task.has_value()) { return; } + + read_stepper_.read_task = read_task_and_step_helper::read_io_task{ + buffer, min_length, max_length, 0}; + read_stepper_.read_step(*stream_); +} + +conveyor async_io_stream::read_done() { + auto caf = new_conveyor_and_feeder(); + read_stepper_.read_done = std::move(caf.feeder); + return std::move(caf.conveyor); +} + +conveyor async_io_stream::on_read_disconnected() { + auto caf = new_conveyor_and_feeder(); + read_stepper_.on_read_disconnect = std::move(caf.feeder); + return std::move(caf.conveyor); +} + +void async_io_stream::write(const void *buffer, size_t length) { + SAW_ASSERT(buffer && length > 0) { return; } + + SAW_ASSERT(!write_stepper_.write_task.has_value()) { return; } + + write_stepper_.write_task = + write_task_and_step_helper::write_io_task{buffer, length, 0}; + write_stepper_.write_step(*stream_); +} + +conveyor async_io_stream::write_done() { + auto caf = new_conveyor_and_feeder(); + write_stepper_.write_done = std::move(caf.feeder); + return std::move(caf.conveyor); +} + +string_network_address::string_network_address(const std::string &address, + uint16_t port) + : address_value_{address}, port_value_{port} {} + +const std::string &string_network_address::address() const { + return address_value_; +} + +uint16_t string_network_address::port() const { return port_value_; } +} // namespace saw diff --git a/forstio/io/io.h b/forstio/io/io.h new file mode 100644 index 0000000..4a87da5 --- /dev/null +++ b/forstio/io/io.h @@ -0,0 +1,205 @@ +#pragma once + +#include +#include +#include "io_helpers.h" + +#include +#include + +namespace saw { +/* + * Input stream + */ +class input_stream { +public: + virtual ~input_stream() = default; + + virtual error_or read(void *buffer, size_t length) = 0; + + virtual conveyor read_ready() = 0; + + virtual conveyor on_read_disconnected() = 0; +}; + +/* + * Output stream + */ +class output_stream { +public: + virtual ~output_stream() = default; + + virtual error_or write(const void *buffer, size_t length) = 0; + + virtual conveyor write_ready() = 0; +}; + +/* + * Io stream + */ +class io_stream : public input_stream, public output_stream { +public: + virtual ~io_stream() = default; +}; + +class async_input_stream { +public: + virtual ~async_input_stream() = default; + + virtual void read(void *buffer, size_t min_length, size_t max_length) = 0; + + virtual conveyor read_done() = 0; + virtual conveyor on_read_disconnected() = 0; +}; + +class async_output_stream { +public: + virtual ~async_output_stream() = default; + + virtual void write(const void *buffer, size_t length) = 0; + + virtual conveyor write_done() = 0; +}; + +class async_io_stream final : public async_input_stream, + public async_output_stream { +private: + own stream_; + + conveyor_sink read_ready_; + conveyor_sink write_ready_; + conveyor_sink read_disconnected_; + + read_task_and_step_helper read_stepper_; + write_task_and_step_helper write_stepper_; + +public: + async_io_stream(own str); + + SAW_FORBID_COPY(async_io_stream); + SAW_FORBID_MOVE(async_io_stream); + + void read(void *buffer, size_t length, size_t max_length) override; + + conveyor read_done() override; + + conveyor on_read_disconnected() override; + + void write(const void *buffer, size_t length) override; + + conveyor write_done() override; +}; + +class server { +public: + virtual ~server() = default; + + virtual conveyor> accept() = 0; +}; + +class network_address; +/** + * Datagram class. Bound to a local address it is able to receive inbound + * datagram messages and send them as well as long as an address is provided as + * well + */ +class datagram { +public: + virtual ~datagram() = default; + + virtual error_or read(void *buffer, size_t length) = 0; + virtual conveyor read_ready() = 0; + + virtual error_or write(const void *buffer, size_t length, + network_address &dest) = 0; + virtual conveyor write_ready() = 0; +}; + +class os_network_address; +class string_network_address; + +class network_address { +public: + using child_variant = + std::variant; + + virtual ~network_address() = default; + + virtual network_address::child_variant representation() = 0; + + virtual const std::string &address() const = 0; + virtual uint16_t port() const = 0; +}; + +class os_network_address : public network_address { +public: + virtual ~os_network_address() = default; + + network_address::child_variant representation() override { return this; } +}; + +class string_network_address final : public network_address { +private: + std::string address_value_; + uint16_t port_value_; + +public: + string_network_address(const std::string &address, uint16_t port); + + const std::string &address() const override; + uint16_t port() const override; + + network_address::child_variant representation() override { return this; } +}; + +class network { +public: + virtual ~network() = default; + + /** + * Resolve the provided string and uint16 to the preferred storage method + */ + virtual conveyor> + resolve_address(const std::string &addr, uint16_t port_hint = 0) = 0; + + /** + * Parse the provided string and uint16 to the preferred storage method + * Since no dns request is made here, no async conveyors have to be used. + */ + /// @todo implement + // virtual Own parseAddress(const std::string& addr, + // uint16_t port_hint = 0) = 0; + + /** + * Set up a listener on this address + */ + virtual own listen(network_address &bind_addr) = 0; + + /** + * Connect to a remote address + */ + virtual conveyor> connect(network_address &address) = 0; + + /** + * Bind a datagram socket at this address. + */ + virtual own datagram(network_address &address) = 0; +}; + +class io_provider { +public: + virtual ~io_provider() = default; + + virtual own wrap_input_fd(int fd) = 0; + + virtual network &network() = 0; +}; + +struct async_io_context { + own io; + event_loop &event_loop; + event_port &event_port; +}; + +error_or setup_async_io(); +} // namespace saw diff --git a/forstio/io/io_helpers.cpp b/forstio/io/io_helpers.cpp new file mode 100644 index 0000000..47c5017 --- /dev/null +++ b/forstio/io/io_helpers.cpp @@ -0,0 +1,85 @@ +#include "io_helpers.h" + +#include "io.h" + +#include + +namespace saw { +void read_task_and_step_helper::read_step(input_stream &reader) { + while (read_task.has_value()) { + read_io_task &task = *read_task; + + error_or n_err = reader.read(task.buffer, task.max_length); + if (n_err.is_error()) { + const error &error = n_err.error(); + if (error.is_critical()) { + if (read_done) { + read_done->fail(error.copy_error()); + } + read_task = std::nullopt; + } + + break; + } else if (n_err.is_value()) { + size_t n = n_err.value(); + if (static_cast(n) >= task.min_length && + static_cast(n) <= task.max_length) { + if (read_done) { + read_done->feed(n + task.already_read); + } + read_task = std::nullopt; + } else { + task.buffer = static_cast(task.buffer) + n; + task.min_length -= static_cast(n); + task.max_length -= static_cast(n); + task.already_read += n; + } + + } else { + if (read_done) { + read_done->fail(critical_error("Read failed")); + } + read_task = std::nullopt; + } + } +} + +void write_task_and_step_helper::write_step(output_stream &writer) { + while (write_task.has_value()) { + write_io_task &task = *write_task; + + error_or n_err = writer.write(task.buffer, task.length); + + if (n_err.is_value()) { + + size_t n = n_err.value(); + assert(n <= task.length); + if (n == task.length) { + if (write_done) { + write_done->feed(n + task.already_written); + } + write_task = std::nullopt; + } else { + task.buffer = static_cast(task.buffer) + n; + task.length -= n; + task.already_written += n; + } + } else if (n_err.is_error()) { + const error &error = n_err.error(); + if (error.is_critical()) { + if (write_done) { + write_done->fail(error.copy_error()); + } + write_task = std::nullopt; + } + break; + } else { + if (write_done) { + write_done->fail(critical_error("Write failed")); + } + write_task = std::nullopt; + } + } +} + +} // namespace saw diff --git a/forstio/io/io_helpers.h b/forstio/io/io_helpers.h new file mode 100644 index 0000000..94e37f4 --- /dev/null +++ b/forstio/io/io_helpers.h @@ -0,0 +1,53 @@ +#pragma once + +#include +#include + +#include +#include + +namespace saw { +/* + * Helper classes for the specific driver implementations + */ + +/* + * Since I don't want to repeat these implementations for tls on unix systems + * and gnutls doesn't let me write or read into buffers I have to have this kind + * of strange abstraction. This may also be reusable for windows/macOS though. + */ +class input_stream; + +class read_task_and_step_helper { +public: + struct read_io_task { + void *buffer; + size_t min_length; + size_t max_length; + size_t already_read = 0; + }; + std::optional read_task; + own> read_done = nullptr; + + own> on_read_disconnect = nullptr; + +public: + void read_step(input_stream &reader); +}; + +class output_stream; + +class write_task_and_step_helper { +public: + struct write_io_task { + const void *buffer; + size_t length; + size_t already_written = 0; + }; + std::optional write_task; + own> write_done = nullptr; + +public: + void write_step(output_stream &writer); +}; +} // namespace saw -- cgit v1.2.3