summaryrefslogtreecommitdiff
path: root/modules/io_codec
diff options
context:
space:
mode:
Diffstat (limited to 'modules/io_codec')
-rw-r--r--modules/io_codec/.nix/derivation.nix31
-rw-r--r--modules/io_codec/SConscript38
-rw-r--r--modules/io_codec/SConstruct66
-rw-r--r--modules/io_codec/io_peer.h104
-rw-r--r--modules/io_codec/io_peer.tmpl.h117
-rw-r--r--modules/io_codec/rpc.h31
6 files changed, 387 insertions, 0 deletions
diff --git a/modules/io_codec/.nix/derivation.nix b/modules/io_codec/.nix/derivation.nix
new file mode 100644
index 0000000..7cd55a8
--- /dev/null
+++ b/modules/io_codec/.nix/derivation.nix
@@ -0,0 +1,31 @@
+{ lib
+, stdenv
+, scons
+, clang-tools
+, version
+, forstio
+}:
+
+let
+
+in stdenv.mkDerivation {
+ pname = "forstio-io_codec";
+ inherit version;
+ src = ./..;
+
+ enableParallelBuilding = true;
+
+ nativeBuildInputs = [
+ scons
+ clang-tools
+ ];
+
+ buildInputs = [
+ forstio.core
+ forstio.async
+ forstio.io
+ forstio.codec
+ ];
+
+ outputs = ["out" "dev"];
+}
diff --git a/modules/io_codec/SConscript b/modules/io_codec/SConscript
new file mode 100644
index 0000000..0afd6d6
--- /dev/null
+++ b/modules/io_codec/SConscript
@@ -0,0 +1,38 @@
+#!/bin/false
+
+import os
+import os.path
+import glob
+
+
+Import('env')
+
+dir_path = Dir('.').abspath
+
+# Environment for base library
+io_env = env.Clone();
+
+io_env.sources = sorted(glob.glob(dir_path + "/*.cpp"))
+io_env.headers = sorted(glob.glob(dir_path + "/*.h"))
+
+env.sources += io_env.sources;
+env.headers += io_env.headers;
+
+## Shared lib
+objects_shared = []
+io_env.add_source_files(objects_shared, io_env.sources, shared=True);
+io_env.library_shared = io_env.SharedLibrary('#build/forstio-io_codec', [objects_shared]);
+
+## Static lib
+objects_static = []
+io_env.add_source_files(objects_static, io_env.sources, shared=False);
+io_env.library_static = io_env.StaticLibrary('#build/forstio-io_codec', [objects_static]);
+
+# Set Alias
+env.Alias('library_io_codec', [io_env.library_shared, io_env.library_static]);
+
+env.targets += ['library_io_codec'];
+
+# Install
+env.Install('$prefix/lib/', [io_env.library_shared, io_env.library_static]);
+env.Install('$prefix/include/forstio/io_codec/', [io_env.headers]);
diff --git a/modules/io_codec/SConstruct b/modules/io_codec/SConstruct
new file mode 100644
index 0000000..4e6e150
--- /dev/null
+++ b/modules/io_codec/SConstruct
@@ -0,0 +1,66 @@
+#!/usr/bin/env python3
+
+import sys
+import os
+import os.path
+import glob
+import re
+
+
+if sys.version_info < (3,):
+ def isbasestring(s):
+ return isinstance(s,basestring)
+else:
+ def isbasestring(s):
+ return isinstance(s, (str,bytes))
+
+def add_kel_source_files(self, sources, filetype, lib_env=None, shared=False, target_post=""):
+
+ if isbasestring(filetype):
+ dir_path = self.Dir('.').abspath
+ filetype = sorted(glob.glob(dir_path+"/"+filetype))
+
+ for path in filetype:
+ target_name = re.sub( r'(.*?)(\.cpp|\.c\+\+)', r'\1' + target_post, path )
+ if shared:
+ target_name+='.os'
+ sources.append( self.SharedObject( target=target_name, source=path ) )
+ else:
+ target_name+='.o'
+ sources.append( self.StaticObject( target=target_name, source=path ) )
+ pass
+
+def isAbsolutePath(key, dirname, env):
+ assert os.path.isabs(dirname), "%r must have absolute path syntax" % (key,)
+
+env_vars = Variables(
+ args=ARGUMENTS
+)
+
+env_vars.Add('prefix',
+ help='Installation target location of build results and headers',
+ default='/usr/local/',
+ validator=isAbsolutePath
+)
+
+env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=[],
+ CPPDEFINES=['SAW_UNIX'],
+ CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'],
+ LIBS=['forstio-io'])
+env.__class__.add_source_files = add_kel_source_files
+env.Tool('compilation_db');
+env.cdb = env.CompilationDatabase('compile_commands.json');
+
+env.objects = [];
+env.sources = [];
+env.headers = [];
+env.targets = [];
+
+Export('env')
+SConscript('SConscript')
+
+env.Alias('cdb', env.cdb);
+env.Alias('all', [env.targets]);
+env.Default('all');
+
+env.Alias('install', '$prefix')
diff --git a/modules/io_codec/io_peer.h b/modules/io_codec/io_peer.h
new file mode 100644
index 0000000..b9a4b34
--- /dev/null
+++ b/modules/io_codec/io_peer.h
@@ -0,0 +1,104 @@
+#pragma once
+
+#include <forstio/async/async.h>
+#include <forstio/buffer.h>
+#include <forsto/io/io.h>
+#include <forstio/schema/message.h>
+
+namespace saw {
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer = message_container<Incoming>,
+ typename OutContainer = message_container<Outgoing>,
+ typename BufferT = ring_buffer>
+class streaming_io_peer {
+public:
+ /**
+ *
+ */
+ streaming_io_peer(
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
+ own<async_io_stream> stream, Codec codec, BufferT in, BufferT out);
+ /**
+ *
+ */
+ streaming_io_peer(
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
+ own<async_io_stream> stream);
+
+ /**
+ * Deleted copy and move constructors
+ */
+ SAW_FORBID_COPY(streaming_io_peer);
+ SAW_FORBID_MOVE(streaming_io_peer);
+
+ /**
+ * Send a message to the remote peer
+ */
+ error send(heap_message_root<Outgoing, OutContainer> builder);
+
+ /**
+ * A phantom conveyor feeder. Meant for interfacing with other components
+ */
+ conveyor_feeder<heap_message_root<Outgoing, OutContainer>> &feeder();
+
+ conveyor<void> on_read_disconnected();
+
+private:
+ /// @unimplemented
+ class peer_conveyor_feeder final
+ : public conveyor_feeder<heap_message_root<Outgoing, OutContainer>> {
+ public:
+ peer_conveyor_feeder(
+ streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
+ OutContainer, BufferT> &peer_)
+ : peer_{peer_} {}
+
+ void feed(heap_message_root<Outgoing, OutContainer> &&data) override {
+ (void)data;
+ }
+
+ void fail(error &&error) override { (void)error; }
+
+ size_t space() const override { return 0; }
+
+ size_t queued() const override { return 0; }
+
+ private:
+ streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT> &peer_;
+ };
+
+private:
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>>
+ incoming_feeder_ = nullptr;
+
+ own<async_io_stream> io_stream_;
+
+ Codec codec_;
+
+ BufferT in_buffer_;
+ BufferT out_buffer_;
+
+ conveyor_sink sink_read_;
+ conveyor_sink sink_write_;
+
+ peer_conveyor_feeder conveyor_feeder_;
+};
+
+/**
+ * Setup new streaming io peer with the provided network protocols.
+ * This is a convenience wrapper intended for a faster setup of this class
+ */
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer = message_container<Incoming>,
+ typename OutContainer = message_container<Outgoing>,
+ typename BufferT = ring_buffer>
+std::pair<own<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
+ OutContainer, BufferT>>,
+ conveyor<heap_message_root<Incoming, InContainer>>>
+new_streaming_io_peer(own<async_io_stream> stream);
+
+} // namespace saw
+
+#include "io_peer.tmpl.h"
diff --git a/modules/io_codec/io_peer.tmpl.h b/modules/io_codec/io_peer.tmpl.h
new file mode 100644
index 0000000..880a58a
--- /dev/null
+++ b/modules/io_codec/io_peer.tmpl.h
@@ -0,0 +1,117 @@
+namespace saw {
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer, typename OutContainer, typename BufferT>
+streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT>::
+ streaming_io_peer(
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
+ own<async_io_stream> str)
+ : streaming_io_peer{std::move(feed), std::move(str), {}, {}, {}} {}
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer, typename OutContainer, typename BufferT>
+streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT>::
+ streaming_io_peer(
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
+ own<async_io_stream> stream, Codec codec, BufferT in, BufferT out)
+ : incoming_feeder_{std::move(feed)},
+ io_stream_{std::move(stream)}, codec_{std::move(codec)},
+ in_buffer_{std::move(in)}, out_buffer_{std::move(out)},
+ sink_read_{
+ io_stream_->read_done()
+ .then([this](size_t bytes) -> error_or<void> {
+ in_buffer_.write_advance(bytes);
+
+ if (in_buffer_.write_segment_length() == 0) {
+ return critical_error("Message too long");
+ }
+
+ io_stream_->read(&in_buffer_.write(), 1,
+ in_buffer_.write_segment_length());
+
+ while (true) {
+ auto root = heap_message_root<Incoming, InContainer>();
+ auto builder = root.build();
+
+ error err = codec_.template decode<Incoming, InContainer>(
+ builder, in_buffer_);
+ if (err.is_critical()) {
+ return err;
+ }
+
+ if (!err.failed()) {
+ incoming_feeder_->feed(std::move(root));
+ } else {
+ break;
+ }
+ }
+
+ return void_t{};
+ })
+ .sink([this](error err) {
+ incoming_feeder_->fail(err.copy_error());
+
+ return err;
+ })},
+ sink_write_{io_stream_->write_done()
+ .then([this](size_t bytes) -> error_or<void> {
+ out_buffer_.read_advance(bytes);
+ if (out_buffer_.readCompositeLength() > 0) {
+ io_stream_->write(
+ &out_buffer_.read(),
+ out_buffer_.read_segment_length());
+ }
+
+ return void_t{};
+ })
+ .sink()} {
+ io_stream_->read(&in_buffer_.write(), 1, in_buffer_.write_segment_length());
+}
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer, typename OutContainer, typename BufferT>
+error streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT>::send(heap_message_root<Outgoing, OutContainer>
+ msg) {
+ bool restart_write = out_buffer_.read_segment_length() == 0;
+
+ error err =
+ codec_.template encode<Outgoing, OutContainer>(msg.read(), out_buffer_);
+ if (err.failed()) {
+ return err;
+ }
+
+ if (restart_write) {
+ io_stream_->write(&out_buffer_.read(),
+ out_buffer_.read_segment_length());
+ }
+
+ return no_error();
+}
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer, typename OutContainer, typename BufferT>
+conveyor<void>
+streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT>::on_read_disconnected() {
+ return io_stream_->on_read_disconnected();
+}
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer, typename OutContainer, typename BufferT>
+std::pair<own<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
+ OutContainer, BufferT>>,
+ conveyor<heap_message_root<Incoming, InContainer>>>
+newstreaming_io_peer(own<async_io_stream> stream) {
+ auto caf =
+ new_conveyor_and_feeder<heap_message_root<Incoming, InContainer>>();
+
+ return {heap<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
+ OutContainer, BufferT>>(
+ std::move(caf.feeder), std::move(stream)),
+ std::move(caf.conveyor)};
+}
+
+} // namespace saw
diff --git a/modules/io_codec/rpc.h b/modules/io_codec/rpc.h
new file mode 100644
index 0000000..020bf96
--- /dev/null
+++ b/modules/io_codec/rpc.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include <forstio/codec/rpc.h>
+
+namespace saw {
+namespace rmt {
+struct Network {};
+}
+
+template<>
+class remote<rmt::Network> {
+private:
+ std::string addr_str_;
+public:
+ remote(std::string addr_str);
+
+ template<typename Interface>
+ conveyor<rpc_client<rmt::Network, Interface>> create_client();
+};
+
+template<template Interface>
+class rpc_client<rmt::Network, Interface> {
+private:
+ own<io_stream> stream_;
+public:
+ rpc_client(own<io_stream> stream);
+
+ template<typename T>
+ remote_result<typename response<T>::type> call(typename request<T>::type req);
+};
+}