Contents
This is just brainstorming for now. Feedback welcome.
This proposal primarily targets providing a replacement for &synchronized. However, we could also take it further and take out all of the current communication framework for something new.
Regarding the former, Zeek uses the &synchronized attribute to transparently share state across nodes. &synchronized is a conceptually simple mechanism that hides a lot of complexity behind the scenes. While that’s good in principle, it comes with several drawbacks:
- &synchronized doesn’t offer any control over what to share with whom; it’s always all or nothing.
- &synchronized provides only very loose consistency guarantees. See http://www.icir.org/robin/papers/acsac05.pdf for the glory details.
- &synchronized adds a lot of internal complexity. However, as it turns out, Zeek’s standard scripts actually don’t use &synchronized much at all anymore, so that complexity doesn’t seem justified.
- While &synchronized can work with &persistent to keep state across restarts, the latter is a rather unreliable mechanism as it is easy to "loose" that state file.
More broadly, Zeek’s communication framework has more shortcomings:
- The "no control" argument from above applies to events as well, though to a lesser degree. Nodes can subscribe to subsets of events based on their names but there’s no mechanism that would allow limiting information to, e.g., a specific group of nodes, like in a tiered cluster setup.
- It’s inefficient as it sends information individually to each node, often duplicating the same content even if conceptually it could broadcast.
- The code still has some corner cases where it doesn’t function correctly in overload situations.
- The code and the protocol has become quite messy over time.
This proposal suggest to replace &synchronized with distributed persistent key/value stores. In the standard use case, we would normally not expose these stores to users directly but use them internally inside higher-level frameworks that provide "cluster transparency" natively. However, user can use their own key/value stores as well if they need a lower-level facility for state sharing; the stores are conceptually quite simple and match the common key/value paradigm pretty much directly.
A new key/value framework could provide the following API:
module Store; export { # Definition of a single key/value database. type Database = record { # A name to identify the database. name: string; # Master server hosting the authoritative version of the # database. server: addr; # A record defining the values' type. This works similar # to how the logging defines its columns, and supports the # same data types (but not more). schema: any; }; # Initiate access to a key/value store on a given server. # We identify a particular store by the tuple of (name, server). # Once a client has opened a store, it automatically starts # synchronizing its content with all other clients who're # connected to the same store. global open: function(store: Database) : opaque of Handle; # Retrieves a key's value. Returns an instance of the store's # ``schema``. # TODO: Returning ``any`` won't actually work here ... global get: function(h: Handle, key: string) : any; # Stores a new value for a key. This executes asynchronously, # i.e., the change may not be immediately visible. ``value`` # is an instance of the store's ``schema``. global put: function(h: Handle, key: string, value: any); # Updates the value for a key. In contrast to ``put``, which # simply overrides the current value, ``update`` passes the # given value on to the server who can use it *modify* the # current value with custom logic. See below for more. # Like ``put``, this executes asynchronously, i.e., the change # may not be immediately visible. global update: function(h: Handle, key: string, value: any); # Removes a key from a store. This executes asynchronously, # i.e., the change may not be immediately visible. global remove: function(h: Handle, key: string); # Finish access to a store. global close: function(h: Handle); }
The above shows the client side. The server side will need a bit more configuration, like when keeping data persistently on disk.
Example usage:
type Foo: record { a: count; b: string; }; local foo = Store::open([$name="my-data", $server=myserver.icir.org, $schema=Foo]) local v = Storage::get(foo, "a.b.c"); Storage::put(foo, "a.b.c", [$a=42, $b="Foo"]); Storage::update(foo, "a.b.c", [$a=42, b="Foo"]); Storage::remove(foo, "d.e.f") # Can we add iteration? for ( i in foo ) { local f = Storage::get(foo, i); print fmt("key: %s value: %s", i, f); }
More specifics about what’s happening here:
- By opening a store on a server, nodes first request a copy of the current state and then implicitly subscribe to all updates.
- All nodes opening the same store on the same server will generally see the same data. However, it’s the server which decides what nodes will get to see; the server keeps the master copy and the clients are primarily caches that provide fast local lookup.
- get() is synchronous and will reflect the node’s current view of the container.
- put()/remove()/update() are asynchronous. They trigger updates sent to the server, and the server broadcasts them back to everybody subscribed. Locally the changes take effect only once received back.
- update() is like put() but it triggers a callback on the server that receives the new value and can change it, potentially based on the old one. This allows to do, e.g., well-defined increments: the server would take the current value for a field and add the updated value to it.
- We could attach timeouts to values, either globally per store or even individually to elements written. The server would expire them when the time comes around and broadcast corresponding updates.
- The server can keep all its stores persistently in an on-disk DB across restarts.
- If we support iteration, the index set would be frozen at the time when we begin (which can mean that when we access an element it’s not there anymore).
- We can support a default value for get() to return if an element doesn’t exist.
Extensions:
We can allow external applications to participate in the data exchange. Several options:
- If the storage layer that we use for persistence allows for concurrent accesses, external apps could just modify it directly on the server. Might be nice for like configuration updates. (But we’d still need to provide a library that performs value serialization/deserialization).
- We can extend Broccoli to participate in the communication and become just another client, or even a server.
- In fact, if we implemented all the logic inside Broccoli, Zeek could just that itself and we’d have just one implementation to maintain while providing external applications with the same capabilities as Zeek has (including, e.g., running update callbacks …). Now suddenly the master node for a store wouldn’t even need to be a Zeek.
Can we use the same mechanism for events? We’d forward events to a master node who broadcasts them out to all clients subscribed to them.
Advantages:
- We can control event subscription better: in addition to subscribing by name, we can add more fine-granular mechanisms like grouping, hierarchies, etc. Also, the server could even dynamically filter what it sends out to whom.
- Being able to the real broadcasts is more efficient than the current serialize-individually-for-each-receiver scheme.
- If we transition events over and remove &sychronize, we can completely remove the current communication system.
Disadvantage:
- We’d now be limited in the type of arguments that remote events support (i.e., just simple logging-style types). Is that sufficient? We could in particular no longer send the connection record.
Implementation options:
0mq actually could be a good fit here now. See in particualr:
That pretty much describes exactly our scenario (except for the persistence).
Redis is another alternative (still need to look more closely):
© 2014 The Bro Project.