diff --git a/driver/io-unix.cpp b/driver/io-unix.cpp index b84ac88..3767a8c 100644 --- a/driver/io-unix.cpp +++ b/driver/io-unix.cpp @@ -370,7 +370,8 @@ size_t UnixNetworkAddress::unixAddressSize() const { return addresses.size(); } UnixNetwork::UnixNetwork(UnixEventPort &event) : event_port{event} {} -Conveyor> UnixNetwork::resolveAddress(const std::string &path,uint16_t port_hint) { +Conveyor> +UnixNetwork::resolveAddress(const std::string &path, uint16_t port_hint) { std::string_view addr_view{path}; { std::string_view begins_with = "unix:"; @@ -388,7 +389,7 @@ Conveyor> UnixNetwork::resolveAddress(const std::string &pat UnixIoProvider::UnixIoProvider(UnixEventPort &port_ref, Own port) : event_port{port_ref}, event_loop{std::move(port)}, unix_network{ - port_ref} {} + port_ref} {} Own UnixIoProvider::wrapInputFd(int fd) { return heap(event_port, fd, 0, EPOLLIN); diff --git a/driver/io-unix.h b/driver/io-unix.h index 550e8dc..ff6ee27 100644 --- a/driver/io-unix.h +++ b/driver/io-unix.h @@ -380,7 +380,7 @@ public: socklen_t getRawLength() const { return address_length; } static std::vector resolve(std::string_view str, - uint16_t port_hint) { + uint16_t port_hint) { std::vector results; struct ::addrinfo *head; @@ -437,7 +437,8 @@ private: public: UnixNetwork(UnixEventPort &event_port); - Conveyor> resolveAddress(const std::string &address, uint16_t port_hint = 0) override; + Conveyor> + resolveAddress(const std::string &address, uint16_t port_hint = 0) override; Own listen(NetworkAddress &addr) override; diff --git a/source/forstio/async.h b/source/forstio/async.h index dd0ae06..2e95d44 100644 --- a/source/forstio/async.h +++ b/source/forstio/async.h @@ -220,9 +220,10 @@ public: ErrorOr> take(); /** @todo implement - * Specifically pump elements through this chain with the provided wait_scope + * Specifically pump elements through this chain with the provided + * wait_scope */ - void poll(WaitScope& wait_scope); + void poll(WaitScope &wait_scope); // helper static Conveyor toConveyor(Own node, diff --git a/source/forstio/io.h b/source/forstio/io.h index 135f838..9fae5fa 100644 --- a/source/forstio/io.h +++ b/source/forstio/io.h @@ -160,13 +160,14 @@ public: */ virtual Conveyor> resolveAddress(const std::string &addr, uint16_t port_hint = 0) = 0; - + /** * Parse the provided string and uint16 to the preferred storage method * Since no dns request is made here, no async conveyors have to be used. */ /// @todo implement - //virtual Own parseAddress(const std::string& addr, uint16_t port_hint = 0) = 0; + // virtual Own parseAddress(const std::string& addr, + // uint16_t port_hint = 0) = 0; /** * Set up a listener on this address diff --git a/source/forstio/io_peer.h b/source/forstio/io_peer.h index c3b7873..19593cc 100644 --- a/source/forstio/io_peer.h +++ b/source/forstio/io_peer.h @@ -1,15 +1,19 @@ #pragma once #include "async.h" -#include "message.h" #include "io.h" +#include "message.h" namespace saw { -template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> +template , + typename OutContainer = MessageContainer, + typename BufferT = RingBuffer> class StreamingIoPeer { private: - Own>> incoming_feeder = nullptr; + Own>> + incoming_feeder = nullptr; Own io_stream; @@ -22,11 +26,15 @@ private: SinkConveyor sink_write; public: - StreamingIoPeer(Own>> feed, Own stream, Codec codec, BufferT in, BufferT out); - StreamingIoPeer(Own>> feed, Own stream); + StreamingIoPeer( + Own>> feed, + Own stream, Codec codec, BufferT in, BufferT out); + StreamingIoPeer( + Own>> feed, + Own stream); SAW_FORBID_COPY(StreamingIoPeer); - SAW_FORBID_MOVE(StreamingIoPeer); + SAW_FORBID_MOVE(StreamingIoPeer); void send(HeapMessageRoot builder); @@ -35,10 +43,16 @@ public: /** * Setup new streaming io peer with the provided network protocols. - * This is a convenience wrapper intended for a faster setup of + * This is a convenience wrapper intended for a faster setup of */ -template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> -std::pair>, Conveyor>> newStreamingIoPeer(Own stream); +template , + typename OutContainer = MessageContainer, + typename BufferT = RingBuffer> +std::pair>, + Conveyor>> +newStreamingIoPeer(Own stream); } // namespace saw diff --git a/source/forstio/io_peer.tmpl.h b/source/forstio/io_peer.tmpl.h index 773e1f1..f20922f 100644 --- a/source/forstio/io_peer.tmpl.h +++ b/source/forstio/io_peer.tmpl.h @@ -1,96 +1,112 @@ namespace saw { -template -StreamingIoPeer::StreamingIoPeer( -Own>> feed, -Own str -): - StreamingIoPeer{std::move(feed), std::move(str), {}, {}, {}} -{ -} +template +StreamingIoPeer:: + StreamingIoPeer( + Own>> feed, + Own str) + : StreamingIoPeer{std::move(feed), std::move(str), {}, {}, {}} {} -template -StreamingIoPeer::StreamingIoPeer( -Own>> feed_, -Own stream_, Codec codec_, BufferT in_, BufferT out_): - incoming_feeder{std::move(feed_)}, - io_stream{std::move(stream_)}, - codec{std::move(codec_)}, - in_buffer{std::move(in_)}, - out_buffer{std::move(out_)}, - sink_read{ - io_stream->readDone().then([this](size_t bytes) -> ErrorOr { - in_buffer.writeAdvance(bytes); +template +StreamingIoPeer:: + StreamingIoPeer( + Own>> feed_, + Own stream_, Codec codec_, BufferT in_, BufferT out_) + : incoming_feeder{std::move(feed_)}, io_stream{std::move(stream_)}, + codec{std::move(codec_)}, in_buffer{std::move(in_)}, out_buffer{std::move( + out_)}, + sink_read{io_stream->readDone() + .then([this](size_t bytes) -> ErrorOr { + in_buffer.writeAdvance(bytes); - if(in_buffer.writeSegmentLength() == 0){ - return criticalError("Message too long"); - } + if (in_buffer.writeSegmentLength() == 0) { + return criticalError("Message too long"); + } - io_stream->read(&in_buffer.write(), 1, in_buffer.writeSegmentLength()); + io_stream->read(&in_buffer.write(), 1, + in_buffer.writeSegmentLength()); - while(true){ - auto root = heapMessageRoot(); - auto builder = root.build(); + while (true) { + auto root = + heapMessageRoot(); + auto builder = root.build(); - Error error = codec.template decode(builder, in_buffer); - if(error.isCritical()){ - return error; - } + Error error = + codec.template decode( + builder, in_buffer); + if (error.isCritical()) { + return error; + } - if(!error.failed()){ - incoming_feeder->feed(std::move(root)); - }else{ - break; - } - } + if (!error.failed()) { + incoming_feeder->feed(std::move(root)); + } else { + break; + } + } - return Void{}; - }).sink([this](Error error){ - incoming_feeder->fail(error.copyError()); + return Void{}; + }) + .sink([this](Error error) { + incoming_feeder->fail(error.copyError()); - return error; - }) - }, - sink_write{ - io_stream->writeDone().then([this](size_t bytes) -> ErrorOr { - out_buffer.readAdvance(bytes); - if(out_buffer.readCompositeLength() > 0){ - io_stream->write(&out_buffer.read(), out_buffer.readSegmentLength()); - } + return error; + })}, + sink_write{io_stream->writeDone() + .then([this](size_t bytes) -> ErrorOr { + out_buffer.readAdvance(bytes); + if (out_buffer.readCompositeLength() > 0) { + io_stream->write(&out_buffer.read(), + out_buffer.readSegmentLength()); + } - return Void{}; - }).sink() - } -{ + return Void{}; + }) + .sink()} { io_stream->read(&in_buffer.write(), 1, in_buffer.writeSegmentLength()); } -template -void StreamingIoPeer::send(HeapMessageRoot msg){ +template +void StreamingIoPeer::send(HeapMessageRoot + msg) { bool restart_write = out_buffer.readSegmentLength() == 0; - - Error error = codec.template encode(msg.read(), out_buffer); - if(error.failed()){ + + Error error = + codec.template encode(msg.read(), out_buffer); + if (error.failed()) { return error; } - if(restart_write){ + if (restart_write) { io_stream->write(&out_buffer.read(), out_buffer.readSegmentLength()); } return noError(); } -template -Conveyor StreamingIoPeer::onReadDisconnected(){ +template +Conveyor StreamingIoPeer::onReadDisconnected() { return io_stream->onReadDisconnected(); } -template -std::pair>, Conveyor>> newStreamingIoPeer(Own stream){ +template +std::pair>, + Conveyor>> +newStreamingIoPeer(Own stream) { auto caf = newConveyorAndFeeder>(); - return {heap>(std::move(caf.feeder), std::move(stream)), std::move(caf.conveyor)}; + return {heap>(std::move(caf.feeder), + std::move(stream)), + std::move(caf.conveyor)}; } -} +} // namespace saw