msticpy.data.drivers.local_osquery_driver module
Local Osquery Data Driver class - osquery.{results,snapshots}.log.
- class msticpy.data.drivers.local_osquery_driver.OSQueryLogDriver(connection_str: str | None = None, **kwargs)
Bases:
DriverBase
OSQueryLogDriver class to execute kql queries.
Instantiate OSQueryLogDriver and optionally connect.
- Parameters:
connection_str (str, optional) – Connection string (not used)
data_paths (List[str], optional) – Paths from which to load data files
cache_file (str, optional) – Store extracted data to cache_file path, or read from this file, if it exists.
progress (bool, optional) – Show progress with tqdm, by default, True
- OS_QUERY_DATEIME_COLS = {'columns_atime', 'columns_ctime', 'columns_mtime', 'columns_time', 'unixTime'}
- add_query_filter(name: str, query_filter: str | Iterable)
Add an expression to the query attach filter.
- connect(connection_str: str | None = None, **kwargs)
Connect to data source.
- Parameters:
connection_str (str) – Connect to a data source
- property connected: bool
Return true if at least one connection has been made.
- Returns:
True if a successful connection has been made.
- Return type:
bool
Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
- property driver_queries: List[Dict[str, Any]]
Return dynamic queries available on connection to data.
- Returns:
List of queries with properties: “name”, “query”, “container” and (optionally) “description”
- Return type:
List[Dict[str, Any]]
- Raises:
MsticpyNotConnectedError – If called before driver is connected.
- get_driver_property(name: str) Any
Return value or KeyError from driver properties.
- static get_http_timeout(**kwargs)
Get http timeout from settings or kwargs.
- property instance: str | None
Return instance name, if one is set.
- Returns:
The name of driver instance or None if the driver does not support multiple instances
- Return type:
Optional[str]
- property loaded: bool
Return true if the provider is loaded.
- Returns:
True if the provider is loaded.
- Return type:
bool
Notes
This is not relevant for some providers.
- query(query: str, query_source: QuerySource | None = None, **kwargs) DataFrame | Any
Execute query string and return DataFrame of results.
- Parameters:
query (str) – The query to execute
query_source (QuerySource) – The query definition object
- Returns:
A DataFrame (if successful) or the underlying provider result if an error.
- Return type:
Union[pd.DataFrame, results.ResultSet]
- property query_attach_spec: Dict[str, Set[str]]
Parameters that determine whether a query is relevant for the driver.
- query_usable(query_source: QuerySource) bool
Return True if query should be exposed for this driver.
- query_with_results(query, **kwargs)
Return query with fake results.
- property schema: Dict[str, Dict]
Return current data schema of connection.
- Returns:
Data schema of current connection.
- Return type:
Dict[str, Dict]
- property service_queries: Tuple[Dict[str, str], str]
Return queries retrieved from the service after connecting.
- Returns:
Dictionary of query_name, query_text. Name of container to add queries to.
- Return type:
Tuple[Dict[str, str], str]
- set_driver_property(name: str, value: Any)
Set an item in driver properties.