diff options
Diffstat (limited to 'src/io_codec')
-rw-r--r-- | src/io_codec/SConscript | 38 | ||||
-rw-r--r-- | src/io_codec/SConstruct | 66 | ||||
-rw-r--r-- | src/io_codec/io_peer.h | 104 | ||||
-rw-r--r-- | src/io_codec/io_peer.tmpl.h | 117 |
4 files changed, 325 insertions, 0 deletions
diff --git a/src/io_codec/SConscript b/src/io_codec/SConscript new file mode 100644 index 0000000..0afd6d6 --- /dev/null +++ b/src/io_codec/SConscript @@ -0,0 +1,38 @@ +#!/bin/false + +import os +import os.path +import glob + + +Import('env') + +dir_path = Dir('.').abspath + +# Environment for base library +io_env = env.Clone(); + +io_env.sources = sorted(glob.glob(dir_path + "/*.cpp")) +io_env.headers = sorted(glob.glob(dir_path + "/*.h")) + +env.sources += io_env.sources; +env.headers += io_env.headers; + +## Shared lib +objects_shared = [] +io_env.add_source_files(objects_shared, io_env.sources, shared=True); +io_env.library_shared = io_env.SharedLibrary('#build/forstio-io_codec', [objects_shared]); + +## Static lib +objects_static = [] +io_env.add_source_files(objects_static, io_env.sources, shared=False); +io_env.library_static = io_env.StaticLibrary('#build/forstio-io_codec', [objects_static]); + +# Set Alias +env.Alias('library_io_codec', [io_env.library_shared, io_env.library_static]); + +env.targets += ['library_io_codec']; + +# Install +env.Install('$prefix/lib/', [io_env.library_shared, io_env.library_static]); +env.Install('$prefix/include/forstio/io_codec/', [io_env.headers]); diff --git a/src/io_codec/SConstruct b/src/io_codec/SConstruct new file mode 100644 index 0000000..4e6e150 --- /dev/null +++ b/src/io_codec/SConstruct @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 + +import sys +import os +import os.path +import glob +import re + + +if sys.version_info < (3,): + def isbasestring(s): + return isinstance(s,basestring) +else: + def isbasestring(s): + return isinstance(s, (str,bytes)) + +def add_kel_source_files(self, sources, filetype, lib_env=None, shared=False, target_post=""): + + if isbasestring(filetype): + dir_path = self.Dir('.').abspath + filetype = sorted(glob.glob(dir_path+"/"+filetype)) + + for path in filetype: + target_name = re.sub( r'(.*?)(\.cpp|\.c\+\+)', r'\1' + target_post, path ) + if shared: + target_name+='.os' + sources.append( self.SharedObject( target=target_name, source=path ) ) + else: + target_name+='.o' + sources.append( self.StaticObject( target=target_name, source=path ) ) + pass + +def isAbsolutePath(key, dirname, env): + assert os.path.isabs(dirname), "%r must have absolute path syntax" % (key,) + +env_vars = Variables( + args=ARGUMENTS +) + +env_vars.Add('prefix', + help='Installation target location of build results and headers', + default='/usr/local/', + validator=isAbsolutePath +) + +env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=[], + CPPDEFINES=['SAW_UNIX'], + CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'], + LIBS=['forstio-io']) +env.__class__.add_source_files = add_kel_source_files +env.Tool('compilation_db'); +env.cdb = env.CompilationDatabase('compile_commands.json'); + +env.objects = []; +env.sources = []; +env.headers = []; +env.targets = []; + +Export('env') +SConscript('SConscript') + +env.Alias('cdb', env.cdb); +env.Alias('all', [env.targets]); +env.Default('all'); + +env.Alias('install', '$prefix') diff --git a/src/io_codec/io_peer.h b/src/io_codec/io_peer.h new file mode 100644 index 0000000..b9a4b34 --- /dev/null +++ b/src/io_codec/io_peer.h @@ -0,0 +1,104 @@ +#pragma once + +#include <forstio/async/async.h> +#include <forstio/buffer.h> +#include <forsto/io/io.h> +#include <forstio/schema/message.h> + +namespace saw { + +template <typename Codec, typename Incoming, typename Outgoing, + typename InContainer = message_container<Incoming>, + typename OutContainer = message_container<Outgoing>, + typename BufferT = ring_buffer> +class streaming_io_peer { +public: + /** + * + */ + streaming_io_peer( + own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed, + own<async_io_stream> stream, Codec codec, BufferT in, BufferT out); + /** + * + */ + streaming_io_peer( + own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed, + own<async_io_stream> stream); + + /** + * Deleted copy and move constructors + */ + SAW_FORBID_COPY(streaming_io_peer); + SAW_FORBID_MOVE(streaming_io_peer); + + /** + * Send a message to the remote peer + */ + error send(heap_message_root<Outgoing, OutContainer> builder); + + /** + * A phantom conveyor feeder. Meant for interfacing with other components + */ + conveyor_feeder<heap_message_root<Outgoing, OutContainer>> &feeder(); + + conveyor<void> on_read_disconnected(); + +private: + /// @unimplemented + class peer_conveyor_feeder final + : public conveyor_feeder<heap_message_root<Outgoing, OutContainer>> { + public: + peer_conveyor_feeder( + streaming_io_peer<Codec, Incoming, Outgoing, InContainer, + OutContainer, BufferT> &peer_) + : peer_{peer_} {} + + void feed(heap_message_root<Outgoing, OutContainer> &&data) override { + (void)data; + } + + void fail(error &&error) override { (void)error; } + + size_t space() const override { return 0; } + + size_t queued() const override { return 0; } + + private: + streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer, + BufferT> &peer_; + }; + +private: + own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> + incoming_feeder_ = nullptr; + + own<async_io_stream> io_stream_; + + Codec codec_; + + BufferT in_buffer_; + BufferT out_buffer_; + + conveyor_sink sink_read_; + conveyor_sink sink_write_; + + peer_conveyor_feeder conveyor_feeder_; +}; + +/** + * Setup new streaming io peer with the provided network protocols. + * This is a convenience wrapper intended for a faster setup of this class + */ +template <typename Codec, typename Incoming, typename Outgoing, + typename InContainer = message_container<Incoming>, + typename OutContainer = message_container<Outgoing>, + typename BufferT = ring_buffer> +std::pair<own<streaming_io_peer<Codec, Incoming, Outgoing, InContainer, + OutContainer, BufferT>>, + conveyor<heap_message_root<Incoming, InContainer>>> +new_streaming_io_peer(own<async_io_stream> stream); + +} // namespace saw + +#include "io_peer.tmpl.h" diff --git a/src/io_codec/io_peer.tmpl.h b/src/io_codec/io_peer.tmpl.h new file mode 100644 index 0000000..880a58a --- /dev/null +++ b/src/io_codec/io_peer.tmpl.h @@ -0,0 +1,117 @@ +namespace saw { + +template <typename Codec, typename Incoming, typename Outgoing, + typename InContainer, typename OutContainer, typename BufferT> +streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer, + BufferT>:: + streaming_io_peer( + own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed, + own<async_io_stream> str) + : streaming_io_peer{std::move(feed), std::move(str), {}, {}, {}} {} + +template <typename Codec, typename Incoming, typename Outgoing, + typename InContainer, typename OutContainer, typename BufferT> +streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer, + BufferT>:: + streaming_io_peer( + own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed, + own<async_io_stream> stream, Codec codec, BufferT in, BufferT out) + : incoming_feeder_{std::move(feed)}, + io_stream_{std::move(stream)}, codec_{std::move(codec)}, + in_buffer_{std::move(in)}, out_buffer_{std::move(out)}, + sink_read_{ + io_stream_->read_done() + .then([this](size_t bytes) -> error_or<void> { + in_buffer_.write_advance(bytes); + + if (in_buffer_.write_segment_length() == 0) { + return critical_error("Message too long"); + } + + io_stream_->read(&in_buffer_.write(), 1, + in_buffer_.write_segment_length()); + + while (true) { + auto root = heap_message_root<Incoming, InContainer>(); + auto builder = root.build(); + + error err = codec_.template decode<Incoming, InContainer>( + builder, in_buffer_); + if (err.is_critical()) { + return err; + } + + if (!err.failed()) { + incoming_feeder_->feed(std::move(root)); + } else { + break; + } + } + + return void_t{}; + }) + .sink([this](error err) { + incoming_feeder_->fail(err.copy_error()); + + return err; + })}, + sink_write_{io_stream_->write_done() + .then([this](size_t bytes) -> error_or<void> { + out_buffer_.read_advance(bytes); + if (out_buffer_.readCompositeLength() > 0) { + io_stream_->write( + &out_buffer_.read(), + out_buffer_.read_segment_length()); + } + + return void_t{}; + }) + .sink()} { + io_stream_->read(&in_buffer_.write(), 1, in_buffer_.write_segment_length()); +} + +template <typename Codec, typename Incoming, typename Outgoing, + typename InContainer, typename OutContainer, typename BufferT> +error streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer, + BufferT>::send(heap_message_root<Outgoing, OutContainer> + msg) { + bool restart_write = out_buffer_.read_segment_length() == 0; + + error err = + codec_.template encode<Outgoing, OutContainer>(msg.read(), out_buffer_); + if (err.failed()) { + return err; + } + + if (restart_write) { + io_stream_->write(&out_buffer_.read(), + out_buffer_.read_segment_length()); + } + + return no_error(); +} + +template <typename Codec, typename Incoming, typename Outgoing, + typename InContainer, typename OutContainer, typename BufferT> +conveyor<void> +streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer, + BufferT>::on_read_disconnected() { + return io_stream_->on_read_disconnected(); +} + +template <typename Codec, typename Incoming, typename Outgoing, + typename InContainer, typename OutContainer, typename BufferT> +std::pair<own<streaming_io_peer<Codec, Incoming, Outgoing, InContainer, + OutContainer, BufferT>>, + conveyor<heap_message_root<Incoming, InContainer>>> +newstreaming_io_peer(own<async_io_stream> stream) { + auto caf = + new_conveyor_and_feeder<heap_message_root<Incoming, InContainer>>(); + + return {heap<streaming_io_peer<Codec, Incoming, Outgoing, InContainer, + OutContainer, BufferT>>( + std::move(caf.feeder), std::move(stream)), + std::move(caf.conveyor)}; +} + +} // namespace saw |