msticpy.data.core.query_provider_utils_mixin module

Query Provider mixin methods.

class msticpy.data.core.query_provider_utils_mixin.QueryParam(name: str, data_type: str, description: str | None = None, default: str | None = None)

Bases: NamedTuple

Named tuple for custom query parameters.

name and data_type are mandatory. description and default are optional.

Create new instance of QueryParam(name, data_type, description, default)

count(value, /)

Return number of occurrences of value.

data_type: str

Alias for field number 1

default: str | None

Alias for field number 3

description: str | None

Alias for field number 2

index(value, start=0, stop=9223372036854775807, /)

Return first index of value.

Raises ValueError if the value is not present.

name: str

Alias for field number 0

class msticpy.data.core.query_provider_utils_mixin.QueryProviderProtocol(*args, **kwargs)

Bases: Protocol

Protocol for required properties of QueryProvider class.

query_store: QueryStore
class msticpy.data.core.query_provider_utils_mixin.QueryProviderUtilsMixin(*args, **kwargs)

Bases: QueryProviderProtocol

Mixin utility methods for QueryProvider class.

add_custom_query(name: str, query: str, family: str | Iterable[str], description: str | None = None, parameters: Iterable[QueryParam] | None = None)

Add a custom function to the provider.

Parameters:
  • name (str) – The name of the query.

  • query (str) – The query text (optionally parameterized).

  • family (Union[str, Iterable[str]]) – The query group/family or list of families. The query will be added to attributes of the query provider with these names.

  • description (Optional[str], optional) – Optional description (for query help), by default None

  • parameters (Optional[Iterable[QueryParam]], optional) – Optional list of parameter definitions, by default None. If the query is parameterized you must supply definitions for the parameters here - at least name and type. Parameters can be the named tuple QueryParam (also exposed as QueryProvider.Param) or a 4-value

Examples

>>> qp = QueryProvider("MSSentinel")
>>> qp_host = qp.create_param("host_name", "str", "Name of Host")
>>> qp_start = qp.create_param("start", "datetime")
>>> qp_end = qp.create_param("end", "datetime")
>>> qp_evt = qp.create_param("event_id", "int", None, 4688)
>>>
>>> query = '''
>>> SecurityEvent
>>> | where EventID == {event_id}
>>> | where TimeGenerated between (datetime({start}) .. datetime({end}))
>>> | where Computer has "{host_name}"
>>> '''
>>>
>>> qp.add_custom_query(
>>>     name="test_host_proc",
>>>     query=query,
>>>     family="Custom",
>>>     parameters=[qp_host, qp_start, qp_end, qp_evt]
>>> )
browse(**kwargs)

Return QueryProvider query browser.

Parameters:

kwargs – passed to SelectItem constructor.

Returns:

SelectItem browser for TI Data.

Return type:

SelectItem

browse_queries(**kwargs)

Return QueryProvider query browser.

Parameters:

kwargs – passed to SelectItem constructor.

Returns:

SelectItem browser for TI Data.

Return type:

SelectItem

property connected: bool

Return True if the provider is connected.

Returns:

True if the provider is connected.

Return type:

bool

property connection_string: str

Return provider connection string.

Returns:

Provider connection string.

Return type:

str

create_param

alias of QueryParam

driver_help()

Display help for the driver.

get_query(query_name: str) str

Return the raw query text for query_name.

Parameters:

query_name (str) – The name of the query.

import_query_file(query_file: str)

Import a yaml data source definition.

Parameters:

query_file (str) – Path to the file to import

property instance: str | None

Return instance name, if any for provider.

Returns:

The instance name or None for drivers that do not support multiple instances.

Return type:

Optional[str]

classmethod list_data_environments() List[str]

Return list of current data environments.

Returns:

List of current data environments

Return type:

List[str]

list_queries(substring: str | None = None) List[str]

Return list of family.query in the store.

Parameters:

substring (Optional[str]) – Optional pattern - will return only queries matching the pattern, default None.

Returns:

List of queries

Return type:

List[str]

query_help(query_name: str)

Print help for query_name.

Parameters:

query_name (str) – The name of the query.

query_store: QueryStore
property schema: Dict[str, Dict]

Return current data schema of connection.

Returns:

Data schema of current connection.

Return type:

Dict[str, Dict]

property schema_tables: List[str]

Return list of tables in the data schema of the connection.

Returns:

Tables in the of current connection.

Return type:

List[str]

search(search: str | Iterable[str] | None = None, table: str | Iterable[str] | None = None, param: str | Iterable[str] | None = None, ignore_case: bool = True) List[str]

Search queries for match properties.

Parameters:
  • search (Union[str, Iterable[str]], optional) – String or iterable of search terms to match on any property of the query, by default None. The properties include: name, description, table, parameter names and query_text.

  • table (Union[str, Iterable[str]], optional) – String or iterable of search terms to match on the query table name, by default None

  • param (Union[str, Iterable[str]], optional) – String or iterable of search terms to match on the query parameter names, by default None

  • ignore_case (bool) – Use case-insensitive search, default is True.

Returns:

A list of matched queries

Return type:

List[str]

Notes

Search terms are treated as regular expressions. Supplying multiple parameters returns the intersection of matches for each category. For example: qry_prov.search(search=”account”, table=”syslog”) will match queries that have a table parameter of “syslog” AND have the term “Account” somewhere in the query properties.