Contents
Much of this is currently complete and undergoing testing. Full usage documentation will be written soon.
The logging interface is built around three main abstractions:
- Log streams
- A stream identifies log entries that are semantically related and share the same fields (e.g., the HTTP analyzer would log its information to an HTTP stream).
- Filters
- A filter takes a specific log stream as its input and decides what of the received information to log and in which form.
- Writers
A writer defines the output format for the information being logged. We could have, e.g., a DataSeries writer, an CSV writer, a syslog writer, a database writer, etc.
Note that writers will be implemented in C++, not on the script level.
To the script-level, we make the following interface available via a new script logging.bro:
module Logging; # The set of writers Zeek provides. type Writer: enum { WRITER_DEFAULT, # See default_writer below. WRITER_CSV, WRITER_DATA_SERIES, WRITER_SYSLOG }; # The default writer to use if a filter does not specify # anything else. const default_writer = WRITER_CSV &redef; # Each stream gets a unique ID. This type will be extended by # other scripts. type ID: enum { Unknown }; # Type defining a stream. type Stream = record { ID id; # The ID of the stream. columns: any; # A record type defining the stream's output columns. }; # A filter defining what to record. type Filter = record { # A name to reference this filter. name: string; # A predicate returning True if the filter wants a log entry # to be recorded. If not given, an implicit True is assumed # for all entries. The predicate receives one parameter: # an instance of the log's record type with the fields to be # logged. pred: function &optional; # A path for outputting everything matching this # filter. The path is either a string, or a function # called with a single ``ID`` argument and returning a string. # # The specific interpretation of the string is left to the # Writer, but if it's refering to a file, it's assumed that no # extension is given; the writer will add whatever is # appropiate. path: any &optional; # A subset of column names to record. If not given, all # columns are recorded (except for those listed in "exclude"). include: set[string] &optional; # A subset of column names never to record. exclude: set[string] &optional; # An event that is raised whenever the filter is applied # to an entry. The event receives the same parameter # as the predicate. It will always be generated, # independent of what the predicate returns. ev: event &optional; # The writer to use. writer: Writer &default=WRITER_DEFAULT; }; # Logs the record "rec" to the stream "id". The type of # "rec" must match the stream's "columns" field. global log: function(id: ID, rec: any); # Returns an existing filter previously installed for stream # "id" under the given "name". If no such filter exists, # the record "NoSuchFilter" is returned. global get_filter(id: ID, name: string) : Filter; # Sentinel representing an unknown filter. const NoSuchFilter: Filter = [$name="<unknown filter>"];
Here’s how a hypothetical SSH script would use that for setting up it’s default logging:
module SSH; @load logging # Create a new ID for our log stream redef Logging::ID += { SSH }; # Define a record with all the columns the log file can have. # (I'm using a subset of fields from ssh-ext for demonstration.) type Log = record { t: time; id: conn_id; # Will be rolled out into individual columns. status: string; &optional; country: string &default="unknown"; }; event bro_init() { # Create the stream. Logging::create([$id=SSH, $columns=Log]); # Add a default filter that simply logs everything to "ssh.log" using the default writer. Logging::add_filter(SSH, [$name="default", path="ssh"]); } .... # Log something. Logging::log(Logging::SSH, [$t=network_time(), $id=cid, $status="ok"]); ....
To customize the output, one can now add further filters:
# Predicate returning True for locally originated SSH sessions. function ssh_outbound(rec: SSH::Log) : bool { return is_local_addr(rec$cid$orig_h); } event bro_init() { # Add a filter that uses the predicate to record # outbound sessions to a separate file, but including # only IP addresses. Log::add_filter(SSH, [$name="ssh-outbound", $pred=ssh_outbound, $path="ssh-outbound", $include=set("orig_h", "resp_h")]); }
Now SSH log information will end up in two files, the default ssh.log and a subset in ssh-outbound.log.
One can also remove the default filter if one isn’t interested in its output at all:
event bro_init() { # Remove the filter called 'default' from the SSH stream. Log::remove_filter(SSH, "default"); }
Likewise, all other filters can be removed by referencing them via their name.
The one tricky thing is that for generically applicable filters one doesn’t really want to write one predicate function per log stream. In the example above, we should be able to use the same outbound predicate with all log files that have an conn_id field. This is possible by defining the predicate with an any argument and using a (new) dynamic type check operator:
function generic_outbound(rec: any) { return rec$?<conn_id>cid && is_local_addr(rec$cid$orig_h); }
Having such a generic predicate it would also be nice to adapt the name of the output file automatically, like by logging everything into <normal-name>-outbound.log. That works by using a function as the path that computes the actual value:
const OutboundFilter: Logging::Filter = [$name = "outbound", $pred = generic_outbound, $path = function(id: Logging:ID) { return fmt("%s-outbound", Logging::get_filter(id, "default")$path); } ]; event bro_init() { Log::add_filter(SSH, OutboundFilter); Log::add_filter(HTTP, OutboundFilter); Log::add_filter(FTP, OutboundFilter); }
Sometimes it can be helpful to do additional analysis on information being logged. For these cases, a filter can specify an event that will be generated everytime something is passed through it:
global ssh_log: event(rec: SSH::LOG); event bro_init() { Logging::create([$id=SSH, $columns=Log]); Logging::add_filter(SSH, [$name="default", path="ssh", $ev=ssh_log]); } ... # This will now also generate the ssh_log event. Logging::log(Logging::SSH, [$t=network_time(), $id=cid, $status="ok"]);
One can see this as a way to get access to the information being logged in a programatical way from inside Zeek, rather than externally via log post-postpressing.
So far, everything is written out with the default writer. The format can be changed globally simply by redef’ing the corresponding option:
redef Logging:default_writer = Logging::DataSeries;
It can also be changed on per filter basis:
event bro_init() { Log::remove_filter(SSH, "default"); Log::add_filter(SSH, [$name="DataSeries", $path="ssh", $writer=Writer::DataSeries]); }
Or for additional syslogging:
event bro_init() { Log::add_filter(SSH, [$name="syslog", $writer=Writer::Syslog]); }
Like with the OutboundFilter above, we can provide a set of predefined filters for common needs, both in terms of complete Logging::Filter instances and as pred functions to be used in custom filters. People can then use such predefined functionality without understanding the specifics of writing filters.
We could add a timestamp field implicitly to all logs and then wouldn’t need to define that in the log’s record. Log::log would just always record network_time().
For the Writers, we create a new C++ abstract base class LogWriter that provides the interface suitable to implement the above functionality. We can then derive Writer implementations from that base class for any type of output we want to provide.
We can use a global stream Info for information messages not related to any specific activity (like the famous starting incremental serialization).
It seems that long-term, we could use this interface to replace the current NOTICE() by creating a Notice log stream, plus corresponding filters implementing the notice policy. However, that’s something to add later, let’s not break everything at once. :)
(Same goes for the alarm keyword.)
Open questions:
See Binary logging for more DataSeries-specific thoughts.
See notes below for more context.
/////////////////// Types. // Definition of an individual log field as passed on to a Writer. struct LogField { BroString name; // May be a path when doing record unrolling, see below. TypeTag type; }; // All values that can be directlty logged by a Writer. (subset of BroValUnion). union LogVal { bro_int_t int_val; bro_uint_t uint_val; addr_type addr_val; subnet_type subnet_val; double double_val; struct { // Specially set up so that we need new only one malloc/delete per field. int len; char string[]; // A string starting right here. } string_val; }; // We keep a registry of all Writer types we have available. enum WriterType { WRITER_TSV, WRITER_SYSLOG, WRITER_DATA_SERIES, }; struct LogWriterDefinition { WriterType type; // The type. const char *name; // Descriptive name for error messages. bool (*init)(); // An optional one-time initialization function. }; LogWriterDefinition[] = { { WRITER_TSV, "TSV", 0 }, { WRITER_DATA_SERIES, "DataSeries", ds_initialize_library } }; /////////////////// Writers. class LogWriter { // Creates a new instance. // // The instance takes ownership of "path" and "fields", and will delete // them when done. LogWriter(BroString* path, int number_fields, LogField* fields); ~LogWriter(); // Writes one log entry. The method takes ownership of "vals" and will // return immediately after queueing the write request, potentially // before the output has actually taken place. void Write(LogVal* vals); protected: //// Methods for Writers to override. // Called once for doing initialization of the Writer. bool DoInit(BroString* path, int number_fields, LogFields* fields) = 0; // Called once per entry to record. Returns False if an error occured. bool DoWrite(int number_fields, LogFields* fields, LogVal* vals) = 0; //// Methods for Writers to use. // Reports an error and quits the writer. The caller must not being // anything further after calling this method. void Error(const char *msg); private: // Main thread loop. void Loop(); // Delete values as passed into Write(). void DeleteVals(LogVal* vals); BroString* path; int number_fields; LogFields* fields; }; ////////////////// Manager // A record-keeping class for managing writers and filters. class LogMgr { public: LogMgr(); ~LogMgr(); // These correspond to the BiFs visible on the scripting layer. The actual // BiFs just forward to these. void CreateLog(EnumVal* stream, RecordVal* columns); void AddFilter(EnumVal* stream, RecordVal* filter); void RemoveFilter(EnumVal* stream, StringVal* filter); void Write(EnumVal* stream, RecorcVal* columns); private: friend class LogWriter; // Reports an error for the given writer. void Error(LogWriter* writer, const char* msg); }; /////////////////
Assumption: the fields ending up in the log output are all atomic (e.g., we don’t log tables etc.).
The writers receive their information heavily preprocessed, and without further context. All the preprocessing is done by the LogMgr, such as rolling out the records. That ensures that (1) it will be almost trivial to add a new writer in terms of Zeek knowledge required; and (2) all writers will log stuff consistently.
A field name is either directly the name of a record field, or a path-like string to a record field for unrolled records ("cid.orig_h").
TODO: For unrolling, we could also just use the name of the lowest level field ("orig_h") if that’s unique. We could either enforce uniqueness, or switch to paths if not unique.
The writers receive ownership of all dynamically allocated memory passed into them. That allows them to (eventually) run in separate threads without infering with main Zeek.
The LogMgr can do managing of the threads; but we should probably do things single-threaded initially.
The writers however should already assume they are running in a thread, and they must not use any other Zeek functionality, as that would immediately break things.
Note: We could eventually add a more general "back-channel" to allow the writers to talk back to main Zeek. I have actually code for that already. However, I’m not convinced we want and need it.
The previous point also means that multiple writers of the same kind can be running in different threads. I.e., their implementation must likewise be thread-safe.
The writers can do one-time initialization by specifying an init funtion in their LogWriterDefinition. This allows for example to initialize an external library they may need to use. However, this initialization will take place in the main thread, so be careful. If that function returns False, an error is assumed and the writer will not be used.
One question still not clear to me yet: where does the filtering/unrolling/etc, take place in the cluster setting? Ideally, on the manager I would say, in the spirit of keeping all policy there. However, that would mean potentially sending lots of data over to the manager just to be then discarded. So, I’m thinking to actually leave that to the workers, and add new message to the communication protocol to send already prepropressed logging fields over to the manager, which would then hand them directly to the writers.
Decide which log lines to log.
Decide which fields out of the log line to log.
Possibly by using match with a structure like this:
logging_filters = { ["http"] = { [$pred = function(log: log_table) { return is_local_addr(log$orig_h) }, $result = set("ts", "orig_h", "user_agent"), $priority = 1, ], }, };
*Revised variant*:
logging_policy = { ["http"] = { function(log: string, line: any) { if ( line$?<addr>orig_h && is_local_addr(line$orig_h) ) return [$file = cat(log, "-outbound"]; else return [$file = cat(log, "-inbound"]; }, function(log: string, line: any) { if ( line$?<string>mime_type && line$mime_type == "application/x-dosexec" ) return [$file = cat(log, "-", line$mime_type)]; }, function(log: string, line: any) { if ( line$?<addr>orig_h && is_local_addr(line$orig_h) && line$?<string>user_agent && line$user_agent !in HTTP::known_user_agents[line$orig_h] ) { add HTTP::known_user_agents[line$orig_h][line$user_agent]; return [$file = cat(log, "-user-agents"), $columns = vector("ts", "orig_h", "user_agent")]; } }, } }
Problems -~~~~~~~
How do we handle these key-value log lines? My initial thought was to do it the same as the database logging interface I created. It would start from a function call like this:
LOG::log("http", [$orig_h=1.2.3.4, $url="//www.zeek.org/"]);
The problem is that we can’t handle "any" type records at the policy script layer which is what the second argument would almost necessarily have to be, especially if we make it possible to users to decide which fields they actually want logged. My database logger works with an external tool that extracts all of the data from the record and looks at the value of the record field and converts it to text along with the record field name before inserting into the database. If the logging framework is completely implemented in policy script land, we wouldn’t be able to dynamically cope with all of the fields.
Assuming we work around the problem with dynamically working with record of "any" type, then this should work fairly well because the LOG::log function would then generate an event in the background so that this will work mostly transparently on a cluster too. In the cluster scenario, my thinking is that the manager would do all of the "match" work (as mentioned in the features section above). Workers would send the raw logging events to the manager which would then make the decision about which ones to actually log and how to munge them before writing to disk. The only problem I see with this is that it could result in a lot more events to the manager than would be good.
Another current downside to this that Robin mentioned is the type checking that happens during record construction could impose a lot of unnecessary overhead.
Usage examples coming soon!
Reading the above, I noticed three things:
So, here’s an extension of the above scheme along those lines:
Any script that wants to do logging (I’m using ssh.bro as an example) creates a new log stream at initialization time ("log stream" as an abstraction of a "log file"):
# Create new ID for this stream. redef Logging::ID += { SSH }; # Define a record with all the columns the log file can have. # (I'm using a subset of fields from ssh-ext for demonstration.) type LogSSH = record { ts: time; orig_h: addr; orig_p: port; resp_h: addr; resp_p: port; status: string; &optional; country: string &default="unknown"; }; # At initialization time, register the stream with the logging # framework. To do that, the script fills the following record defined # by the logging framework: # # type Logging::Stream = record { # StreamID id; # The ID of the logging stream. # columns: any; # The record type defining the columns. # name : string; # The default name for the log file on disk. # select : string_set &default=set("*"); # The default (sub-)set of columns to write out; '*' means all. # }; event bro_init() { Logging::register([$id=SSH, $columns=LogSSH, $name="ssh.log"]) }
Whenever something needs to be logged, the script does something like this:
Logging::log(SSH, [ts=network_time(), orig_h=1.2.3.4, orig_p=12345/tcp, ...]);
The prototype for that function is Logging::log(Logging::StreamID, entry: any), with the constraint being that the 2nd argument must be of the type that the record passed to Logging::register specified as columns for this given stream ID. That allows Zeek to know the type of the 2nd parameter (Logging::log will need to be an built-in function for that).
If nothing else is configured, the logging system now simply writes all the log entries to disk (but indicates including only columns specificed in Stream$select).
Alternatively, however, one can define log filters to control the logging in a number of ways. A filter is specified by creating an instance of the following record:
type Log::Filter = record { pred: function &optional; # A predicate returning True if the filter applies. name: string &optional; # A name for the output file for everything matching this filter. select: string_set &optional; # A subset of fields being written out for everything matching this filter. };
One can register filters with another Logging function:
event bro_init() { Log::add_filter(SSH, [$pred=ssh_outbound, name="ssh-outbound.log", select=set("orig_h", "resp_h")]; } function ssh_outbound(rec: LogSSH) : bool { return is_local_addr(rec$orig_h); }
Now the file ssh-outbound.log stores originator and responder addresses only for SSH sessions originated locally.
Internally, the set of filters is probably just something like a global table but it’s more flexible to wrap it into a function call.
The one tricky thing is that for generically applicable filters, one doesn’t really want to write one predicate function per log stream. In the example above, we should be able to use the same outbound predicate with all log files that have an orig_h column.
One can do that by defining the predicate as receiving a rec: any argument (instead of the typed rec: LogSSH in the example above); but then one can’t access the record’s field in a type-safe matter anymore.
But to solve this, we can us the dynamic type-check operator proposed above, e.g.:
function all_outbound(rec: any) { return rec$?<addr>orig_h && is_local_addr(rec$orig_h); }
Notes:
Open questions:
© 2014 The Bro Project.