summaryrefslogtreecommitdiff
path: root/modules/remote-thread/c++/remote.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'modules/remote-thread/c++/remote.hpp')
-rw-r--r--modules/remote-thread/c++/remote.hpp140
1 files changed, 140 insertions, 0 deletions
diff --git a/modules/remote-thread/c++/remote.hpp b/modules/remote-thread/c++/remote.hpp
new file mode 100644
index 0000000..39c64a6
--- /dev/null
+++ b/modules/remote-thread/c++/remote.hpp
@@ -0,0 +1,140 @@
+#pragma once
+
+#include <thread>
+
+namespace saw {
+/**
+ * ### Device ###
+ * Device acts a launcher?
+ *
+ * Acts a logical device.
+ * Though logical devices are not findable by an address as of now.
+ *
+ * Generally a device represents some info about the remote object?
+ * But what exactly? Does it store capabilities?
+ *
+ * In that case I'd prefer it has compile time information about
+ * those.
+ *
+ * ### RpcServer ###
+ * Should always be created on the side which it is run on.
+ * For SYCL it's kernel launches, so it's created on the local
+ * thread.
+ * For Threads it's supposed to be created on the remote thread.
+ * How do I solve this cleanly?
+ * Technically the server shouldn't know about the device.
+ * It should register with an authority, so it gets requests
+ * though.
+ */
+namespace rmt {
+struct Thread {};
+}
+
+namespace impl {
+template<typename Iface, typename Encoding, typename Storage>
+class thread_rpc_communication_handler final {
+private:
+ std::mutex mut_;
+
+ using FunctionT = std::function<error_or<void>(rpc_server<Iface, Encoding, Storage, rmt::Thread>&)>;
+ std::deque<FunctionT> dispatches_;
+
+ // TODO Need a send + receive + erase request queue
+ // std::deque<int>;
+public:
+ thread_rpc_communication_handler() = default;
+
+ template<string_literal Lit>
+ error_or<void> call(id<Void> dat_id){
+ std::lock_guard lock{mut_};
+
+ dispatches_.emplace_back([dat_id](rpc_server<Iface, Encoding, Storage, rmt::Thread>& srv){
+ srv.template call<Lit>(dat_id);
+ });
+ }
+
+ error_or<void> run_next_dispatch(rpc_server<Iface, Encoding, Storage, rmt::Thread>& srv){
+ std::lock_guard lock{mut_};
+ if(dispatches_.empty()){
+ return make_error<err::recoverable>("Dispatch Queue is empty");
+ }
+
+ ref<FunctionT> front{dispatches_.front()};
+
+ front()();
+ }
+};
+
+}
+
+template<Iface, Encoding, Storage>
+class rpc_server<Iface, Encoding, Storage, rmt::Thread> {
+private:
+ our<impl::thread_rpc_communication_handler<Iface, Encoding, Storage>> comms_;
+public:
+};
+
+template<Iface, Encoding, Storage>
+class rpc_client<Iface, Encoding, Storage, rmt::Thread> {
+private:
+ our<impl::thread_rpc_communication_handler<Iface, Encoding, Storage>> comms_;
+public:
+ rpc_client(our<impl::thread_rpc_communication_handler<Iface, Encoding, Storage>> comms__):
+ comms_{std::move(comms__)}
+ {}
+};
+
+/**
+ * A device representing a remote thread. Technically it's
+ * a logical distinction and not a physical.
+ */
+template<>
+class device<rmt::Thread> final {
+private:
+ event_loop ev_loop_;
+ bool keep_running_;
+ std::function<void()> run_func_;
+
+ // std::vector<std::function<error_or<void>()>> func_calls_;
+
+ std::thread thread_;
+
+ void run(){
+ wait_scope wait{ev_loop_};
+
+ while(keep_running_){
+ run_func_();
+ wait.wait(std::chrono::seconds{16u});
+ }
+
+ wait.poll();
+ }
+public:
+ template<typename Func>
+ device(Func func):
+ ev_loop_{},
+ keep_running_{true},
+ run_func_{std::move(func)}
+ thread_{&device<rmt::Thread>::run, this},
+ {}
+
+ void stop(){
+ keep_running_ = false;
+ }
+};
+
+template<>
+class remote<rmt::Thread> final {
+ private:
+public:
+ remote() = default;
+
+ conveyor<own<remote_address<rmt::Thread>>> resolve_address(){
+ return heap<remote_address<rmt::Thread>>(*this);
+ }
+
+ device<rmt::Thread> connect_device(const remote_address<rmt::Thread>& ){
+ return {};
+ }
+};
+}