Broker-Enabled Communication Framework

Bro can now use the Broker Library to exchange information with other Bro processes.

Connecting to Peers

Communication via Broker must first be turned on via Broker::enable.

Bro can accept incoming connections by calling Broker::listen and then monitor connection status updates via the Broker::incoming_connection_established and Broker::incoming_connection_broken events.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
connecting-listener.bro

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "listener";

event bro_init()
	{
	Broker::enable();
	Broker::listen(broker_port, "127.0.0.1");
	}

event Broker::incoming_connection_established(peer_name: string)
	{
	print "Broker::incoming_connection_established", peer_name;
	}

event Broker::incoming_connection_broken(peer_name: string)
	{
	print "Broker::incoming_connection_broken", peer_name;
	terminate();
	}

Bro can initiate outgoing connections by calling Broker::connect and then monitor connection status updates via the Broker::outgoing_connection_established, Broker::outgoing_connection_broken, and Broker::outgoing_connection_incompatible events.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
connecting-connector.bro

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "connector";

event bro_init()
	{
	Broker::enable();
	Broker::connect("127.0.0.1", broker_port, 1sec);
	}

event Broker::outgoing_connection_established(peer_address: string,
                                            peer_port: port,
                                            peer_name: string)
	{
	print "Broker::outgoing_connection_established",
		  peer_address, peer_port, peer_name;
	terminate();
	}

Remote Printing

To receive remote print messages, first use the Broker::subscribe_to_prints function to advertise to peers a topic prefix of interest and then create an event handler for Broker::print_handler to handle any print messages that are received.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
printing-listener.bro

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "listener";
global msg_count = 0;

event bro_init()
	{
	Broker::enable();
	Broker::subscribe_to_prints("bro/print/");
	Broker::listen(broker_port, "127.0.0.1");
	}

event Broker::incoming_connection_established(peer_name: string)
	{
	print "Broker::incoming_connection_established", peer_name;
	}

event Broker::print_handler(msg: string)
	{
	++msg_count;
	print "got print message", msg;

	if ( msg_count == 3 )
		terminate();
	}

To send remote print messages, just call Broker::send_print.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
printing-connector.bro

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "connector";

event bro_init()
	{
	Broker::enable();
	Broker::connect("127.0.0.1", broker_port, 1sec);
	}

event Broker::outgoing_connection_established(peer_address: string,
                                            peer_port: port,
                                            peer_name: string)
	{
	print "Broker::outgoing_connection_established",
	      peer_address, peer_port, peer_name;
	Broker::send_print("bro/print/hi", "hello");
	Broker::send_print("bro/print/stuff", "...");
	Broker::send_print("bro/print/bye", "goodbye");
	}

event Broker::outgoing_connection_broken(peer_address: string,
                                       peer_port: port)
	{
	terminate();
	}

Notice that the subscriber only used the prefix “bro/print/”, but is able to receive messages with full topics of “bro/print/hi”, “bro/print/stuff”, and “bro/print/bye”. The model here is that the publisher of a message checks for all subscribers who advertised interest in a prefix of that message’s topic and sends it to them.

Message Format

For other applications that want to exchange print messages with Bro, the Broker message format is simply:

broker::message{std::string{}};

Remote Events

Receiving remote events is similar to remote prints. Just use the Broker::subscribe_to_events function and possibly define any new events along with handlers that peers may want to send.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
events-listener.bro

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "listener";
global msg_count = 0;
global my_event: event(msg: string, c: count);
global my_auto_event: event(msg: string, c: count);

event bro_init()
	{
	Broker::enable();
	Broker::subscribe_to_events("bro/event/");
	Broker::listen(broker_port, "127.0.0.1");
	}

event Broker::incoming_connection_established(peer_name: string)
	{
	print "Broker::incoming_connection_established", peer_name;
	}

event my_event(msg: string, c: count)
	{
	++msg_count;
	print "got my_event", msg, c;

	if ( msg_count == 5 )
		terminate();
	}

event my_auto_event(msg: string, c: count)
	{
	++msg_count;
	print "got my_auto_event", msg, c;

	if ( msg_count == 5 )
		terminate();
	}

There are two different ways to send events. The first is to call the Broker::send_event function directly. The second option is to call the Broker::auto_event function where you specify a particular event that will be automatically sent to peers whenever the event is called locally via the normal event invocation syntax.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
events-connector.bro

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "connector";
global my_event: event(msg: string, c: count);
global my_auto_event: event(msg: string, c: count);

