nonblocking tls

This commit is contained in:
keldu.magnus 2021-10-07 18:09:05 +02:00
parent 9f78b64731
commit 5c475f8992
3 changed files with 93 additions and 47 deletions

View File

@ -193,6 +193,11 @@ public:
*/
[[nodiscard]] Conveyor<T> limit(size_t val = 1);
/**
*
*/
[[nodiscard]] std::pair<Conveyor<T>, MergeConveyor<T>> merge();
/**
* Moves the conveyor chain into a thread local storage point which drops
* every element. Use sink() if you want to control the lifetime of a
@ -200,12 +205,6 @@ public:
*/
template <typename ErrorFunc = PropagateError>
void detach(ErrorFunc &&err_func = PropagateError());
/**
*
*/
[[nodiscard]] std::pair<Conveyor<T>, MergeConveyor<T>> merge();
/**
* Creates a local sink which drops elements, but lifetime control remains
* in your hand.
@ -825,7 +824,7 @@ public:
~MergeConveyorNode();
// Event
void getResult(ErrorOrValue &err_or_val) override;
void getResult(ErrorOrValue &err_or_val) noexcept override;
void fire() override;
@ -850,45 +849,33 @@ public:
};
/*
class JoinConveyorNodeBase : public ConveyorStorage {
public:
virtual ~JoinConveyorNodeBase() = default;
};
template <typename T>
class JoinConveyorNode final : public JoinConveyorNodeBase {
class JoinConveyorNodeBase : public ConveyorNode, public ConveyorEventStorage {
private:
T data;
public:
};
class JoinConveyorMergeNodeBase : public ConveyorNode, public ConveyorStorage {
public:
};
template <typename... Args>
class JoinConveyorMergerNode final : public JoinConveyorMergeNodeBase {
class JoinConveyorNode final : public JoinConveyorNodeBase {
private:
std::tuple<JoinConveyorNode<Args>...> joined;
template<typename T>
class Appendage : public ConveyorEventStorage {
private:
Maybe<T> data = std::nullopt;
public:
size_t space() const override;
size_t queued() const override;
void fire() override;
void getResult(ErrorOrValue& eov) override;
};
std::tuple<Appendage<Args>...> appendages;
public:
void getResult(ErrorOrValue &err_or_val) noexcept override {}
void fire() override;
};
class UniteConveyorNodeBase : public ConveyorNode, public ConveyorStorage {
public:
virtual ~UniteConveyorNodeBase() = default;
};
template <typename T> class UniteConveyorNode : public UniteConveyorNodeBase {
public:
virtual ~UniteConveyorNode() = default;
};
template <typename T> class
*/
} // namespace gin

View File

@ -302,7 +302,8 @@ MergeConveyorNode<T>::MergeConveyorNode(Our<MergeConveyorNodeData<T>> d)
template <typename T> MergeConveyorNode<T>::~MergeConveyorNode() {}
template <typename T> void MergeConveyorNode<T>::getResult(ErrorOrValue &eov) {
template <typename T>
void MergeConveyorNode<T>::getResult(ErrorOrValue &eov) noexcept {
ErrorOr<FixVoid<T>> &err_or_val = eov.as<FixVoid<T>>();
GIN_ASSERT(data) { return; }

View File

@ -99,6 +99,58 @@ Conveyor<Own<IoStream>> TlsServer::accept() {
});
}
namespace {
struct TlsClientStreamHelper {
public:
Own<ConveyorFeeder<Own<IoStream>>> feeder;
SinkConveyor connection_sink;
SinkConveyor stream_reader;
SinkConveyor stream_writer;
Own<TlsIoStream> stream = nullptr;
public:
TlsClientStreamHelper(Own<ConveyorFeeder<Own<IoStream>>> f):
feeder{std::move(f)}
{}
void setupTurn(){
GIN_ASSERT(stream){
return;
}
stream_reader = stream->readReady().then([this](){
turn();
}).sink();
stream_writer = stream->writeReady().then([this](){
turn();
}).sink();
}
void turn(){
if(stream){
// Guarantee that the receiving end is already setup
GIN_ASSERT(feeder){
return;
}
auto &session = stream->session();
int ret;
do {
ret = gnutls_handshake(session);
} while (ret == GNUTLS_E_AGAIN && gnutls_error_is_fatal(ret) == 0);
if(gnutls_error_is_fatal(ret) != 0){
feeder->fail(criticalError("Couldn't create Tls connection"));
}else if(ret == GNUTLS_E_SUCCESS){
feeder->feed(std::move(stream));
}
}
}
};
}
TlsNetworkAddress::TlsNetworkAddress(Own<NetworkAddress> net_addr, const std::string& host_name_, Tls &tls_)
: internal{std::move(net_addr)}, host_name{host_name_}, tls{tls_} {}
@ -109,8 +161,15 @@ Own<Server> TlsNetworkAddress::listen() {
Conveyor<Own<IoStream>> TlsNetworkAddress::connect() {
GIN_ASSERT(internal) { return Conveyor<Own<IoStream>>{nullptr, nullptr}; }
return internal->connect().then([this](
Own<IoStream> stream) -> ErrorOr<Own<IoStream>> {
// Helper setups
auto caf = newConveyorAndFeeder<Own<IoStream>>();
Own<TlsClientStreamHelper> helper = heap<TlsClientStreamHelper>(std::move(caf.feeder));
TlsClientStreamHelper* hlp_ptr = helper.get();
// Conveyor entangled structure
auto prim_conv = internal->connect().then([this, hlp_ptr](
Own<IoStream> stream) -> ErrorOr<void> {
IoStream* inner_stream = stream.get();
auto tls_stream = heap<TlsIoStream>(std::move(stream));
@ -134,17 +193,16 @@ Conveyor<Own<IoStream>> TlsNetworkAddress::connect() {
// gnutls_handshake_set_timeout(session, GNUTLS_DEFAULT_HANDSHAKE_TIMEOUT);
int ret;
do {
ret = gnutls_handshake(session);
} while (ret < 0 && gnutls_error_is_fatal(ret) == 0);
hlp_ptr->stream = std::move(tls_stream);
hlp_ptr->setupTurn();
hlp_ptr->turn();
if(ret < 0){
return criticalError("Couldn't create Tls connection");
}
return Own<IoStream>{std::move(tls_stream)};
return Void{};
});
helper->connection_sink = prim_conv.sink();
return caf.conveyor.attach(std::move(helper));
}
static ssize_t kelgin_tls_push_func(gnutls_transport_ptr_t p, const void *data,