summaryrefslogtreecommitdiff
path: root/modules/remote-thread/c++/remote.hpp
blob: 280101aefd740334137b0691efd48ce57b669370 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#pragma once

#include <thread>

namespace saw {
/**
 * ### Device ###
 * Device acts a launcher?
 * 
 * Acts a logical device.
 * Though logical devices are not findable by an address as of now.
 *
 * Generally a device represents some info about the remote object?
 * But what exactly? Does it store capabilities?
 *
 * In that case I'd prefer it has compile time information about
 * those.
 * 
 * ### RpcServer ###
 * Should always be created on the side which it is run on.
 * For SYCL it's kernel launches, so it's created on the local
 * thread.
 * For Threads it's supposed to be created on the remote thread.
 * How do I solve this cleanly?
 * Technically the server shouldn't know about the device.
 * It should register with an authority, so it gets requests
 * though.
 */
namespace rmt {
struct Thread {};
}

namespace impl {
template<typename Iface, typename Encoding, typename Storage>
class thread_rpc_communication_handler final {
private:
	std::mutex mut_;

	using FunctionT = std::function<error_or<void>(rpc_server<Iface, Encoding, rmt::Thread>&)>;
	std::deque<FunctionT> dispatches_;

	// TODO Need a send + receive + erase request queue
	// std::deque<int>;
public:
	thread_rpc_communication_handler() = default;

	template<string_literal Lit>
	error_or<void> call(id<Void> dat_id){
		std::lock_guard lock{mut_};

		dispatches_.emplace_back([dat_id](rpc_server<Iface, Encoding, rmt::Thread>& srv){
			srv.template call<Lit>(dat_id);
		});
	}

	error_or<void> run_next_dispatch(rpc_server<Iface, Encoding, rmt::Thread>& srv){
		std::lock_guard lock{mut_};
		if(dispatches_.empty()){
			return make_error<err::recoverable>("Dispatch Queue is empty");
		}
		
		ref<FunctionT> front{dispatches_.front()};

		front()();
	}
};

}

template<Iface, Encoding>
class rpc_server<Iface, Encoding, rmt::Thread> {
private:
	our<impl::thread_rpc_communication_handler<Iface, Encoding>> comms_;
public:	
};

template<Iface, Encoding>
class rpc_client<Iface, Encoding, rmt::Thread> {
private:
	our<impl::thread_rpc_communication_handler<Iface, Encoding>> comms_;
public:
	rpc_client(our<impl::thread_rpc_communication_handler<Iface, Encoding>> comms__):
		comms_{std::move(comms__)}
	{}
};

/**
 * A device representing a remote thread. Technically it's
 * a logical distinction and not a physical.
 */
template<>
class device<rmt::Thread> final {
private:
	event_loop ev_loop_;
	bool keep_running_;
	std::function<void()> run_func_;

	// std::vector<std::function<error_or<void>()>> func_calls_;

	std::thread thread_;

	void run(){
		wait_scope wait{ev_loop_};

		while(keep_running_){
			run_func_();
			wait.wait(std::chrono::seconds{16u});
		}

		wait.poll();
	}
public:
	template<typename Func>
	device(Func func):
		ev_loop_{},
		keep_running_{true},
		run_func_{std::move(func)}
		thread_{&device<rmt::Thread>::run, this},
	{}

	void stop(){
		keep_running_ = false;
	}
};

template<>
class remote<rmt::Thread> final {
	private:
public:
		remote() = default;

		conveyor<own<remote_address<rmt::Thread>>> resolve_address(){
			return heap<remote_address<rmt::Thread>>(*this);
		}

		device<rmt::Thread> connect_device(const remote_address<rmt::Thread>& ){
			return {};
		}
};
}