At DerbyCon I had a conversation with Ross Wolf (@rw_access) from EndGame about the capabilities of EQL (Event Query Language) and how to integrate it in other tools. The purpose of this blog is to share my gained knowledge in that area and Python code to help others to integrate EQL within their tools.
What is EQL?
See the citation below for a quick introduction to EQL. For more details and examples see the blogs on Endgame’s website [1, 2, 3].
EQL is a language that can match events, generate sequences, stack data, build aggregations, and perform analysis. EQL is schemaless and supports multiple database backends. It supports field lookups, boolean logic, comparisons, wildcard matching, and function calls. EQL also has a preprocessor that can perform parse and translation time evaluation, allowing for easily sharable components between queries.
Source: Endgame
Starting end of 2018, EndGame made EQL open source, and therefore you can start using it within your own tools or extending the capabilities of EQL. I think it is truly great to see this being shared with the community! I also like the fact it is written in Python: a very popular language for security tools.
When we wanted to have the capability within DeTT&CT (written in Python) to include or exclude certain objects (e.g. to filter or highlight detections for ATT&CK techniques with a low score), I was happy that I did not have to write the code myself to achieve this, but instead rely mainly on EQL doing that for me in a much better way.
integrate into your code
Now, let’s explain how to integrate EQL in your Python tool to query your data. Before you continue, make sure to have the EQL Python libraries available on your system. This can easily be done via pip: pip install eql
EQL: Not only for JSON and security events
EQL is used mainly for security events stored in the JSON format, but EQL can also be used for other purposes and data formats. As long as you provide the data as a list of Python dictionaries and have the key-value pairs be compatible with the JSON data format.
Within DeTT&CT we make use of several YAML files to store data. Luckily in Python, YAML is also presented as a list of dictionaries. To have these YAML files compatible with the JSON specs, I had to make sure to serialise any date key-value pairs of the type ‘Datetime.date’ (after loading the YAML file into a list of dictionaries) to a Python string:
if isinstance(value, datetime.date): return str(value) else: return value
Create EQL events
The first step (or second when you first had to make the data compatible with the JSON specs) is to create EQL Event objects, which are used in EQL to perform data analytics on. Although this is not a mandatory step, it does provide you with more control over how the Events are created.
Set the value for the ‘event_type’ to be used within your search queries:
Having a good value will make it very clear on what kind of data you are performing your search query:
For example: “web_proxy where …” instead of “generic where …”
When not creating the EQL Events yourself, EQL will try to derive the ‘event_type’ based on any field within your data named ‘event_type’ or ‘event_type_full’.
If it does not result in a value for the ‘event_type’, the default value ‘generic’ will be used.
Similar to the above, you can specify which data field will be used as the timestamp for your events. Otherwise, it tries to derive the timestamp based on a data field named ‘timestamp’.
Start with creating a list of EQL events from a list of Python dictionaries containing the data you want to query. The below function ‘_create_events’ does precisely that. When you are for example dealing with YAML as an input, the function ‘_serialize_date’ will serialise every ‘Datetime.date’ key-value pair to a String before creating the EQL Events:
def _create_events(self, data, data_type, event_type, timestamp_key): """ Create EQL Events from the provided data. :param data: list of dictionaries to be transformed to EQL Events :param data_type: if 'yaml', serialize all 'Datetime.date' key-value pairs :param event_type: the value to be used as event_type for the provided data :param timestamp_key: name of the key-value pair to be used as timestamp :return: EQL Events or data """ eql_events = [] if data_type == 'yaml': data = self._serialize_date(data) # this result in EQL trying the derive the event_type and # timestamp from the contents of 'data' if not event_type: return data # create EQL Events from 'data' for item in data: eql_events.append(eql.Event(event_type, timestamp_key, item)) return eql_events
Learn the schema
The next step is to learn the schema of the data. In the code below the variable ‘events’ is the list of EQL Event objects created previously by the function ‘_create_events’:
schema = eql.Schema.learn(events)
Learning the schema is not a necessary step to execute EQL search queries. But it does give you the capability to print a detailed error message when making syntax mistakes in your query.
See the picture for an example of an error message, which nicely highlights that ‘pi’ is an unrecognised field. As shown in the schema, ‘pid’ should be used instead.
The EQL library does not print this error message and the schema for you. Fortunately, this only requires a few lines of code:
with schema: try: eql_query = eql.parse_query(query, implied_any=True, implied_base=True) engine.add_query(eql_query) except eql.EqlError as e: print(e, file=sys.stderr) print('\nTake into account the following schema:') pprint(schema.schema) return None
Create the EQL engine and Execute the query
After you have created a list of EQL Events from your data and optionally learned the schema, you can continue with creating the EQL Python engine to execute the query. The function below will return a list of dictionaries containing the Event data that match your query, which you can use within your own tooling to do whatever you want.
def _execute_eql_query(self, events, query): """ Execute an EQL query on the provided events. :param events: events :param query: EQL query :return: the result of the query as a list of dictionaries or None when the query did not match the schema """ schema = eql.Schema.learn(events) query_result = [] # this function is used to store the result of the query to 'query_result' def store_result(result): for event in result.events: query_result.append(event.data) engine = eql.PythonEngine() with schema: try: eql_query = eql.parse_query(query, implied_any=True, implied_base=True) engine.add_query(eql_query) except eql.EqlError as e: print(e, file=sys.stderr) print('\nTake into account the following schema:') pprint(schema.schema) return None engine.add_output_hook(store_result) # execute the query engine.stream_events(events) return query_result
11: create a Python list to store the results of the query.
14-16: in my code, I choose to make use of an inner function ‘store_result’ to be used as a callback for the EQL engine. This function will be called after the query has finished and stores the result in the list ’query_result’.
18: create the EQL Python engine.
21-22: parse the query using the function: ‘parse_query’.
Because the parameter ‘implied_any’ is set to ‘True’, it will give the possibility to shorten queries by removing the ‘event_type’ and the ‘where’ clause. For example: “process where pid == 424” and “pid == 424” are both valid and return the same result.
23: add the query to the engine.
24-28: the query syntax error handling as discussed earlier.
29: add the callback function ‘store_result’ to the engine using ‘add_output_hook’.
32: execute the query by calling the function ‘stream_events’ from the engine. When finished, it will call the ‘store_result’ callback function to save the result in the list ’query_result’.
That is all to start using EQL within your tools.
The complete code example
All Python code for integrating EQL in your tool(s) can be downloaded from my GitHub account: github.com/marcusbakker/EQL. This code also includes functionality that was not shared above. Such as code that performs two example search queries on the files: example.json and data-sources-endpoints.yaml.
The code requires Python version 3 and was tested with v0.7 of EQL and v5.1.2 PyYAML. When run successfully it should produce the following output:
import datetime import json import sys from pprint import pprint import eql import yaml class EQLSearch: def _create_events(self, data, data_type, event_type, timestamp_key): """ Create EQL Events from the provided data. :param data: list of dictionaries to be transformed to EQL Events :param data_type: if 'yaml', serialize all 'Datetime.date' key-value pairs :param event_type: the value to be used as event_type for the provided data :param timestamp_key: name of the key-value pair to be used as timestamp :return: EQL Events or data """ eql_events = [] if data_type == 'yaml': data = self._serialize_date(data) # this result in EQL trying the derive the event_type and # timestamp from the contents of 'data' if not event_type: return data # create EQL Events from 'data' for item in data: eql_events.append(eql.Event(event_type, timestamp_key, item)) return eql_events def _execute_eql_query(self, events, query): """ Execute an EQL query on the provided events. :param events: events :param query: EQL query :return: the result of the query as a list of dictionaries or None when the query did not match the schema """ schema = eql.Schema.learn(events) query_result = [] # this function is used to store the result of the query to 'query_result' def store_result(result): for event in result.events: query_result.append(event.data) engine = eql.PythonEngine() with schema: try: eql_query = eql.parse_query(query, implied_any=True, implied_base=True) engine.add_query(eql_query) except eql.EqlError as e: print(e, file=sys.stderr) print('\nTake into account the following schema:') pprint(schema.schema) return None engine.add_output_hook(store_result) # execute the query engine.stream_events(events) return query_result def search(self, data, query, data_type='json', event_type=None, timestamp_key=0): """ Perform a EQL search on the provided JSON or YAML data. :param data: list of dictionaries :param query: EQL search query :param data_type: 'json' or 'yaml' :param event_type: name of the event type to use for the EQL schema. Leave empty if you want to derive the event type from the data itself (i.e. the key-value pair named 'event_type' or 'event_type_full'). :param timestamp_key: name of the key-value pair to be used as timestamp :return: the result of the search query as a list of dictionaries """ # check for a valid data_type if data_type != 'json' and data_type != 'yaml': raise ValueError("date_type should be 'json' or 'yaml'") # transform data into a list of EQL Event objects eql_events = self._create_events(data, data_type, event_type, timestamp_key) # execute the EQL query on the provided data search_result = self._execute_eql_query(eql_events, query) return search_result def _traverse_dict(self, obj, callback=None): """ Traverse all items in a dictionary :param obj: dictionary, list or value :param callback: the function that will be called to modify a value :return: value or call callback function """ if isinstance(obj, dict): value = {k: self._traverse_dict(v, callback) for k, v in obj.items()} elif isinstance(obj, list): value = [self._traverse_dict(elem, callback) for elem in obj] else: value = obj # if a callback is provided, call it to get the new value if callback is None: return value else: return callback(value) def _serialize_date(self, obj): """ Serialize a datetime.date object :param obj: dictionary :return: function call """ # this gets called for every value in the dictionary def _transformer(value): if isinstance(value, datetime.date): return str(value) else: return value return self._traverse_dict(obj, callback=_transformer) if __name__ == "__main__": eql_search = EQLSearch() with open('example.json', 'r') as json_data: data = json.load(json_data) query = 'process where pid == 424' result = eql_search.search(data, query) if result: print('Query: ' + query + '\nResult: ' + str(len(result)) + ' event(s) ↓\n') pprint(result) print('\n' + '-' * 80 + '\n') with open('data-sources-endpoints.yaml', 'r') as yaml_data: data = yaml.safe_load(yaml_data)['data_sources'] query = 'data_sources where date_connected >= "2019-01-01"' result = eql_search.search(data, query, data_type='yaml', event_type='data_sources', timestamp_key=0) if result: print('Query: ' + query + '\nResult: ' + str(len(result)) + ' event(s) ↓\n') pprint(result)