From db5b1b8aa05e00dca83126602bcc1814412a1515 Mon Sep 17 00:00:00 2001 From: "keldu.magnus" Date: Wed, 9 Jun 2021 14:00:03 +0200 Subject: [PATCH] changing error to something with more generic codes --- driver/io-unix.cpp | 14 +++--- source/kelgin/common.h | 4 +- source/kelgin/error.cpp | 36 +++++++++------- source/kelgin/error.h | 35 +++++++++------ source/kelgin/io.cpp | 33 +++++++++++++- source/kelgin/io.h | 19 ++++++--- source/kelgin/io_helpers.cpp | 83 ++++++++++++++++++++---------------- source/kelgin/io_helpers.h | 6 ++- 8 files changed, 147 insertions(+), 83 deletions(-) diff --git a/driver/io-unix.cpp b/driver/io-unix.cpp index 3c21b58..c51ef03 100644 --- a/driver/io-unix.cpp +++ b/driver/io-unix.cpp @@ -32,11 +32,11 @@ UnixIoStream::UnixIoStream(UnixEventPort &event_port, int file_descriptor, } ErrorOr UnixIoStream::read(void *buffer, size_t length) { - ssize_t read_bytes = unixRead(fd(), buffer, length); - if( read_bytes > 0 ){ + ssize_t read_bytes = unixRead(fd(), buffer, length); + if (read_bytes > 0) { return static_cast(read_bytes); - }else if(read_bytes == 0){ - return criticalError("Disconnected", static_cast(Error::Type::Disconnected)); + } else if (read_bytes == 0) { + return criticalError("Disconnected", Error::Code::Disconnected); } return recoverableError("Currently busy"); @@ -56,17 +56,17 @@ Conveyor UnixIoStream::onReadDisconnected() { ErrorOr UnixIoStream::write(const void *buffer, size_t length) { ssize_t write_bytes = unixWrite(fd(), buffer, length); - if( write_bytes > 0){ + if (write_bytes > 0) { return static_cast(write_bytes); } int error = errno; - if( error == EAGAIN || error == EWOULDBLOCK ){ + if (error == EAGAIN || error == EWOULDBLOCK) { return recoverableError("Currently busy"); } - return criticalError("Disconnected", static_cast(Error::Type::Disconnected)); + return criticalError("Disconnected", Error::Code::Disconnected); } Conveyor UnixIoStream::writeReady() { diff --git a/source/kelgin/common.h b/source/kelgin/common.h index 9611db6..1420131 100644 --- a/source/kelgin/common.h +++ b/source/kelgin/common.h @@ -15,7 +15,9 @@ namespace gin { classname(const classname &) = delete; \ classname &operator=(const classname &) = delete -#define GIN_ASSERT(expression) assert(expression) if (!expression) +#define GIN_ASSERT(expression) \ + assert(expression); \ + if (!expression) template using Maybe = std::optional; diff --git a/source/kelgin/error.cpp b/source/kelgin/error.cpp index 70bb2b3..2cc83e3 100644 --- a/source/kelgin/error.cpp +++ b/source/kelgin/error.cpp @@ -1,13 +1,13 @@ #include "error.h" namespace gin { -Error::Error() : error_{static_cast(0)} {} +Error::Error() : error_{static_cast(0)} {} -Error::Error(const std::string_view &msg, int8_t code) - : error_message{msg}, error_{static_cast(code)} {} +Error::Error(const std::string_view &msg, Error::Code code) + : error_message{msg}, error_{static_cast(code)} {} -Error::Error(std::string &&msg, int8_t code) - : error_message{std::move(msg)}, error_{static_cast(code)} {} +Error::Error(std::string &&msg, Error::Code code) + : error_message{std::move(msg)}, error_{static_cast(code)} {} Error::Error(Error &&error) : error_message{std::move(error.error_message)}, error_{std::move( @@ -30,11 +30,17 @@ const std::string_view Error::message() const { error_message); } -bool Error::failed() const { return static_cast(error_) != 0; } +bool Error::failed() const { + return static_cast>(error_) != 0; +} -bool Error::isCritical() const { return static_cast(error_) < 0; } +bool Error::isCritical() const { + return static_cast>(error_) < 0; +} -bool Error::isRecoverable() const { return static_cast(error_) > 0; } +bool Error::isRecoverable() const { + return static_cast>(error_) > 0; +} Error Error::copyError() const { Error error; @@ -48,16 +54,14 @@ Error Error::copyError() const { return error; } -Error::Type Error::code() const { - return static_cast +Error::Code Error::code() const { return static_cast(error_); } + +Error criticalError(const std::string_view &generic, Error::Code c) { + return Error{generic, c}; } -Error criticalError(const std::string_view &generic) { - return Error{generic, -1}; -} - -Error recoverableError(const std::string_view &generic) { - return Error{generic, 1}; +Error recoverableError(const std::string_view &generic, Error::Code c) { + return Error{generic, c}; } Error noError() { return Error{}; } diff --git a/source/kelgin/error.h b/source/kelgin/error.h index c993a71..2462624 100644 --- a/source/kelgin/error.h +++ b/source/kelgin/error.h @@ -14,17 +14,21 @@ namespace gin { */ class Error { public: - enum class Type : int8_t { - Disconnected = -99 + enum class Code : int16_t { + GenericCritical = -1, + GenericRecoverable = 1, + Disconnected = -99, + Exhausted = 99 }; + private: std::variant error_message; - Type error_; + Code error_; public: Error(); - Error(const std::string_view &msg, int8_t code); - Error(std::string &&msg, int8_t code); + Error(const std::string_view &msg, Error::Code code); + Error(std::string &&msg, Error::Code code); Error(Error &&error); GIN_FORBID_COPY(Error); @@ -39,11 +43,11 @@ public: Error copyError() const; - Type code() const; + Code code() const; }; template -Error makeError(const Formatter &formatter, int8_t code, +Error makeError(const Formatter &formatter, Error::Code code, const std::string_view &generic) { try { std::string error_msg = formatter(); @@ -53,20 +57,23 @@ Error makeError(const Formatter &formatter, int8_t code, } } -Error criticalError(const std::string_view &generic); +Error criticalError(const std::string_view &generic, + Error::Code c = Error::Code::GenericCritical); template -Error criticalError(const Formatter &formatter, - const std::string_view &generic) { - return makeError(formatter, -1, generic); +Error criticalError(const Formatter &formatter, const std::string_view &generic, + Error::Code c = Error::Code::GenericCritical) { + return makeError(formatter, c, generic); } -Error recoverableError(const std::string_view &generic); +Error recoverableError(const std::string_view &generic, + Error::Code c = Error::Code::GenericRecoverable); template Error recoverableError(const Formatter &formatter, - const std::string_view &generic) { - return makeError(formatter, -1, generic); + const std::string_view &generic, + Error::Code c = Error::Code::GenericRecoverable) { + return makeError(formatter, c, generic); } Error noError(); diff --git a/source/kelgin/io.cpp b/source/kelgin/io.cpp index e1ab289..809ac8d 100644 --- a/source/kelgin/io.cpp +++ b/source/kelgin/io.cpp @@ -1,3 +1,34 @@ #include "io.h" -namespace gin {} // namespace gin \ No newline at end of file +namespace gin { + +AsyncIoStream::AsyncIoStream(Own str) + : stream{std::move(str)}, + read_ready{stream->readReady().then([this]() {}).sink()}, + write_ready{stream->writeReady() + .then([this]() { + + }) + .sink()}, + read_disconnected{stream->onReadDisconnected() + .then([this]() { + + }) + .sink()} {} + +void AsyncIoStream::read(void *buffer, size_t min_length, size_t max_length) {} + +Conveyor AsyncIoStream::readDone() { + auto caf = newConveyorAndFeeder(); + read_stepper.read_done = std::move(caf.feeder); + return std::move(caf.conveyor); +} + +void AsyncIoStream::write(const void *buffer, size_t length) {} + +Conveyor AsyncIoStream::writeDone() { + auto caf = newConveyorAndFeeder(); + write_stepper.write_done = std::move(caf.feeder); + return std::move(caf.conveyor); +} +} // namespace gin \ No newline at end of file diff --git a/source/kelgin/io.h b/source/kelgin/io.h index 0cb5030..7c63c2c 100644 --- a/source/kelgin/io.h +++ b/source/kelgin/io.h @@ -2,6 +2,7 @@ #include "async.h" #include "common.h" +#include "io_helpers.h" #include @@ -40,12 +41,11 @@ public: virtual ~IoStream() = default; }; -/* class AsyncInputStream { public: virtual ~AsyncInputStream() = default; - virtual void read(void* buffer, size_t min_length, size_t max_length) = 0; + virtual void read(void *buffer, size_t min_length, size_t max_length) = 0; virtual Conveyor readDone() = 0; }; @@ -54,7 +54,7 @@ class AsyncOutputStream { public: virtual ~AsyncOutputStream() = default; - virtual void write(const void* buffer, size_t length) = 0; + virtual void write(const void *buffer, size_t length) = 0; virtual Conveyor writeDone() = 0; }; @@ -62,18 +62,25 @@ public: class AsyncIoStream : public AsyncInputStream, public AsyncOutputStream { private: Own stream; + + SinkConveyor read_ready; + SinkConveyor write_ready; + SinkConveyor read_disconnected; + + ReadTaskAndStepHelper read_stepper; + WriteTaskAndStepHelper write_stepper; + public: AsyncIoStream(Own str); - void read(void* buffer, size_t min_length, size_t max_length) override; + void read(void *buffer, size_t min_length, size_t max_length) override; Conveyor readDone() override; - void write(const void* buffer, size_t length) override; + void write(const void *buffer, size_t length) override; Conveyor writeDone() override; }; -*/ class Server { public: diff --git a/source/kelgin/io_helpers.cpp b/source/kelgin/io_helpers.cpp index 2b719b1..c4cb6f8 100644 --- a/source/kelgin/io_helpers.cpp +++ b/source/kelgin/io_helpers.cpp @@ -2,40 +2,43 @@ #include "io.h" +#include + namespace gin { void ReadTaskAndStepHelper::readStep(InputStream &reader) { if (read_task.has_value()) { ReadIoTask &task = *read_task; - ssize_t n = reader.read(task.buffer, task.max_length); - - if (n <= 0) { - if (n == 0) { - if (on_read_disconnect) { - on_read_disconnect->feed(); - } - return; - } - int error = errno; - if (error == EAGAIN || error == EWOULDBLOCK) { - return; - } else { + ErrorOr n_err = reader.read(task.buffer, task.max_length); + if (n_err.isError()) { + const Error &error = n_err.error(); + if (error.isCritical()) { if (read_done) { - read_done->fail(criticalError("Read failed")); + read_done->fail(error.copyError()); } read_task = std::nullopt; } - } else if (static_cast(n) >= task.min_length && - static_cast(n) <= task.max_length) { - if (read_done) { - read_done->feed(static_cast(n)); + } else if (n_err.isValue()) { + size_t n = n_err.value(); + if (static_cast(n) >= task.min_length && + static_cast(n) <= task.max_length) { + if (read_done) { + // Accumulated bytes are not pushed + read_done->feed(n + task.already_read); + } + read_task = std::nullopt; + } else { + task.buffer = reinterpret_cast(task.buffer) + n; + task.min_length -= static_cast(n); + task.max_length -= static_cast(n); + task.already_read += n; } - size_t max_len = task.max_length; - read_task = std::nullopt; + } else { - task.buffer = reinterpret_cast(task.buffer) + n; - task.min_length -= static_cast(n); - task.max_length -= static_cast(n); + if (read_done) { + read_done->fail(criticalError("Read failed")); + } + read_task = std::nullopt; } } } @@ -44,27 +47,35 @@ void WriteTaskAndStepHelper::writeStep(OutputStream &writer) { if (write_task.has_value()) { WriteIoTask &task = *write_task; - ssize_t n = writer.write(task.buffer, task.length); + ErrorOr n_err = writer.write(task.buffer, task.length); - if (n < 0) { - int error = errno; - if (error == EAGAIN || error == EWOULDBLOCK) { - return; - } else { + if (n_err.isValue()) { + size_t n = n_err.value(); + assert(n <= task.length); + if (n == task.length) { if (write_done) { - write_done->fail(criticalError("Write failed")); + write_done->feed(n + task.already_written); + } + write_task = std::nullopt; + } else { + task.buffer = reinterpret_cast(task.buffer) + + static_cast(n); + task.length -= n; + task.already_written += n; + } + } else if (n_err.isError()) { + const Error &error = n_err.error(); + if (error.isCritical()) { + if (write_done) { + write_done->fail(error.copyError()); } write_task = std::nullopt; } - } else if (static_cast(n) == task.length) { + } else { if (write_done) { - write_done->feed(static_cast(task.length)); + write_done->fail(criticalError("Write failed")); } write_task = std::nullopt; - } else { - task.buffer = reinterpret_cast(task.buffer) + - static_cast(n); - task.length -= static_cast(n); } } } diff --git a/source/kelgin/io_helpers.h b/source/kelgin/io_helpers.h index ea13d7a..130b72a 100644 --- a/source/kelgin/io_helpers.h +++ b/source/kelgin/io_helpers.h @@ -1,7 +1,7 @@ #pragma once -#include -#include +#include "async.h" +#include "common.h" #include #include @@ -24,6 +24,7 @@ public: void *buffer; size_t min_length; size_t max_length; + size_t already_read = 0; }; std::optional read_task; Own> read_done = nullptr; @@ -41,6 +42,7 @@ public: struct WriteIoTask { const void *buffer; size_t length; + size_t already_written = 0; }; std::optional write_task; Own> write_done = nullptr;