msticpy.data package

msticpy.nbtools.entityschema module

msticpy.nbtools.security_alert module

Module for SecurityAlert class.

class msticpy.nbtools.security_alert.SecurityAlert(*args, **kwargs)

Bases: msticpy.nbtools.security_base.SecurityBase

Security Alert Class.

Instantiates a security alert from a pandas Series and provides convenience access methods to retrieve properties.

Deprecated since version 1.7.0: Replaced by Alert entity in datamodel

Instantiate a security alert from a pandas Series.

property computer: Optional[str]

Return the Computer name of the host associated with the alert.

(host FQDN, if available)

property data_environment: msticpy.data.query_defns.DataEnvironment

Return the data environment of the alert for subsequent queries.

property data_family: msticpy.datamodel.entities.entity_enums.OSFamily

Return the data family of the alert for subsequent queries.

property entities: List[msticpy.datamodel.entities.entity.Entity]

Return a list of the Security Alert entities.

get_all_entities() pandas.core.frame.DataFrame

Return a DataFrame of the Alert or Event entities.

Returns

Pandas DataFrame of the Alert or Event entities.

Return type

DataFrame

get_entities_of_type(entity_type: str) List[msticpy.datamodel.entities.entity.Entity]

Return entity collection for a give entity type.

Parameters

entity_type (str, optional) – The entity type.

Returns

The entities matching entity_type.

Return type

List[Entity]

get_logon_id(account: Optional[msticpy.datamodel.entities.account.Account] = None) Optional[Union[str, int]]

Get the logon Id for the alert or the account, if supplied.

If account is not supplied, return the logon id of the first host-logon-session or account entity.

Parameters

account (Account, optional) – Account objec to use (the default is None)

Returns

The logon Id for primary account

Return type

Optional[Union[str, int]]

host_filter(operator='==')

Return a KQL host filter clause derived from the alert properties.

param operator=’==’

the operator to use in the filter clause. ‘==’ and ‘!=’ typically.

property hostname: str

Return the Hostname (not FQDN) of the host associated with the alert.

property ids: Dict[str, str]

Return a collection of Identity properties for the alert.

property is_in_azure_sub: bool

Return True if the alert originates from an Azure Security Center host.

property is_in_log_analytics: bool

Return True if the alert originates from a Log Analytics Workspace host.

property is_in_workspace: bool

Return True if the alert has a Log Analytics WorkspaceID.

property origin_time: datetime.datetime

Return the datetime of event.

property primary_account: Optional[Union[msticpy.datamodel.entities.process.Process, msticpy.datamodel.entities.entity.Entity]]

Return the primary account entity (if any) associated with this object.

Returns

primary account entity (if any)

Return type

Optional[Process]

property primary_host: Optional[Union[msticpy.datamodel.entities.host.Host, msticpy.datamodel.entities.entity.Entity]]

Return the primary host entity (if any) associated with this object.

Returns

primary host entity (if any)

Return type

Optional[Host]

property primary_process: Optional[Union[msticpy.datamodel.entities.process.Process, msticpy.datamodel.entities.entity.Entity]]

Return the primary process entity (if any) associated with this object.

Returns

primary process entity (if any)

Return type

Optional[Process]

property properties: Dict[str, Any]

Return a dictionary of the Alert or Event properties.

Returns

dictionary of the Alert or Event properties.

Return type

Dict[str, Any]

property query_params: Dict[str, Any]

Query parameters derived from alert.

Returns

Dictionary of parameter names/value

Return type

Dict[str, Any]

subscription_filter(operator='==')

Return a KQL subscription filter clause derived from the alert properties.

to_html(show_entities=False) str

Return the item as HTML string.

msticpy.nbtools.security_alert_graph module

security_alert_graph.

Creates an entity graph for the alert.

Add related alerts to the graph.

Link to the entity that is common to both alerts.

msticpy.nbtools.security_alert_graph.create_alert_graph(alert: msticpy.nbtools.security_alert.SecurityAlert)

Create a networkx graph from the alert and contained entities.

msticpy.nbtools.security_base module

Module for SecurityAlert class.

class msticpy.nbtools.security_base.SecurityBase(*args, **kwargs)

Bases: msticpy.data.query_defns.QueryParamProvider

Security Base Class for alerts and events.

Instantiates a security event or alert from a pandas Series and provides convenience access methods to retrieve properties.

Deprecated since version 1.7.0: Replaced by Alert entity in datamodel

Instantiate a security alert from a pandas Series.

property computer: Optional[str]

Return the Computer name of the host associated with the alert.

(host FQDN, if available)

property data_environment: msticpy.data.query_defns.DataEnvironment

Return the data environment of the alert for subsequent queries.

property data_family: msticpy.datamodel.entities.entity_enums.OSFamily

Return the data family of the alert for subsequent queries.

property entities: List[msticpy.datamodel.entities.entity.Entity]

Return a list of the Alert or Event entities.

Returns

List of the Alert or Event entities.

Return type

List[Entity]

get_all_entities() pandas.core.frame.DataFrame

Return a DataFrame of the Alert or Event entities.

Returns

Pandas DataFrame of the Alert or Event entities.

Return type

DataFrame

get_entities_of_type(entity_type: str) List[msticpy.datamodel.entities.entity.Entity]

Return entity collection for a give entity type.

Parameters

entity_type (str, optional) – The entity type.

Returns

The entities matching entity_type.

Return type

List[Entity]

get_logon_id(account: Optional[msticpy.datamodel.entities.account.Account] = None) Optional[Union[str, int]]

Get the logon Id for the alert or the account, if supplied.

If account is not supplied, return the logon id of the first host-logon-session or account entity.

Parameters