event bro_init()
	{
	Broker::enable();
	Broker::connect("127.0.0.1", broker_port, 1sec);
	Broker::auto_event("bro/event/my_auto_event", my_auto_event);
	}

event Broker::outgoing_connection_established(peer_address: string,
                                            peer_port: port,
                                            peer_name: string)
	{
	print "Broker::outgoing_connection_established",
	      peer_address, peer_port, peer_name;
	Broker::send_event("bro/event/my_event", Broker::event_args(my_event, "hi", 0));
	event my_auto_event("stuff", 88);
	Broker::send_event("bro/event/my_event", Broker::event_args(my_event, "...", 1));
	event my_auto_event("more stuff", 51);
	Broker::send_event("bro/event/my_event", Broker::event_args(my_event, "bye", 2));
	}

event Broker::outgoing_connection_broken(peer_address: string,
                                       peer_port: port)
	{
	terminate();
	}

Again, the subscription model is prefix-based.

Message Format

For other applications that want to exchange event messages with Bro, the Broker message format is:

broker::message{std::string{}, ...};

The first parameter is the name of the event and the remaining ... are its arguments, which are any of the supported Broker data types as they correspond to the Bro types for the event named in the first parameter of the message.

Remote Logging

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
testlog.bro

module Test;

export {
	redef enum Log::ID += { LOG };

	type Info: record {
		msg: string &log;
		num: count &log;
	};

	global log_test: event(rec: Test::Info);
}

event bro_init() &priority=5
	{
	Broker::enable();
	Log::create_stream(Test::LOG, [$columns=Test::Info, $ev=log_test, $path="test"]);
	}

Use the Broker::subscribe_to_logs function to advertise interest in logs written by peers. The topic names that Bro uses are implicitly of the form “bro/log/<stream-name>”.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
logs-listener.bro

@load ./testlog

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "listener";

event bro_init()
	{
	Broker::enable();
	Broker::subscribe_to_logs("bro/log/Test::LOG");
	Broker::listen(broker_port, "127.0.0.1");
	}

event Broker::incoming_connection_established(peer_name: string)
	{
	print "Broker::incoming_connection_established", peer_name;
	}

event Test::log_test(rec: Test::Info)
	{
	print "wrote log", rec;

	if ( rec$num == 5 )
		terminate();
	}

To send remote logs either redef Log::enable_remote_logging or use the Broker::enable_remote_logs function. The former allows any log stream to be sent to peers while the latter enables remote logging for particular streams.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
logs-connector.bro

@load ./testlog

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "connector";
redef Log::enable_local_logging = F;
redef Log::enable_remote_logging = F;
global n = 0;

event bro_init()
	{
	Broker::enable();
	Broker::enable_remote_logs(Test::LOG);
	Broker::connect("127.0.0.1", broker_port, 1sec);
	}

event do_write()
	{
	if ( n == 6 )
		return;

	Log::write(Test::LOG, [$msg = "ping", $num = n]);
	++n;
	event do_write();
	}

event Broker::outgoing_connection_established(peer_address: string,
                                            peer_port: port,
                                            peer_name: string)
	{
	print "Broker::outgoing_connection_established",
	      peer_address, peer_port, peer_name;
	event do_write();
	}

event Broker::outgoing_connection_broken(peer_address: string,
                                       peer_port: port)
	{
	terminate();
	}

Message Format

For other applications that want to exchange log messages with Bro, the Broker message format is:

broker::message{broker::enum_value{}, broker::record{}};

The enum value corresponds to the stream’s Log::ID value, and the record corresponds to a single entry of that log’s columns record, in this case a Test::Info value.

Tuning Access Control

By default, endpoints do not restrict the message topics that it sends to peers and do not restrict what message topics and data store identifiers get advertised to peers. These are the default Broker::EndpointFlags supplied to Broker::enable.

If not using the auto_publish flag, one can use the Broker::publish_topic and Broker::unpublish_topic functions to manipulate the set of message topics (must match exactly) that are allowed to be sent to peer endpoints. These settings take precedence over the per-message peers flag supplied to functions that take a Broker::SendFlags such as Broker::send_print, Broker::send_event, Broker::auto_event or Broker::enable_remote_logs.

