How to integrate EQL into your tooling

At DerbyCon I had a conversation with Ross Wolf (@rw_access) from EndGame about the capabilities of EQL (Event Query Language) and how to integrate it in other tools. The purpose of this blog is to share my gained knowledge in that area and Python code to help others to integrate EQL within their tools.

What is EQL?

See the citation below for a quick introduction to EQL. For more details and examples see the blogs on Endgame’s website [1, 2, 3].

eql.png

EQL is a language that can match events, generate sequences, stack data, build aggregations, and perform analysis. EQL is schemaless and supports multiple database backends. It supports field lookups, boolean logic, comparisons, wildcard matching, and function calls. EQL also has a preprocessor that can perform parse and translation time evaluation, allowing for easily sharable components between queries.

Source: Endgame

Starting end of 2018, EndGame made EQL open source, and therefore you can start using it within your own tools or extending the capabilities of EQL. I think it is truly great to see this being shared with the community! I also like the fact it is written in Python: a very popular language for security tools.

When we wanted to have the capability within DeTT&CT (written in Python) to include or exclude certain objects (e.g. to filter or highlight detections for ATT&CK techniques with a low score), I was happy that I did not have to write the code myself to achieve this, but instead rely mainly on EQL doing that for me in a much better way.

integrate into your code

Now, let’s explain how to integrate EQL in your Python tool to query your data. Before you continue, make sure to have the EQL Python libraries available on your system. This can easily be done via pip: pip install eql

EQL: Not only for JSON and security events

EQL is used mainly for security events stored in the JSON format, but EQL can also be used for other purposes and data formats. As long as you provide the data as a list of Python dictionaries and have the key-value pairs be compatible with the JSON data format.

Within DeTT&CT we make use of several YAML files to store data. Luckily in Python, YAML is also presented as a list of dictionaries. To have these YAML files compatible with the JSON specs, I had to make sure to serialise any date key-value pairs of the type ‘Datetime.date’ (after loading the YAML file into a list of dictionaries) to a Python string:

  1. if isinstance(value, datetime.date):
  2. return str(value)
  3. else:
  4. return value

Create EQL events

The first step (or second when you first had to make the data compatible with the JSON specs) is to create EQL Event objects, which are used in EQL to perform data analytics on. Although this is not a mandatory step, it does provide you with more control over how the Events are created.

  • Set the value for the ‘event_type’ to be used within your search queries:

    • Having a good value will make it very clear on what kind of data you are performing your search query:

      • For example: “web_proxy where …” instead of “generic where …

    • When not creating the EQL Events yourself, EQL will try to derive the ‘event_type’ based on any field within your data named ‘event_type’ or ‘event_type_full’.

    • If it does not result in a value for the ‘event_type’, the default value ‘generic’ will be used.

  • Similar to the above, you can specify which data field will be used as the timestamp for your events. Otherwise, it tries to derive the timestamp based on a data field named ‘timestamp’.

Start with creating a list of EQL events from a list of Python dictionaries containing the data you want to query. The below function ‘_create_events’ does precisely that. When you are for example dealing with YAML as an input, the function ‘_serialize_date’ will serialise every ‘Datetime.date’ key-value pair to a String before creating the EQL Events:

  1. def _create_events(self, data, data_type, event_type, timestamp_key):
  2. """
  3. Create EQL Events from the provided data.
  4. :param data: list of dictionaries to be transformed to EQL Events
  5. :param data_type: if 'yaml', serialize all 'Datetime.date' key-value pairs
  6. :param event_type: the value to be used as event_type for the provided data
  7. :param timestamp_key: name of the key-value pair to be used as timestamp
  8. :return: EQL Events or data
  9. """
  10. eql_events = []
  11.  
  12. if data_type == 'yaml':
  13. data = self._serialize_date(data)
  14.  
  15. # this result in EQL trying the derive the event_type and
  16. # timestamp from the contents of 'data'
  17. if not event_type:
  18. return data
  19.  
  20. # create EQL Events from 'data'
  21. for item in data:
  22. eql_events.append(eql.Event(event_type, timestamp_key, item))
  23.  
  24. return eql_events

Learn the schema

The next step is to learn the schema of the data. In the code below the variable ‘events’ is the list of EQL Event objects created previously by the function ‘_create_events’:

  1. schema = eql.Schema.learn(events)
Example EQL syntax error message.

Example EQL syntax error message.

