1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
#pragma once
#include "remote.hpp"
#include <forstio/buffer.hpp>
#include <forstio/remote/transfer.hpp>
#include <fstream>
#include <cstring>
namespace saw {
template<typename Schema, typename Encoding>
class data_server<Schema, Encoding, rmt::File> final : public i_data_server<rmt::File> {
private:
ptr<remote<rmt::File>> remote_;
remote_address<rmt::File> addr_;
static error generate_invalid_id_error() {
return make_error<err::invalid_state>("rmt::File data_server only supports Id 0.");
}
public:
static constexpr std::pair<uint32_t,uint32_t> class_id{schema_hash<Schema>::apply(), schema_hash<Encoding>::apply()};
data_server(ptr<remote<rmt::File>> remote__, ref<remote_address<rmt::File>> addr__):
remote_{remote__},
addr_{addr__()}
{}
~data_server(){
remote_().deregister_data_server(*this);
}
SAW_FORBID_MOVE(data_server);
SAW_FORBID_COPY(data_server);
std::pair<uint32_t,uint32_t> get_class_id() const override {
return class_id;
}
error_or<void> send(data<Schema, Encoding>& dat, id<Schema> store_id){
if(store_id.get_value() > 0u){
return generate_invalid_id_error();
}
std::fstream s{addr_.get_path(), s.binary | s.out | s.trunc};
if(!s.is_open()){
return make_error<err::not_found>("Couldn't open file");
}
auto& buff = dat.get_buffer();
buffer_view buff_v{buff};
while(buff_v.read_segment_length() > 0u){
s.write(reinterpret_cast<char*>(&buff_v.read()), buff_v.read_segment_length());
buff_v.read_advance(buff_v.read_segment_length());
}
s.flush();
if(s.bad()){
return make_error<err::invalid_state>("Stream failed to write.");
}
if(s.fail()){
return make_error<err::invalid_state>("Stream encountered non-integer data.");
}
return make_void();
}
error_or<void> allocate(data<typename meta_schema<Schema>::MetaSchema, Encoding> meta, id<Schema> store_id){
if(store_id.get_value() > 0u){
return generate_invalid_id_error();
}
return make_error<err::not_implemented>();
}
error_or<void> erase(id<Schema> store_id){
if(store_id.get_value() > 0u){
return generate_invalid_id_error();
}
return make_error<err::not_implemented>();
}
error_or<data<Schema,Encoding>> receive(id<Schema> store_id){
if(store_id.get_value() > 0u){
return generate_invalid_id_error();
}
data<Schema, Encoding> dat{heap<array_buffer>(1u)};
std::fstream s{addr_.get_path(), s.binary | s.in};
if(!s.is_open()){
return make_error<err::not_found>("Couldn't open file");
}
uint8_t ele{};
while(s.readsome(reinterpret_cast<char*>(&ele), 1u) > 0u){
auto err = dat.get_buffer().push(ele,1u);
if(err.failed()){
return err;
}
}
return dat;
}
};
template <typename Schema, typename Encoding>
class data_client<Schema, Encoding, rmt::File> {
private:
ptr<remote<rmt::File>> remote_;
ptr<data_server<Schema, Encoding, rmt::File>> srv_;
public:
data_client(ptr<remote<rmt::File>> remote__, ptr<data_server<Schema, Encoding,rmt::File>> srv__):
remote_{remote__},
srv_{srv__}
{}
error_or<id<Schema>> send(data<Schema, Encoding>& dat){
auto eov = srv_().send(data, {0u});
if(eov.is_error()){
return std::move(eov.get_error());
}
return id<Schema>{0u};
}
error_or<data<Schema, Encoding>> receive(id<Schema> id_ = {0u}){
return srv_().receive(id_);
}
};
}
|