Merge branch 'master' of ssh://git.keldu.de:64730/kelgin/kelgin into master

This commit is contained in:
keldu.magnus 2020-10-13 16:54:03 +02:00
commit e3462a06bf
5 changed files with 53 additions and 9 deletions

View File

@ -30,7 +30,7 @@ def add_kel_source_files(self, sources, filetype, lib_env=None, shared=False, ta
pass
env=Environment(CPPPATH=['#source','#','#driver'],
CXX='clang++',
CXX='c++',
CPPDEFINES=['GIN_UNIX'],
CXXFLAGS=['-std=c++17','-g','-Wall','-Wextra'],
LIBS=[])

View File

@ -21,26 +21,34 @@ void UnixIoStream::readStep() {
ssize_t n = ::read(fd(), task.buffer, task.max_length);
if (n < 0) {
if (n <= 0) {
if( n == 0 ){
if(on_read_disconnect){
on_read_disconnect->feed();
}
break;
}
int error = errno;
if (error == EAGAIN) {
if (error == EAGAIN || error == EWOULDBLOCK) {
break;
} else {
if (read_done) {
read_done->fail(criticalError("Read failed"));
}
read_tasks.pop();
}
} else if (static_cast<size_t>(n) >= task.min_length &&
static_cast<size_t>(n) <= task.max_length) {
if (read_done) {
read_done->feed(static_cast<size_t>(n));
}
size_t max_len = task.max_length;
read_tasks.pop();
} else {
task.buffer = reinterpret_cast<uint8_t *>(task.buffer) + n;
task.min_length -= static_cast<size_t>(n);
task.max_length -= static_cast<size_t>(n);
}
read_tasks.pop();
}
}
@ -52,23 +60,24 @@ void UnixIoStream::writeStep() {
if (n < 0) {
int error = errno;
if (error == EAGAIN) {
if (error == EAGAIN || error == EWOULDBLOCK) {
break;
} else {
if (write_done) {
write_done->fail(criticalError("Write failed"));
}
write_tasks.pop();
}
} else if (static_cast<size_t>(n) == task.length) {
if (write_done) {
write_done->feed(static_cast<size_t>(task.length));
}
write_tasks.pop();
} else {
task.buffer = reinterpret_cast<const uint8_t *>(task.buffer) +
static_cast<size_t>(n);
task.length -= static_cast<size_t>(n);
}
write_tasks.pop();
}
}
@ -131,6 +140,7 @@ void UnixIoStream::notify(uint32_t mask) {
if (mask & EPOLLIN) {
readStep();
}
if (mask & EPOLLRDHUP) {
if (on_read_disconnect) {
on_read_disconnect->feed();
@ -175,7 +185,26 @@ bool beginsWith(const std::string_view &viewed,
}
} // namespace
Own<Server> UnixNetworkAddress::listen() { return nullptr; }
Own<Server> UnixNetworkAddress::listen() {
assert(addresses.size() > 0);
if (addresses.size() == 0) {
return nullptr;
}
int fd = addresses.front().socket(SOCK_STREAM);
if (fd < 0) {
return nullptr;
}
int val = 1;
::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
addresses.front().bind(fd);
::listen(fd, SOMAXCONN);
return heap<UnixServer>(event_port, fd, 0);
}
Own<IoStream> UnixNetworkAddress::connect() {
assert(addresses.size() > 0);

View File

@ -88,6 +88,7 @@ private:
auto equal_range = signal_conveyors.equal_range(signal);
for (auto iter = equal_range.first; iter != equal_range.second;
++iter) {
if (iter->second) {
if (iter->second->space() > 0) {
iter->second->feed();
@ -184,6 +185,18 @@ public:
void wait() override { pollImpl(-1); }
void wait(const std::chrono::steady_clock::duration &duration) override {
pollImpl(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
}
void wait(const std::chrono::steady_clock::time_point &time_point) override{
auto now = std::chrono::steady_clock::now();
if(time_point <= now){
poll();
}else{
pollImpl(std::chrono::duration_cast<std::chrono::milliseconds>(time_point-now).count());
}
}
void subscribe(IFdOwner &owner, int fd, uint32_t event_mask) {
if (epoll_fd < 0 || fd < 0) {
return;

View File

@ -194,7 +194,7 @@ bool EventLoop::turn() {
bool EventLoop::wait(const std::chrono::steady_clock::duration &duration) {
if (event_port) {
event_port->wait();
event_port->wait(duration);
}
while (head) {
@ -207,7 +207,7 @@ bool EventLoop::wait(const std::chrono::steady_clock::duration &duration) {
bool EventLoop::wait(const std::chrono::steady_clock::time_point &time_point) {
if (event_port) {
event_port->wait();
event_port->wait(time_point);
}
while (head) {

View File

@ -193,6 +193,8 @@ public:
virtual void poll() = 0;
virtual void wait() = 0;
virtual void wait(const std::chrono::steady_clock::duration &) = 0;
virtual void wait(const std::chrono::steady_clock::time_point &) = 0;
};
class SinkConveyorNode;