PeerMgr

Establishes new connections either by dial or accept And associates active connections with a public key Most operations are asynchronous and when completed will send a message to the calling thread.

@safe
struct PeerMgr {
nng_stream_listener* listener;
string address;
size_t bufsize;
uint compression_level;
Dialer*[uint] dialers;
LRU!(uint, Peer*) all_peers;
nng_aio* aio_conn;
string task_name;
}

Constructors

this
this(immutable(NodeInterfaceOptions) opts)

Destructor

~this
~this()

free nng memory

Postblit

this(this)
this(this)

copy/postblit disabled

Members

Functions

dial
void dial(string address, uint id)

Connect to an address an associate it with a public key

listen
void listen()

* Listen on the specified address. * This should be called before doing anything else. * Finishes immediately

send
void send(uint id, Buffer buf)

Send to an active connection with a known public key

Examples

1 uint last_id;
2 void unit_handler(ref PeerMgr sender, ref PeerMgr receiver) {
3     receiveOnlyTimeout(1.seconds, 
4             (NodeAction a, uint id) {
5                 if(a is NodeAction.accepted) {
6                     receiver.update(a, id);
7                     receiver.recv(id);
8                 }
9                 else if(a is NodeAction.sent) {
10                     sender.update(a, id);
11                     sender.recv(id);
12                 }
13                 else {
14                     sender.update(a, id);
15                 }
16             },
17             (NodeAction a, uint id, Buffer buf) {
18                 last_id = id;
19                 receiver.update(a, id);
20             }
21     );
22 }
23 
24 thisActor.task_name = "jens";
25 
26 import std.stdio;
27 
28 auto net1 = new StdSecureNet();
29 net1.generateKeyPair("me1");
30 
31 auto net2 = new StdSecureNet();
32 net2.generateKeyPair("me2");
33 
34 auto dialer = PeerMgr(NodeInterfaceOptions(node_address: "abstract://whomisam" ~ generateId.to!string, bufsize: 256, pool_size: 2));
35 auto listener = PeerMgr(NodeInterfaceOptions(node_address: "abstract://whomisam" ~ generateId.to!string, bufsize: 256, pool_size: 2));
36 
37 dialer.listen();
38 listener.listen();
39 
40 dialer.dial(listener.address, 1);
41 listener.accept();
42 
43 unit_handler(dialer, listener);
44 unit_handler(dialer, listener);
45 
46 assert(dialer.isActive(1));
47 
48 assert(dialer.all_peers.length == 1);
49 assert(listener.all_peers.length == 1);
50 assert(dialer.all_peers[].all!(e => e.value.state is Peer.State.ready));
51 assert(listener.all_peers[].all!(e => e.value.state is Peer.State.receive));
52 
53 {
54     // listener.recv
55     Buffer send_payload_p1 = HiRPC(net1).action("manythanks").serialize;
56 
57     dialer.send(1, send_payload_p1);
58 
59     unit_handler(dialer, listener);
60     unit_handler(dialer, listener);
61 
62     assert(listener.all_peers.length == 1);
63 }
64 
65 {
66     // dialer.recv
67     Buffer send_payload_p2 = HiRPC(net2).action("manythanks").serialize;
68     listener.send(last_id, send_payload_p2);
69 
70     unit_handler(listener, dialer);
71     unit_handler(listener, dialer);
72 }
73 
74 {
75     // listener.recv
76     Buffer send_payload_p1 = HiRPC(net1).action("manythanks").serialize;
77 
78     dialer.send(1, send_payload_p1);
79 
80     unit_handler(dialer, listener);
81     unit_handler(dialer, listener);
82 
83     assert(listener.all_peers.length == 1);
84 }
85 
86 dialer.stop();
87 listener.stop();