account (Account, optional) – Account objec to use (the default is None)

Returns

The logon Id for primary account

Return type

Optional[Union[str, int]]

host_filter(operator='==')

Return a KQL host filter clause derived from the alert properties.

param operator=’==’

the operator to use in the filter clause. ‘==’ and ‘!=’ typically.

property hostname: str

Return the Hostname (not FQDN) of the host associated with the alert.

property ids: Dict[str, str]

Return a collection of Identity properties for the alert.

property is_in_azure_sub: bool

Return True if the alert originates from an Azure Security Center host.

property is_in_log_analytics: bool

Return True if the alert originates from a Log Analytics Workspace host.

property is_in_workspace: bool

Return True if the alert has a Log Analytics WorkspaceID.

property origin_time: datetime.datetime

Return the datetime of event.

property primary_account: Optional[Union[msticpy.datamodel.entities.process.Process, msticpy.datamodel.entities.entity.Entity]]

Return the primary account entity (if any) associated with this object.

Returns

primary account entity (if any)

Return type

Optional[Process]

property primary_host: Optional[Union[msticpy.datamodel.entities.host.Host, msticpy.datamodel.entities.entity.Entity]]

Return the primary host entity (if any) associated with this object.

Returns

primary host entity (if any)

Return type

Optional[Host]

property primary_process: Optional[Union[msticpy.datamodel.entities.process.Process, msticpy.datamodel.entities.entity.Entity]]

Return the primary process entity (if any) associated with this object.

Returns

primary process entity (if any)

Return type

Optional[Process]

property properties: Dict[str, Any]

Return a dictionary of the Alert or Event properties.

Returns

dictionary of the Alert or Event properties.

Return type

Dict[str, Any]

property query_params: Dict[str, Any]

Query parameters derived from alert.

Returns

Dictionary of parameter names/values

Return type

Dict[str, Any]

subscription_filter(operator='==')

Return a KQL subscription filter clause derived from the alert properties.

to_html(show_entities: bool = False) str

Return the item as HTML string.

msticpy.nbtools.security_event module

Module for SecurityEvent class.

class msticpy.nbtools.security_event.SecurityEvent(*args, **kwargs)

Bases: msticpy.nbtools.security_base.SecurityBase

SecurityEvent class.

Deprecated since version 1.7.0: Replaced by datamodel entitis

Instantiate new instance of SecurityEvent.

param src_row

Pandas series containing single security event

property computer: Optional[str]

Return the Computer name of the host associated with the alert.

(host FQDN, if available)

property data_environment: msticpy.data.query_defns.DataEnvironment

Return the data environment of the alert for subsequent queries.

property data_family: msticpy.datamodel.entities.entity_enums.OSFamily

Return the data family of the alert for subsequent queries.

property entities: List[msticpy.datamodel.entities.entity.Entity]

Return the list of entities extracted from the event.

Returns

The list of entities extracted from the event.

Return type

List[Entity]

get_all_entities() pandas.core.frame.DataFrame

Return a DataFrame of the Alert or Event entities.

Returns

Pandas DataFrame of the Alert or Event entities.

Return type

DataFrame

get_entities_of_type(entity_type: str) List[msticpy.datamodel.entities.entity.Entity]

Return entity collection for a give entity type.

Parameters

entity_type (str, optional) – The entity type.

Returns

The entities matching entity_type.

Return type

List[Entity]

get_logon_id(account: Optional[msticpy.datamodel.entities.account.Account] = None) Optional[Union[str, int]]

Get the logon Id for the alert or the account, if supplied.

If account is not supplied, return the logon id of the first host-logon-session or account entity.

Parameters

account (Account, optional) – Account objec to use (the default is None)

Returns

The logon Id for primary account

Return type

Optional[Union[str, int]]

host_filter(operator='==')

Return a KQL host filter clause derived from the alert properties.

param operator=’==’

the operator to use in the filter clause. ‘==’ and ‘!=’ typically.

property hostname: str

Return the Hostname (not FQDN) of the host associated with the alert.

property ids: Dict[str, str]

Return a collection of Identity properties for the alert.

property is_in_azure_sub: bool

Return True if the alert originates from an Azure Security Center host.

property is_in_log_analytics: bool

Return True if the alert originates from a Log Analytics Workspace host.

property is_in_workspace: bool

Return True if the alert has a Log Analytics WorkspaceID.

property origin_time: datetime.datetime

Return the datetime of event.

property primary_account: Optional[Union[msticpy.datamodel.entities.process.Process, msticpy.datamodel.entities.entity.Entity]]

Return the primary account entity (if any) associated with this object.

Returns

primary account entity (if any)

Return type

Optional[Process]

property primary_host: Optional[Union[msticpy.datamodel.entities.host.Host, msticpy.datamodel.entities.entity.Entity]]

Return the primary host entity (if any) associated with this object.

Returns

primary host entity (if any)

Return type

Optional[Host]

property primary_process: Optional[Union[msticpy.datamodel.entities.process.Process, msticpy.datamodel.entities.entity.Entity]]

Return the primary process entity (if any) associated with this object.

Returns

primary process entity (if any)

Return type

Optional[Process]

property properties: Dict[str, Any]

Return a dictionary of the Alert or Event properties.

Returns

dictionary of the Alert or Event properties.

Return type

Dict[str, Any]

property query_params: Dict[str, Any]

Query parameters derived from alert.

Returns

Dictionary of parameter names

Return type

Dict[str, Any]

subscription_filter(operator='==')

Return a KQL subscription filter clause derived from the alert properties.

to_html(show_entities: bool = False) str

Return the item as HTML string.

msticpy.data.data_providers module

Data provider loader.

