#include "io-unix.h" #include namespace saw { namespace unix { IFdOwner::IFdOwner(UnixEventPort &event_port, int file_descriptor, int fd_flags, uint32_t event_mask) : event_port{event_port}, file_descriptor{file_descriptor}, fd_flags{fd_flags}, event_mask{event_mask} { event_port.subscribe(*this, file_descriptor, event_mask); } IFdOwner::~IFdOwner() { if (file_descriptor >= 0) { event_port.unsubscribe(file_descriptor); ::close(file_descriptor); } } ssize_t unixRead(int fd, void *buffer, size_t length) { return ::recv(fd, buffer, length, 0); } ssize_t unixWrite(int fd, const void *buffer, size_t length) { return ::send(fd, buffer, length, 0); } UnixIoStream::UnixIoStream(UnixEventPort &event_port, int file_descriptor, int fd_flags, uint32_t event_mask) : IFdOwner{event_port, file_descriptor, fd_flags, event_mask | EPOLLRDHUP} { } ErrorOr UnixIoStream::read(void *buffer, size_t length) { ssize_t read_bytes = unixRead(fd(), buffer, length); if (read_bytes > 0) { return static_cast(read_bytes); } else if (read_bytes == 0) { return criticalError("Disconnected", Error::Code::Disconnected); } return recoverableError("Currently busy"); } Conveyor UnixIoStream::readReady() { auto caf = newConveyorAndFeeder(); read_ready = std::move(caf.feeder); return std::move(caf.conveyor); } Conveyor UnixIoStream::onReadDisconnected() { auto caf = newConveyorAndFeeder(); on_read_disconnect = std::move(caf.feeder); return std::move(caf.conveyor); } ErrorOr UnixIoStream::write(const void *buffer, size_t length) { ssize_t write_bytes = unixWrite(fd(), buffer, length); if (write_bytes > 0) { return static_cast(write_bytes); } int error = errno; if (error == EAGAIN || error == EWOULDBLOCK) { return recoverableError("Currently busy"); } return criticalError("Disconnected", Error::Code::Disconnected); } Conveyor UnixIoStream::writeReady() { auto caf = newConveyorAndFeeder(); write_ready = std::move(caf.feeder); return std::move(caf.conveyor); } void UnixIoStream::notify(uint32_t mask) { if (mask & EPOLLOUT) { if (write_ready) { write_ready->feed(); } } if (mask & EPOLLIN) { if (read_ready) { read_ready->feed(); } } if (mask & EPOLLRDHUP) { if (on_read_disconnect) { on_read_disconnect->feed(); } } } UnixServer::UnixServer(UnixEventPort &event_port, int file_descriptor, int fd_flags) : IFdOwner{event_port, file_descriptor, fd_flags, EPOLLIN} {} Conveyor> UnixServer::accept() { auto caf = newConveyorAndFeeder>(); accept_feeder = std::move(caf.feeder); return std::move(caf.conveyor); } void UnixServer::notify(uint32_t mask) { if (mask & EPOLLIN) { if (accept_feeder) { struct ::sockaddr_storage address; socklen_t address_length = sizeof(address); int accept_fd = ::accept4(fd(), reinterpret_cast(&address), &address_length, SOCK_NONBLOCK | SOCK_CLOEXEC); if (accept_fd < 0) { return; } auto fd_stream = heap(event_port, accept_fd, 0, EPOLLIN | EPOLLOUT); accept_feeder->feed(std::move(fd_stream)); } } } UnixDatagram::UnixDatagram(UnixEventPort &event_port, int file_descriptor, int fd_flags) : IFdOwner{event_port, file_descriptor, fd_flags, EPOLLIN | EPOLLOUT} {} namespace { ssize_t unixReadMsg(int fd, void *buffer, size_t length) { struct ::sockaddr_storage their_addr; socklen_t addr_len = sizeof(sockaddr_storage); return ::recvfrom(fd, buffer, length, 0, reinterpret_cast(&their_addr), &addr_len); } ssize_t unixWriteMsg(int fd, const void *buffer, size_t length, ::sockaddr *dest_addr, socklen_t dest_addr_len) { return ::sendto(fd, buffer, length, 0, dest_addr, dest_addr_len); } } // namespace ErrorOr UnixDatagram::read(void *buffer, size_t length) { ssize_t read_bytes = unixReadMsg(fd(), buffer, length); if (read_bytes > 0) { return static_cast(read_bytes); } return recoverableError("Currently busy"); } Conveyor UnixDatagram::readReady() { auto caf = newConveyorAndFeeder(); read_ready = std::move(caf.feeder); return std::move(caf.conveyor); } ErrorOr UnixDatagram::write(const void *buffer, size_t length, NetworkAddress &dest) { UnixNetworkAddress &unix_dest = static_cast(dest); SocketAddress &sock_addr = unix_dest.unixAddress(); socklen_t sock_addr_length = sock_addr.getRawLength(); ssize_t write_bytes = unixWriteMsg(fd(), buffer, length, sock_addr.getRaw(), sock_addr_length); if (write_bytes > 0) { return static_cast(write_bytes); } return recoverableError("Currently busy"); } Conveyor UnixDatagram::writeReady() { auto caf = newConveyorAndFeeder(); write_ready = std::move(caf.feeder); return std::move(caf.conveyor); } void UnixDatagram::notify(uint32_t mask) { if (mask & EPOLLOUT) { if (write_ready) { write_ready->feed(); } } if (mask & EPOLLIN) { if (read_ready) { read_ready->feed(); } } } namespace { bool beginsWith(const std::string_view &viewed, const std::string_view &begins) { return viewed.size() >= begins.size() && viewed.compare(0, begins.size(), begins) == 0; } std::variant translateNetworkAddressToUnixNetworkAddress(NetworkAddress &addr) { auto addr_variant = addr.representation(); std::variant os_addr = std::visit( [](auto &arg) -> std::variant { using T = std::decay_t; if constexpr (std::is_same_v) { return static_cast(arg); } auto sock_addrs = SocketAddress::resolve( std::string_view{arg->address()}, arg->port()); return UnixNetworkAddress{arg->address(), arg->port(), std::move(sock_addrs)}; }, addr_variant); return os_addr; } UnixNetworkAddress &translateToUnixAddressRef( std::variant &addr_variant) { return std::visit( [](auto &arg) -> UnixNetworkAddress & { using T = std::decay_t; if constexpr (std::is_same_v) { return arg; } else if constexpr (std::is_same_v) { return *arg; } else { static_assert(true, "Cases exhausted"); } }, addr_variant); } } // namespace Own UnixNetwork::listen(NetworkAddress &addr) { auto unix_addr_storage = translateNetworkAddressToUnixNetworkAddress(addr); UnixNetworkAddress &address = translateToUnixAddressRef(unix_addr_storage); assert(address.unixAddressSize() > 0); if (address.unixAddressSize() == 0) { return nullptr; } int fd = address.unixAddress(0).socket(SOCK_STREAM); if (fd < 0) { return nullptr; } int val = 1; int rc = ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); if (rc < 0) { ::close(fd); return nullptr; } bool failed = address.unixAddress(0).bind(fd); if (failed) { ::close(fd); return nullptr; } ::listen(fd, SOMAXCONN); return heap(event_port, fd, 0); } Conveyor> UnixNetwork::connect(NetworkAddress &addr) { auto unix_addr_storage = translateNetworkAddressToUnixNetworkAddress(addr); UnixNetworkAddress &address = translateToUnixAddressRef(unix_addr_storage); assert(address.unixAddressSize() > 0); if (address.unixAddressSize() == 0) { return Conveyor>{criticalError("No address found")}; } int fd = address.unixAddress(0).socket(SOCK_STREAM); if (fd < 0) { return Conveyor>{criticalError("Couldn't open socket")}; } Own io_stream = heap(event_port, fd, 0, EPOLLIN | EPOLLOUT); bool success = false; for (size_t i = 0; i < address.unixAddressSize(); ++i) { SocketAddress &addr_iter = address.unixAddress(i); int status = ::connect(fd, addr_iter.getRaw(), addr_iter.getRawLength()); if (status < 0) { int error = errno; /* * It's not connected yet... * But edge triggered epolling means that it'll * be ready when the signal is triggered */ /// @todo Add limit node when implemented if (error == EINPROGRESS) { /* Conveyor write_ready = io_stream->writeReady(); return write_ready.then( [ios{std::move(io_stream)}]() mutable { ios->write_ready = nullptr; return std::move(ios); }); */ success = true; break; } else if (error != EINTR) { /// @todo Push error message from return Conveyor>{ criticalError("Couldn't connect")}; } } else { success = true; break; } } if (!success) { return criticalError("Couldn't connect"); } return Conveyor>{std::move(io_stream)}; } Own UnixNetwork::datagram(NetworkAddress &addr) { auto unix_addr_storage = translateNetworkAddressToUnixNetworkAddress(addr); UnixNetworkAddress &address = translateToUnixAddressRef(unix_addr_storage); SAW_ASSERT(address.unixAddressSize() > 0) { return nullptr; } int fd = address.unixAddress(0).socket(SOCK_DGRAM); int optval = 1; int rc = ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); if (rc < 0) { ::close(fd); return nullptr; } bool failed = address.unixAddress(0).bind(fd); if (failed) { ::close(fd); return nullptr; } /// @todo return heap(event_port, fd, 0); } const std::string &UnixNetworkAddress::address() const { return path; } uint16_t UnixNetworkAddress::port() const { return port_hint; } SocketAddress &UnixNetworkAddress::unixAddress(size_t i) { assert(i < addresses.size()); /// @todo change from list to vector? return addresses.at(i); } size_t UnixNetworkAddress::unixAddressSize() const { return addresses.size(); } UnixNetwork::UnixNetwork(UnixEventPort &event) : event_port{event} {} Conveyor> UnixNetwork::resolveAddress(const std::string &path, uint16_t port_hint) { std::string_view addr_view{path}; { std::string_view begins_with = "unix:"; if (beginsWith(addr_view, begins_with)) { addr_view.remove_prefix(begins_with.size()); } } std::vector addresses = SocketAddress::resolve(addr_view, port_hint); return Conveyor>{ heap(path, port_hint, std::move(addresses))}; } UnixIoProvider::UnixIoProvider(UnixEventPort &port_ref, Own port) : event_port{port_ref}, event_loop{std::move(port)}, unix_network{ port_ref} {} Own UnixIoProvider::wrapInputFd(int fd) { return heap(event_port, fd, 0, EPOLLIN); } Network &UnixIoProvider::network() { return static_cast(unix_network); } EventLoop &UnixIoProvider::eventLoop() { return event_loop; } } // namespace unix ErrorOr setupAsyncIo() { using namespace unix; try { Own prt = heap(); UnixEventPort &prt_ref = *prt; Own io_provider = heap(prt_ref, std::move(prt)); EventLoop &loop_ref = io_provider->eventLoop(); return {{std::move(io_provider), loop_ref, prt_ref}}; } catch (std::bad_alloc &) { return criticalError("Out of memory"); } } } // namespace saw