Bro now uses the Broker Library to exchange information with other Bro processes. Broker itself uses CAF (C++ Actor Framework) internally for connecting nodes and exchanging arbitrary data over networks. Broker then introduces, on top of CAF, a topic-based publish/subscribe communication pattern using a data model that is compatible to Bro’s. Broker itself can be utilized outside the context of Bro, with Bro itself making use of only a few predefined Broker message formats that represent Bro events, log entries, etc.
In summary, the Bro’s Broker framework provides basic facilities for connecting broker-enabled peers (e.g. Bro instances) to each other and exchanging messages (e.g. events and logs). With this comes changes in how clusters operate and, since Broker significantly differs from the previous communication framework, there are several changes in the set of scripts that Bro ships with that may break your own customizations. This document aims to describe the changes that have been made, making it easier to port your own scripts. It also gives examples of Broker and the new cluster framework that show off all the new features and capabilities.
Contents
Review and use the points below as a guide to port your own scripts to the latest version of Bro, which uses the new cluster and Broker communication framework.
@load policy/frameworks/communication/listen
and
@load base/frameworks/communication
indicates use of the
old communication framework, consider porting to
@load base/frameworks/broker
and using the Broker API:
base/frameworks/broker/main.bro&synchronized
and &persistent
attributes are deprecated,
consider using Data Stores instead.old_comm_usage_is_ok
flag in this case.Cluster::manager2worker_events
(and all
permutations for every node type), what you’d now use is either
Broker::publish
or Broker::auto_publish
with
either the topic associated with a specific node or class of nodes,
like Cluster::node_topic
or
Cluster::worker_topic
.send_id
BIF, use Broker::publish_id
.terminate
instead of terminate_communication
.
The latter refers to the old communication system and no longer affects
the new Broker-based system.remote_connection_established
and
remote_connection_closed
, consider Broker::peer_added
or Broker::peer_lost
. There’s also Cluster::node_up
and Cluster::node_down
.Software::tracked
is now partitioned among proxy nodes
instead of synchronized in its entirety to all nodes.Known::known_hosts
is renamed to Known::host_store
and
implemented via the new Broker data store interface.Known::known_services
is renamed to Known::service_store
and implemented via the new Broker data store interface.Known::certs
is renamed to Known::cert_store
and implemented via the new Broker data store interface.The cluster topology has changed.
This looks like:
Some general suggestions as to the purpose/utilization of each node type:
There’s maybe no single, best approach or pattern to use when you need a
Bro script to store or share long-term state and data. The two
approaches that were previously used were either using the &synchronized
attribute on tables/sets or by explicitly sending events to specific
nodes on which you wanted data to be stored. The former is no longer
possible, though there are several new possibilities that the new
Broker/Cluster framework offer, namely distributed data store and data
partitioning APIs.
Broker provides a distributed key-value store interface with optional choice of using a persistent backend. For more detail, see this example.
Some ideas/considerations/scenarios when deciding whether to use a data store for your use-case:
New data partitioning strategies are available using the API in base/frameworks/cluster/pools.bro. Using that API, developers of custom Bro scripts can define a custom pool of nodes that best fits the needs of their script.
One example strategy is to use Highest Random Weight (HRW) hashing to
partition data tables amongst the pool of all proxy nodes. e.g. using
Cluster::publish_hrw
. This could allow clusters to
be scaled more easily than the approach of “the entire data set gets
synchronized to all nodes” as the solution to memory limitations becomes
“just add another proxy node”. It may also take away some of the
messaging load that used to be required to synchronize data sets across
all nodes.
The tradeoff of this approach, is that nodes that leave the pool (due to crashing, etc.) cause a temporary gap in the total data set until workers start hashing keys to a new proxy node that is still alive, causing data to now be located and updated there.
If the developer of a script expects its workload to be particularly
intensive, wants to ensure that their operations get exclusive
access to nodes, or otherwise set constraints on the number of nodes within
a pool utilized by their script, then the Cluster::PoolSpec
structure will allow them to do that while still allowing users of that script
to override the default suggestions made by the original developer.
The broker framework provides basic facilities for connecting Bro instances to each other and exchanging messages, like events or logs.
See base/frameworks/broker/main.bro for an overview of the main Broker API.
All Broker-based messaging involves two components: the information you want to send (e.g. an event w/ its arguments) along with an associated topic name string. The topic strings are used as a filtering mechanism: Broker uses a publish/subscribe communication pattern where peers advertise interest in topic prefixes and only receive messages which match one of their prefix subscriptions.
Broker itself supports arbitrary topic strings, however Bro generally follows certain conventions in choosing these topics to help avoid conflicts and generally make them easier to remember.
As a reminder of how topic subscriptions work, subscribers advertise interest in a topic prefix and then receive any messages published by a peer to a topic name that starts with that prefix. E.g. Alice subscribes to the “alice/dogs” prefix, then would receive the following message topics published by Bob:
Alice would not receive the following message topics published by Bob:
Note that the topics aren’t required to form a slash-delimited hierarchy, the subscription matching is purely a byte-per-byte prefix comparison.
However, Bro scripts generally will follow a topic naming hierarchy and any given script will make the topic names it uses apparent via some redef’able constant in its export section. Generally topics that Bro scripts use will be along the lines of “bro/<namespace>/<specifics>” with “<namespace>” being the script’s module name (in all-undercase). For example, you might expect an imaginary “Pretend” framework to publish/subscribe using topic names like “bro/pretend/my_cool_event”. For scripts that use Broker as a means of cluster-aware analysis, it’s usually sufficient for them to make use of the topics declared by the cluster framework. For scripts that are meant to establish communication flows unrelated to Bro cluster, new topics are declared (examples being the NetControl and Control frameworks).
For cluster operation, see base/frameworks/cluster/main.bro for a list of topics that are useful for steering published events to the various node classes. E.g. you have the ability to broadcast to all nodes of a given class (e.g. just workers) or just send to a specific node within a class.
The topic names that logs get published under are a bit nuanced. In the
default cluster configuration, they are round-robin published to
explicit topic names that identify a single logger. In standalone Bro
processes, logs get published to the topic indicated by
Broker::default_log_topic_prefix
.
For those writing their own scripts which need new topic names, a suggestion would be to avoid prefixing any new topics/prefixes with “bro/” as any changes in scripts shipping with Bro will use that prefix and it’s better to not risk unintended conflicts. Again, it’s often less confusing to just re-use existing topic names instead of introducing new topic names. The typical use case is writing a cluster-enabled script, which usually just needs to route events based upon node classes, and that already has usable topics in the cluster framework.
Bro can accept incoming connections by calling Broker::listen
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | connecting-listener.bro
redef exit_only_after_terminate = T;
event bro_init()
{
Broker::listen("127.0.0.1");
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print "peer added", endpoint;
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print "peer lost", endpoint;
terminate();
}
|
Bro can initiate outgoing connections by calling Broker::peer
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | connecting-connector.bro
redef exit_only_after_terminate = T;
event bro_init()
{
Broker::peer("127.0.0.1");
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print "peer added", endpoint;
terminate();
}
|
In either case, connection status updates are monitored via the
Broker::peer_added
and Broker::peer_lost
events.
To receive remote events, you need to first subscribe to a “topic” to which the events are being sent. A topic is just a string chosen by the sender, and named in a way that helps organize events into various categories. See the topic naming conventions section for more on how topics work and are chosen.
Use the Broker::subscribe
function to subscribe to topics and
define any event handlers for events that peers will send.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | events-listener.bro
redef exit_only_after_terminate = T;
global msg_count = 0;
global my_event: event(msg: string, c: count);
global my_auto_event: event(msg: string, c: count);
event bro_init()
{
Broker::subscribe("bro/event/");
Broker::listen("127.0.0.1");
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print "peer added", endpoint;
}
event my_event(msg: string, c: count)
{
++msg_count;
print "got my_event", msg, c;
if ( msg_count == 5 )
terminate();
}
event my_auto_event(msg: string, c: count)
{
++msg_count;
print "got my_auto_event", msg, c;
if ( msg_count == 5 )
terminate();
}
|
There are two different ways to send events.
The first is to call the Broker::publish
function which you can
supply directly with the event and its arguments or give it the return value of
Broker::make_event
in case you need to send the same event/args
multiple times. When publishing events like this, local event handlers for
the event are not called.
The second option is to call the Broker::auto_publish
function where
you specify a particular event that will be automatically sent to peers
whenever the event is called locally via the normal event invocation syntax.
When auto-publishing events, local event handlers for the event are called
in addition to sending the event to any subscribed peers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | events-connector.bro
redef exit_only_after_terminate = T;
global my_event: event(msg: string, c: count);
global my_auto_event: event(msg: string, c: count);
event bro_init()
{
Broker::peer("127.0.0.1");
Broker::auto_publish("bro/event/my_auto_event", my_auto_event);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print "peer added", endpoint;
Broker::publish("bro/event/my_event", my_event, "hi", 0);
event my_auto_event("stuff", 88);
Broker::publish("bro/event/my_event", my_event, "...", 1);
event my_auto_event("more stuff", 51);
local e = Broker::make_event(my_event, "bye", 2);
Broker::publish("bro/event/my_event", e);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
terminate();
}
event my_event(msg: string, c: count)
{
print "got my_event", msg, c;
}
event my_auto_event(msg: string, c: count)
{
print "got my_auto_event", msg, c;
}
|
Note that the subscription model is prefix-based, meaning that if you subscribe to the “bro/events” topic prefix you would receive events that are published to topic names “bro/events/foo” and “bro/events/bar” but not “bro/misc”.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | testlog.bro
module Test;
export {
redef enum Log::ID += { LOG };
type Info: record {
msg: string &log;
num: count &log;
};
global log_test: event(rec: Test::Info);
}
event bro_init() &priority=5
{
Log::create_stream(Test::LOG, [$columns=Test::Info, $ev=log_test, $path="test"]);
}
|
To toggle remote logs, redef Log::enable_remote_logging
.
Use the Broker::subscribe
function to advertise interest
in logs written by peers. The topic names that Bro uses are determined by
Broker::log_topic
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | logs-listener.bro
@load ./testlog
redef exit_only_after_terminate = T;
event bro_init()
{
Broker::subscribe("bro/logs");
Broker::listen("127.0.0.1");
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print "peer added", endpoint;
}
event Test::log_test(rec: Test::Info)
{
print "got log event", rec;
if ( rec$num == 5 )
terminate();
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | logs-connector.bro
@load ./testlog
redef exit_only_after_terminate = T;
global n = 0;
event bro_init()
{
Broker::peer("127.0.0.1");
}
event do_write()
{
if ( n == 6 )
return;
Log::write(Test::LOG, [$msg = "ping", $num = n]);
++n;
event do_write();
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print "peer added", endpoint;
event do_write();
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
terminate();
}
event Test::log_test(rec: Test::Info)
{
print "wrote log", rec;
Broker::publish("bro/logs/forward/test", Test::log_test, rec);
}
|
Note that logging events are only raised locally on the node that performs
the Log::write
and not automatically published to peers.
See base/frameworks/broker/store.bro for an overview of the Broker data store API.
There are two flavors of key-value data store interfaces: master and clone.
A master data store can be cloned from remote peers which may then perform lightweight, local queries against the clone, which automatically stays synchronized with the master store. Clones cannot modify their content directly, instead they send modifications to the centralized master store which applies them and then broadcasts them to all clones.
Master stores get to choose what type of storage backend to use. E.g. In-memory versus SQLite for persistence.
Data stores also support expiration on a per-key basis using an amount of time relative to the entry’s last modification time.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | stores-listener.bro
redef exit_only_after_terminate = T;
global h: opaque of Broker::Store;
global expected_key_count = 4;
global key_count = 0;
# Lookup a value in the store based on an arbitrary key string.
function do_lookup(key: string)
{
when ( local res = Broker::get(h, key) )
{
++key_count;
print "lookup", key, res;
# End after we iterated over looking up each key in the store twice.
if ( key_count == expected_key_count * 2 )
terminate();
}
# All data store queries must specify a timeout
timeout 3sec
{ print "timeout", key; }
}
event check_keys()
{
# Here we just query for the list of keys in the store, and show how to
# look up each one's value.
when ( local res = Broker::keys(h) )
{
print "clone keys", res;
if ( res?$result )
{
# Since we know that the keys we are storing are all strings,
# we can conveniently cast the result of Broker::keys to
# a native Bro type, namely 'set[string]'.
for ( k in res$result as string_set )
do_lookup(k);
# Alternatively, we can use a generic iterator to iterate
# over the results (which we know is of the 'set' type because
# that's what Broker::keys() always returns). If the keys
# we stored were not all of the same type, then you would
# likely want to use this method of inspecting the store's keys.
local i = Broker::set_iterator(res$result);
while ( ! Broker::set_iterator_last(i) )
{
do_lookup(Broker::set_iterator_value(i) as string);
Broker::set_iterator_next(i);
}
}
}
# All data store queries must specify a timeout.
# You also might see timeouts on connecting/initializing a clone since
# it hasn't had time to get fully set up yet.
timeout 1sec
{
print "timeout";
schedule 1sec { check_keys() };
}
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print "peer added";
# We could create a clone early, like in bro_init and it will periodically
# try to synchronize with its master once it connects, however, we just
# create it now since we know the peer w/ the master store has just
# connected.
h = Broker::create_clone("mystore");
event check_keys();
}
event bro_init()
{
Broker::listen("127.0.0.1");
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | stores-connector.bro
redef exit_only_after_terminate = T;
global h: opaque of Broker::Store;
global ready: event();
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
terminate();
}
event bro_init()
{
h = Broker::create_master("mystore");
local myset: set[string] = {"a", "b", "c"};
local myvec: vector of string = {"alpha", "beta", "gamma"};
Broker::put(h, "one", 110);
Broker::put(h, "two", 223);
Broker::put(h, "myset", myset);
Broker::put(h, "myvec", myvec);
Broker::increment(h, "one");
Broker::decrement(h, "two");
Broker::insert_into_set(h, "myset", "d");
Broker::remove_from(h, "myset", "b");
Broker::push(h, "myvec", "delta");
Broker::peer("127.0.0.1");
}
|
Note that all data store queries must be made within Bro’s asynchronous
when
statements and must specify a timeout block.
This section contains a few brief examples of how various communication patterns one might use when developing Bro scripts that are to operate in the context of a cluster.
For simplicity, the following examples do not use any modules/namespaces.
If you choose to use them within your own code, it’s important to
remember that the event
and schedule
dispatching statements
should always use the fully-qualified event name.
For example, this will likely not work as expected:
module MyModule; export { global my_event: event(); } event my_event() { print "got my event"; } event bro_init() { event my_event(); schedule 10sec { my_event() }; }
This code runs without errors, however, the local my_event
handler
will never be called and also not any remote handlers either, even if
Broker::auto_publish
was used elsewhere for it. Instead, at
minimum you would need change the bro_init()
handler:
event bro_init() { event MyModule::my_event(); schedule 10sec { MyModule::my_event() }; }
Though, an easy rule of thumb to remember would be to always use the explicit module namespace scoping and you can’t go wrong:
module MyModule; export { global MyModule::my_event: event(); } event MyModule::my_event() { print "got my event"; } event bro_init() { event MyModule::my_event(); schedule 10sec { MyModule::my_event() }; }
Note that other identifiers in Bro do not have this inconsistency related to module namespacing, it’s just events that require explicitness.
This is fairly straightforward, we just need a topic name which we know all workers are subscribed combined with the event we want to send them.
event manager_to_workers(s: string) { print "got event from manager", s; } event some_event_handled_on_manager() { Broker::publish(Cluster::worker_topic, manager_to_workers, "hello v0"); # If you know this event is only handled on the manager, you don't # need any of the following conditions, they're just here as an # example of how you can further discriminate based on node identity. # Can check based on the name of the node. if ( Cluster::node == "manager" ) Broker::publish(Cluster::worker_topic, manager_to_workers, "hello v1"); # Can check based on the type of the node. if ( Cluster::local_node_type() == Cluster::MANAGER ) Broker::publish(Cluster::worker_topic, manager_to_workers, "hello v2"); # The run-time overhead of the above conditions can even be # eliminated by using the following conditional directives. # It's evaluated once per node at parse-time and, if false, # any code within is just ignored / treated as not existing at all. @if ( Cluster::local_node_type() == Cluster::MANAGER ) Broker::publish(Cluster::worker_topic, manager_to_workers, "hello v3"); @endif }
This should look almost identical to the previous case of sending an event from the manager to workers, except it simply changes the topic name to one which the manager is subscribed.
event worker_to_manager(worker_name: string) { print "got event from worker", worker_name; } event some_event_handled_on_worker() { Broker::publish(Cluster::manager_topic, worker_to_manager, Cluster::node); }
Since workers are not directly connected to each other in the cluster topology, this type of communication is a bit different than what we did before since we have to manually relay the event via some node that is connected to all workers. The manager or a proxy satisfies that requirement:
event worker_to_workers(worker_name: string) { @if ( Cluster::local_node_type() == Cluster::MANAGER || Cluster::local_node_type() == Cluster::PROXY ) Broker::publish(Cluster::worker_topic, worker_to_workers, worker_name) @else print "got event from worker", worker_name; @endif } event some_event_handled_on_worker() { # We know the manager is connected to all workers, so we could # choose to relay the event across it. Broker::publish(Cluster::manager_topic, worker_to_workers, Cluster::node + " (via manager)"); # We also know that any given proxy is connected to all workers, # though now we have a choice of which proxy to use. If we # want to distribute the work associated with relaying uniformly, # we can use a round-robin strategy. The key used here is simply # used by the cluster framework internally to keep track of # which node is up next in the round-robin. local pt = Cluster::rr_topic(Cluster::proxy_pool, "example_key"); Broker::publish(pt, worker_to_workers, Cluster::node + " (via a proxy)"); }
If you want to offload some data/work from a worker to your proxies, we can make use of a Highest Random Weight (HRW) hashing distribution strategy to uniformly map an arbitrary key space across all available proxies.
event worker_to_proxies(worker_name: string) { print "got event from worker", worker_name; } global my_counter = 0; event some_event_handled_on_worker() { # The key here is used to choose which proxy shall receive # the event. Different keys may map to different nodes, but # any given key always maps to the same node provided the # pool of nodes remains consistent. If a proxy goes offline, # that key maps to a different node until the original comes # back up. Cluster::publish_hrw(Cluster::proxy_pool, cat("example_key", ++my_counter), worker_to_proxies, Cluster::node); }