class msticpy.data.data_providers.QueryProvider(data_environment: Union[str, msticpy.data.query_defns.DataEnvironment], driver: Optional[msticpy.data.drivers.driver_base.DriverBase] = None, query_paths: Optional[List[str]] = None, **kwargs)

Bases: object

Container for query store and query execution provider.

Instances of this class hold the query set and execution methods for a specific data environment.

Query provider interface to queries.

Parameters
  • data_environment (Union[str, DataEnvironment]) – Name or Enum of environment for the QueryProvider

  • driver (DriverBase, optional) – Override the builtin driver (query execution class) and use your own driver (must inherit from DriverBase)

  • query_paths (List[str]) – Additional paths to look for query definitions.

  • kwargs – Other arguments are passed to the data provider driver.

See also

DataProviderBase

base class for data query providers.

add_connection(connection_str: Optional[str] = None, alias: Optional[str] = None, **kwargs)

Add an additional connection for the query provider.

Parameters
  • connection_str (Optional[str], optional) – Connection string for the provider, by default None

  • alias (Optional[str], optional) – Alias to use for the connection, by default None

  • kwargs (Dict[str, Any]) – Other parameters passed to the driver constructor.

Notes

Some drivers may accept types other than strings for the connection_str parameter.

browse(**kwargs)

Return QueryProvider query browser.

Parameters

kwargs – passed to SelectItem constructor.

Returns

SelectItem browser for TI Data.

Return type

SelectItem

browse_queries(**kwargs)

Return QueryProvider query browser.

Parameters

kwargs – passed to SelectItem constructor.

Returns

SelectItem browser for TI Data.

Return type

SelectItem

connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters

connection_str (str) – Connection string for the data source

property connected: bool

Return True if the provider is connected.

Returns

True if the provider is connected.

Return type

bool

property connection_string: str

Return provider connection string.

Returns

Provider connection string.

Return type

str

