diff --git a/.gitignore b/.gitignore index 9077e63..1078054 100644 --- a/.gitignore +++ b/.gitignore @@ -59,7 +59,7 @@ Mkfile.old dkms.conf # binary files -bin/ +build/ # test files assets/ # custom build tracking diff --git a/SConstruct b/SConstruct index b5c1a34..8bd2030 100644 --- a/SConstruct +++ b/SConstruct @@ -29,7 +29,20 @@ def add_kel_source_files(self, sources, filetype, lib_env=None, shared=False, ta sources.append( self.StaticObject( target=target_name, source=path ) ) pass -env=Environment(ENV=os.environ, CPPPATH=['#source/forstio','#source','#','#driver'], +def isAbsolutePath(key, dirname, env): + assert os.path.isabs(dirname), "%r must have absolute path syntax" % (key,) + +env_vars = Variables( + args=ARGUMENTS +) + +env_vars.Add('prefix', + help='Installation target location of build results and headers', + default='/usr/local/', + validator=isAbsolutePath +) + +env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=['#source/forstio','#source','#','#driver'], CXX='clang++', CPPDEFINES=['SAW_UNIX'], CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'], @@ -56,11 +69,11 @@ env_library = env.Clone() env.objects_shared = [] env_library.add_source_files(env.objects_shared, env.sources + env.driver_sources + env.tls_sources, shared=True) -env.library_shared = env_library.SharedLibrary('#bin/forstio', [env.objects_shared]) +env.library_shared = env_library.SharedLibrary('#build/forstio', [env.objects_shared]) env.objects_static = [] env_library.add_source_files(env.objects_static, env.sources + env.driver_sources + env.tls_sources) -env.library_static = env_library.StaticLibrary('#bin/forstio', [env.objects_static]) +env.library_static = env_library.StaticLibrary('#build/forstio', [env.objects_static]) env.Alias('library', [env.library_shared, env.library_static]) env.Alias('library_shared', env.library_shared) @@ -87,9 +100,9 @@ env.Alias('format', env.format_actions) env.Alias('all', ['format', 'library_shared', 'library_static', 'test']) -env.Install('/usr/local/lib/', [env.library_shared, env.library_static]) -env.Install('/usr/local/include/forstio/', [env.headers]) -env.Install('/usr/local/include/forstio/tls/', [env.tls_headers]) +env.Install('$prefix/lib/', [env.library_shared, env.library_static]) +env.Install('$prefix/include/forstio/', [env.headers]) +env.Install('$prefix/include/forstio/tls/', [env.tls_headers]) -env.Install('/usr/local/include/forstio/test/', [env.test_headers]) -env.Alias('install', '/usr/local/') +env.Install('$prefix/include/forstio/test/', [env.test_headers]) +env.Alias('install', '$prefix') diff --git a/driver/io-unix.cpp b/driver/io-unix.cpp index f83d27a..4bd5c2b 100644 --- a/driver/io-unix.cpp +++ b/driver/io-unix.cpp @@ -124,31 +124,145 @@ void UnixServer::notify(uint32_t mask) { } } +UnixDatagram::UnixDatagram(UnixEventPort &event_port, int file_descriptor, + int fd_flags) + : IFdOwner{event_port, file_descriptor, fd_flags, EPOLLIN | EPOLLOUT} {} + +namespace { +ssize_t unixReadMsg(int fd, void *buffer, size_t length) { + struct ::sockaddr_storage their_addr; + socklen_t addr_len = sizeof(sockaddr_storage); + return ::recvfrom(fd, buffer, length, 0, + reinterpret_cast(&their_addr), + &addr_len); +} + +ssize_t unixWriteMsg(int fd, const void *buffer, size_t length, + ::sockaddr *dest_addr, socklen_t dest_addr_len) { + + return ::sendto(fd, buffer, length, 0, dest_addr, dest_addr_len); +} +} // namespace + +ErrorOr UnixDatagram::read(void *buffer, size_t length) { + ssize_t read_bytes = unixReadMsg(fd(), buffer, length); + if (read_bytes > 0) { + return static_cast(read_bytes); + } + return recoverableError("Currently busy"); +} + +Conveyor UnixDatagram::readReady() { + auto caf = newConveyorAndFeeder(); + read_ready = std::move(caf.feeder); + return std::move(caf.conveyor); +} + +ErrorOr UnixDatagram::write(const void *buffer, size_t length, + NetworkAddress &dest) { + UnixNetworkAddress &unix_dest = static_cast(dest); + SocketAddress &sock_addr = unix_dest.unixAddress(); + socklen_t sock_addr_length = sock_addr.getRawLength(); + ssize_t write_bytes = unixWriteMsg(fd(), buffer, length, sock_addr.getRaw(), + sock_addr_length); + if (write_bytes > 0) { + return static_cast(write_bytes); + } + return recoverableError("Currently busy"); +} + +Conveyor UnixDatagram::writeReady() { + auto caf = newConveyorAndFeeder(); + write_ready = std::move(caf.feeder); + return std::move(caf.conveyor); +} + +void UnixDatagram::notify(uint32_t mask) { + if (mask & EPOLLOUT) { + if (write_ready) { + write_ready->feed(); + } + } + + if (mask & EPOLLIN) { + if (read_ready) { + read_ready->feed(); + } + } +} + namespace { bool beginsWith(const std::string_view &viewed, const std::string_view &begins) { return viewed.size() >= begins.size() && viewed.compare(0, begins.size(), begins) == 0; } + +std::variant +translateNetworkAddressToUnixNetworkAddress(NetworkAddress &addr) { + auto addr_variant = addr.representation(); + std::variant os_addr = std::visit( + [](auto &arg) + -> std::variant { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + return static_cast(arg); + } + + auto sock_addrs = SocketAddress::parse( + std::string_view{arg->address()}, arg->port()); + + return UnixNetworkAddress{arg->address(), arg->port(), + std::move(sock_addrs)}; + }, + addr_variant); + return os_addr; +} + +UnixNetworkAddress &translateToUnixAddressRef( + std::variant &addr_variant) { + return std::visit( + [](auto &arg) -> UnixNetworkAddress & { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + return arg; + } else if constexpr (std::is_same_v) { + return *arg; + } else { + static_assert(true, "Cases exhausted"); + } + }, + addr_variant); +} + } // namespace -Own UnixNetworkAddress::listen() { - assert(addresses.size() > 0); - if (addresses.size() == 0) { +Own UnixNetwork::listen(NetworkAddress &addr) { + auto unix_addr_storage = translateNetworkAddressToUnixNetworkAddress(addr); + UnixNetworkAddress &address = translateToUnixAddressRef(unix_addr_storage); + + assert(address.unixAddressSize() > 0); + if (address.unixAddressSize() == 0) { return nullptr; } - int fd = addresses.front().socket(SOCK_STREAM); + int fd = address.unixAddress(0).socket(SOCK_STREAM); if (fd < 0) { return nullptr; } int val = 1; + int rc = ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); + if (rc < 0) { + ::close(fd); + return nullptr; + } - ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); - - bool failed = addresses.front().bind(fd); + bool failed = address.unixAddress(0).bind(fd); if (failed) { + ::close(fd); return nullptr; } @@ -157,13 +271,16 @@ Own UnixNetworkAddress::listen() { return heap(event_port, fd, 0); } -Conveyor> UnixNetworkAddress::connect() { - assert(addresses.size() > 0); - if (addresses.size() == 0) { +Conveyor> UnixNetwork::connect(NetworkAddress &addr) { + auto unix_addr_storage = translateNetworkAddressToUnixNetworkAddress(addr); + UnixNetworkAddress &address = translateToUnixAddressRef(unix_addr_storage); + + assert(address.unixAddressSize() > 0); + if (address.unixAddressSize() == 0) { return Conveyor>{criticalError("No address found")}; } - int fd = addresses.front().socket(SOCK_STREAM); + int fd = address.unixAddress(0).socket(SOCK_STREAM); if (fd < 0) { return Conveyor>{criticalError("Couldn't open socket")}; } @@ -172,8 +289,10 @@ Conveyor> UnixNetworkAddress::connect() { heap(event_port, fd, 0, EPOLLIN | EPOLLOUT); bool success = false; - for (auto iter = addresses.begin(); iter != addresses.end(); ++iter) { - int status = ::connect(fd, iter->getRaw(), iter->getRawLength()); + for (size_t i = 0; i < address.unixAddressSize(); ++i) { + SocketAddress &addr_iter = address.unixAddress(i); + int status = + ::connect(fd, addr_iter.getRaw(), addr_iter.getRawLength()); if (status < 0) { int error = errno; /* @@ -212,22 +331,43 @@ Conveyor> UnixNetworkAddress::connect() { return Conveyor>{std::move(io_stream)}; } -std::string UnixNetworkAddress::toString() const { - try { - std::ostringstream oss; - oss << "Address: " << path; - if (port_hint > 0) { - oss << "\nPort: " << port_hint; - } - return oss.str(); - } catch (std::bad_alloc &) { - return {}; +Own UnixNetwork::datagram(NetworkAddress &addr) { + auto unix_addr_storage = translateNetworkAddressToUnixNetworkAddress(addr); + UnixNetworkAddress &address = translateToUnixAddressRef(unix_addr_storage); + + SAW_ASSERT(address.unixAddressSize() > 0) { return nullptr; } + + int fd = address.unixAddress(0).socket(SOCK_DGRAM); + + int optval = 1; + int rc = + ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); + if (rc < 0) { + ::close(fd); + return nullptr; } + + bool failed = address.unixAddress(0).bind(fd); + if (failed) { + ::close(fd); + return nullptr; + } + /// @todo + return heap(event_port, fd, 0); } + const std::string &UnixNetworkAddress::address() const { return path; } uint16_t UnixNetworkAddress::port() const { return port_hint; } +SocketAddress &UnixNetworkAddress::unixAddress(size_t i) { + assert(i < addresses.size()); + /// @todo change from list to vector? + return addresses.at(i); +} + +size_t UnixNetworkAddress::unixAddressSize() const { return addresses.size(); } + UnixNetwork::UnixNetwork(UnixEventPort &event) : event_port{event} {} Conveyor> UnixNetwork::parseAddress(const std::string &path, @@ -240,29 +380,11 @@ Conveyor> UnixNetwork::parseAddress(const std::string &path, } } - std::list addresses = + std::vector addresses = SocketAddress::parse(addr_view, port_hint); - return Conveyor>{heap( - event_port, path, port_hint, std::move(addresses))}; -} - -ErrorOr UnixNetwork::socketPair() { - int sv[2]; - - int rc = ::socketpair(AF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, - 0, sv); - if (rc < 0) { - return criticalError("Failed to create socket pair"); - } - - SocketPair socket_pair; - socket_pair.stream[0] = - heap(event_port, sv[0], 0, EPOLLIN | EPOLLOUT); - socket_pair.stream[1] = - heap(event_port, sv[1], 0, EPOLLIN | EPOLLOUT); - - return socket_pair; + return Conveyor>{ + heap(path, port_hint, std::move(addresses))}; } UnixIoProvider::UnixIoProvider(UnixEventPort &port_ref, Own port) diff --git a/driver/io-unix.h b/driver/io-unix.h index 94d6c4a..a6baa50 100644 --- a/driver/io-unix.h +++ b/driver/io-unix.h @@ -312,6 +312,27 @@ public: void notify(uint32_t mask) override; }; +class UnixDatagram final : public Datagram, public IFdOwner { +private: + Own> read_ready = nullptr; + Own> write_ready = nullptr; + +public: + UnixDatagram(UnixEventPort &event_port, int file_descriptor, int fd_flags); + + ErrorOr read(void *buffer, size_t length) override; + Conveyor readReady() override; + + ErrorOr write(const void *buffer, size_t length, + NetworkAddress &dest) override; + Conveyor writeReady() override; + + void notify(uint32_t mask) override; +}; + +/** + * Helper class which provides potential addresses to NetworkAddress + */ class SocketAddress { private: union { @@ -335,8 +356,6 @@ public: } int socket(int type) const { - bool is_stream = type & SOCK_STREAM; - type |= SOCK_NONBLOCK | SOCK_CLOEXEC; int result = ::socket(address.generic.sa_family, type, 0); @@ -352,13 +371,17 @@ public: return error < 0; } + struct ::sockaddr *getRaw() { + return &address.generic; + } + const struct ::sockaddr *getRaw() const { return &address.generic; } socklen_t getRawLength() const { return address_length; } - static std::list parse(std::string_view str, - uint16_t port_hint) { - std::list results; + static std::vector parse(std::string_view str, + uint16_t port_hint) { + std::vector results; struct ::addrinfo *head; struct ::addrinfo hints; @@ -387,27 +410,24 @@ public: } }; -class UnixNetworkAddress final : public NetworkAddress { +class UnixNetworkAddress final : public OsNetworkAddress { private: - UnixEventPort &event_port; const std::string path; uint16_t port_hint; - std::list addresses; + std::vector addresses; public: - UnixNetworkAddress(UnixEventPort &event_port, const std::string &path, - uint16_t port_hint, std::list &&addr) - : event_port{event_port}, path{path}, port_hint{port_hint}, - addresses{std::move(addr)} {} - - Own listen() override; - Conveyor> connect() override; - - std::string toString() const override; + UnixNetworkAddress(const std::string &path, uint16_t port_hint, + std::vector &&addr) + : path{path}, port_hint{port_hint}, addresses{std::move(addr)} {} const std::string &address() const override; uint16_t port() const override; + + // Custom address info + SocketAddress &unixAddress(size_t i = 0); + size_t unixAddressSize() const; }; class UnixNetwork final : public Network { @@ -420,7 +440,11 @@ public: Conveyor> parseAddress(const std::string &address, uint16_t port_hint = 0) override; - ErrorOr socketPair() override; + Own listen(NetworkAddress &addr) override; + + Conveyor> connect(NetworkAddress &addr) override; + + Own datagram(NetworkAddress &addr) override; }; class UnixIoProvider final : public IoProvider { diff --git a/source/forstio/error.h b/source/forstio/error.h index e2fda88..dc4ae96 100644 --- a/source/forstio/error.h +++ b/source/forstio/error.h @@ -91,11 +91,11 @@ public: virtual ~ErrorOrValue() = default; template ErrorOr> &as() { - return dynamic_cast> &>(*this); + return static_cast> &>(*this); } template const ErrorOr> &as() const { - return dynamic_cast> &>(*this); + return static_cast> &>(*this); } }; diff --git a/source/forstio/io.cpp b/source/forstio/io.cpp index e835617..ce84fc0 100644 --- a/source/forstio/io.cpp +++ b/source/forstio/io.cpp @@ -58,4 +58,14 @@ Conveyor AsyncIoStream::writeDone() { write_stepper.write_done = std::move(caf.feeder); return std::move(caf.conveyor); } -} // namespace saw \ No newline at end of file + +StringNetworkAddress::StringNetworkAddress(const std::string &address, + uint16_t port) + : address_value{address}, port_value{port} {} + +const std::string &StringNetworkAddress::address() const { + return address_value; +} + +uint16_t StringNetworkAddress::port() const { return port_value; } +} // namespace saw diff --git a/source/forstio/io.h b/source/forstio/io.h index d230368..190ca53 100644 --- a/source/forstio/io.h +++ b/source/forstio/io.h @@ -5,6 +5,7 @@ #include "io_helpers.h" #include +#include namespace saw { /* @@ -74,7 +75,7 @@ private: public: AsyncIoStream(Own str); - void read(void *buffer, size_t min_length, size_t max_length) override; + void read(void *buffer, size_t length, size_t max_length) override; Conveyor readDone() override; @@ -92,38 +93,85 @@ public: virtual Conveyor> accept() = 0; }; -class SocketPair { +class NetworkAddress; +/** + * Datagram class. Bound to a local address it is able to receive inbound + * datagram messages and send them as well as long as an address is provided as + * well + */ +class Datagram { public: - std::array, 2> stream; + virtual ~Datagram() = default; + + virtual ErrorOr read(void *buffer, size_t length) = 0; + virtual Conveyor readReady() = 0; + + virtual ErrorOr write(const void *buffer, size_t length, + NetworkAddress &dest) = 0; + virtual Conveyor writeReady() = 0; }; +class OsNetworkAddress; +class StringNetworkAddress; + class NetworkAddress { public: + using ChildVariant = + std::variant; + virtual ~NetworkAddress() = default; - /* - * Listen on this address - */ - virtual Own listen() = 0; - virtual Conveyor> connect() = 0; - - virtual std::string toString() const = 0; + virtual NetworkAddress::ChildVariant representation() = 0; virtual const std::string &address() const = 0; virtual uint16_t port() const = 0; }; +class OsNetworkAddress : public NetworkAddress { +public: + virtual ~OsNetworkAddress() = default; + + NetworkAddress::ChildVariant representation() override { return this; } +}; + +class StringNetworkAddress final : public NetworkAddress { +private: + std::string address_value; + uint16_t port_value; + +public: + StringNetworkAddress(const std::string &address, uint16_t port); + + const std::string &address() const override; + uint16_t port() const override; + + NetworkAddress::ChildVariant representation() override { return this; } +}; + class Network { public: virtual ~Network() = default; + /** + * Parse the provided string and uint16 to the preferred storage method + */ virtual Conveyor> parseAddress(const std::string &addr, uint16_t port_hint = 0) = 0; /** - * Creates an unnamed pair of bidirectional fds + * Set up a listener on this address */ - virtual ErrorOr socketPair() = 0; + virtual Own listen(NetworkAddress &bind_addr) = 0; + + /** + * Connect to a remote address + */ + virtual Conveyor> connect(NetworkAddress &address) = 0; + + /** + * Bind a datagram socket at this address. + */ + virtual Own datagram(NetworkAddress &address) = 0; }; class IoProvider { diff --git a/source/forstio/io_wrapper.h b/source/forstio/io_wrapper.h index a6b4d16..66588fb 100644 --- a/source/forstio/io_wrapper.h +++ b/source/forstio/io_wrapper.h @@ -1,25 +1,27 @@ #pragma once #include "async.h" +#include "message.h" #include "io.h" namespace saw { -template +template , class OutContainer = MessageContainer> class StreamingIoPeer { private: Codec codec; Own io_stream; - Own> incoming_feeder = nullptr; - + Own>> incoming_feeder = nullptr; public: StreamingIoPeer(Own stream); - void send(Outgoing outgoing, Own builder); + void send(HeapMessageRoot builder); - Conveyor startReadPump(); + Conveyor startReadPump(); }; -} // namespace saw \ No newline at end of file + + +} // namespace saw diff --git a/source/forstio/tls/tls.cpp b/source/forstio/tls/tls.cpp index 2801c74..b1f3d2d 100644 --- a/source/forstio/tls/tls.cpp +++ b/source/forstio/tls/tls.cpp @@ -155,24 +155,18 @@ public: }; } -TlsNetworkAddress::TlsNetworkAddress(Own net_addr, const std::string& host_name_, Tls &tls_) - : internal{std::move(net_addr)}, host_name{host_name_}, tls{tls_} {} - -Own TlsNetworkAddress::listen() { - SAW_ASSERT(internal) { return nullptr; } - return heap(internal->listen()); +Own TlsNetwork::listen(NetworkAddress& address) { + return heap(internal.listen(address)); } -Conveyor> TlsNetworkAddress::connect() { - SAW_ASSERT(internal) { return Conveyor>{nullptr, nullptr}; } - +Conveyor> TlsNetwork::connect(NetworkAddress& address) { // Helper setups auto caf = newConveyorAndFeeder>(); Own helper = heap(std::move(caf.feeder)); TlsClientStreamHelper* hlp_ptr = helper.get(); // Conveyor entangled structure - auto prim_conv = internal->connect().then([this, hlp_ptr]( + auto prim_conv = internal.connect(address).then([this, hlp_ptr, addr = address.address()]( Own stream) -> ErrorOr { IoStream* inner_stream = stream.get(); auto tls_stream = heap(std::move(stream)); @@ -181,8 +175,6 @@ Conveyor> TlsNetworkAddress::connect() { gnutls_init(&session, GNUTLS_CLIENT); - const std::string &addr = this->address(); - gnutls_server_name_set(session, GNUTLS_NAME_DNS, addr.c_str(), addr.size()); @@ -209,6 +201,11 @@ Conveyor> TlsNetworkAddress::connect() { return caf.conveyor.attach(std::move(helper)); } +Own TlsNetwork::datagram(NetworkAddress& address){ + ///@unimplemented + return nullptr; +} + static ssize_t forst_tls_push_func(gnutls_transport_ptr_t p, const void *data, size_t size) { IoStream *stream = reinterpret_cast(p); @@ -238,30 +235,13 @@ static ssize_t forst_tls_pull_func(gnutls_transport_ptr_t p, void *data, size_t return static_cast(length.value()); } -const std::string &TlsNetworkAddress::address() const { - assert(internal); - return internal->address(); -} -uint16_t TlsNetworkAddress::port() const { - assert(internal); - return internal->port(); } - -std::string TlsNetworkAddress::toString() const { return internal->toString(); } - TlsNetwork::TlsNetwork(Network &network) : internal{network} {} Conveyor> TlsNetwork::parseAddress(const std::string &addr, uint16_t port) { - return internal.parseAddress(addr, port) - .then( - [this, addr, port](Own net) -> Own { - assert(net); - return heap(std::move(net), addr, tls); - }); -} - -ErrorOr TlsNetwork::socketPair(){ - return criticalError("Unimplemented"); + /// @todo tls server name needed. Check validity. Won't matter later on, because gnutls should fail anyway. But + /// it's better to find the error source sooner rather than later + return internal.parseAddress(addr, port); } std::optional> setupTlsNetwork(Network &network) { diff --git a/source/forstio/tls/tls.h b/source/forstio/tls/tls.h index 4730666..9c39c67 100644 --- a/source/forstio/tls/tls.h +++ b/source/forstio/tls/tls.h @@ -33,25 +33,6 @@ public: Conveyor> accept() override; }; -class TlsNetworkAddress final : public NetworkAddress { -private: - Own internal; - std::string host_name; - Tls &tls; - -public: - TlsNetworkAddress(Own net_addr, const std::string& host_name_, Tls &tls_); - - Own listen() override; - - Conveyor> connect() override; - - std::string toString() const override; - - const std::string &address() const override; - uint16_t port() const override; -}; - class TlsNetwork final : public Network { private: Tls tls; @@ -60,10 +41,13 @@ private: public: TlsNetwork(Network &network); - Conveyor> parseAddress(const std::string &addr, - uint16_t port = 0) override; + Conveyor> parseAddress(const std::string &addr, uint16_t port = 0) override; + + Own listen(NetworkAddress& address) override; - ErrorOr socketPair() override; + Conveyor> connect(NetworkAddress& address) override; + + Own datagram(NetworkAddress& address) override; }; std::optional> setupTlsNetwork(Network &network); diff --git a/test/SConscript b/test/SConscript index 0d52054..8d9d3a6 100644 --- a/test/SConscript +++ b/test/SConscript @@ -15,4 +15,4 @@ env.test_objects = [] env.test_sources.append(dir_path+'/suite/suite.cpp') env.test_headers = [dir_path + '/suite/suite.h'] -env.test_program = env_test.Program('#bin/test', [env.test_sources, env.library_static]) +env.test_program = env_test.Program('#build/test', [env.test_sources, env.library_static]) diff --git a/test/io.cpp b/test/io.cpp index 248f8ee..46d135e 100644 --- a/test/io.cpp +++ b/test/io.cpp @@ -3,6 +3,7 @@ #include "source/forstio/io.h" namespace { +/* SAW_TEST("Io Socket Pair"){ using namespace saw; @@ -40,5 +41,5 @@ SAW_TEST("Io Socket Pair"){ SAW_EXPECT(buffer_out[5] == 0, "Element 6 failed"); SAW_EXPECT(buffer_out[6] == 0, "Element 7 failed"); } - +*/ } diff --git a/tools/message_analyzer/SConscript b/tools/message_analyzer/SConscript new file mode 100644 index 0000000..e69de29