updated io_peer errors

fb-snake-case
Claudius Holeksa 2022-12-08 10:23:18 +01:00
parent 92f874b3f2
commit 16298130c3
4 changed files with 34 additions and 28 deletions

View File

@ -60,7 +60,9 @@ template <typename Func> struct return_type_helper<Func, void> {
template <typename Func, typename T>
using return_type = typename return_type_helper<Func, T>::Type;
// NOLINTBEGIN
struct Void {};
// NOLINTEND
template <typename T> struct void_fix { typedef T Type; };
template <> struct void_fix<void> { typedef Void Type; };
@ -70,6 +72,8 @@ template <typename T> struct void_unfix { typedef T Type; };
template <> struct void_unfix<Void> { typedef void Type; };
template <typename T> using unfix_void = typename void_unfix<T>::Type;
// NOLINTBEGIN
template <typename... T> constexpr bool always_false = false; // NOLINT
// NOLINTEND
} // namespace saw

View File

@ -53,12 +53,14 @@ error make_error(const std::string_view &generic, error::code c);
template <typename Formatter>
error make_error(const Formatter &formatter, error::code code,
const std::string_view &generic) {
// NOLINTBEGIN
try {
std::string error_msg = formatter();
return error{std::move(error_msg), code};
} catch (std::bad_alloc &) {
return error{generic, code};
}
// NOLINTEND
}
error critical_error(const std::string_view &generic,

View File

@ -9,20 +9,20 @@ namespace saw {
template <typename Codec, typename Incoming, typename Outgoing,
typename InContainer = MessageContainer<Incoming>,
typename OutContainer = MessageContainer<Outgoing>,
typename BufferT = RingBuffer>
typename BufferT = ring_buffer>
class StreamingIoPeer {
public:
/**
*
*/
StreamingIoPeer(
own<conveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed,
own<conveyor_feeder<HeapMessageRoot<Incoming, InContainer>>> feed,
own<AsyncIoStream> stream, Codec codec, BufferT in, BufferT out);
/**
*
*/
StreamingIoPeer(
own<conveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed,
own<conveyor_feeder<HeapMessageRoot<Incoming, InContainer>>> feed,
own<AsyncIoStream> stream);
/**
@ -39,22 +39,22 @@ public:
/**
* A phantom conveyor feeder. Meant for interfacing with other components
*/
conveyorFeeder<HeapMessageRoot<Outgoing, OutContainer>> &feeder();
conveyor_feeder<HeapMessageRoot<Outgoing, OutContainer>> &feeder();
conveyor<void> onReadDisconnected();
private:
class PeerconveyorFeeder final
: public conveyorFeeder<HeapMessageRoot<Outgoing, OutContainer>> {
class Peerconveyor_feeder final
: public conveyor_feeder<HeapMessageRoot<Outgoing, OutContainer>> {
public:
PeerconveyorFeeder(
Peerconveyor_feeder(
StreamingIoPeer<Codec, Incoming, Outgoing, InContainer,
OutContainer, BufferT> &peer_)
: peer{peer_} {}
void feed(T &&data) override { (void)data; }
void fail(Error &&error) override { (void)error; }
void fail(error &&error) override { (void)error; }
size_t space() const override { return 0; }
@ -66,7 +66,7 @@ private:
};
private:
own<conveyorFeeder<HeapMessageRoot<Incoming, InContainer>>>
own<conveyor_feeder<HeapMessageRoot<Incoming, InContainer>>>
incoming_feeder = nullptr;
own<AsyncIoStream> io_stream;
@ -79,7 +79,7 @@ private:
sink_conveyor sink_read;
sink_conveyor sink_write;
PeerconveyorFeeder conveyor_feeder;
Peerconveyor_feeder conveyor_feeder;
};
/**
@ -89,7 +89,7 @@ private:
template <typename Codec, typename Incoming, typename Outgoing,
typename InContainer = MessageContainer<Incoming>,
typename OutContainer = MessageContainer<Outgoing>,
typename BufferT = RingBuffer>
typename BufferT = ring_buffer>
std::pair<own<StreamingIoPeer<Codec, Incoming, Outgoing, InContainer,
OutContainer, BufferT>>,
conveyor<HeapMessageRoot<Incoming, InContainer>>>

View File

@ -4,25 +4,25 @@ template <typename Codec, typename Incoming, typename Outgoing,
typename InContainer, typename OutContainer, typename BufferT>
StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer, BufferT>::
StreamingIoPeer(
Own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed,
Own<AsyncIoStream> str)
own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed,
own<AsyncIoStream> str)
: StreamingIoPeer{std::move(feed), std::move(str), {}, {}, {}} {}
template <typename Codec, typename Incoming, typename Outgoing,
typename InContainer, typename OutContainer, typename BufferT>
StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer, BufferT>::
StreamingIoPeer(
Own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed_,
Own<AsyncIoStream> stream_, Codec codec_, BufferT in_, BufferT out_)
own<ConveyorFeeder<HeapMessageRoot<Incoming, InContainer>>> feed_,
own<AsyncIoStream> stream_, Codec codec_, BufferT in_, BufferT out_)
: incoming_feeder{std::move(feed_)}, io_stream{std::move(stream_)},
codec{std::move(codec_)}, in_buffer{std::move(in_)}, out_buffer{std::move(
out_)},
sink_read{io_stream->readDone()
.then([this](size_t bytes) -> ErrorOr<void> {
.then([this](size_t bytes) -> error_or<void> {
in_buffer.writeAdvance(bytes);
if (in_buffer.writeSegmentLength() == 0) {
return criticalError("Message too long");
return critical_error("Message too long");
}
io_stream->read(&in_buffer.write(), 1,
@ -33,10 +33,10 @@ StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer, BufferT>::
heapMessageRoot<Incoming, InContainer>();
auto builder = root.build();
Error error =
error error =
codec.template decode<Incoming, InContainer>(
builder, in_buffer);
if (error.isCritical()) {
if (error.is_critical()) {
return error;
}
@ -49,13 +49,13 @@ StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer, BufferT>::
return Void{};
})
.sink([this](Error error) {
incoming_feeder->fail(error.copyError());
.sink([this](error error) {
incoming_feeder->fail(error.copy_error());
return error;
})},
sink_write{io_stream->writeDone()
.then([this](size_t bytes) -> ErrorOr<void> {
.then([this](size_t bytes) -> error_or<void> {
out_buffer.readAdvance(bytes);
if (out_buffer.readCompositeLength() > 0) {
io_stream->write(&out_buffer.read(),
@ -75,17 +75,17 @@ void StreamingIoPeer<Codec, Incoming, Outgoing, InContainer, OutContainer,
msg) {
bool restart_write = out_buffer.readSegmentLength() == 0;
Error error =
error err =
codec.template encode<Outgoing, OutContainer>(msg.read(), out_buffer);
if (error.failed()) {
return error;
if (err.failed()) {
return err;
}
if (restart_write) {
io_stream->write(&out_buffer.read(), out_buffer.readSegmentLength());
}
return noError();
return no_error();
}
template <typename Codec, typename Incoming, typename Outgoing,
@ -97,10 +97,10 @@ Conveyor<void> StreamingIoPeer<Codec, Incoming, Outgoing, InContainer,
template <typename Codec, typename Incoming, typename Outgoing,
typename InContainer, typename OutContainer, typename BufferT>
std::pair<Own<StreamingIoPeer<Codec, Incoming, Outgoing, InContainer,
std::pair<own<StreamingIoPeer<Codec, Incoming, Outgoing, InContainer,
OutContainer, BufferT>>,
Conveyor<HeapMessageRoot<Incoming, InContainer>>>
newStreamingIoPeer(Own<AsyncIoStream> stream) {
newStreamingIoPeer(own<AsyncIoStream> stream) {
auto caf = newConveyorAndFeeder<HeapMessageRoot<Incoming, InContainer>>();
return {heap<StreamingIoPeer<Codec, Incoming, Outgoing, InContainer,