This commit is contained in:
keldu 2022-03-28 01:19:00 +02:00
parent f44b6a1dc8
commit 0808db94ee
6 changed files with 116 additions and 82 deletions

View File

@ -370,7 +370,8 @@ size_t UnixNetworkAddress::unixAddressSize() const { return addresses.size(); }
UnixNetwork::UnixNetwork(UnixEventPort &event) : event_port{event} {} UnixNetwork::UnixNetwork(UnixEventPort &event) : event_port{event} {}
Conveyor<Own<NetworkAddress>> UnixNetwork::resolveAddress(const std::string &path,uint16_t port_hint) { Conveyor<Own<NetworkAddress>>
UnixNetwork::resolveAddress(const std::string &path, uint16_t port_hint) {
std::string_view addr_view{path}; std::string_view addr_view{path};
{ {
std::string_view begins_with = "unix:"; std::string_view begins_with = "unix:";
@ -388,7 +389,7 @@ Conveyor<Own<NetworkAddress>> UnixNetwork::resolveAddress(const std::string &pat
UnixIoProvider::UnixIoProvider(UnixEventPort &port_ref, Own<EventPort> port) UnixIoProvider::UnixIoProvider(UnixEventPort &port_ref, Own<EventPort> port)
: event_port{port_ref}, event_loop{std::move(port)}, unix_network{ : event_port{port_ref}, event_loop{std::move(port)}, unix_network{
port_ref} {} port_ref} {}
Own<InputStream> UnixIoProvider::wrapInputFd(int fd) { Own<InputStream> UnixIoProvider::wrapInputFd(int fd) {
return heap<UnixIoStream>(event_port, fd, 0, EPOLLIN); return heap<UnixIoStream>(event_port, fd, 0, EPOLLIN);

View File

@ -380,7 +380,7 @@ public:
socklen_t getRawLength() const { return address_length; } socklen_t getRawLength() const { return address_length; }
static std::vector<SocketAddress> resolve(std::string_view str, static std::vector<SocketAddress> resolve(std::string_view str,
uint16_t port_hint) { uint16_t port_hint) {
std::vector<SocketAddress> results; std::vector<SocketAddress> results;
struct ::addrinfo *head; struct ::addrinfo *head;
@ -437,7 +437,8 @@ private:
public: public:
UnixNetwork(UnixEventPort &event_port); UnixNetwork(UnixEventPort &event_port);
Conveyor<Own<NetworkAddress>> resolveAddress(const std::string &address, uint16_t port_hint = 0) override; Conveyor<Own<NetworkAddress>>
resolveAddress(const std::string &address, uint16_t port_hint = 0) override;
Own<Server> listen(NetworkAddress &addr) override; Own<Server> listen(NetworkAddress &addr) override;

View File

@ -220,9 +220,10 @@ public:
ErrorOr<FixVoid<T>> take(); ErrorOr<FixVoid<T>> take();
/** @todo implement /** @todo implement
* Specifically pump elements through this chain with the provided wait_scope * Specifically pump elements through this chain with the provided
* wait_scope
*/ */
void poll(WaitScope& wait_scope); void poll(WaitScope &wait_scope);
// helper // helper
static Conveyor<T> toConveyor(Own<ConveyorNode> node, static Conveyor<T> toConveyor(Own<ConveyorNode> node,

View File

@ -160,13 +160,14 @@ public:
*/ */
virtual Conveyor<Own<NetworkAddress>> virtual Conveyor<Own<NetworkAddress>>
resolveAddress(const std::string &addr, uint16_t port_hint = 0) = 0; resolveAddress(const std::string &addr, uint16_t port_hint = 0) = 0;
/** /**
* Parse the provided string and uint16 to the preferred storage method * Parse the provided string and uint16 to the preferred storage method
* Since no dns request is made here, no async conveyors have to be used. * Since no dns request is made here, no async conveyors have to be used.
*/ */
/// @todo implement /// @todo implement
//virtual Own<NetworkAddress> parseAddress(const std::string& addr, uint16_t port_hint = 0) = 0; // virtual Own<NetworkAddress> parseAddress(const std::string& addr,
// uint16_t port_hint = 0) = 0;
/** /**
* Set up a listener on this address * Set up a listener on this address

View File

@ -1,15 +1,19 @@
#pragma once #pragma once
#include "async.h" #include "async.h"
#include "message.h"
#include "io.h" #include "io.h"
#include "message.h"
namespace saw { namespace saw {
template <typename Codec, typename Incoming, typename Outgoing, typename InContainer = MessageContainer<Incoming>, typename OutContainer = MessageContainer<Outgoing>, typename BufferT = RingBuffer> template <typename Codec, typename Incoming, typename Outgoing,
typename InContainer = MessageContainer<Incoming>,
typename OutContainer = MessageContainer<Outgoing>,
typename BufferT = RingBuffer>
class StreamingIoPeer { class StreamingIoPeer {
private: private:
Own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> incoming_feeder = nullptr; Own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>>
incoming_feeder = nullptr;
Own<AsyncIoStream> io_stream; Own<AsyncIoStream> io_stream;
@ -22,11 +26,15 @@ private:
SinkConveyor sink_write; SinkConveyor sink_write;
public: public:
StreamingIoPeer(Own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed, Own<AsyncIoStream> stream, Codec codec, BufferT in, BufferT out); StreamingIoPeer(
StreamingIoPeer(Own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed, Own<AsyncIoStream> stream); Own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed,
Own<AsyncIoStream> stream, Codec codec, BufferT in, BufferT out);
StreamingIoPeer(
Own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed,
Own<AsyncIoStream> stream);
SAW_FORBID_COPY(StreamingIoPeer); SAW_FORBID_COPY(StreamingIoPeer);
SAW_FORBID_MOVE(StreamingIoPeer); SAW_FORBID_MOVE(StreamingIoPeer);
void send(HeapMessageRoot<Outgoing, OutContainer> builder); void send(HeapMessageRoot<Outgoing, OutContainer> builder);
@ -35,10 +43,16 @@ public:
/** /**
* Setup new streaming io peer with the provided network protocols. * Setup new streaming io peer with the provided network protocols.
* This is a convenience wrapper intended for a faster setup of * This is a convenience wrapper intended for a faster setup of
*/ */
template <typename Codec, typename Incoming, typename Outgoing, typename InContainer = MessageContainer<Incoming>, typename OutContainer = MessageContainer<Outgoing>, typename BufferT = RingBuffer> template <typename Codec, typename Incoming, typename Outgoing,
std::pair<Own<StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer, BufferT>>, Conveyor<HeapMessageRoot<Incoming, InContainer>>> newStreamingIoPeer(Own<AsyncIoStream> stream); typename InContainer = MessageContainer<Incoming>,
typename OutContainer = MessageContainer<Outgoing>,
typename BufferT = RingBuffer>
std::pair<Own<StreamingIoPeer<Codec, Incoming, Outgoing, InContainer,
OutContainer, BufferT>>,
Conveyor<HeapMessageRoot<Incoming, InContainer>>>
newStreamingIoPeer(Own<AsyncIoStream> stream);
} // namespace saw } // namespace saw

View File

@ -1,96 +1,112 @@
namespace saw { namespace saw {
template <typename Codec, typename Incoming, typename Outgoing, typename InContainer, typename OutContainer, typename BufferT> template <typename Codec, typename Incoming, typename Outgoing,
StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer, BufferT>::StreamingIoPeer( typename InContainer, typename OutContainer, typename BufferT>
Own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed, StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer, BufferT>::
Own<AsyncIoStream> str StreamingIoPeer(
): Own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed,
StreamingIoPeer{std::move(feed), std::move(str), {}, {}, {}} Own<AsyncIoStream> str)
{ : StreamingIoPeer{std::move(feed), std::move(str), {}, {}, {}} {}
}
template <typename Codec, typename Incoming, typename Outgoing, typename InContainer, typename OutContainer, typename BufferT> template <typename Codec, typename Incoming, typename Outgoing,
StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer, BufferT>::StreamingIoPeer( typename InContainer, typename OutContainer, typename BufferT>
Own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed_, StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer, BufferT>::
Own<AsyncIoStream> stream_, Codec codec_, BufferT in_, BufferT out_): StreamingIoPeer(
incoming_feeder{std::move(feed_)}, Own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed_,
io_stream{std::move(stream_)}, Own<AsyncIoStream> stream_, Codec codec_, BufferT in_, BufferT out_)
codec{std::move(codec_)}, : incoming_feeder{std::move(feed_)}, io_stream{std::move(stream_)},
in_buffer{std::move(in_)}, codec{std::move(codec_)}, in_buffer{std::move(in_)}, out_buffer{std::move(
out_buffer{std::move(out_)}, out_)},
sink_read{ sink_read{io_stream->readDone()
io_stream->readDone().then([this](size_t bytes) -> ErrorOr<void> { .then([this](size_t bytes) -> ErrorOr<void> {
in_buffer.writeAdvance(bytes); in_buffer.writeAdvance(bytes);
if(in_buffer.writeSegmentLength() == 0){ if (in_buffer.writeSegmentLength() == 0) {
return criticalError("Message too long"); return criticalError("Message too long");
} }
io_stream->read(&in_buffer.write(), 1, in_buffer.writeSegmentLength()); io_stream->read(&in_buffer.write(), 1,
in_buffer.writeSegmentLength());
while(true){ while (true) {
auto root = heapMessageRoot<Incoming, InContainer>(); auto root =
auto builder = root.build(); heapMessageRoot<Incoming, InContainer>();
auto builder = root.build();
Error error = codec.template decode<Incoming, InContainer>(builder, in_buffer); Error error =
if(error.isCritical()){ codec.template decode<Incoming, InContainer>(
return error; builder, in_buffer);
} if (error.isCritical()) {
return error;
}
if(!error.failed()){ if (!error.failed()) {
incoming_feeder->feed(std::move(root)); incoming_feeder->feed(std::move(root));
}else{ } else {
break; break;
} }
} }
return Void{}; return Void{};
}).sink([this](Error error){ })
incoming_feeder->fail(error.copyError()); .sink([this](Error error) {
incoming_feeder->fail(error.copyError());
return error; return error;
}) })},
}, sink_write{io_stream->writeDone()
sink_write{ .then([this](size_t bytes) -> ErrorOr<void> {
io_stream->writeDone().then([this](size_t bytes) -> ErrorOr<void> { out_buffer.readAdvance(bytes);
out_buffer.readAdvance(bytes); if (out_buffer.readCompositeLength() > 0) {
if(out_buffer.readCompositeLength() > 0){ io_stream->write(&out_buffer.read(),
io_stream->write(&out_buffer.read(), out_buffer.readSegmentLength()); out_buffer.readSegmentLength());
} }
return Void{}; return Void{};
}).sink() })
} .sink()} {
{
io_stream->read(&in_buffer.write(), 1, in_buffer.writeSegmentLength()); io_stream->read(&in_buffer.write(), 1, in_buffer.writeSegmentLength());
} }
template <typename Codec, typename Incoming, typename Outgoing, typename InContainer, typename OutContainer, typename BufferT> template <typename Codec, typename Incoming, typename Outgoing,
void StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer, BufferT>::send(HeapMessageRoot<Outgoing, OutContainer> msg){ typename InContainer, typename OutContainer, typename BufferT>
void StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer,
BufferT>::send(HeapMessageRoot<Outgoing, OutContainer>
msg) {
bool restart_write = out_buffer.readSegmentLength() == 0; bool restart_write = out_buffer.readSegmentLength() == 0;
Error error = codec.template encode<Outgoing, OutContainer>(msg.read(), out_buffer); Error error =
if(error.failed()){ codec.template encode<Outgoing, OutContainer>(msg.read(), out_buffer);
if (error.failed()) {
return error; return error;
} }
if(restart_write){ if (restart_write) {
io_stream->write(&out_buffer.read(), out_buffer.readSegmentLength()); io_stream->write(&out_buffer.read(), out_buffer.readSegmentLength());
} }
return noError(); return noError();
} }
template <typename Codec, typename Incoming, typename Outgoing, typename InContainer, typename OutContainer, typename BufferT> template <typename Codec, typename Incoming, typename Outgoing,
Conveyor<void> StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer, BufferT>::onReadDisconnected(){ typename InContainer, typename OutContainer, typename BufferT>
Conveyor<void> StreamingIoPeer<Codec, Incoming, Outgoing, InContainer,
OutContainer, BufferT>::onReadDisconnected() {
return io_stream->onReadDisconnected(); return io_stream->onReadDisconnected();
} }
template <typename Codec, typename Incoming, typename Outgoing, typename InContainer, typename OutContainer, typename BufferT> template <typename Codec, typename Incoming, typename Outgoing,
std::pair<Own<StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer, BufferT>>, Conveyor<HeapMessageRoot<Incoming, InContainer>>> newStreamingIoPeer(Own<AsyncIoStream> stream){ typename InContainer, typename OutContainer, typename BufferT>
std::pair<Own<StreamingIoPeer<Codec, Incoming, Outgoing, InContainer,
OutContainer, BufferT>>,
Conveyor<HeapMessageRoot<Incoming, InContainer>>>
newStreamingIoPeer(Own<AsyncIoStream> stream) {
auto caf = newConveyorAndFeeder<HeapMessageRoot<Incoming, InContainer>>(); auto caf = newConveyorAndFeeder<HeapMessageRoot<Incoming, InContainer>>();
return {heap<StreamingIoPeer<Codec,Incoming, Outgoing, InContainer, OutContainer, BufferT>>(std::move(caf.feeder), std::move(stream)), std::move(caf.conveyor)}; return {heap<StreamingIoPeer<Codec, Incoming, Outgoing, InContainer,
OutContainer, BufferT>>(std::move(caf.feeder),
std::move(stream)),
std::move(caf.conveyor)};
} }
} } // namespace saw