achieved basic udp

fb-udp
Claudius Holeksa 2022-01-08 10:46:57 +01:00
parent 8c37b329dc
commit ca76ea4d24
3 changed files with 125 additions and 15 deletions

View File

@ -124,6 +124,73 @@ void UnixServer::notify(uint32_t mask) {
}
}
UnixDatagram::UnixDatagram(UnixEventPort &event_port, int file_descriptor,
int fd_flags)
: IFdOwner{event_port, file_descriptor, fd_flags, EPOLLIN | EPOLLOUT} {}
namespace {
ssize_t unixReadMsg(int fd, void *buffer, size_t length) {
struct ::sockaddr_storage their_addr;
socklen_t addr_len = sizeof(sockaddr_storage);
return ::recvfrom(fd, buffer, length, 0,
reinterpret_cast<struct ::sockaddr *>(&their_addr),
&addr_len);
}
ssize_t unixWriteMsg(int fd, void *buffer, size_t length, ::sockaddr *dest_addr,
socklen_t dest_addr_len) {
return ::sendto(fd, buffer, length, 0, dest_addr, dest_addr_len);
}
} // namespace
ErrorOr<size_t> UnixDatagram::read(void *buffer, size_t length) {
ssize_t read_bytes = unixReadMsg(fd(), buffer, length);
if (read_bytes > 0) {
return static_cast<size_t>(read_bytes);
}
return recoverableError("Currently busy");
}
Conveyor<void> UnixDatagram::readReady() {
auto caf = newConveyorAndFeeder<void>();
read_ready = std::move(caf.feeder);
return std::move(caf.conveyor);
}
ErrorOr<size_t> UnixDatagram::write(void *buffer, size_t length,
NetworkAddress &dest) {
UnixNetworkAddress &unix_dest = static_cast<UnixNetworkAddress &>(dest);
SocketAddress &sock_addr = unix_dest.unixAddress();
socklen_t sock_addr_length = sock_addr.getRawLength();
ssize_t write_bytes = unixWriteMsg(fd(), buffer, length, sock_addr.getRaw(),
sock_addr_length);
if (write_bytes > 0) {
return static_cast<size_t>(write_bytes);
}
return recoverableError("Currently busy");
}
Conveyor<void> UnixDatagram::writeReady() {
auto caf = newConveyorAndFeeder<void>();
write_ready = std::move(caf.feeder);
return std::move(caf.conveyor);
}
void UnixDatagram::notify(uint32_t mask) {
if (mask & EPOLLOUT) {
if (write_ready) {
write_ready->feed();
}
}
if (mask & EPOLLIN) {
if (read_ready) {
read_ready->feed();
}
}
}
namespace {
bool beginsWith(const std::string_view &viewed,
const std::string_view &begins) {
@ -144,11 +211,15 @@ Own<Server> UnixNetworkAddress::listen() {
}
int val = 1;
::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
int rc = ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
if (rc < 0) {
::close(fd);
return nullptr;
}
bool failed = addresses.front().bind(fd);
if (failed) {
::close(fd);
return nullptr;
}
@ -220,12 +291,16 @@ Own<Datagram> UnixNetworkAddress::datagram() {
int optval = 1;
int rc =
::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
SAW_ASSERT(rc == 0) {
if (rc < 0) {
::close(fd);
return nullptr;
}
addresses.front().bind(fd);
bool failed = addresses.front().bind(fd);
if (failed) {
::close(fd);
return nullptr;
}
/// @todo
return nullptr;
}
@ -246,6 +321,14 @@ const std::string &UnixNetworkAddress::address() const { return path; }
uint16_t UnixNetworkAddress::port() const { return port_hint; }
SocketAddress &UnixNetworkAddress::unixAddress(size_t i) {
assert(i < addresses.size());
/// @todo change from list to vector?
return addresses.at(i);
}
size_t UnixNetworkAddress::unixAddressSize() const { return addresses.size(); }
UnixNetwork::UnixNetwork(UnixEventPort &event) : event_port{event} {}
Conveyor<Own<NetworkAddress>> UnixNetwork::parseAddress(const std::string &path,
@ -258,7 +341,7 @@ Conveyor<Own<NetworkAddress>> UnixNetwork::parseAddress(const std::string &path,
}
}
std::list<SocketAddress> addresses =
std::vector<SocketAddress> addresses =
SocketAddress::parse(addr_view, port_hint);
return Conveyor<Own<NetworkAddress>>{heap<UnixNetworkAddress>(

View File

@ -312,6 +312,24 @@ public:
void notify(uint32_t mask) override;
};
class UnixDatagram final : public Datagram, public IFdOwner {
private:
Own<ConveyorFeeder<void>> read_ready = nullptr;
Own<ConveyorFeeder<void>> write_ready = nullptr;
public:
UnixDatagram(UnixEventPort &event_port, int file_descriptor, int fd_flags);
ErrorOr<size_t> read(void *buffer, size_t length) override;
Conveyor<void> readReady() override;
ErrorOr<size_t> write(void *buffer, size_t length,
NetworkAddress &dest) override;
Conveyor<void> writeReady() override;
void notify(uint32_t mask) override;
};
/**
* Helper class which provides potential addresses to NetworkAddress
*/
@ -353,13 +371,17 @@ public:
return error < 0;
}
struct ::sockaddr *getRaw() {
return &address.generic;
}
const struct ::sockaddr *getRaw() const { return &address.generic; }
socklen_t getRawLength() const { return address_length; }
static std::list<SocketAddress> parse(std::string_view str,
uint16_t port_hint) {
std::list<SocketAddress> results;
static std::vector<SocketAddress> parse(std::string_view str,
uint16_t port_hint) {
std::vector<SocketAddress> results;
struct ::addrinfo *head;
struct ::addrinfo hints;
@ -393,11 +415,11 @@ private:
UnixEventPort &event_port;
const std::string path;
uint16_t port_hint;
std::list<SocketAddress> addresses;
std::vector<SocketAddress> addresses;
public:
UnixNetworkAddress(UnixEventPort &event_port, const std::string &path,
uint16_t port_hint, std::list<SocketAddress> &&addr)
uint16_t port_hint, std::vector<SocketAddress> &&addr)
: event_port{event_port}, path{path}, port_hint{port_hint},
addresses{std::move(addr)} {}
@ -411,6 +433,10 @@ public:
const std::string &address() const override;
uint16_t port() const override;
// Custom address info
SocketAddress &unixAddress(size_t i = 0);
size_t unixAddressSize() const;
};
class UnixNetwork final : public Network {

View File

@ -74,7 +74,7 @@ private:
public:
AsyncIoStream(Own<IoStream> str);
void read(void *buffer, size_t min_length, size_t max_length) override;
void read(void *buffer, size_t length, size_t max_length) override;
Conveyor<size_t> readDone() override;
@ -102,11 +102,12 @@ class Datagram {
public:
virtual ~Datagram() = default;
virtual void write(void *buffer, size_t length, NetworkAddress &dest) = 0;
virtual Conveyor<size_t> writeDone() = 0;
virtual ErrorOr<size_t> read(void *buffer, size_t length) = 0;
virtual Conveyor<void> readReady() = 0;
virtual void read(void *buffer, size_t min_length, size_t max_length) = 0;
virtual Conveyor<size_t> readDone() = 0;
virtual ErrorOr<size_t> write(void *buffer, size_t length,
NetworkAddress &dest) = 0;
virtual Conveyor<void> writeReady() = 0;
};
class NetworkAddress {