exec_query(query: str, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute simple query string.

Parameters
  • query (str) – [description]

  • use_connections (Union[str, List[str]]) –

  • query_options (Dict[str, Any]) – Additional options passed to query driver.

  • kwargs (Dict[str, Any]) – Additional options passed to query driver.

Returns

Query results - a DataFrame if successful or a KqlResult if unsuccessful.

Return type

Union[pd.DataFrame, Any]

get_query(query_name: str) str

Return the raw query text for query_name.

Parameters

query_name (str) – The name of the query.

import_query_file(query_file: str)

Import a yaml data source definition.

Parameters

query_file (str) – Path to the file to import

list_connections() List[str]

Return a list of current connections or the default connection.

Returns

The alias and connection string for each connection.

Return type

List[str]

classmethod list_data_environments() List[str]

Return list of current data environments.

Returns

List of current data environments

Return type

List[str]

list_queries(substring: Optional[str] = None) List[str]

Return list of family.query in the store.

Parameters

substring (Optional[str]) – Optional pattern - will return only queries matching the pattern, default None.

Returns

List of queries

Return type

List[str]

query_help(query_name: str)

Print help for query_name.

Parameters

query_name (str) – The name of the query.

property query_time

Return the default QueryTime control for queries.

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property schema_tables: List[str]

Return list of tables in the data schema of the connection.

Returns

Tables in the of current connection.

Return type

List[str]

msticpy.data.data_query_reader module

Data query definition reader.

msticpy.data.data_query_reader.find_yaml_files(source_path: str, recursive: bool = False) Iterable[pathlib.Path]

Return iterable of yaml files found in source_path.

Parameters
  • source_path (str) – The source path to search in.

  • recursive (bool, optional) – Whether to recurse through subfolders. By default False

Returns

File paths of yanl files found.

Return type

Iterable[str]

msticpy.data.data_query_reader.read_query_def_file(query_file: str) Tuple[Dict, Dict, Dict]

Read a yaml data query definition file.

Parameters

query_file (str) – Path to yaml query defintion file

Returns

Tuple of dictionaries. sources - dictionary of query definitions defaults - the default parameters from the file metadata - the global metadata from the file

Return type

Tuple[Dict, Dict, Dict]

msticpy.data.data_query_reader.validate_query_defs(query_def_dict: Dict[str, Any]) bool

Validate content of query definition.

Parameters

query_def_dict (dict) – Dictionary of query definition yaml file contents.

Returns

True if validation succeeds.

Return type

bool

Raises

ValueError – The validation failure reason is returned in the exception message (arg[0])

msticpy.data.param_extractor module

Parameter extractor helper functions for use with IPython/Juptyer queries.

msticpy.data.param_extractor.extract_query_params(query_source: msticpy.data.query_source.QuerySource, *args, **kwargs) Tuple[Dict[str, Any], List[str]]

Get the parameters needed for the query.

Parameters
  • query_source (QuerySource) – Query source

  • args (Tuple[QueryParamProvider]) – objects that implement QueryParamProvider (from which query parameters can be extracted).

  • kwargs (Dict[str, Any]) – custom parameter list to populate queries (override default values and values extracted from QueryParamProviders).

Returns

Dictionary of parameter names and values to be used in the query. List of any missing parameters

Return type

Tuple[Dict[str, Any], List[str]]

msticpy.data.query_container module

Query hierarchy attribute class.

class msticpy.data.query_container.QueryContainer

Bases: object

Empty class used to create hierarchical attributes.

msticpy.data.query_defns module

Query helper definitions.

class msticpy.data.query_defns.DataEnvironment(value)

Bases: enum.Enum

Enumeration of data environments.

Used to identify which queries are relevant for which data sources.

AzureSecurityCenter = 3
AzureSentinel = 1
Cybereason = 12
Kusto = 2
LocalData = 6
LogAnalytics = 1
M365D = 11
MDATP = 5
MDE = 5
MSSentinel = 1
Mordor = 8
ResourceGraph = 9
SecurityGraph = 4
Splunk = 7
Sumologic = 10
Unknown = 0
classmethod parse(value: Union[str, int]) msticpy.data.query_defns.DataEnvironment

Convert string or int to enum.

Parameters

value (Union[str, int]) – value to parse

class msticpy.data.query_defns.DataFamily(value)

Bases: enum.Enum

Enumeration of data families.

Used to identify which queries are relevant for which data sources.

AzureNetwork = 6
Cybereason = 11
LinuxSecurity = 2
LinuxSyslog = 5
MDATP = 7
ResourceGraph = 9
SecurityAlert = 3
SecurityGraphAlert = 4
Splunk = 8
Sumologic = 10
Unknown = 0
WindowsSecurity = 1
classmethod parse(value: Union[str, int]) msticpy.data.query_defns.DataFamily

Convert string or int to enum.

Parameters

value (Union[str, int]) – value to parse

class msticpy.data.query_defns.QueryParamProvider

Bases: abc.ABC

Abstract type for QueryParamProvider.

Method query_params must be overridden by derived classes.

abstract property query_params

Return dict of query parameters.

These parameters are sourced in the object implementing this method.

Returns

Return type

dict – dictionary of query parameter values.

msticpy.data.query_defns.ensure_df_datetimes(data: pandas.core.frame.DataFrame, columns: Optional[Union[str, List[str]]] = None, add_utc_tz: bool = True) pandas.core.frame.DataFrame

Return dataframe with converted TZ-aware timestamps.

Parameters
  • data (pd.DataFrame) – Input dataframe

  • columns (Union[str, List[str], None], optional) – column (str) or list of columns to convert, by default None. If this parameter is not supplied then any column containing the substring “time” is used as a candidate for conversion.

  • add_utc_tz (bool, optional) – If True any datetime columns in the columns parameter ( (or default ‘.*time.*’ columns) that are timezone-naive, will be converted to Timezone-aware timestamps marked as UTC.

Returns

Converted DataFrame.

Return type

pd.DataFrame

msticpy.data.query_source module

Intake kql driver.

class msticpy.data.query_source.QuerySource(name: str, source: Dict[str, Any], defaults: Dict[str, Any], metadata: Dict[str, Any])

Bases: object

Query definition class for templated queries.

name

The query name

Type

str

metadata

The consolidated metadata for the query

Type

Dict[str, Any]

params

The dictionary of parameter definitions for the query.

Type

dict[str, Any]

query_store

The query store object that the query belongs to

Type

QueryStore

Initialize query source definition.

Parameters
  • name (str) – The query name

  • source (dict) – The data source definition settings

  • defaults (dict) – The default settings (if source-specific setting not supplied)

  • metadata (dict) – The global metadata from the source file.

Notes

A data source can belong to multiple families (e.g. a query that joins data from several sources)

create_doc_string() str

Return a doc string for the query.

Returns

New-line delimited docstring dynamically created from query definition properties.

Return type

str

create_query(formatters: Optional[Dict[str, Callable]] = None, **kwargs) str

Return query with values from kwargs and defaults substituted.

Parameters
  • formatters (Dict[str, Callable]) – Dictionary of custom parameter formatters indexed by data type

  • kwargs (Mapping[str, Any]) – Set of parameter name, value pairs used to populate the template query.

Returns

The populated query

Return type

str

Raises

ValueError – If one or more parameters with no default values are not supplied.

Notes

Parameters supplied as arguments will override any parameter defaults (see default_params property).

property data_families: List[str]

Return the list of data families used by the query.

Returns

The list of data families. A data family is usually equivalent to a table or entity set.

Return type

List[str]

property default_params: Dict[str, dict]

Return the set of parameters with default values.

Returns

List of parameters

Return type

Iterable[dict]

property description: str

Return description of the query.

Returns

Query description.

Return type

str

help()

Print help for query.

property query: str

Return the query template.

Returns

The template query.

Return type

str

property required_params: Dict[str, dict]

Return the set of parameters with no default values.

Returns

List of parameters

Return type

Iterable[dict]

resolve_param_aliases(param_dict: Dict[str, Any]) Dict[str, Any]

Try to resolve any parameters in param_dict that are aliases.

validate() Tuple[bool, List[str]]

Validate the source to ensure that all required properties are present.

Returns

True if validation is successful.

Return type

bool

msticpy.data.query_store module

QueryStore class - holds a collection of QuerySources.

class msticpy.data.query_store.QueryStore(environment: str)

Bases: object

Repository for query definitions for a data environment.

environment

The data environment for the queries.

Type

str

data_families

The set of data families and associated queries for each.

Type

Dict[str, Dict[str, QuerySource]]

Intialize a QueryStore for a new environment.

Parameters

environment (str) – The data environment

add_data_source(source: msticpy.data.query_source.QuerySource)

Add a datasource/query to the store.

Parameters

source (QuerySource) – The source to add. An existing item with the same name will be overwritten

add_query(name: str, query: str, query_paths: Union[str, List[str]], description: Optional[str] = None)

Add a query from name/query text.

Parameters
  • name (str) – name of the query

  • query (str) – The query string

  • query_paths (Union[str, List[str]]) – The path/data_family to categorize. Multiple paths can be specified. If the path is dotted, this will cause the query to be displayed in the corresponding hierarchy.

  • description (str, optional) – Query description

find_query(query_name: str) Set[Optional[msticpy.data.query_source.QuerySource]]

Return set of queries with name query_name.

Parameters

query_name (str) – Name of the query

Returns

Set (distinct) queries matching name.

Return type

Set[QuerySource]

get_query(query_name: str, query_path: Optional[Union[str, msticpy.data.query_defns.DataFamily]] = None) msticpy.data.query_source.QuerySource

Return query with name data_family and query_name.

Parameters
  • query_name (str) – Name of the query

  • query_path (Union[str, DataFamily]) – The data family for the query

Returns

Query matching name and family.

Return type

QuerySource

import_file(query_file: str)

Import a yaml data source definition.

Parameters

query_file (str) – Path to the file to import

Raises

ImportError – File read error or Syntax or semantic error found in the source file.

classmethod import_files(source_path: list, recursive: bool = False, driver_query_filter: Optional[Dict[str, Set[str]]] = None) Dict[str, msticpy.data.query_store.QueryStore]

Import multiple query definition files from directory path.

Parameters
  • source_path (str) – The folder containing the yaml definition files.

  • recursive (bool, optional) – True to recurse sub-directories (the default is False, which only reads from the top level)

  • driver_query_filter (Dict[str, Set[str]]) – A dictionary of query metadata keys and values. This is used to test each read query to see if it is relevant to the driver and should be returned in the created QueryStore dictionary.

Returns

Dictionary of one or more environments and the QueryStore containing the queries for each environment.

Return type

Dict[str, ‘QueryStore’]

Raises

FileNotFoundError – File read error or Syntax or semantic error found in a source file.

property query_names: Iterable[str]

Return list of family.query in the store.

Returns

List of queries

Return type

Iterable[str]

msticpy.data.azure_data module

Deprecated path for data.azure.

msticpy.data.azure_sentinel module

Deprecated path for data.azure.

msticpy.data.azure_blob_storage module

Deprecated path for data.azure.

msticpy.data.sql_to_kql module

Module for SQL to KQL Conversion.

This is an experiment conversion utility built to support a limited subset of ANSI SQL. It relies on moz_sql_parser (https://github.com/mozilla/moz-sql-parser) to parse the SQL syntax tree. Some hacky additions have been done to allow table renaming and support for a few SparkSQL operators such as RLIKE.

For a more complete translation help with SQL to KQL see https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/sqlcheatsheet

Known limitations

  • Does not support aggregate functions in SELECT with no GROUP BY clause

  • Does not support IN, EXISTS, HAVING operators

  • Only partial support for AS naming (should work in SELECT expressions)

msticpy.data.sql_to_kql.sql_to_kql(sql: str, target_tables: Optional[Dict[str, str]] = None) str

Parse SQL and return KQL equivalent.

msticpy.data.drivers.driver_base module

Data driver base class.

class msticpy.data.drivers.driver_base.DriverBase(**kwargs)

Bases: abc.ABC

Base class for data providers.

Initialize new instance.

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

abstract connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters

connection_str (Optional[str]) – Connect to a data source

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

abstract query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

  • kwargs – Are passed to the underlying provider query method, if supported.

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, Any]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

abstract query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame plus native results.

Parameters

query (str) – The query to execute

Returns

A DataFrame and native results.

Return type

Tuple[pd.DataFrame,Any]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.kql_driver module

KQL Driver class.

class msticpy.data.drivers.kql_driver.KqlDriver(connection_str: Optional[str] = None, **kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

KqlDriver class to execute kql queries.

Instantiate KqlDriver and optionally connect.

Parameters
  • connection_str (str, optional) – Connection string

  • debug (bool) – print out additional diagnostic information.

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters
  • connection_str (str) – Connect to a data source

  • kqlmagic_args (str, optional) – Additional string of parameters to be passed to KqlMagic

  • mp_az_auth (Union[bool, str, list, None], optional) – Optional parameter directing KqlMagic to use MSTICPy Azure authentication. Values can be: True or “default”: use the settings in msticpyconfig.yaml ‘Azure’ section str: single auth method name (‘msi’, ‘cli’, ‘env’, ‘vscode’, ‘powershell’, ‘cache’ or ‘interactive’) List[str]: list of acceptable auth methods from (‘msi’, ‘cli’, ‘env’, ‘vscode’, ‘powershell’, ‘cache’ or ‘interactive’)

  • mp_az_tenant_id (str, optional) – Optional parameter specifying a Tenant ID for use by MSTICPy Azure authentication.

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, results.ResultSet]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Kqlmagic.kql_proxy.KqlResponse]

Execute query string and return DataFrame of results.

Parameters

query (str) – The kql query to execute

Returns

A DataFrame (if successfull) and Kql ResultSet.

Return type

Tuple[pd.DataFrame, results.ResultSet]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.local_data_driver module

Local Data Driver class - for testing and demos.

class msticpy.data.drivers.local_data_driver.LocalDataDriver(connection_str: Optional[str] = None, **kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

LocalDataDriver class to execute kql queries.

Instantiate LocalDataDriver and optionally connect.

Parameters
  • connection_str (str, optional) – Connection string (not used)

  • data_paths (List[str], optional) – Paths from which to load data files

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters

connection_str (str) – Connect to a data source

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, results.ResultSet]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query, **kwargs)

