changed io interfaces to conveyor/async versions

This commit is contained in:
keldu.magnus 2021-05-01 15:08:59 +02:00
parent fcddc368a7
commit 044f2b4fb0
3 changed files with 26 additions and 26 deletions

View File

@ -214,15 +214,15 @@ Own<Server> UnixNetworkAddress::listen() {
return heap<UnixServer>(event_port, fd, 0);
}
Own<IoStream> UnixNetworkAddress::connect() {
Conveyor<Own<IoStream>> UnixNetworkAddress::connect() {
assert(addresses.size() > 0);
if (addresses.size() == 0) {
return nullptr;
return Conveyor<Own<IoStream>>{criticalError("No address found")};
}
int fd = addresses.front().socket(SOCK_STREAM);
if (fd < 0) {
return nullptr;
return Conveyor<Own<IoStream>>{criticalError("Couldn't open socket")};
}
Own<UnixIoStream> io_stream =
@ -237,28 +237,29 @@ Own<IoStream> UnixNetworkAddress::connect() {
* But edge triggered epolling means that it'll
* be ready when the signal is triggered
*/
/// @todo Add limit node when implemented
if (error == EINPROGRESS) {
Conveyor<void> write_ready = io_stream->writeReady();
break;
/*
* Future function return
Conveyor<void> write_ready = io_stream->writeReady();
return write_ready.then(
[io_stream{std::move(io_stream)}]() mutable {
io_stream->write_ready = nullptr;
return std::move(io_stream);
[ios{std::move(io_stream)}]() mutable {
ios->write_ready = nullptr;
return std::move(ios);
});
*/
break;
} else if (error != EINTR) {
return nullptr;
/// @todo Push error message from
return Conveyor<Own<IoStream>>{
criticalError("Some error happened.")};
}
} else {
break;
}
}
return io_stream;
// @todo change function into a safe return type.
// return Conveyor<Own<IoStream>>{std::move(io_stream)};
return Conveyor<Own<IoStream>>{std::move(io_stream)};
}
std::string UnixNetworkAddress::toString() const {
@ -278,12 +279,11 @@ UnixAsyncIoProvider::UnixAsyncIoProvider(UnixEventPort &port_ref,
Own<EventPort> &&port)
: event_port{port_ref}, event_loop{std::move(port)} {}
Own<NetworkAddress> UnixAsyncIoProvider::parseAddress(const std::string &path,
uint16_t port_hint) {
UnixEventPort *port =
reinterpret_cast<UnixEventPort *>(event_loop.eventPort());
Conveyor<Own<NetworkAddress>>
UnixAsyncIoProvider::parseAddress(const std::string &path, uint16_t port_hint) {
UnixEventPort *port = dynamic_cast<UnixEventPort *>(event_loop.eventPort());
if (!port) {
return nullptr;
return Conveyor<Own<NetworkAddress>>{criticalError("No event port")};
}
std::string_view addr_view{path};
@ -297,8 +297,8 @@ Own<NetworkAddress> UnixAsyncIoProvider::parseAddress(const std::string &path,
std::list<SocketAddress> addresses =
SocketAddress::parse(addr_view, port_hint);
return heap<UnixNetworkAddress>(*port, *this, path, port_hint,
std::move(addresses));
return Conveyor<Own<NetworkAddress>>{heap<UnixNetworkAddress>(
*port, *this, path, port_hint, std::move(addresses))};
}
Own<InputStream> UnixAsyncIoProvider::wrapInputFd(int fd) {

View File

@ -405,7 +405,7 @@ public:
port_hint{port_hint}, addresses{std::move(addr)} {}
Own<Server> listen() override;
Own<IoStream> connect() override;
Conveyor<Own<IoStream>> connect() override;
std::string toString() const override;
};
@ -418,8 +418,8 @@ private:
public:
UnixAsyncIoProvider(UnixEventPort &port_ref, Own<EventPort> &&port);
Own<NetworkAddress> parseAddress(const std::string &,
uint16_t port_hint = 0) override;
Conveyor<Own<NetworkAddress>> parseAddress(const std::string &,
uint16_t port_hint = 0) override;
Own<InputStream> wrapInputFd(int fd) override;

View File

@ -66,7 +66,7 @@ public:
* Listen on this address
*/
virtual Own<Server> listen() = 0;
virtual Own<IoStream> connect() = 0;
virtual Conveyor<Own<IoStream>> connect() = 0;
virtual std::string toString() const = 0;
};
@ -75,8 +75,8 @@ class AsyncIoProvider {
public:
virtual ~AsyncIoProvider() = default;
virtual Own<NetworkAddress> parseAddress(const std::string &,
uint16_t port_hint = 0) = 0;
virtual Conveyor<Own<NetworkAddress>>
parseAddress(const std::string &, uint16_t port_hint = 0) = 0;
virtual Own<InputStream> wrapInputFd(int fd) = 0;
};