Contents
This currently focuses on a new C library implementing the desired functionality for both Zeek itself and external clients. It is just an initial proposal for now, feedback welcome.
Note
There is an older document proposing a script-level API for the the distributed key/value store. That is outdated but much of that should transfer pretty well to working on top of the API described here.
This proposal primarily targets providing a replacement for the current communication system, including the exchange of events and synchronizing state across nodes. Specifically regarding the latter, Zeek currently uses the &synchronized attribute to transparently share state across nodes. &synchronized is a conceptually simple mechanism that hides a lot of complexity behind the scenes. While that is good in principle, it comes with several drawbacks:
- &synchronized does not offer any control over what to share with whom; it is always all or nothing.
- &synchronized provides only very loose consistency guarantees. See http://www.icir.org/robin/papers/acsac05.pdf for the glory details.
- &synchronized adds a lot of internal complexity. However, as it turns out, Zeek’s standard scripts actually don’t use &synchronized much at all anymore, so that complexity does not seem justified.
- While &synchronized can work with &persistent to keep state across restarts, the latter is a rather unreliable mechanism as it is easy to "loose" that state file.
More broadly, the current communication framework has more shortcomings:
- The "no control" argument from above applies to events as well, though to a lesser degree. Nodes can subscribe to subsets of events based on their names but there is no mechanism that would allow limiting information to, e.g., a specific group of nodes, like in a tiered cluster setup.
- it is inefficient as it sends information individually to each node, often duplicating the same content even if conceptually it could broadcast.
- The code still has some corner cases where it does not function correctly in overload situations.
- The code and the protocol has become quite messy over time.
- We currently have to maintain two independent implementations of the communication protocol: the internal version in Zeek, and Broccoli. Furthermore, Broccoli does not support all features of Zeek (no &synchronized in particular).
The proposal is to develop a new standalone C library that implements communication and distributed state for use by both Zeek and external clients. The library will be used for Zeek-to-Zeek, Zeek-to-Client, Client-to-Zeek, and Client-to-Client communication. (In the following, a "node" can be either a Zeek or an external client using the same library.)
The library needs to support the following three areas:
- Exchanging events.
- Maintaining global persistent state
- Remote logging and printing
For the global state, we simplify the current model by replacing &synchronized with distributed persistent key/value stores. In the standard use case, we would normally not expose these stores to users directly but use them internally inside higher-level frameworks that provide "cluster transparency" natively. However, users can use their own key/value stores as well if they need a lower-level facility for state sharing; the stores are conceptually quite simple and match the common key/value paradigm pretty much directly.
This proposal defines an C API and protocol that is based on the following structure:
- We define a transport layer of point-to-point connections between nodes. In a connection the two endpoints are "equal", we do not distinguish between client and server. The transport layer can be an actual network connection (TCP), inter-process communication on a single host, or intra-process communication within a single process (including between threads).
- On top of the transport-layer, we define a message layer for exchanging messages. All communication is publish/subscribe based: a node expresses interests in a set of topics and will then receive all messages associated with any of those. By associating semantics with topics, the application can fine-tune quite precisely who gets what.
- Messages may originate directly from one of the peers that a node has a direct transport-layer connection to; or those peers may be forwarding messages originating elsewhere at other nodes that they are connected to (in other words, messages may be routed across hops).
- All communication is asynchronous. While some exchanges have a request/reply structure, replies may come in anytime and be interleaved with other activity. This also generally enables pipelining.
- We fully trust all participating nodes. There are no protection measures against injecting wrong data, or subscribing to something one should not see. While we may add support for encryption and authentication via SSL, however we don not further bake anything into our own protocols/APIs beyond that.
The following breaks down the proposed API into various parts. The idea is to eventually expose this API in pretty much the same way at both the C level and as a Zeek-script level framework; the latter will basically just map the C data types to Zeek types.
These types are used abstractly below and will need to be fleshed out further. Most of them will need their own sub-APIs for instantiation and management (and for some they are drafted below).
// Type that can represent all supported Zeek types. struct bro_type; // Type representing a transport-layer connection to a peer. struct bro_connection; // Type representing an event instance. // Includes name, timestamp, and arguments. struct bro_event; // Type representing a list of Zeek values. // Used for event arguments. struct bro_value_list; // Type representing a list of Zeek types // Used for event arguments. struct bro_type_list; // Type representing a single data store, i.e, one // instance of key/value table. struct bro_dstore; // Type representing a key in a data store. // This will probably be just a blob in the form of length and bytes. struct bro_dstore_key; // Type representing a value in a table. // This will probably be just a blob in the form of length and bytes. struct bro_dstore_value;
To use the library, one needs a instantiate a context object that holds the global application state.
// Initialize library and create context object. // // flags: A set of flags tuning the libraries behavior. Currently // there are none defined however. bro_context* bro_context_create(int flags); // Finish using context/library. void bro_context_destroy(bro_context* ctx); // Returns an error code for the most recent operation. int bro_context_errno(bro_context* ctx); // Returns a textual error message for the most recent operation. const char* bro_context_strerrno(bro_context* ctx);
// Opens a connection to a peer node. // // name: A descriptive name for ourselves. By convention, we use a // hierarchical DNS-style naming scheme. A node will always // subscribe to the special topic ".node.<name>". // // location: A URL-style location string describing where to connect to. // // Supported schemes: // // tcp://<addr>:<port> - Contact remote host on the given port. // ipc://<path> - Contact local process via given socket. // inproc://<id> - Contact same process using given ID for identification. // // flags: Flags for tuning the behavior of the connection: // // AUTO_PUBLISH: - Automatically publish topics that we generate (see below). // AUTO_SUBSCRIBE: - Automatically subscribe to topic that we have a handler for (see below). bro_connection* bro_connection_open(bro_context* ctx, const char* name, const char* location, uint64_t flags); // Closes an existing connection. int bro_connection_close(bro_context* ctx, bro_connection* conn); // Begins listening for incoming connections from remote nodes. // // location: The URL-style location that peers can use to connect. // // Supported schemes: // tcp://<interface>:<port> - Opens a TCP socket on the given interface/port. // ipc://<path> - Opens an IPC socket at the given path. // inproc://<id> - Make available within process using given ID for identification. // // flags: Flags for tuning the behavior of connections established via this location. // // AUTO_PUBLISH: - Automatically publish topics that we generate (see below). // AUTO_SUBSCRIBE: - Automatically subscribe to topic that we have a handler for (see below). // // cb: A callback that will be called after an incoming connection has // been established but before it's being used. The callback receives // the connection object and the cookie, and returns true or false // depending on whether it wants the connection to proceed. // // cookie: A cookie to pass to the callback. int bro_connection_listen(bro_context* ctx, const char* location, int flags, listen_callback* cb, void *cookie);
All exchanged activity is associated with a particular topic. To receive activity, one needs to subscribe to the corresponding topic from the relevant peer nodes; and to send out activity one needs to publish the topic. If a connection is opened with AUTO_PUBLISH and/or AUTO_SUBSCRIBE that will happen mostly automatically as the API functions below get called (see there for details). If not, one can control topics more fine-granularity with these functions:
// Makes all activity related to a topic available over a connection. The // peer node can then request the activity by subscribing to the topic. // // topic: The topic to make available. By convention, we use a // hierarchical dotted naming scheme (e.g., // "topic.subtopic.subsubtopic"). Topics starting with a "." are // reserved and should not be used for application data. // // conn: The connection to the remote node to publish on. If set to null, // applies to all connections (including future ones). int bro_topic_publish(bro_context* ctx, const char* topic, bro_connection* conn); // Subscribe to all of a topic's activity from a peer node. Once // subscribed, receiving corresponding activity will trigger the // corresponding callbacks. // // topic: The topic to subscribe to. The topic may include '*' as // wildcards, which will then subscribe to all activity matching // that pattern (including any matching future topics that may not // yet been known). // // conn: The connection to the remote node to subscribe on. If set to // null, applies to all connections (including future ones). int bro_topic_subscribe(bro_context* ctx, const char* topic, bro_connection* conn); // Subscribes to all locally generated activity for a topic. By default, // local activity (e.g., events we raise ourselves) will be only sent // out, but not trigger our own callbacks. By subscribing to a topic // locally, that changes and all local activity will be treated the same // as if it had come in from remote. // // topic: The topic to subscribe to. The topic may include '*' as // wildcards, which will then subscribe to all activity matching that // pattern (including any matching future topics that may not yet been // known). int bro_topic_subscribe_locally(bro_context* ctx, const char* topic); // Stops publishing a topic on a connection. int bro_topic_unpublish(bro_context* ctx, const char* topic, bro_connection* conn); // Unsubscribes from a topic. *topic* must match exactly what was used to // subscribe (including any wildcards). int bro_topic_unsubscribe(bro_context* ctx, const char* topic, bro_connection* conn); // Locally unsubscribes from a topic. *topic* must match exactly what was used to // subscribe (including any wildcards). int bro_topic_unsubscribe_locally(bro_context* ctx, const char* topic); // Forward all activity received from a remote node to another peer. This // basically turns the node into a router. // // topic: The topic to forward. The topic may include '*' as wildcards, // which will then forward activity matching that pattern (including any // matching future topics that may not yet been known). // // from: The connection to forward the topic's activity from. If null, // forward from all connections (including future ones). // // to: The connection to forward the topic's activity to. If null, // forward to all connections including future ones). int bro_topic_forward(bro_context* ctx, const char* topic, bro_connection* from, bro_connection* to);
// Register a callback for all events coming in that are associated with // a given topic. Multiple handlers can be registered per topic; all will // be executed. // // This automatically subscribes to the topic on all AUTO_SUBSCRIBE // connections. // // topic: The topic the handler is in charge for; may include wildcards. // // cb: The callback function to execute for each matching event. The // callback will receive the event, the actual topic, and the // cookie. // // cookie: A cookie value to pass to the callback. void bro_event_register_handler(bro_context* ctx, const char* topic, event_callback* cb, void *cookie); // Unregister a handler for a topic. // // This automatically unsubscribes the topic on all AUTO_SUBSCRIBE // connections. void bro_event_unregister_handler(bro_context* ctx, const char* topic); // Creates a new event instance to raise. // // name: Name of the event. By convention we use a namespace-style scheme // (i.e., "[<module>::]<name>"). // // t: The time of the event. // // args: The event arguments. bro_event* bro_event_new(bro_context* ctx, const char* name, time_t t, bro_value_list* args); // Deletes an event instance. void bro_event_delete(bro_context* ctx, bro_event* ev); // Raises an event, associating it with a topic. All remote nodes that // have subscribed to the topic will receive the event. If we have // subscribed to the topic locally, we will also receive it ourselves via // the registered handler. // // This automatically publishes the topic on all AUTO_PUBLISH // connections. // // topic: The topic to associate with the event. // // ev: The event to raise. Takes ownership. void bro_event_raise(bro_context* ctx, const char* topic, bro_event* ev); // A higher-level event pattern for surveying a group of nodes for each's // results. Like bro_event_raise() this sends out an event, but then it // waits for answers to come in associated with the same topic. It // collects the first such answer from each node and passes them to a // given callback. A survey answer has the same structure as an event, // but it is handled outside of the normal event processing (i.e., it // won't trigger event callbacks). // // This automatically publishes the topic on all AUTO_PUBLISH // connections, and automatically subscribes to the topic on all // AUTO_SUBSCRIBE connections. // // topic: The topic to associate with the survey. // // ev: The survey event to send out. // // num: The number of nodes that we expect to answer (-1 if unknown; in // that case the function always waits for the full timeout interval to // pass). // // timeout: Time in seconds to wait for answers to come in. Must be // larger than zero. // // cb: The callback to trigger once all answers are available (i.e., // either *num* answers have come in, or the timeout has been exceeded). // The callback receives the topic, an array of all answers received, and // the cookie. // // cookie: Cookie value to pass to the callback. void bro_event_survey(bro_context* ctx, const char* topic, bro_event* ev, int num, double timeout, survey_callback* cb, void* cookie);
Distributed data stores are a bit more complex in order to guarantee consistency. One data store corresponds to a global distributed single key/value table. We identify a store by a topic name: all updates will be propagated as activity associated with that topic, and all updates associated with the same topic will be applied to the same store. For each store/topic, one node is authoritative: all updates are funneled through it, and then broadcasted out again from there; only when a node receives the latter updates from the authoritative node, it will apply them locally. The authoritative node also keeps a persistent copy on disk if requested.
All write operations to a data store happen asynchronously, i.e., they may not be immediately visible (because they will go through the authoritative node). Read operations can operate either synchronously or asynchronously, as specified by a flag when opening a store. In the former case, the local node keeps a in-memory copy of the whole store to answer operations immediately; in the latter case, we do not keep a copy (except potentially a small cache) but send requests over to the authoritative node (i.e., latency can potentially be high).
TODOs:
- Like for events we need to decide if typing for store content is static or dynamic; and if the former, where it is defined and enforced.
- Can we get redundancy into the picture: if an authoritative node fails, a a spare would take over.
- Can we add detection when two nodes declare themselves authoritative for the same store.
// Opens a data store that's stored remotely, i.e., for which the local // node is not authoritative. // // This automatically publishes the topic on all AUTO_PUBLISH connections // and automatically subscribes to the topic on all AUTO_SUBSCRIBE // connections. // // topic: The topic associated with the data store. // // flags: A set of flags to tune the behavior of the store. // // SYNC: We support synchronous read operations by keeping a // complete local in-memory copy. // // ASYNC: We do not keep a local copy (except maybe some cached // content), and hence only support asynchronous operations. // // READ_ONLY: Only read operations are supported; no writes. // // READ_WRITE: Read and write operations are supported. bro_dstore* bro_dstore_open_non_authoritative(bro_context* ctx, const char* topic, int flags); // Opens a data store for which the local node is authoritative, i.e., it // moderates all updates and optionally keeps the persistent copy. // // There must be only one node that's authoritative for a topic. // // This automatically publishes the topic on all AUTO_PUBLISH connections // and automatically subscribes to the topic on all AUTO_SUBSCRIBE // connections. // // topic: The topic associated with the data store. // // flags: A set of flags to tune the behavior of the store. This include the // same flags as bro_dstore_open() plus the following: // // PERSISTENT: Keep a persistent copy on disk // // CLEAR: If a persistent copy already exists, clear it. // bro_dstore* bro_dstore_open_authoritative(bro_context* ctx, const char* topic, int flags); // Inserts a value into a data store. Any existing value of the same key // will be overridden. // // The operation takes place asynchronously and its effect may not be // immediately visible after the function returns. // // ds: The store to insert into. // // key: The key to insert. // // value: The value to store with the key. // // expire: A time at which the key will automatically be removed from the // table. 0 to disable. int bro_dstore_insert(bro_context* ctx, bro_dstore* ds, bro_dstore_key key, bro_dstore_value value, time_t expire); // Removes a key from a data store. // // The operation takes place asychronously and its effect may not be // immediately visible after the function returns. // // ds: The store to remove the key from. // // key: The key to insert. // // value: The value to store with the key. int bro_dstore_remove(bro_context* ctx, bro_dstore* ds, bro_dstore_key key); // Increments the value for a key in a data store. Increments are only // defined for some data types, in particular integers. If a key does not // yet exists, the current value is assumed to be null. // // The operation takes place asynchronously and its effect may not be // immediately visible after the function returns. // // ds: The store to use. // // key: The key to increment. // // by: The value to increment the current value by. The value must be of the // same type of the store's values. int bro_dstore_increment(bro_context* ctx, bro_dstore* ds, bro_dstore_key key, bro_dstore_value by); // Decrements the value for a key in a data store. Increments are only // defined for some data types, in particular integers. If a key does not // yet exists, the current value is assumed to be null. // // The operation takes place asynchronously and its effect may not be // immediately visible after the function returns. // // ds: The store to use. // // key: The key to decrement. // // by: The value to decrement the current value by. The value must be of the // same type of the store's values. int bro_dstore_decrement(bro_context* ctx, bro_dstore* ds, bro_dstore_key key, bro_dstore_value by); // Clears a data store. // // The operation takes place asynchronously and its effect may not be // immediately visible after the function returns. // // ds: The store to clear. int bro_dstore_clear(bro_context* ctx, bro_dstore* ds); // Retrieves the value for a key synchronously. Returns a null sentinel // if it does not exist. The function is only supported for stores opened // with the SYNC flag. // // ds: The store to look up the key in. // // key: The key to lookup. bro_dstore_value bro_dstore_lookup_sync(bro_context* ctx, bro_dstore* ds, bro_dstore_key key) // Retrieves the value for a key asynchronously. The function returns // immediately and triggers the callback once the result is available. // The function is supported for stores opened with either the SYNC or // ASYNC flag. // // ds: The store to look up the key in. // // key: The key to lookup. // // timeout: Timeout in seconds on when to abort the lookup. // // callback: A function receiving a data store and a key/value pair with // the result; the value will be a null sentinel if the key does not // exist, and another sentinel if the timeout hit. // // cookie: A cookie value to pass to the callback. int bro_dstore_lookup_async(bro_context* ctx, bro_dstore* ds, bro_dstore_key key, double timeout, lookup_callback* callback, void* cookie); // Checks if a given key is available in a data store synchronously or // asynchronously, respectively. Parameters similar to the // bro_dstore_lookup() functions. {bool,int} bro_dstore_has_key_{sync,async}(bro_context* ctx, bro_dstore* ds [, double timeout, size_callback* callback, void* cookie]); // Returns the number of keys in a data store synchronously or // asynchronously, respectively. Parameters similar to the // bro_dstore_lookup() functions. {size_t,int} bro_dstore_size_{sync,async}(bro_context* ctx, bro_dstore* ds [, double timeout, size_callback* callback, void* cookie]); // Returns all keys in a data store synchronously or asynchronously, // respectively. Parameters similar to the bro_dstore_lookup() functions. {set_of_all_keys,int} bro_dstore_all_keys_{sync,async}(bro_context* ctx, bro_dstore* ds [, double timeout, size_callback* callback, void* cookie]); // Closes a data store. // // This automatically unpublishes the topic on all AUTO_PUBLISH // connections and automatically unsubscribes from the topic on all // AUTO_SUBSCRIBE connections. int bro_dstore_close(bro_context* ctx, bro_dstore* ds);
// Opens a remote Zeek log for writing information to. // // This automatically publishes the topic on all AUTO_PUBLISH // connections. // // topic: The topic to associate with the log activity. // // id: The name of the log file, corresponding to the Zeek-level enum. // // writer: The name of the log writer to use, corresponding to the // Zeek-level enum. bro_log* bro_log_open(bro_context* ctx, const char* topic, const char* id, const char* writer); // Writes one line into a remote Zeek log. // // log: The log to write to. // // vals: The values to log. bro_log* bro_log_write(bro_context* ctx, bro_log* log, bro_value_list* vals); // Closes a remote Zeek log. int bro_log_close(bro_context* ctx, bro_log* log); // Registers a callback to trigger for incoming log writes. // // This automatically subscribes to the topic on all AUTO_SUBSCRIBE // connections. // // topic: The topic of the writes to trigger the callback for. // // cb: The callback to trigger, which will receive the topic, the bro_log // object, the logged values, and the cookie. // // cookie: Cookie to pass to the callback. int bro_log_register_handler(bro_context* ctx, const char* topic, log_callback* cb, void* cookie); // Unregister a log callback. // // This automatically unsubscribes the topic on all AUTO_SUBSCRIBE // connections. int bro_log_unregister_handler(bro_context* ctx, const char* topic); // Sends out a chunk of raw data for appending to a file at the remote // end. // // This automatically publishes the topic on all AUTO_PUBLISH // connections. // // topic: The topic to associate with the write operation. All peers // subscribed to the topic will see it. // // path: A logical path to write the data to add the remote end; the // interpretation is left to the receiver. // // len: The number of bytes to write. // // data: The data itself. int bro_write(bro_context* ctx, const char* topic, const char* path, size_t len, const char* data); // Registers a callback to handle remote write operations. // // This automatically subscribes to the topic on all AUTO_SUBSCRIBE // connections. // // topic: The topic the callback is to trigger for. // // cb: The callback that we will receive topic, path, len, data, and // cookie. // // cookie: Cookie to pass to the callback. int bro_write_register_handler(bro_context* ctx, const char* topic, writer_callback* cb, void* cookie);
TODO: Flesh out.
Still need to devise a protocol to can carry out the API above. A crucial piece will be ensuring that messages can be transparently forwarded across hops. Also, it needs to allow for dynamic arrival/departure of nodes.
TODO: Flesh out.
We need to define a serialization format that captures all the Zeek types we want to exchange. Different from the current format, we are going fully flatten out all nested data types, and we are not going to restore any pointer structures on the receiving end. With that, things should become much simpler; and even more importantly, the serialized representation will be well-defined and independent of the nodes being involved (and thus can, e.g., be broadcasted and forwarded arbitrarily).
We can investigate using an existing library. Candidates:
(Note we’ll need a C interface, and having interfaces to scripting languages will make writing bindings easier.)
If at all possible, we should leverage an external library to do the low-level ground work of sending stuff around; they more functionality the library takes over the better. A good candidate seems to be nanomsg: it comes with simple but powerful API, and has the right license.
© 2014 The Bro Project.