summaryrefslogtreecommitdiff
path: root/modules/io_codec/examples/peer_echo_client.cpp
blob: 7cd6fe7648f62188cf02e316ed5081954169692c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#include "echo.hpp"

#include "../c++/io_peer.hpp"

#include <forstio/codec/transport.hpp>
#include <forstio/codec/args.hpp>

#include <iostream>

namespace sch {
using namespace saw::schema;

using EchoArgs = Args<
	Struct<>,
	Tuple<String>
>;
}

int main(int argc, char** argv){
	using namespace saw;

	data<sch::EchoArgs, encode::Args> dat_args{
		argc,
		argv
	};

	codec<sch::EchoArgs, encode::Args> codec_args;

	data<sch::EchoArgs> dat_nat;
	auto eov = codec_args.decode(dat_args, dat_nat);
	if(eov.is_error()){
		auto& err = eov.get_error();
		std::cout<<err.get_category();
		auto err_msg = err.get_message();
		if(err_msg.size()>0u){
			std::cout<<" - "<<err_msg;
		}
		std::cout<<std::endl;
		return -4;
	}

	auto& tup = dat_nat.template get<"positionals">();
	auto& nat_echo = tup.template get<0>();

	auto eo_aio = saw::setup_async_io();
	if(eo_aio.is_error()){
		auto& err = eo_aio.get_error();
		std::cerr<<err.get_message()<<std::endl;
		return err.get_id();
	}
	auto& aio = eo_aio.get_value();
	/**
	 * Make the event loop the current event loop on this thread
	 */
	saw::wait_scope wait{aio.event_loop};

	bool keep_running = true;
	aio.event_port.on_signal(saw::Signal::Terminate).then([&keep_running](){
		keep_running = false;
	}).detach();

	auto& network = aio.io->get_network();

	auto eo_addr = network.resolve_address(saw::echo_address, saw::echo_port).take();
	if(eo_addr.is_error()){
		return -1;
	}
	auto& addr = eo_addr.get_value();

	codec<sch::Echo, encode::KelSimple> simple_codec;

	network.connect(*addr).then([&](saw::own<saw::io_stream> client){
		if(!client){
			return;
		}

		auto echo_stream = saw::heap<saw::async_io_stream>(std::move(client));
		auto echo_peer_stream_p = saw::new_streaming_io_peer<sch::Echo, sch::Echo, trans::FixedLength<8u>, encode::KelSimple, ring_buffer>(std::move(echo_stream));

		std::cout<<"Connected"<<std::endl;

		data<sch::Echo, encode::KelSimple> simple_echo;
		{
			auto eov = simple_codec.encode(nat_echo, simple_echo);
			if(eov.is_error()){
				return;
			}
		}
		{
			auto eo_send = echo_peer_stream_p.first->send(std::move(simple_echo));
			if(eo_send.is_error()){
				return;
			}
		}
		std::cout<<"Sent data"<<std::endl;
		
		echo_peer_stream_p.second.then([&](auto simp_resp){
			data<sch::Echo> nat_resp;
			auto eov = simple_codec.decode(simp_resp, nat_resp);
			if(nat_resp.size() != nat_echo.size()){
				exit(-2);
			}
			std::cout<<"Answer:\n";
			for(uint64_t i = 0u; i < nat_resp.size(); ++i){
				if (nat_resp.at(i) != nat_echo.at(i)){
					exit(-3);
				}
				std::cout<<nat_resp.at(i);
			}
			std::cout<<std::endl;

			keep_running = false;
			return eov;
		}).detach();
		
		auto peer_str = echo_peer_stream_p.first.get();
		peer_str->on_disconnected().attach(std::move(echo_peer_stream_p.first)).then([&]() -> error_or<void>{
			keep_running = false;
			std::cout<<"Disconnected"<<std::endl;
			return make_error<err::critical>("Destroy pipeline on purpose :>");
		}).detach();
		
	}).detach();

	while(keep_running){
		wait.wait_for(1000*1000);
	}

	return 0;
}