Return query with fake results.

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.mdatp_driver module

MDATP OData Driver class.

class msticpy.data.drivers.mdatp_driver.MDATPDriver(connection_str: Optional[str] = None, **kwargs)

Bases: msticpy.data.drivers.odata_driver.OData

KqlDriver class to retreive date from MS Defender APIs.

Instantiate MSDefenderDriver and optionally connect.

Parameters

connection_str (str, optional) – Connection string

CONFIG_NAME = 'MicrosoftDefender'
add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to oauth data source.

Parameters
  • connection_str (Optional[str], optional) – Connect to a data source

  • instance (Optional[str], optional) – Optional name of configuration instance - this is added as a prefix to the driver configuration key name when searching for configuration in the msticpyconfig.yaml

Notes

Connection string fields: tenant_id client_id client_secret apiRoot apiVersion

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, results.ResultSet]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters

query (str) – The kql query to execute

Returns

A DataFrame (if successfull) and Kql ResultSet.

Return type

Tuple[pd.DataFrame, results.ResultSet]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.mordor_driver module

Mordor/OTRF Security datasets driver.

class msticpy.data.drivers.mordor_driver.MitreAttack(attack: Optional[Dict[str, Any]] = None, technique: Optional[str] = None, sub_technique: Optional[str] = None, tactics: Optional[List[str]] = None)

