msticpy.data.drivers.local_osquery_driver module

Local Osquery Data Driver class - osquery.{results,snapshots}.log.

class msticpy.data.drivers.local_osquery_driver.OSQueryLogDriver(connection_str: str | None = None, **kwargs)

Bases: DriverBase

OSQueryLogDriver class to execute kql queries.

Instantiate OSQueryLogDriver and optionally connect.

Parameters:
  • connection_str (str, optional) – Connection string (not used)

  • data_paths (List[str], optional) – Paths from which to load data files

  • cache_file (str, optional) – Store extracted data to cache_file path, or read from this file, if it exists.

  • progress (bool, optional) – Show progress with tqdm, by default, True

OS_QUERY_DATEIME_COLS = {'columns_atime', 'columns_ctime', 'columns_mtime', 'columns_time', 'unixTime'}
add_query_filter(name: str, query_filter: str | Iterable)

Add an expression to the query attach filter.

connect(connection_str: str | None = None, **kwargs)

Connect to data source.

Parameters:

connection_str (str) – Connect to a data source

property connected: bool

Return true if at least one connection has been made.

Returns:

True if a successful connection has been made.

Return type:

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: List[Dict[str, Any]]

Return dynamic queries available on connection to data.

Returns:

List of queries with properties: “name”, “query”, “container” and (optionally) “description”

Return type:

List[Dict[str, Any]]

Raises:

MsticpyNotConnectedError – If called before driver is connected.

get_driver_property(name: str) Any

Return value or KeyError from driver properties.

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property instance: str | None

Return instance name, if one is set.

Returns:

The name of driver instance or None if the driver does not support multiple instances

Return type:

Optional[str]

property loaded: bool

Return true if the provider is loaded.

Returns:

True if the provider is loaded.

Return type:

bool

Notes

This is not relevant for some providers.

query(query: str, query_source: QuerySource | None = None, **kwargs) DataFrame | Any

Execute query string and return DataFrame of results.

Parameters:
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

Returns:

A DataFrame (if successful) or the underlying provider result if an error.

Return type:

Union[pd.DataFrame, results.ResultSet]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_usable(query_source: QuerySource) bool

Return True if query should be exposed for this driver.

query_with_results(query, **kwargs)

Return query with fake results.

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns:

Data schema of current connection.

Return type:

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns:

Dictionary of query_name, query_text. Name of container to add queries to.

Return type:

Tuple[Dict[str, str], str]

set_driver_property(name: str, value: Any)

Set an item in driver properties.