#ifdef SAW_UNIX #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "io.h" namespace saw { namespace unix { constexpr int MAX_EPOLL_EVENTS = 256; class unix_event_port; class i_fd_owner { protected: unix_event_port &event_port_; private: int file_descriptor_; int fd_flags_; uint32_t event_mask_; public: i_fd_owner(unix_event_port &event_port, int file_descriptor, int fd_flags, uint32_t event_mask); virtual ~i_fd_owner(); virtual void notify(uint32_t mask) = 0; int fd() const { return file_descriptor_; } }; class unix_event_port final : public event_port { private: int epoll_fd_; int signal_fd_; sigset_t signal_fd_set_; std::unordered_multimap>> signal_conveyors_; int pipefds_[2]; std::vector to_unix_signal(Signal signal) const { switch (signal) { case Signal::User1: return {SIGUSR1}; case Signal::Terminate: default: return {SIGTERM, SIGQUIT, SIGINT}; } } Signal from_unix_signal(int signal) const { switch (signal) { case SIGUSR1: return Signal::User1; case SIGTERM: case SIGINT: case SIGQUIT: default: return Signal::Terminate; } } void notify_signal_listener(int sig) { Signal signal = from_unix_signal(sig); auto equal_range = signal_conveyors_.equal_range(signal); for (auto iter = equal_range.first; iter != equal_range.second; ++iter) { if (iter->second) { if (iter->second->space() > 0) { iter->second->feed(); } } } } bool poll_impl(int time) { epoll_event events[MAX_EPOLL_EVENTS]; int nfds = 0; do { nfds = epoll_wait(epoll_fd_, events, MAX_EPOLL_EVENTS, time); if (nfds < 0) { /// @todo error_handling return false; } for (int i = 0; i < nfds; ++i) { if (events[i].data.u64 == 0) { while (1) { struct ::signalfd_siginfo siginfo; ssize_t n = ::read(signal_fd_, &siginfo, sizeof(siginfo)); if (n < 0) { break; } assert(n == sizeof(siginfo)); notify_signal_listener(siginfo.ssi_signo); } } else if (events[i].data.u64 == 1) { uint8_t i; if (pipefds_[0] < 0) { continue; } while (1) { ssize_t n = ::recv(pipefds_[0], &i, sizeof(i), 0); if (n < 0) { break; } } } else { i_fd_owner *owner = reinterpret_cast(events[i].data.ptr); if (owner) { owner->notify(events[i].events); } } } } while (nfds == MAX_EPOLL_EVENTS); return true; } public: unix_event_port() : epoll_fd_{-1}, signal_fd_{-1} { ::signal(SIGPIPE, SIG_IGN); epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC); if (epoll_fd_ < 0) { return; } ::sigemptyset(&signal_fd_set_); signal_fd_ = ::signalfd(-1, &signal_fd_set_, SFD_NONBLOCK | SFD_CLOEXEC); if (signal_fd_ < 0) { return; } struct epoll_event event; memset(&event, 0, sizeof(event)); event.events = EPOLLIN; event.data.u64 = 0; ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, signal_fd_, &event); int rc = ::pipe2(pipefds_, O_NONBLOCK | O_CLOEXEC); if (rc < 0) { return; } memset(&event, 0, sizeof(event)); event.events = EPOLLIN; event.data.u64 = 1; ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, pipefds_[0], &event); } ~unix_event_port() { ::close(epoll_fd_); ::close(signal_fd_); ::close(pipefds_[0]); ::close(pipefds_[1]); } conveyor on_signal(Signal signal) override { auto caf = new_conveyor_and_feeder(); signal_conveyors_.insert(std::make_pair(signal, std::move(caf.feeder))); std::vector sig = to_unix_signal(signal); for (auto iter = sig.begin(); iter != sig.end(); ++iter) { ::sigaddset(&signal_fd_set_, *iter); } ::sigprocmask(SIG_BLOCK, &signal_fd_set_, nullptr); ::signalfd(signal_fd_, &signal_fd_set_, SFD_NONBLOCK | SFD_CLOEXEC); auto node = conveyor::from_conveyor(std::move(caf.conveyor)); return conveyor::to_conveyor(std::move(node)); } void poll() override { poll_impl(0); } void wait() override { poll_impl(-1); } void wait(const std::chrono::steady_clock::duration &duration) override { poll_impl( std::chrono::duration_cast(duration) .count()); } void wait(const std::chrono::steady_clock::time_point &time_point) override { auto now = std::chrono::steady_clock::now(); if (time_point <= now) { poll(); } else { poll_impl(std::chrono::duration_cast( time_point - now) .count()); } } void wake() override { /// @todo pipe() in the beginning and write something minor into it like /// uint8_t or sth the value itself doesn't matter if (pipefds_[1] < 0) { return; } uint8_t i = 0; ::send(pipefds_[1], &i, sizeof(i), MSG_DONTWAIT); } void subscribe(i_fd_owner &owner, int fd, uint32_t event_mask) { if (epoll_fd_ < 0 || fd < 0) { return; } ::epoll_event event; memset(&event, 0, sizeof(event)); event.events = event_mask | EPOLLET; event.data.ptr = &owner; if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) < 0) { /// @todo error_handling return; } } void unsubscribe(int fd) { if (epoll_fd_ < 0 || fd < 0) { return; } if (::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr) < 0) { /// @todo error_handling return; } } }; ssize_t unix_read(int fd, void *buffer, size_t length); ssize_t unix_write(int fd, const void *buffer, size_t length); class unix_io_stream final : public io_stream, public i_fd_owner { private: own> read_ready_ = nullptr; own> on_read_disconnect_ = nullptr; own> write_ready_ = nullptr; public: unix_io_stream(unix_event_port &event_port, int file_descriptor, int fd_flags, uint32_t event_mask); error_or read(void *buffer, size_t length) override; conveyor read_ready() override; conveyor on_read_disconnected() override; error_or write(const void *buffer, size_t length) override; conveyor write_ready() override; /* void read(void *buffer, size_t min_length, size_t max_length) override; Conveyor readDone() override; Conveyor readReady() override; Conveyor onReadDisconnected() override; void write(const void *buffer, size_t length) override; Conveyor writeDone() override; Conveyor writeReady() override; */ void notify(uint32_t mask) override; }; class unix_server final : public server, public i_fd_owner { private: own>> accept_feeder_ = nullptr; public: unix_server(unix_event_port &event_port, int file_descriptor, int fd_flags); conveyor> accept() override; void notify(uint32_t mask) override; }; class unix_datagram final : public datagram, public i_fd_owner { private: own> read_ready_ = nullptr; own> write_ready_ = nullptr; public: unix_datagram(unix_event_port &event_port, int file_descriptor, int fd_flags); error_or read(void *buffer, size_t length) override; conveyor read_ready() override; error_or write(const void *buffer, size_t length, network_address &dest) override; conveyor write_ready() override; void notify(uint32_t mask) override; }; /** * Helper class which provides potential addresses to NetworkAddress */ class socket_address { private: union { struct sockaddr generic; struct sockaddr_un unix; struct sockaddr_in inet; struct sockaddr_in6 inet6; struct sockaddr_storage storage; } address_; socklen_t address_length_; bool wildcard_; socket_address() : wildcard_{false} {} public: socket_address(const void *sockaddr, socklen_t len, bool wildcard) : address_length_{len}, wildcard_{wildcard} { assert(len <= sizeof(address_)); memcpy(&address_.generic, sockaddr, len); } int socket(int type) const { type |= SOCK_NONBLOCK | SOCK_CLOEXEC; int result = ::socket(address_.generic.sa_family, type, 0); return result; } bool bind(int fd) const { if (wildcard_) { int value = 0; ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &value, sizeof(value)); } int error = ::bind(fd, &address_.generic, address_length_); return error < 0; } struct ::sockaddr *get_raw() { return &address_.generic; } const struct ::sockaddr *get_raw() const { return &address_.generic; } socklen_t get_raw_length() const { return address_length_; } static std::vector resolve(std::string_view str, uint16_t port_hint) { std::vector results; struct ::addrinfo *head; struct ::addrinfo hints; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; std::string port_string = std::to_string(port_hint); bool wildcard = str == "*" || str == "::"; std::string address_string{str}; int error = ::getaddrinfo(address_string.c_str(), port_string.c_str(), &hints, &head); if (error) { return {}; } for (struct ::addrinfo *it = head; it != nullptr; it = it->ai_next) { if (it->ai_addrlen > sizeof(socket_address::address_)) { continue; } results.push_back({it->ai_addr, it->ai_addrlen, wildcard}); } ::freeaddrinfo(head); return results; } }; class unix_network_address final : public os_network_address { private: const std::string path_; uint16_t port_hint_; std::vector addresses_; public: unix_network_address(const std::string &path, uint16_t port_hint, std::vector &&addr) : path_{path}, port_hint_{port_hint}, addresses_{std::move(addr)} {} const std::string &address() const override; uint16_t port() const override; // Custom address info socket_address &unix_address(size_t i = 0); size_t unix_address_size() const; }; class unix_network final : public network { private: unix_event_port &event_port_; public: unix_network(unix_event_port &event_port); conveyor> resolve_address(const std::string &address, uint16_t port_hint = 0) override; own listen(network_address &addr) override; conveyor> connect(network_address &addr) override; own datagram(network_address &addr) override; }; class unix_io_provider final : public io_provider { private: unix_event_port &event_port_; class event_loop event_loop_; unix_network unix_network_; public: unix_io_provider(unix_event_port &port_ref, own port); class network &network() override; own wrap_input_fd(int fd) override; class event_loop &event_loop(); }; i_fd_owner::i_fd_owner(unix_event_port &event_port, int file_descriptor, int fd_flags, uint32_t event_mask) : event_port_{event_port}, file_descriptor_{file_descriptor}, fd_flags_{fd_flags}, event_mask_{event_mask} { event_port_.subscribe(*this, file_descriptor, event_mask); } i_fd_owner::~i_fd_owner() { if (file_descriptor_ >= 0) { event_port_.unsubscribe(file_descriptor_); ::close(file_descriptor_); } } ssize_t unix_read(int fd, void *buffer, size_t length) { return ::recv(fd, buffer, length, 0); } ssize_t unix_write(int fd, const void *buffer, size_t length) { return ::send(fd, buffer, length, 0); } unix_io_stream::unix_io_stream(unix_event_port &event_port, int file_descriptor, int fd_flags, uint32_t event_mask) : i_fd_owner{event_port, file_descriptor, fd_flags, event_mask | EPOLLRDHUP} {} error_or unix_io_stream::read(void *buffer, size_t length) { ssize_t read_bytes = unix_read(fd(), buffer, length); if (read_bytes > 0) { return static_cast(read_bytes); } else if (read_bytes == 0) { return make_error(); } return make_error(); } conveyor unix_io_stream::read_ready() { auto caf = new_conveyor_and_feeder(); read_ready_ = std::move(caf.feeder); return std::move(caf.conveyor); } conveyor unix_io_stream::on_read_disconnected() { auto caf = new_conveyor_and_feeder(); on_read_disconnect_ = std::move(caf.feeder); return std::move(caf.conveyor); } error_or unix_io_stream::write(const void *buffer, size_t length) { ssize_t write_bytes = unix_write(fd(), buffer, length); if (write_bytes > 0) { return static_cast(write_bytes); } int error = errno; if (error == EAGAIN || error == EWOULDBLOCK) { return make_error(); } return make_error(); } conveyor unix_io_stream::write_ready() { auto caf = new_conveyor_and_feeder(); write_ready_ = std::move(caf.feeder); return std::move(caf.conveyor); } void unix_io_stream::notify(uint32_t mask) { if (mask & EPOLLOUT) { if (write_ready_) { write_ready_->feed(); } } if (mask & EPOLLIN) { if (read_ready_) { read_ready_->feed(); } } if (mask & EPOLLRDHUP) { if (on_read_disconnect_) { on_read_disconnect_->feed(); } } } unix_server::unix_server(unix_event_port &event_port, int file_descriptor, int fd_flags) : i_fd_owner{event_port, file_descriptor, fd_flags, EPOLLIN} {} conveyor> unix_server::accept() { auto caf = new_conveyor_and_feeder>(); accept_feeder_ = std::move(caf.feeder); return std::move(caf.conveyor); } void unix_server::notify(uint32_t mask) { if (mask & EPOLLIN) { if (accept_feeder_) { struct ::sockaddr_storage address; socklen_t address_length = sizeof(address); int accept_fd = ::accept4(fd(), reinterpret_cast(&address), &address_length, SOCK_NONBLOCK | SOCK_CLOEXEC); if (accept_fd < 0) { return; } auto fd_stream = heap(event_port_, accept_fd, 0, EPOLLIN | EPOLLOUT); accept_feeder_->feed(std::move(fd_stream)); } } } unix_datagram::unix_datagram(unix_event_port &event_port, int file_descriptor, int fd_flags) : i_fd_owner{event_port, file_descriptor, fd_flags, EPOLLIN | EPOLLOUT} {} namespace { ssize_t unix_read_msg(int fd, void *buffer, size_t length) { struct ::sockaddr_storage their_addr; socklen_t addr_len = sizeof(sockaddr_storage); return ::recvfrom(fd, buffer, length, 0, reinterpret_cast(&their_addr), &addr_len); } ssize_t unix_write_msg(int fd, const void *buffer, size_t length, ::sockaddr *dest_addr, socklen_t dest_addr_len) { return ::sendto(fd, buffer, length, 0, dest_addr, dest_addr_len); } } // namespace error_or unix_datagram::read(void *buffer, size_t length) { ssize_t read_bytes = unix_read_msg(fd(), buffer, length); if (read_bytes > 0) { return static_cast(read_bytes); } return make_error(); } conveyor unix_datagram::read_ready() { auto caf = new_conveyor_and_feeder(); read_ready_ = std::move(caf.feeder); return std::move(caf.conveyor); } error_or unix_datagram::write(const void *buffer, size_t length, network_address &dest) { unix_network_address &unix_dest = static_cast(dest); socket_address &sock_addr = unix_dest.unix_address(); socklen_t sock_addr_length = sock_addr.get_raw_length(); ssize_t write_bytes = unix_write_msg(fd(), buffer, length, sock_addr.get_raw(), sock_addr_length); if (write_bytes > 0) { return static_cast(write_bytes); } return make_error(); } conveyor unix_datagram::write_ready() { auto caf = new_conveyor_and_feeder(); write_ready_ = std::move(caf.feeder); return std::move(caf.conveyor); } void unix_datagram::notify(uint32_t mask) { if (mask & EPOLLOUT) { if (write_ready_) { write_ready_->feed(); } } if (mask & EPOLLIN) { if (read_ready_) { read_ready_->feed(); } } } namespace { bool begins_with(const std::string_view &viewed, const std::string_view &begins) { return viewed.size() >= begins.size() && viewed.compare(0, begins.size(), begins) == 0; } std::variant translate_network_address_to_unix_network_address(network_address &addr) { auto addr_variant = addr.representation(); std::variant os_addr = std::visit( [](auto &arg) -> std::variant { using T = std::decay_t; if constexpr (std::is_same_v) { return static_cast(arg); } auto sock_addrs = socket_address::resolve( std::string_view{arg->address()}, arg->port()); return unix_network_address{arg->address(), arg->port(), std::move(sock_addrs)}; }, addr_variant); return os_addr; } unix_network_address &translate_to_unix_address_ref( std::variant &addr_variant) { return std::visit( [](auto &arg) -> unix_network_address & { using T = std::decay_t; if constexpr (std::is_same_v) { return arg; } else if constexpr (std::is_same_v) { return *arg; } else { static_assert(true, "Cases exhausted"); } }, addr_variant); } } // namespace own unix_network::listen(network_address &addr) { auto unix_addr_storage = translate_network_address_to_unix_network_address(addr); unix_network_address &address = translate_to_unix_address_ref(unix_addr_storage); assert(address.unix_address_size() > 0); if (address.unix_address_size() == 0) { return nullptr; } int fd = address.unix_address(0).socket(SOCK_STREAM); if (fd < 0) { return nullptr; } int val = 1; int rc = ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); if (rc < 0) { ::close(fd); return nullptr; } bool failed = address.unix_address(0).bind(fd); if (failed) { ::close(fd); return nullptr; } ::listen(fd, SOMAXCONN); return heap(event_port_, fd, 0); } conveyor> unix_network::connect(network_address &addr) { auto unix_addr_storage = translate_network_address_to_unix_network_address(addr); unix_network_address &address = translate_to_unix_address_ref(unix_addr_storage); assert(address.unix_address_size() > 0); if (address.unix_address_size() == 0) { return conveyor>{make_error()}; } int fd = address.unix_address(0).socket(SOCK_STREAM); if (fd < 0) { return conveyor>{make_error()}; } own io_str = heap(event_port_, fd, 0, EPOLLIN | EPOLLOUT); bool success = false; for (size_t i = 0; i < address.unix_address_size(); ++i) { socket_address &addr_iter = address.unix_address(i); int status = ::connect(fd, addr_iter.get_raw(), addr_iter.get_raw_length()); if (status < 0) { int error = errno; /* * It's not connected yet... * But edge triggered epolling means that it'll * be ready when the signal is triggered */ /// @todo Add limit node when implemented if (error == EINPROGRESS) { /* Conveyor write_ready = io_stream->writeReady(); return write_ready.then( [ios{std::move(io_stream)}]() mutable { ios->write_ready = nullptr; return std::move(ios); }); */ success = true; break; } else if (error != EINTR) { /// @todo Push error message from return conveyor>{make_error()}; } } else { success = true; break; } } if (!success) { return conveyor>{make_error()}; } return conveyor>{std::move(io_str)}; } own unix_network::datagram(network_address &addr) { auto unix_addr_storage = translate_network_address_to_unix_network_address(addr); unix_network_address &address = translate_to_unix_address_ref(unix_addr_storage); SAW_ASSERT(address.unix_address_size() > 0) { return nullptr; } int fd = address.unix_address(0).socket(SOCK_DGRAM); int optval = 1; int rc = ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); if (rc < 0) { ::close(fd); return nullptr; } bool failed = address.unix_address(0).bind(fd); if (failed) { ::close(fd); return nullptr; } /// @todo return heap(event_port_, fd, 0); } const std::string &unix_network_address::address() const { return path_; } uint16_t unix_network_address::port() const { return port_hint_; } socket_address &unix_network_address::unix_address(size_t i) { assert(i < addresses_.size()); /// @todo change from list to vector? return addresses_.at(i); } size_t unix_network_address::unix_address_size() const { return addresses_.size(); } unix_network::unix_network(unix_event_port &event) : event_port_{event} {} conveyor> unix_network::resolve_address(const std::string &path, uint16_t port_hint) { std::string_view addr_view{path}; { std::string_view str_begins_with = "unix:"; if (begins_with(addr_view, str_begins_with)) { addr_view.remove_prefix(str_begins_with.size()); } } std::vector addresses = socket_address::resolve(addr_view, port_hint); return conveyor>{ heap(path, port_hint, std::move(addresses))}; } unix_io_provider::unix_io_provider(unix_event_port &port_ref, own port) : event_port_{port_ref}, event_loop_{std::move(port)}, unix_network_{ port_ref} {} own unix_io_provider::wrap_input_fd(int fd) { return heap(event_port_, fd, 0, EPOLLIN); } class network &unix_io_provider::network() { return static_cast(unix_network_); } class event_loop &unix_io_provider::event_loop() { return event_loop_; } } // namespace unix error_or setup_async_io() { using namespace unix; try { own prt = heap(); unix_event_port &prt_ref = *prt; own io_provider = heap(prt_ref, std::move(prt)); event_loop &loop_ref = io_provider->event_loop(); return {{std::move(io_provider), loop_ref, prt_ref}}; } catch (std::bad_alloc &) { return make_error(); } } } // namespace saw #endif