msticpy.data.core.data_providers module

Data provider loader.

class msticpy.data.core.data_providers.QueryProvider(data_environment: str | DataEnvironment, driver: DriverBase | None = None, query_paths: List[str] | None = None, **kwargs)

Bases: QueryProviderConnectionsMixin, QueryProviderUtilsMixin

Container for query store and query execution provider.

Instances of this class hold the query set and execution methods for a specific data environment.

Query provider interface to queries.

Parameters:
  • data_environment (Union[str, DataEnvironment]) – Name or Enum of environment for the QueryProvider

  • driver (DriverBase, optional) – Override the builtin driver (query execution class) and use your own driver (must inherit from DriverBase)

  • query_paths (List[str]) – Additional paths to look for query definitions.

  • kwargs – Other arguments are passed to the data provider driver.

Notes

Additional keyword arguments are passed to the data provider driver. The driver may support additional keyword arguments, use the QueryProvider.driver_help() method to get a list of these parameters.

See also

DataProviderBase

base class for data query providers.

add_connection(connection_str: str | None = None, alias: str | None = None, **kwargs)

Add an additional connection for the query provider.

Parameters:
  • connection_str (Optional[str], optional) – Connection string for the provider, by default None

  • alias (Optional[str], optional) – Alias to use for the connection, by default None

  • kwargs (Dict[str, Any]) – Other connection parameters passed to the driver.

Notes

Some drivers may accept types other than strings for the connection_str parameter.

add_custom_query(name: str, query: str, family: str | Iterable[str], description: str | None = None, parameters: Iterable[QueryParam] | None = None)

Add a custom function to the provider.

Parameters:
  • name (str) – The name of the query.

  • query (str) – The query text (optionally parameterized).

  • family (Union[str, Iterable[str]]) – The query group/family or list of families. The query will be added to attributes of the query provider with these names.

  • description (Optional[str], optional) – Optional description (for query help), by default None

  • parameters (Optional[Iterable[QueryParam]], optional) – Optional list of parameter definitions, by default None. If the query is parameterized you must supply definitions for the parameters here - at least name and type. Parameters can be the named tuple QueryParam (also exposed as QueryProvider.Param) or a 4-value

Examples

>>> qp = QueryProvider("MSSentinel")
>>> qp_host = qp.create_param("host_name", "str", "Name of Host")
>>> qp_start = qp.create_param("start", "datetime")
>>> qp_end = qp.create_param("end", "datetime")
>>> qp_evt = qp.create_param("event_id", "int", None, 4688)
>>>
>>> query = '''
>>> SecurityEvent
>>> | where EventID == {event_id}
>>> | where TimeGenerated between (datetime({start}) .. datetime({end}))
>>> | where Computer has "{host_name}"
>>> '''
>>>
>>> qp.add_custom_query(
>>>     name="test_host_proc",
>>>     query=query,
>>>     family="Custom",
>>>     parameters=[qp_host, qp_start, qp_end, qp_evt]
>>> )
browse(**kwargs)

Return QueryProvider query browser.

Parameters:

kwargs – passed to SelectItem constructor.

Returns:

SelectItem browser for TI Data.

Return type:

SelectItem

browse_queries(**kwargs)

Return QueryProvider query browser.

Parameters:

kwargs – passed to SelectItem constructor.

Returns:

SelectItem browser for TI Data.

Return type:

SelectItem

connect(connection_str: str | None = None, **kwargs)

Connect to data source.

Parameters:

connection_str (str) – Connection string for the data source

property connected: bool

Return True if the provider is connected.

Returns:

True if the provider is connected.

Return type:

bool

property connection_string: str

Return provider connection string.

Returns:

Provider connection string.

Return type:

str

create_param

alias of QueryParam

driver_class: Any
driver_help()

Display help for the driver.

property environment: str

Return the environment name.

exec_query(query: str, **kwargs) DataFrame | Any

Execute simple query string.

Parameters:
  • query (str) – [description]

  • use_connections (Union[str, List[str]])

  • query_options (Dict[str, Any]) – Additional options passed to query driver.

  • kwargs (Dict[str, Any]) – Additional options passed to query driver.

Returns:

Query results - a DataFrame if successful or a KqlResult if unsuccessful.

Return type:

Union[pd.DataFrame, Any]

get_query(query_name: str) str

Return the raw query text for query_name.

Parameters:

query_name (str) – The name of the query.

import_query_file(query_file: str)

Import a yaml data source definition.

Parameters:

query_file (str) – Path to the file to import

property instance: str | None

Return instance name, if any for provider.

Returns:

The instance name or None for drivers that do not support multiple instances.

Return type:

Optional[str]

list_connections() List[str]

Return a list of current connections.

Returns:

The alias and connection string for each connection.

Return type:

List[str]

classmethod list_data_environments() List[str]

Return list of current data environments.

Returns:

List of current data environments

Return type:

List[str]

list_queries(substring: str | None = None) List[str]

Return list of family.query in the store.

Parameters:

substring (Optional[str]) – Optional pattern - will return only queries matching the pattern, default None.

Returns:

List of queries

Return type:

List[str]

query_help(query_name: str)

Print help for query_name.

Parameters:

query_name (str) – The name of the query.

query_store: QueryStore
property query_time

Return the default QueryTime control for queries.

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns:

Data schema of current connection.

Return type:

Dict[str, Dict]

property schema_tables: List[str]

Return list of tables in the data schema of the connection.

Returns:

Tables in the of current connection.

Return type:

List[str]

search(search: str | Iterable[str] | None = None, table: str | Iterable[str] | None = None, param: str | Iterable[str] | None = None, ignore_case: bool = True) List[str]

Search queries for match properties.

Parameters:
  • search (Union[str, Iterable[str]], optional) – String or iterable of search terms to match on any property of the query, by default None. The properties include: name, description, table, parameter names and query_text.

  • table (Union[str, Iterable[str]], optional) – String or iterable of search terms to match on the query table name, by default None

  • param (Union[str, Iterable[str]], optional) – String or iterable of search terms to match on the query parameter names, by default None

  • ignore_case (bool) – Use case-insensitive search, default is True.

Returns:

A list of matched queries

Return type:

List[str]

Notes

Search terms are treated as regular expressions. Supplying multiple parameters returns the intersection of matches for each category. For example: qry_prov.search(search=”account”, table=”syslog”) will match queries that have a table parameter of “syslog” AND have the term “Account” somewhere in the query properties.