Bro can now use the Broker Library to exchange information with other Bro processes.
Contents
Communication via Broker must first be turned on via
Broker::enable
.
Bro can accept incoming connections by calling Broker::listen
and then monitor connection status updates via the
Broker::incoming_connection_established
and
Broker::incoming_connection_broken
events.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | connecting-listener.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "listener";
event bro_init()
{
Broker::enable();
Broker::listen(broker_port, "127.0.0.1");
}
event Broker::incoming_connection_established(peer_name: string)
{
print "Broker::incoming_connection_established", peer_name;
}
event Broker::incoming_connection_broken(peer_name: string)
{
print "Broker::incoming_connection_broken", peer_name;
terminate();
}
|
Bro can initiate outgoing connections by calling Broker::connect
and then monitor connection status updates via the
Broker::outgoing_connection_established
,
Broker::outgoing_connection_broken
, and
Broker::outgoing_connection_incompatible
events.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | connecting-connector.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "connector";
event bro_init()
{
Broker::enable();
Broker::connect("127.0.0.1", broker_port, 1sec);
}
event Broker::outgoing_connection_established(peer_address: string,
peer_port: port,
peer_name: string)
{
print "Broker::outgoing_connection_established",
peer_address, peer_port, peer_name;
terminate();
}
|
To receive remote print messages, first use the
Broker::subscribe_to_prints
function to advertise to peers a
topic prefix of interest and then create an event handler for
Broker::print_handler
to handle any print messages that are
received.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | printing-listener.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "listener";
global msg_count = 0;
event bro_init()
{
Broker::enable();
Broker::subscribe_to_prints("bro/print/");
Broker::listen(broker_port, "127.0.0.1");
}
event Broker::incoming_connection_established(peer_name: string)
{
print "Broker::incoming_connection_established", peer_name;
}
event Broker::print_handler(msg: string)
{
++msg_count;
print "got print message", msg;
if ( msg_count == 3 )
terminate();
}
|
To send remote print messages, just call Broker::send_print
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | printing-connector.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "connector";
event bro_init()
{
Broker::enable();
Broker::connect("127.0.0.1", broker_port, 1sec);
}
event Broker::outgoing_connection_established(peer_address: string,
peer_port: port,
peer_name: string)
{
print "Broker::outgoing_connection_established",
peer_address, peer_port, peer_name;
Broker::send_print("bro/print/hi", "hello");
Broker::send_print("bro/print/stuff", "...");
Broker::send_print("bro/print/bye", "goodbye");
}
event Broker::outgoing_connection_broken(peer_address: string,
peer_port: port)
{
terminate();
}
|
Notice that the subscriber only used the prefix “bro/print/”, but is able to receive messages with full topics of “bro/print/hi”, “bro/print/stuff”, and “bro/print/bye”. The model here is that the publisher of a message checks for all subscribers who advertised interest in a prefix of that message’s topic and sends it to them.
For other applications that want to exchange print messages with Bro, the Broker message format is simply:
broker::message{std::string{}};
Receiving remote events is similar to remote prints. Just use the
Broker::subscribe_to_events
function and possibly define any
new events along with handlers that peers may want to send.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | events-listener.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "listener";
global msg_count = 0;
global my_event: event(msg: string, c: count);
global my_auto_event: event(msg: string, c: count);
event bro_init()
{
Broker::enable();
Broker::subscribe_to_events("bro/event/");
Broker::listen(broker_port, "127.0.0.1");
}
event Broker::incoming_connection_established(peer_name: string)
{
print "Broker::incoming_connection_established", peer_name;
}
event my_event(msg: string, c: count)
{
++msg_count;
print "got my_event", msg, c;
if ( msg_count == 5 )
terminate();
}
event my_auto_event(msg: string, c: count)
{
++msg_count;
print "got my_auto_event", msg, c;
if ( msg_count == 5 )
terminate();
}
|
There are two different ways to send events. The first is to call the
Broker::send_event
function directly. The second option is to call
the Broker::auto_event
function where you specify a
particular event that will be automatically sent to peers whenever the
event is called locally via the normal event invocation syntax.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | events-connector.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "connector";
global my_event: event(msg: string, c: count);
global my_auto_event: event(msg: string, c: count);
event bro_init()
{
Broker::enable();
Broker::connect("127.0.0.1", broker_port, 1sec);
Broker::auto_event("bro/event/my_auto_event", my_auto_event);
}
event Broker::outgoing_connection_established(peer_address: string,
peer_port: port,
peer_name: string)
{
print "Broker::outgoing_connection_established",
peer_address, peer_port, peer_name;
Broker::send_event("bro/event/my_event", Broker::event_args(my_event, "hi", 0));
event my_auto_event("stuff", 88);
Broker::send_event("bro/event/my_event", Broker::event_args(my_event, "...", 1));
event my_auto_event("more stuff", 51);
Broker::send_event("bro/event/my_event", Broker::event_args(my_event, "bye", 2));
}
event Broker::outgoing_connection_broken(peer_address: string,
peer_port: port)
{
terminate();
}
|
Again, the subscription model is prefix-based.
For other applications that want to exchange event messages with Bro, the Broker message format is:
broker::message{std::string{}, ...};
The first parameter is the name of the event and the remaining ...
are its arguments, which are any of the supported Broker data types as
they correspond to the Bro types for the event named in the first
parameter of the message.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | testlog.bro
module Test;
export {
redef enum Log::ID += { LOG };
type Info: record {
msg: string &log;
num: count &log;
};
global log_test: event(rec: Test::Info);
}
event bro_init() &priority=5
{
Broker::enable();
Log::create_stream(Test::LOG, [$columns=Test::Info, $ev=log_test, $path="test"]);
}
|
Use the Broker::subscribe_to_logs
function to advertise interest
in logs written by peers. The topic names that Bro uses are implicitly of the
form “bro/log/<stream-name>”.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | logs-listener.bro
@load ./testlog
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "listener";
event bro_init()
{
Broker::enable();
Broker::subscribe_to_logs("bro/log/Test::LOG");
Broker::listen(broker_port, "127.0.0.1");
}
event Broker::incoming_connection_established(peer_name: string)
{
print "Broker::incoming_connection_established", peer_name;
}
event Test::log_test(rec: Test::Info)
{
print "wrote log", rec;
if ( rec$num == 5 )
terminate();
}
|
To send remote logs either redef Log::enable_remote_logging
or
use the Broker::enable_remote_logs
function. The former
allows any log stream to be sent to peers while the latter enables remote
logging for particular streams.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | logs-connector.bro
@load ./testlog
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef Broker::endpoint_name = "connector";
redef Log::enable_local_logging = F;
redef Log::enable_remote_logging = F;
global n = 0;
event bro_init()
{
Broker::enable();
Broker::enable_remote_logs(Test::LOG);
Broker::connect("127.0.0.1", broker_port, 1sec);
}
event do_write()
{
if ( n == 6 )
return;
Log::write(Test::LOG, [$msg = "ping", $num = n]);
++n;
event do_write();
}
event Broker::outgoing_connection_established(peer_address: string,
peer_port: port,
peer_name: string)
{
print "Broker::outgoing_connection_established",
peer_address, peer_port, peer_name;
event do_write();
}
event Broker::outgoing_connection_broken(peer_address: string,
peer_port: port)
{
terminate();
}
|
For other applications that want to exchange log messages with Bro, the Broker message format is:
broker::message{broker::enum_value{}, broker::record{}};
The enum value corresponds to the stream’s Log::ID
value, and
the record corresponds to a single entry of that log’s columns record,
in this case a Test::Info
value.
By default, endpoints do not restrict the message topics that it sends
to peers and do not restrict what message topics and data store
identifiers get advertised to peers. These are the default
Broker::EndpointFlags
supplied to Broker::enable
.
If not using the auto_publish
flag, one can use the
Broker::publish_topic
and Broker::unpublish_topic
functions to manipulate the set of message topics (must match exactly)
that are allowed to be sent to peer endpoints. These settings take
precedence over the per-message peers
flag supplied to functions
that take a Broker::SendFlags
such as Broker::send_print
,
Broker::send_event
, Broker::auto_event
or
Broker::enable_remote_logs
.
If not using the auto_advertise
flag, one can use the
Broker::advertise_topic
and
Broker::unadvertise_topic
functions
to manipulate the set of topic prefixes that are allowed to be
advertised to peers. If an endpoint does not advertise a topic prefix, then
the only way peers can send messages to it is via the unsolicited
flag of Broker::SendFlags
and choosing a topic with a matching
prefix (i.e. full topic may be longer than receivers prefix, just the
prefix needs to match).
There are three flavors of key-value data store interfaces: master, clone, and frontend.
A frontend is the common interface to query and modify data stores. That is, a clone is a specific type of frontend and a master is also a specific type of frontend, but a standalone frontend can also exist to e.g. query and modify the contents of a remote master store without actually “owning” any of the contents itself.
A master data store can be cloned from remote peers which may then perform lightweight, local queries against the clone, which automatically stays synchronized with the master store. Clones cannot modify their content directly, instead they send modifications to the centralized master store which applies them and then broadcasts them to all clones.
Master and clone stores get to choose what type of storage backend to use. E.g. In-memory versus SQLite for persistence. Note that if clones are used, then data store sizes must be able to fit within memory regardless of the storage backend as a single snapshot of the master store is sent in a single chunk to initialize the clone.
Data stores also support expiration on a per-key basis either using an absolute point in time or a relative amount of time since the entry’s last modification time.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | stores-listener.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
global h: opaque of Broker::Handle;
global expected_key_count = 4;
global key_count = 0;
function do_lookup(key: string)
{
when ( local res = Broker::lookup(h, Broker::data(key)) )
{
++key_count;
print "lookup", key, res;
if ( key_count == expected_key_count )
terminate();
}
timeout 10sec
{ print "timeout", key; }
}
event ready()
{
h = Broker::create_clone("mystore");
when ( local res = Broker::keys(h) )
{
print "clone keys", res;
do_lookup(Broker::refine_to_string(Broker::vector_lookup(res$result, 0)));
do_lookup(Broker::refine_to_string(Broker::vector_lookup(res$result, 1)));
do_lookup(Broker::refine_to_string(Broker::vector_lookup(res$result, 2)));
do_lookup(Broker::refine_to_string(Broker::vector_lookup(res$result, 3)));
}
timeout 10sec
{ print "timeout"; }
}
event bro_init()
{
Broker::enable();
Broker::subscribe_to_events("bro/event/ready");
Broker::listen(broker_port, "127.0.0.1");
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | stores-connector.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
global h: opaque of Broker::Handle;
function dv(d: Broker::Data): Broker::DataVector
{
local rval: Broker::DataVector;
rval[0] = d;
return rval;
}
global ready: event();
event Broker::outgoing_connection_broken(peer_address: string,
peer_port: port)
{
terminate();
}
event Broker::outgoing_connection_established(peer_address: string,
peer_port: port,
peer_name: string)
{
local myset: set[string] = {"a", "b", "c"};
local myvec: vector of string = {"alpha", "beta", "gamma"};
h = Broker::create_master("mystore");
Broker::insert(h, Broker::data("one"), Broker::data(110));
Broker::insert(h, Broker::data("two"), Broker::data(223));
Broker::insert(h, Broker::data("myset"), Broker::data(myset));
Broker::insert(h, Broker::data("myvec"), Broker::data(myvec));
Broker::increment(h, Broker::data("one"));
Broker::decrement(h, Broker::data("two"));
Broker::add_to_set(h, Broker::data("myset"), Broker::data("d"));
Broker::remove_from_set(h, Broker::data("myset"), Broker::data("b"));
Broker::push_left(h, Broker::data("myvec"), dv(Broker::data("delta")));
Broker::push_right(h, Broker::data("myvec"), dv(Broker::data("omega")));
when ( local res = Broker::size(h) )
{
print "master size", res;
event ready();
}
timeout 10sec
{ print "timeout"; }
}
event bro_init()
{
Broker::enable();
Broker::connect("127.0.0.1", broker_port, 1secs);
Broker::auto_event("bro/event/ready", ready);
}
|
In the above example, if a local copy of the store contents isn’t
needed, just replace the Broker::create_clone
call with
Broker::create_frontend
. Queries will then be made against
the remote master store instead of the local clone.
Note that all data store queries must be made within Bro’s asynchronous
when
statements and must specify a timeout block.