If not using the auto_advertise flag, one can use the Broker::advertise_topic and Broker::unadvertise_topic functions to manipulate the set of topic prefixes that are allowed to be advertised to peers. If an endpoint does not advertise a topic prefix, then the only way peers can send messages to it is via the unsolicited flag of Broker::SendFlags and choosing a topic with a matching prefix (i.e. full topic may be longer than receivers prefix, just the prefix needs to match).

Distributed Data Stores

There are three flavors of key-value data store interfaces: master, clone, and frontend.

A frontend is the common interface to query and modify data stores. That is, a clone is a specific type of frontend and a master is also a specific type of frontend, but a standalone frontend can also exist to e.g. query and modify the contents of a remote master store without actually “owning” any of the contents itself.

A master data store can be cloned from remote peers which may then perform lightweight, local queries against the clone, which automatically stays synchronized with the master store. Clones cannot modify their content directly, instead they send modifications to the centralized master store which applies them and then broadcasts them to all clones.

Master and clone stores get to choose what type of storage backend to use. E.g. In-memory versus SQLite for persistence. Note that if clones are used, then data store sizes must be able to fit within memory regardless of the storage backend as a single snapshot of the master store is sent in a single chunk to initialize the clone.

Data stores also support expiration on a per-key basis either using an absolute point in time or a relative amount of time since the entry’s last modification time.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
stores-listener.bro

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;

global h: opaque of Broker::Handle;
global expected_key_count = 4;
global key_count = 0;

function do_lookup(key: string)
	{
	when ( local res = Broker::lookup(h, Broker::data(key)) )
		{
		++key_count;
		print "lookup", key, res;

		if ( key_count == expected_key_count )
			terminate();
		}
	timeout 10sec
		{ print "timeout", key; }
	}

event ready()
	{
	h = Broker::create_clone("mystore");

	when ( local res = Broker::keys(h) )
		{
		print "clone keys", res;
		do_lookup(Broker::refine_to_string(Broker::vector_lookup(res$result, 0)));
		do_lookup(Broker::refine_to_string(Broker::vector_lookup(res$result, 1)));
		do_lookup(Broker::refine_to_string(Broker::vector_lookup(res$result, 2)));
		do_lookup(Broker::refine_to_string(Broker::vector_lookup(res$result, 3)));
		}
	timeout 10sec
		{ print "timeout"; }
	}

event bro_init()
	{
	Broker::enable();
	Broker::subscribe_to_events("bro/event/ready");
	Broker::listen(broker_port, "127.0.0.1");
	}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
stores-connector.bro

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;

global h: opaque of Broker::Handle;

function dv(d: Broker::Data): Broker::DataVector
	{
	local rval: Broker::DataVector;
	rval[0] = d;
	return rval;
	}

global ready: event();

event Broker::outgoing_connection_broken(peer_address: string,
                                       peer_port: port)
	{
	terminate();
	}

event Broker::outgoing_connection_established(peer_address: string,
                                            peer_port: port,
                                            peer_name: string)
	{
	local myset: set[string] = {"a", "b", "c"};
	local myvec: vector of string = {"alpha", "beta", "gamma"};
	h = Broker::create_master("mystore");
	Broker::insert(h, Broker::data("one"), Broker::data(110));
	Broker::insert(h, Broker::data("two"), Broker::data(223));
	Broker::insert(h, Broker::data("myset"), Broker::data(myset));
	Broker::insert(h, Broker::data("myvec"), Broker::data(myvec));
	Broker::increment(h, Broker::data("one"));
	Broker::decrement(h, Broker::data("two"));
	Broker::add_to_set(h, Broker::data("myset"), Broker::data("d"));
	Broker::remove_from_set(h, Broker::data("myset"), Broker::data("b"));
	Broker::push_left(h, Broker::data("myvec"), dv(Broker::data("delta")));
	Broker::push_right(h, Broker::data("myvec"), dv(Broker::data("omega")));

	when ( local res = Broker::size(h) )
		{
		print "master size", res;
		event ready();
		}
	timeout 10sec
		{ print "timeout"; }
	}

event bro_init()
	{
	Broker::enable();
	Broker::connect("127.0.0.1", broker_port, 1secs);
	Broker::auto_event("bro/event/ready", ready);
	}

In the above example, if a local copy of the store contents isn’t needed, just replace the Broker::create_clone call with Broker::create_frontend. Queries will then be made against the remote master store instead of the local clone.

Note that all data store queries must be made within Bro’s asynchronous when statements and must specify a timeout block.

Copyright 2016, The Bro Project. Last updated on December 07, 2018. Created using Sphinx 1.8.2.