Bases: object

MitreAttack container for techniques and tactics.

Create instance of MitreAttack.

Parameters
  • attack (Dict[str, Any], optional) – attack data as dictionary, by default None

  • technique (str, optional) – technique ID, by default None

  • sub_technique (str, optional) – sub-technique ID, by default None

  • tactics (List[str], optional) – List of associated tactics, by default None

MTR_TAC_URI = 'https://attack.mitre.org/tactics/{tactic_id}/'
MTR_TECH_URI = 'https://attack.mitre.org/techniques/{technique_id}/'
property tactics_full: List[Tuple[str, str, str, str]]

Return full listing of Mitre tactics.

Returns

List of tuples of: (ID, Name, Description, URI)

Return type

List[Tuple[str, str, str, str]]

property technique_desc: Optional[str]

Return Mitre technique description.

Returns

Technique description

Return type

Optional[str]

property technique_name: Optional[str]

Return Mitre Technique full name.

Returns

Name of the Mitre technique

Return type

Optional[str]

property technique_uri: str

Return Mitre Technique URI.

Returns

URI of the Mitre technique

Return type

Optional[str]

class msticpy.data.drivers.mordor_driver.MordorDriver(**kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

Mordor data driver.

Initialize the Mordor driver.

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters

connection_str (Optional[str]) – Connect to a data source

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return generator of Mordor query definitions.

Yields

Iterable[Dict[str, Any]] – Iterable of Dictionaries containing query definitions.

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

  • kwargs – Are passed to the underlying provider query method, if supported.

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, Any]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame plus native results.

Parameters

query (str) – The query to execute

Returns

A DataFrame and native results.

Return type

Tuple[pd.DataFrame,Any]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

search_queries(search: str) Iterable[str]

Search queries for matching attributes.

Parameters

search (str) – Search string. Substrings separated by commas will be treated as OR terms - e.g. “a, b” == “a” or “b”. Substrings separated by “+” will be treated as AND terms - e.g. “a + b” == “a” and “b”

Returns

Iterable of matching query names.

Return type

Iterable[str]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

class msticpy.data.drivers.mordor_driver.MordorEntry(title: str, id: str, type: str, creation_date, modification_date, contributors: List[str] = NOTHING, author: Optional[str] = None, platform: Optional[str] = None, description: Optional[str] = None, tags: List[str] = NOTHING, files: List[Dict[str, Any]] = NOTHING, datasets: List[Dict[str, Any]] = NOTHING, attack_mappings: List[Dict[str, Any]] = NOTHING, notebooks: List[Dict[str, str]] = NOTHING, simulation: Dict[str, Any] = NOTHING, references: List[Any] = NOTHING, rel_file_paths: List[Dict[str, Any]] = NOTHING)

Bases: object

Mordor data set metadata.

Method generated by attrs for class MordorEntry.

attack_mappings: List[Dict[str, Any]]
author: Optional[str]
contributors: List[str]
creation_date: datetime.datetime
datasets: List[Dict[str, Any]]
description: Optional[str]
files: List[Dict[str, Any]]
get_attacks() List[msticpy.data.drivers.mordor_driver.MitreAttack]

Return list of Mitre attack classifications.

Returns

List of MitreAttack definitions.

Return type

List[MitreAttack]

get_file_paths() List[Dict[str, str]]

Return list of data file links.

Returns

list of dictionaries describing files. Each entry has key/values for: - file_type - file_path - relative_path - qry_path

Return type

List[Dict[str, str]]

get_notebooks() List[Tuple[str, str, str]]

Return the list of notebooks for the dataset.

Returns

Tuples of (name, project, link)

Return type

List[Tuple[str, str, str]]

id: str
modification_date: datetime.datetime
notebooks: List[Dict[str, str]]
platform: Optional[str]
references: List[Any]
simulation: Dict[str, Any]
tags: List[str]
title: str
type: str
msticpy.data.drivers.mordor_driver.download_mdr_file(file_uri: str, use_cached: bool = True, save_folder: str = '.', silent: bool = False) pandas.core.frame.DataFrame

Download data file from Mordor.

Parameters
  • file_uri (str) – The URI of the file to download.

  • use_cached (bool, optional) – Try to use locally saved file first, by default True

  • save_folder (str, optional) – Path to output folder, by default “.”

  • silent (bool) – If True, suppress feedback. By default, False.

Returns

DataFrame of Dataset

Return type

pd.DataFrame

msticpy.data.drivers.mordor_driver.get_mdr_data_paths(item_type='metadata') Generator[str, None, None]

Generate Mordor data sets from GitHub repo.

Parameters

item_type (str, optional) – The type of item required, by default “metadata” Other values are “large”, “small.

Yields

str – Iterable of paths

msticpy.data.drivers.mordor_driver.search_mdr_data(mdr_data: Dict[str, msticpy.data.drivers.mordor_driver.MordorEntry], terms: Optional[str] = None, subset: Optional[Iterable[str]] = None) Set[str]

Return IDs for items matching terms.

