PeerMgr

Establishes new connections either by dial or accept And associates active connections with a public key Most operations are asynchronous and when completed will send a message to the calling thread.

@safe
struct PeerMgr {
nng_stream_listener* listener;
string address;
size_t bufsize;
uint compression_level;
Dialer*[uint] dialers;
LRU!(uint, Peer*) all_peers;
nng_aio* aio_conn;
string task_name;
}

Constructors

this
this(immutable(NodeInterfaceOptions) opts)

Destructor

~this
~this()

free nng memory

Postblit

this(this)
this(this)

copy/postblit disabled

Members

Functions

dial
void dial(string address, uint id)

Connect to an address an associate it with a public key

listen
void listen()

* Listen on the specified address. * This should be called before doing anything else. * Finishes immediately

send
void send(uint id, Buffer buf)

Send to an active connection with a known public key

Examples

1 uint last_id;
2 void unit_handler(ref PeerMgr sender, ref PeerMgr receiver) {
3     receiveOnlyTimeout(1.seconds,
4             (NodeAction a, uint id) {
5         if (a is NodeAction.accepted) {
6             receiver.update(a, id);
7             receiver.recv(id);
8         }
9         else if (a is NodeAction.sent) {
10             sender.update(a, id);
11             sender.recv(id);
12         }
13         else {
14             sender.update(a, id);
15         }
16     },
17             (NodeAction a, uint id, Buffer buf) { last_id = id; receiver.update(a, id); }
18     );
19 }
20 
21 thisActor.task_name = "jens";
22 
23 auto net1 = createSecureNet;
24 net1.generateKeyPair("me1");
25 
26 auto net2 = createSecureNet;
27 net2.generateKeyPair("me2");
28 
29 auto dialer = PeerMgr(NodeInterfaceOptions(node_address: "abstract://whomisam" ~ generateId.to!string, bufsize: 256, pool_size: 2));
30 auto listener = PeerMgr(NodeInterfaceOptions(node_address: "abstract://whomisam" ~ generateId.to!string, bufsize: 256, pool_size: 2));
31 
32 dialer.listen();
33 listener.listen();
34 
35 dialer.dial(listener.address, 1);
36 listener.accept();
37 
38 unit_handler(dialer, listener);
39 unit_handler(dialer, listener);
40 
41 assert(dialer.isActive(1));
42 
43 assert(dialer.all_peers.length == 1);
44 assert(listener.all_peers.length == 1);
45 assert(dialer.all_peers[].all!(e => e.value.state is Peer.State.ready));
46 assert(listener.all_peers[].all!(e => e.value.state is Peer.State.receive));
47 
48 {
49     // listener.recv
50     Buffer send_payload_p1 = HiRPC(net1).action("manythanks").serialize;
51 
52     dialer.send(1, send_payload_p1);
53 
54     unit_handler(dialer, listener);
55     unit_handler(dialer, listener);
56 
57     assert(listener.all_peers.length == 1);
58 }
59 
60 {
61     // dialer.recv
62     Buffer send_payload_p2 = HiRPC(net2).action("manythanks").serialize;
63     listener.send(last_id, send_payload_p2);
64 
65     unit_handler(listener, dialer);
66     unit_handler(listener, dialer);
67 }
68 
69 {
70     // listener.recv
71     Buffer send_payload_p1 = HiRPC(net1).action("manythanks").serialize;
72 
73     dialer.send(1, send_payload_p1);
74 
75     unit_handler(dialer, listener);
76     unit_handler(dialer, listener);
77 
78     assert(listener.all_peers.length == 1);
79 }
80 
81 dialer.stop();
82 listener.stop();