sink bug which didn't add the nodes into the sink

This commit is contained in:
keldu 2020-08-29 19:43:03 +02:00
parent 2a3a6407c0
commit 1b877f9127
3 changed files with 40 additions and 12 deletions

View File

@ -22,6 +22,7 @@
#include <unistd.h>
#include <unordered_map>
#include <vector>
#include "io.h"
@ -64,14 +65,26 @@ private:
std::unordered_multimap<Signal, Own<ConveyorFeeder<void>>> signal_conveyors;
void notifySignalListener(int sig) {
Signal signal;
switch (sig) {
case SIGTERM:
std::vector<int> toUnixSignal(Signal signal) const {
switch (signal) {
case Signal::Terminate:
default:
signal = Signal::Terminate;
break;
return {SIGTERM, SIGQUIT, SIGINT};
}
}
Signal fromUnixSignal(int signal) const {
switch (signal) {
case SIGTERM:
case SIGINT:
case SIGQUIT:
default:
return Signal::Terminate;
}
}
void notifySignalListener(int sig) {
Signal signal = fromUnixSignal(sig);
auto equal_range = signal_conveyors.equal_range(signal);
for (auto iter = equal_range.first; iter != equal_range.second;
@ -84,7 +97,7 @@ private:
}
}
bool poll(int time) {
bool pollImpl(int time) {
epoll_event events[MAX_EPOLL_EVENTS];
int nfds = 0;
do {
@ -154,12 +167,22 @@ public:
signal_conveyors.insert(std::make_pair(signal, std::move(caf.feeder)));
std::vector<int> sig = toUnixSignal(signal);
for (auto iter = sig.begin(); iter != sig.end(); ++iter) {
::sigaddset(&signal_fd_set, *iter);
}
::sigprocmask(SIG_BLOCK, &signal_fd_set, nullptr);
::signalfd(signal_fd, &signal_fd_set, SFD_NONBLOCK | SFD_CLOEXEC);
auto node_and_storage =
Conveyor<void>::fromConveyor(std::move(caf.conveyor));
return Conveyor<void>::toConveyor(std::move(node_and_storage.first),
node_and_storage.second);
}
void poll() override { pollImpl(0); }
void subscribe(IFdOwner &owner, int fd, uint32_t event_mask) {
if (epoll_fd < 0 || fd < 0) {
return;

View File

@ -3,8 +3,6 @@
#include <algorithm>
#include <cassert>
#include <iostream>
namespace gin {
namespace {
thread_local EventLoop *local_loop = nullptr;
@ -193,6 +191,9 @@ bool EventLoop::wait(const std::chrono::steady_clock::time_point &time_point) {
bool EventLoop::wait() { return false; }
bool EventLoop::poll() {
if (event_port) {
event_port->poll();
}
while (head) {
if (!turn()) {
return false;
@ -245,6 +246,8 @@ void ConveyorSink::add(Conveyor<void> &&sink) {
if (nas.second) {
nas.second->setParent(sink_node.get());
}
sink_nodes.push_back(std::move(sink_node));
}
void ConveyorSink::fire() {

View File

@ -171,6 +171,8 @@ public:
virtual ~EventPort() = default;
virtual Conveyor<void> onSignal(Signal signal) = 0;
virtual void poll() = 0;
};
class SinkConveyorNode;
@ -544,7 +546,7 @@ public:
};
} // namespace gin
#include <cassert>
// Template inlining
namespace gin {
template <typename T> T reduceErrorOrType(T *);
@ -592,13 +594,13 @@ void detachConveyor(Conveyor<void> &&conveyor);
template <typename T>
template <typename ErrorFunc>
void Conveyor<T>::detach(ErrorFunc &&func) {
detachConveyor(then([](T &&) {}, std::move(func)));
detachConveyor(std::move(then([](T &&) {}, std::move(func))));
}
template <>
template <typename ErrorFunc>
void Conveyor<void>::detach(ErrorFunc &&func) {
detachConveyor(then([]() {}, std::move(func)));
detachConveyor(std::move(then([]() {}, std::move(func))));
}
template <typename T>