Learning the schema is not a necessary step to execute EQL search queries. But it does give you the capability to print a detailed error message when making syntax mistakes in your query.

See the picture for an example of an error message, which nicely highlights that ‘pi’ is an unrecognised field. As shown in the schema, ‘pid’ should be used instead.

The EQL library does not print this error message and the schema for you. Fortunately, this only requires a few lines of code:

  1. with schema:
  2. try:
  3. eql_query = eql.parse_query(query, implied_any=True,
  4. implied_base=True)
  5. engine.add_query(eql_query)
  6. except eql.EqlError as e:
  7. print(e, file=sys.stderr)
  8. print('\nTake into account the following schema:')
  9. pprint(schema.schema)
  10. return None

Create the EQL engine and Execute the query

After you have created a list of EQL Events from your data and optionally learned the schema, you can continue with creating the EQL Python engine to execute the query. The function below will return a list of dictionaries containing the Event data that match your query, which you can use within your own tooling to do whatever you want.

  1. def _execute_eql_query(self, events, query):
  2. """
  3. Execute an EQL query on the provided events.
  4. :param events: events
  5. :param query: EQL query
  6. :return: the result of the query as a list of dictionaries or
  7. None when the query did not match the schema
  8. """
  9. schema = eql.Schema.learn(events)
  10.  
  11. query_result = []
  12.  
  13. # this function is used to store the result of the query to 'query_result'
  14. def store_result(result):
  15. for event in result.events:
  16. query_result.append(event.data)
  17.  
  18. engine = eql.PythonEngine()
  19. with schema:
  20. try:
  21. eql_query = eql.parse_query(query, implied_any=True,
  22. implied_base=True)
  23. engine.add_query(eql_query)
  24. except eql.EqlError as e:
  25. print(e, file=sys.stderr)
  26. print('\nTake into account the following schema:')
  27. pprint(schema.schema)
  28. return None
  29. engine.add_output_hook(store_result)
  30.  
  31. # execute the query
  32. engine.stream_events(events)
  33.  
  34. return query_result
  • 11: create a Python list to store the results of the query.

  • 14-16: in my code, I choose to make use of an inner function ‘store_result’ to be used as a callback for the EQL engine. This function will be called after the query has finished and stores the result in the list ’query_result’.

  • 18: create the EQL Python engine.

  • 21-22: parse the query using the function: ‘parse_query’.

    • Because the parameter ‘implied_any’ is set to ‘True’, it will give the possibility to shorten queries by removing the ‘event_type’ and the ‘where’ clause. For example: “process where pid == 424” and “pid == 424” are both valid and return the same result.

  • 23: add the query to the engine.

  • 24-28: the query syntax error handling as discussed earlier.

  • 29: add the callback function ‘store_result’ to the engine using ‘add_output_hook’.

  • 32: execute the query by calling the function ‘stream_events’ from the engine. When finished, it will call the ‘store_result’ callback function to save the result in the list ’query_result’.

That is all to start using EQL within your tools.

The complete code example

All Python code for integrating EQL in your tool(s) can be downloaded from my GitHub account: github.com/marcusbakker/EQL. This code also includes functionality that was not shared above. Such as code that performs two example search queries on the files: example.json and data-sources-endpoints.yaml.

The code requires Python version 3 and was tested with v0.7 of EQL and v5.1.2 PyYAML. When run successfully it should produce the following output:

output_eql_demo_tool.png
eql_demo_tool.py
  1. import datetime
  2. import json
  3. import sys
  4. from pprint import pprint
  5. import eql
  6. import yaml
  7.  
  8.  
  9. class EQLSearch:
  10. def _create_events(self, data, data_type, event_type, timestamp_key):
  11. """
  12. Create EQL Events from the provided data.
  13. :param data: list of dictionaries to be transformed to EQL Events
  14. :param data_type: if 'yaml', serialize all 'Datetime.date' key-value pairs
  15. :param event_type: the value to be used as event_type for the provided data
  16. :param timestamp_key: name of the key-value pair to be used as timestamp
  17. :return: EQL Events or data
  18. """
  19. eql_events = []
  20.  
  21. if data_type == 'yaml':
  22. data = self._serialize_date(data)
  23.  
  24. # this result in EQL trying the derive the event_type and
  25. # timestamp from the contents of 'data'
  26. if not event_type:
  27. return data
  28.  
  29. # create EQL Events from 'data'
  30. for item in data:
  31. eql_events.append(eql.Event(event_type, timestamp_key, item))
  32.  
  33. return eql_events
  34.  
  35. def _execute_eql_query(self, events, query):
  36. """
  37. Execute an EQL query on the provided events.
  38. :param events: events
  39. :param query: EQL query
  40. :return: the result of the query as a list of dictionaries or
  41. None when the query did not match the schema
  42. """
  43. schema = eql.Schema.learn(events)
  44.  
  45. query_result = []
  46.  
  47. # this function is used to store the result of the query to 'query_result'
  48. def store_result(result):
  49. for event in result.events:
  50. query_result.append(event.data)
  51.  
  52. engine = eql.PythonEngine()
  53. with schema:
  54. try:
  55. eql_query = eql.parse_query(query, implied_any=True,
  56. implied_base=True)
  57. engine.add_query(eql_query)
  58. except eql.EqlError as e:
  59. print(e, file=sys.stderr)
  60. print('\nTake into account the following schema:')
  61. pprint(schema.schema)
  62. return None
  63. engine.add_output_hook(store_result)
  64.  
  65. # execute the query
  66. engine.stream_events(events)
  67.  
  68. return query_result
  69.  
  70. def search(self, data, query, data_type='json',
  71. event_type=None, timestamp_key=0):
  72. """
  73. Perform a EQL search on the provided JSON or YAML data.
  74. :param data: list of dictionaries
  75. :param query: EQL search query
  76. :param data_type: 'json' or 'yaml'
  77. :param event_type: name of the event type to use for the EQL schema.
  78. Leave empty if you want to derive the event type from the data itself
  79. (i.e. the key-value pair named 'event_type' or 'event_type_full').
  80. :param timestamp_key: name of the key-value pair to be used as timestamp
  81. :return: the result of the search query as a list of dictionaries
  82. """
  83. # check for a valid data_type
  84. if data_type != 'json' and data_type != 'yaml':
  85. raise ValueError("date_type should be 'json' or 'yaml'")
  86.  
  87. # transform data into a list of EQL Event objects
  88. eql_events = self._create_events(data, data_type, event_type, timestamp_key)
  89.  
  90. # execute the EQL query on the provided data
  91. search_result = self._execute_eql_query(eql_events, query)
  92.  
  93. return search_result
  94.  
  95. def _traverse_dict(self, obj, callback=None):
  96. """
  97. Traverse all items in a dictionary
  98. :param obj: dictionary, list or value
  99. :param callback: the function that will be called to modify a value
  100. :return: value or call callback function
  101. """
  102. if isinstance(obj, dict):
  103. value = {k: self._traverse_dict(v, callback)
  104. for k, v in obj.items()}
  105. elif isinstance(obj, list):
  106. value = [self._traverse_dict(elem, callback)
  107. for elem in obj]
  108. else:
  109. value = obj
  110.  
  111. # if a callback is provided, call it to get the new value
  112. if callback is None:
  113. return value
  114. else:
  115. return callback(value)
  116.  
  117. def _serialize_date(self, obj):
  118. """
  119. Serialize a datetime.date object
  120. :param obj: dictionary
  121. :return: function call
  122. """
  123.  
  124. # this gets called for every value in the dictionary
  125. def _transformer(value):
  126. if isinstance(value, datetime.date):
  127. return str(value)
  128. else:
  129. return value
  130.  
  131. return self._traverse_dict(obj, callback=_transformer)
  132.  
  133.  
  134. if __name__ == "__main__":
  135. eql_search = EQLSearch()
  136.  
  137. with open('example.json', 'r') as json_data:
  138. data = json.load(json_data)
  139.  
  140. query = 'process where pid == 424'
  141. result = eql_search.search(data, query)
  142. if result:
  143. print('Query: ' + query + '\nResult: ' +
  144. str(len(result)) + ' event(s) ↓\n')
  145. pprint(result)
  146.  
  147. print('\n' + '-' * 80 + '\n')
  148.  
  149. with open('data-sources-endpoints.yaml', 'r') as yaml_data:
  150. data = yaml.safe_load(yaml_data)['data_sources']
  151.  
  152. query = 'data_sources where date_connected >= "2019-01-01"'
  153. result = eql_search.search(data, query, data_type='yaml',
  154. event_type='data_sources', timestamp_key=0)
  155. if result:
  156. print('Query: ' + query + '\nResult: ' +
  157. str(len(result)) + ' event(s) ↓\n')
  158. pprint(result)
  159.