Parameters
  • mdr_data (Dict[str, MordorEntry]) – Mordor dataset

  • terms (str, optional) – Search terms, by default None (comma-separated values are treated as OR terms plus-separated values are treated as AND terms)

  • subset (Iterable[str], optional) – A subset of IDs over which to search, by default None

Returns

The set of matching IDs.

Return type

Set[str]

msticpy.data.drivers.odata_driver module

OData Driver class.

class msticpy.data.drivers.odata_driver.OData(**kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

Parent class to retreive date from an oauth based API.

Instantiate OData driver and optionally connect.

Parameters

connect (bool, optional) – Set true if you want to connect to the provider at initialization

CONFIG_NAME = ''
add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to oauth data source.

Parameters
  • connection_str (Optional[str], optional) – Connect to a data source

  • instance (Optional[str], optional) – Optional name of configuration instance - this is added as a prefix to the driver configuration key name when searching for configuration in the msticpyconfig.yaml

Notes

Connection string fields: tenant_id client_id client_secret apiRoot apiVersion

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

abstract query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, Any]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters

query (str) – The kql query to execute

Returns

A DataFrame (if successfull) and Kql ResultSet.

Return type

Tuple[pd.DataFrame, results.ResultSet]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.resource_graph_driver module

Azure Resource Graph Driver class.

class msticpy.data.drivers.resource_graph_driver.ResourceGraphDriver(**kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

Driver to connect and query from Azure Resource Graph.

Instantiate Azure Resource Graph Driver.

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to Azure Resource Graph via Azure SDK.

Parameters
  • connection_str (Optional[str], optional) – Not used.

  • kwargs – Connection parameters can be supplied as keyword parameters.

Notes

Default configuration is read from the DataProviders/AzureCLI section of msticpyconfig.yaml, if available.

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute Resource Graph query and retrieve results.

Parameters
  • query (str) – KQL query to execute

  • query_source (QuerySource) – The query definition object

  • kwargs – count

Returns

Query results in a dataframe. or query response if an error.

Return type

Union[pd.DataFrame, Any]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters

query (str) – Query to execute against Resource Graph

Returns

A DataFrame (if successful) or the underlying provider result if an error occurs.

Return type

Union[pd.DataFrame,Any]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.security_graph_driver module

Security Graph OData Driver class.

class msticpy.data.drivers.security_graph_driver.SecurityGraphDriver(connection_str: Optional[str] = None, **kwargs)

Bases: msticpy.data.drivers.odata_driver.OData

Driver to query security graph.

Instantiate MSGraph driver and optionally connect.

Parameters

connection_str (str, optional) – Connection string

CONFIG_NAME = 'MicrosoftGraph'
add_query_filter(name, query_filter)

Add an expression to the query attach filter.

api_root: Optional[str]
api_ver: Optional[str]
connect(connection_str: Optional[str] = None, **kwargs)

Connect to oauth data source.

Parameters
  • connection_str (Optional[str], optional) – Connect to a data source

  • instance (Optional[str], optional) – Optional name of configuration instance - this is added as a prefix to the driver configuration key name when searching for configuration in the msticpyconfig.yaml

Notes

Connection string fields: tenant_id client_id client_secret apiRoot apiVersion

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

formatters: Dict[str, Callable]
static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

oauth_url: Optional[str]
public_attribs: Dict[str, Any]
query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, results.ResultSet]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters

query (str) – The kql query to execute

Returns

A DataFrame (if successfull) and Kql ResultSet.

Return type

Tuple[pd.DataFrame, results.ResultSet]

req_body: Optional[Dict[str, Optional[str]]]
request_uri: Optional[str]
property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.splunk_driver module

Splunk Driver class.

