PeerMgr

Establishes new connections either by dial or accept And associates active connections with a public key Most operations are asynchronous and when completed will send a message to the calling thread.

@safe
struct PeerMgr {
nng_stream_listener* listener;
string address;
size_t bufsize;
uint compression_level;
Dialer*[uint] dialers;
LRU!(uint, Peer*) all_peers;
nng_aio* aio_conn;
string task_name;
}

Constructors

this
this(immutable(NodeInterfaceOptions) opts)

Destructor

~this
~this()

free nng memory

Postblit

this(this)
this(this)

copy/postblit disabled

Members

Functions

dial
void dial(string address, uint id)

Connect to an address an associate it with a public key

listen
void listen()

* Listen on the specified address. * This should be called before doing anything else. * Finishes immediately

send
void send(uint id, Buffer buf)

Send to an active connection with a known public key

Examples

1 uint last_id;
2 void unit_handler(ref PeerMgr sender, ref PeerMgr receiver) {
3     receiveOnlyTimeout(1.seconds, 
4             (NodeAction a, uint id) {
5                 if(a is NodeAction.accepted) {
6                     receiver.update(a, id);
7                     receiver.recv(id);
8                 }
9                 else if(a is NodeAction.sent) {
10                     sender.update(a, id);
11                     sender.recv(id);
12                 }
13                 else {
14                     sender.update(a, id);
15                 }
16             },
17             (NodeAction a, uint id, Buffer buf) {
18                 last_id = id;
19                 receiver.update(a, id);
20             }
21     );
22 }
23 
24 thisActor.task_name = "jens";
25 
26 auto net1 = new StdSecureNet();
27 net1.generateKeyPair("me1");
28 
29 auto net2 = new StdSecureNet();
30 net2.generateKeyPair("me2");
31 
32 auto dialer = PeerMgr(NodeInterfaceOptions(node_address: "abstract://whomisam" ~ generateId.to!string, bufsize: 256, pool_size: 2));
33 auto listener = PeerMgr(NodeInterfaceOptions(node_address: "abstract://whomisam" ~ generateId.to!string, bufsize: 256, pool_size: 2));
34 
35 dialer.listen();
36 listener.listen();
37 
38 dialer.dial(listener.address, 1);
39 listener.accept();
40 
41 unit_handler(dialer, listener);
42 unit_handler(dialer, listener);
43 
44 assert(dialer.isActive(1));
45 
46 assert(dialer.all_peers.length == 1);
47 assert(listener.all_peers.length == 1);
48 assert(dialer.all_peers[].all!(e => e.value.state is Peer.State.ready));
49 assert(listener.all_peers[].all!(e => e.value.state is Peer.State.receive));
50 
51 {
52     // listener.recv
53     Buffer send_payload_p1 = HiRPC(net1).action("manythanks").serialize;
54 
55     dialer.send(1, send_payload_p1);
56 
57     unit_handler(dialer, listener);
58     unit_handler(dialer, listener);
59 
60     assert(listener.all_peers.length == 1);
61 }
62 
63 {
64     // dialer.recv
65     Buffer send_payload_p2 = HiRPC(net2).action("manythanks").serialize;
66     listener.send(last_id, send_payload_p2);
67 
68     unit_handler(listener, dialer);
69     unit_handler(listener, dialer);
70 }
71 
72 {
73     // listener.recv
74     Buffer send_payload_p1 = HiRPC(net1).action("manythanks").serialize;
75 
76     dialer.send(1, send_payload_p1);
77 
78     unit_handler(dialer, listener);
79     unit_handler(dialer, listener);
80 
81     assert(listener.all_peers.length == 1);
82 }
83 
84 dialer.stop();
85 listener.stop();