class msticpy.data.drivers.splunk_driver.SplunkDriver(**kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

Driver to connect and query from Splunk.

Instantiate Splunk Driver.

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to Splunk via splunk-sdk.

Parameters
  • connection_str (Optional[str], optional) – Connection string with Splunk connection parameters

  • kwargs – Connection parameters can be supplied as keyword parameters.

Notes

Default configuration is read from the DataProviders/Splunk section of msticpyconfig.yaml, if available.

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return dynamic queries available on connection to service.

Returns

List of queries with properties: “name”, “query”, “container” and (optionally) “description”

Return type

Iterable[Dict[str, Any]]

Raises

MsticpyNotConnectedError – If called before driver is connected.

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute splunk query and retrieve results via OneShot or async search mode.

Parameters
  • query (str) – Splunk query to execute via OneShot or async search mode

  • query_source (QuerySource) – The query definition object

  • kwargs

    Are passed to Splunk oneshot method count=0 by default oneshot=False by default for async query,

    set to True for oneshot (blocking) mode

Returns

Query results in a dataframe. or query response if an error.

Return type

Union[pd.DataFrame, Any]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters

query (str) – Query to execute against splunk instance.

Returns

A DataFrame (if successful) or the underlying provider result if an error occurs.

Return type

Union[pd.DataFrame,Any]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return dynamic queries available on connection to service.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.data_obfus

Data obfuscation functions.

class msticpy.data.data_obfus.ObfuscationAccessor(pandas_obj)

Bases: object

Base64 Unpack pandas extension.

Initialize the extension.

mask(column_map: Optional[Mapping[str, Any]] = None, use_default: bool = True) pandas.core.frame.DataFrame

Obfuscate the data in columns of a pandas dataframe.

Parameters
  • data (pd.DataFrame) – dataframe containing column to obfuscate

  • column_map (Mapping[str, Any], optional) – Custom column mapping, by default None

  • use_default (bool) – If True use the built-in map (adding any custom mappings to this dictionary)

Returns

Obfuscated dataframe

Return type

pd.DataFrame

msticpy.data.data_obfus.check_masking(data: pandas.core.frame.DataFrame, orig_data: pandas.core.frame.DataFrame, index: int = 0, silent=True) Optional[Tuple[List[str], List[str]]]

Check the obfuscation results for a row.

Parameters
  • data (pd.DataFrame) – Obfuscated DataFrame

  • orig_data (pd.DataFrame) – Original DataFrame

  • index (int, optional) – The row to check, by default 0

  • silent (bool) – If False the function returns no output and returns lists of changed and unchanged columns. By default, True

Returns

If silent is True returns a tuple of unchanged, changed items. If False, returns None.

Return type

Optional[Tuple[List[str], List[str]]]

msticpy.data.data_obfus.check_obfuscation(data: pandas.core.frame.DataFrame, orig_data: pandas.core.frame.DataFrame, index: int = 0, silent=True) Optional[Tuple[List[str], List[str]]]

Check the obfuscation results for a row.

Parameters
  • data (pd.DataFrame) – Obfuscated DataFrame

  • orig_data (pd.DataFrame) – Original DataFrame

  • index (int, optional) – The row to check, by default 0

  • silent (bool) – If False the function returns no output and returns lists of changed and unchanged columns. By default, True

Returns

If silent is True returns a tuple of unchanged, changed items. If False, returns None.

Return type

Optional[Tuple[List[str], List[str]]]

msticpy.data.data_obfus.hash_account(account: str) str

Hash an Account to something recognizable.

Parameters

account (str) – Account name (UPN, NT or simple name)

Returns

Hashed Account

Return type

str

msticpy.data.data_obfus.hash_dict(item_dict: Dict[str, Union[Dict[str, Any], List[Any], str]]) Dict[str, Any]

Hash dictionary values.

Parameters

item_dict (Dict[str, Union[Dict[str, Any], List[Any], str]]) – Input item can be a Dict of strings, lists or other dictionaries.

Returns

Dictionary with hashed values.

Return type

Dict[str, Any]

msticpy.data.data_obfus.hash_ip(input_item: Union[List[str], str]) Union[List[str], str]

Hash IP address or list of IP addresses.

Parameters

input_item (Union[List[str], str]) – List of IP addresses or single IP address.

Returns

List of hashed addresses or single address. (depending on input)

Return type

Union[List[str], str]

msticpy.data.data_obfus.hash_item(input_item: str, delim: str = None) str

Hash a simple string.

Parameters
  • input_item (str) – The input string

  • delim (str, optional) – A string of delimiters to use to split the input string prior to hashing.

Returns

The obfuscated output string

Return type

str

msticpy.data.data_obfus.hash_list(item_list: List[str]) List[str]

Hash list of strings.

Parameters

item_list (List[str]) – Input list

Returns

Hashed list

Return type

List[str]

msticpy.data.data_obfus.hash_sid(sid: str) str

Hash a SID preserving well-known SIDs and the RID.

Parameters

sid (str) – SID string

Returns

Hashed SID

Return type

str

msticpy.data.data_obfus.hash_string(input_str: str) str

Hash a simple string.

Parameters

input_str (str) – The input string

Returns

The obfuscated output string

Return type

str

msticpy.data.data_obfus.mask_df(data: pandas.core.frame.DataFrame, column_map: Optional[Mapping[str, Any]] = None, use_default: bool = True, silent: bool = True) pandas.core.frame.DataFrame

Obfuscate columns of a DataFrame.

Parameters
  • data (pd.DataFrame) – Input dataframe

  • column_map (Mapping[str, Any], optional) – Custom column mapping, by default None

  • use_default (bool) – If True use the built-in map (adding any custom mappings to this dictionary)

  • silent (bool) – If False the function returns progress output, by default True.

Returns

Obfuscated dataframe.

Return type

pd.DataFrame

msticpy.data.data_obfus.obfuscate_df(data: pandas.core.frame.DataFrame, column_map: Optional[Mapping[str, Any]] = None, use_default: bool = True, silent: bool = True) pandas.core.frame.DataFrame

Obfuscate columns of a DataFrame.

Parameters
  • data (pd.DataFrame) – Input dataframe

  • column_map (Mapping[str, Any], optional) – Custom column mapping, by default None

  • use_default (bool) – If True use the built-in map (adding any custom mappings to this dictionary)

  • silent (bool) – If False the function returns progress output, by default True.

Returns

Obfuscated dataframe.

Return type

pd.DataFrame

msticpy.data.data_obfus.replace_guid(guid: str) str

Replace GUID/UUID with mapped random UUID.

Parameters

guid (str) – Input UUID.

Returns

Mapped UUID

Return type

str

msticpy.data.browsers.query_browser module

QueryProvider Query Browser.

msticpy.data.browsers.query_browser.browse_queries(query_provider: Any, **kwargs) msticpy.nbtools.nbwidgets.select_item.SelectItem

Return QueryProvider query browser.

Parameters
  • query_provider (QueryProvider) – Initialized query provider.

  • kwargs – passed to SelectItem constuctor.

Returns

SelectItem browser for TI Data.

Return type

SelectItem

msticpy.data.browsers.mordor_browser module

Morder dataset browser.

class msticpy.data.browsers.mordor_browser.MordorBrowser(save_folder: Optional[str] = None, use_cached: bool = True)

Bases: object

Mordor browser widget.

Initialize MordorBrowser control.

Parameters
  • save_folder (str, optional) – Folder to save downloaded files, by default reads the value from settings or defaults to “.”

  • use_cached (bool, optional) – If true, downloaded files are not deleted after download and are used as a local cache, by default True

property fields

Return set of fields widget controls.

property selected_dset

Return the ID of the selected data set.