msticpy.data package

msticpy.nbtools.entityschema module

msticpy.nbtools.security_alert module

Module for SecurityAlert class.

class msticpy.nbtools.security_alert.SecurityAlert(src_row: pandas.core.series.Series = None)

Bases: msticpy.nbtools.security_base.SecurityBase

Security Alert Class.

Instantiates a security alert from a pandas Series and provides convenience access methods to retrieve properties.

Instantiate a security alert from a pandas Series.

computer

Return the Computer name of the host associated with the alert.

(host FQDN, if available)

data_environment

Return the data environment of the alert for subsequent queries.

data_family

Return the data family of the alert for subsequent queries.

entities

Return a list of the Security Alert entities.

get_all_entities() → pandas.core.frame.DataFrame

Return a DataFrame of the Alert or Event entities.

Returns:Pandas DataFrame of the Alert or Event entities.
Return type:DataFrame
get_entities_of_type(entity_type: str) → List[msticpy.datamodel.entities.entity.Entity]

Return entity collection for a give entity type.

Parameters:entity_type (str, optional) – The entity type.
Returns:The entities matching entity_type.
Return type:List[Entity]
get_logon_id(account: msticpy.datamodel.entities.account.Account = None) → Union[str, int, None]

Get the logon Id for the alert or the account, if supplied.

If account is not supplied, return the logon id of the first host-logon-session or account entity.

Parameters:account (Account, optional) – Account objec to use (the default is None)
Returns:The logon Id for primary account
Return type:Optional[Union[str, int]]
host_filter(operator='==')

Return a KQL host filter clause derived from the alert properties.

param operator=’==’:
 the operator to use in the filter clause. ‘==’ and ‘!=’ typically.
hostname

Return the Hostname (not FQDN) of the host associated with the alert.

ids

Return a collection of Identity properties for the alert.

is_in_azure_sub

Return True if the alert originates from an Azure Security Center host.

is_in_log_analytics

Return True if the alert originates from a Log Analytics Workspace host.

is_in_workspace

Return True if the alert has a Log Analytics WorkspaceID.

origin_time

Return the datetime of event.

primary_account

Return the primary account entity (if any) associated with this object.

Returns:primary account entity (if any)
Return type:Optional[Process]
primary_host

Return the primary host entity (if any) associated with this object.

Returns:primary host entity (if any)
Return type:Optional[Host]
primary_process

Return the primary process entity (if any) associated with this object.

Returns:primary process entity (if any)
Return type:Optional[Process]
properties

Return a dictionary of the Alert or Event properties.

Returns:dictionary of the Alert or Event properties.
Return type:Dict[str, Any]
query_params

Query parameters derived from alert.

Returns:Dictionary of parameter names/value
Return type:Dict[str, Any]
subscription_filter(operator='==')

Return a KQL subscription filter clause derived from the alert properties.

to_html(show_entities=False) → str

Return the item as HTML string.

msticpy.nbtools.security_alert_graph module

security_alert_graph.

Creates an entity graph for the alert.

Add related alerts to the graph.

Link to the entity that is common to both alerts.

msticpy.nbtools.security_alert_graph.create_alert_graph(alert: msticpy.nbtools.security_alert.SecurityAlert)

Create a networkx graph from the alert and contained entities.

msticpy.nbtools.security_base module

Module for SecurityAlert class.

class msticpy.nbtools.security_base.SecurityBase(src_row: pandas.core.series.Series = None)

Bases: msticpy.data.query_defns.QueryParamProvider

Security Base Class for alerts and events.

Instantiates a security event or alert from a pandas Series and provides convenience access methods to retrieve properties.

Instantiate a security alert from a pandas Series.

computer

Return the Computer name of the host associated with the alert.

(host FQDN, if available)

data_environment

Return the data environment of the alert for subsequent queries.

data_family

Return the data family of the alert for subsequent queries.

entities

Return a list of the Alert or Event entities.

Returns:List of the Alert or Event entities.
Return type:List[Entity]
get_all_entities() → pandas.core.frame.DataFrame

Return a DataFrame of the Alert or Event entities.

Returns:Pandas DataFrame of the Alert or Event entities.
Return type:DataFrame
get_entities_of_type(entity_type: str) → List[msticpy.datamodel.entities.entity.Entity]

Return entity collection for a give entity type.

Parameters:entity_type (str, optional) – The entity type.
Returns:The entities matching entity_type.
Return type:List[Entity]
get_logon_id(account: msticpy.datamodel.entities.account.Account = None) → Union[str, int, None]

Get the logon Id for the alert or the account, if supplied.

If account is not supplied, return the logon id of the first host-logon-session or account entity.

Parameters:account (Account, optional) – Account objec to use (the default is None)
Returns:The logon Id for primary account
Return type:Optional[Union[str, int]]
host_filter(operator='==')

Return a KQL host filter clause derived from the alert properties.

param operator=’==’:
 the operator to use in the filter clause. ‘==’ and ‘!=’ typically.
hostname

Return the Hostname (not FQDN) of the host associated with the alert.

ids

Return a collection of Identity properties for the alert.

is_in_azure_sub

Return True if the alert originates from an Azure Security Center host.

is_in_log_analytics

Return True if the alert originates from a Log Analytics Workspace host.

is_in_workspace

Return True if the alert has a Log Analytics WorkspaceID.

origin_time

Return the datetime of event.

primary_account

Return the primary account entity (if any) associated with this object.

Returns:primary account entity (if any)
Return type:Optional[Process]
primary_host

Return the primary host entity (if any) associated with this object.

Returns:primary host entity (if any)
Return type:Optional[Host]
primary_process

Return the primary process entity (if any) associated with this object.

Returns:primary process entity (if any)
Return type:Optional[Process]
properties

Return a dictionary of the Alert or Event properties.

Returns:dictionary of the Alert or Event properties.
Return type:Dict[str, Any]
query_params

Query parameters derived from alert.

Returns:Dictionary of parameter names/values
Return type:Dict[str, Any]
subscription_filter(operator='==')

Return a KQL subscription filter clause derived from the alert properties.

to_html(show_entities: bool = False) → str

Return the item as HTML string.

msticpy.nbtools.security_event module

Module for SecurityEvent class.

class msticpy.nbtools.security_event.SecurityEvent(src_row: pandas.core.series.Series = None)

Bases: msticpy.nbtools.security_base.SecurityBase

SecurityEvent class.

Instantiate new instance of SecurityEvent.

param src_row:Pandas series containing single security event
computer

Return the Computer name of the host associated with the alert.

(host FQDN, if available)

data_environment

Return the data environment of the alert for subsequent queries.

data_family

Return the data family of the alert for subsequent queries.

entities

Return the list of entities extracted from the event.

Returns:The list of entities extracted from the event.
Return type:List[Entity]
get_all_entities() → pandas.core.frame.DataFrame

Return a DataFrame of the Alert or Event entities.

Returns:Pandas DataFrame of the Alert or Event entities.
Return type:DataFrame
get_entities_of_type(entity_type: str) → List[msticpy.datamodel.entities.entity.Entity]

Return entity collection for a give entity type.

Parameters:entity_type (str, optional) – The entity type.
Returns:The entities matching entity_type.
Return type:List[Entity]
get_logon_id(account: msticpy.datamodel.entities.account.Account = None) → Union[str, int, None]

Get the logon Id for the alert or the account, if supplied.

If account is not supplied, return the logon id of the first host-logon-session or account entity.

Parameters:account (Account, optional) – Account objec to use (the default is None)
Returns:The logon Id for primary account
Return type:Optional[Union[str, int]]
host_filter(operator='==')

Return a KQL host filter clause derived from the alert properties.

param operator=’==’:
 the operator to use in the filter clause. ‘==’ and ‘!=’ typically.
hostname

Return the Hostname (not FQDN) of the host associated with the alert.

ids

Return a collection of Identity properties for the alert.

is_in_azure_sub

Return True if the alert originates from an Azure Security Center host.

is_in_log_analytics

Return True if the alert originates from a Log Analytics Workspace host.

is_in_workspace

Return True if the alert has a Log Analytics WorkspaceID.

origin_time

Return the datetime of event.

primary_account

Return the primary account entity (if any) associated with this object.

Returns:primary account entity (if any)
Return type:Optional[Process]
primary_host

Return the primary host entity (if any) associated with this object.

Returns:primary host entity (if any)
Return type:Optional[Host]
primary_process

Return the primary process entity (if any) associated with this object.

Returns:primary process entity (if any)
Return type:Optional[Process]
properties

Return a dictionary of the Alert or Event properties.

Returns:dictionary of the Alert or Event properties.
Return type:Dict[str, Any]
query_params

Query parameters derived from alert.

Returns:Dictionary of parameter names
Return type:Dict[str, Any]
subscription_filter(operator='==')

Return a KQL subscription filter clause derived from the alert properties.

to_html(show_entities: bool = False) → str

Return the item as HTML string.

msticpy.data.data_providers module

Data provider loader.

class msticpy.data.data_providers.QueryProvider(data_environment: Union[str, msticpy.data.query_defns.DataEnvironment], driver: msticpy.data.drivers.driver_base.DriverBase = None, query_paths: List[str] = None, **kwargs)

Bases: object

Container for query store and query execution provider.

Instances of this class hold the query set and execution methods for a specific data environment.

Query provider interface to queries.

Parameters:
  • data_environment (Union[str, DataEnvironment]) – Name or Enum of environment for the QueryProvider
  • driver (DriverBase, optional) – Override the builtin driver (query execution class) and use your own driver (must inherit from DriverBase)
  • query_paths (List[str]) – Additional paths to look for query definitions.
  • kwargs – Other arguments are passed to the data provider driver.

See also

DataProviderBase
base class for data query providers.
browse(**kwargs)

Return QueryProvider query browser.

Other Parameters:
 kwargs – passed to SelectItem constructor.
Returns:SelectItem browser for TI Data.
Return type:SelectItem
browse_queries(**kwargs)

Return QueryProvider query browser.

Other Parameters:
 kwargs – passed to SelectItem constructor.
Returns:SelectItem browser for TI Data.
Return type:SelectItem
connect(connection_str: str = None, **kwargs)

Connect to data source.

Parameters:connection_str (str) – Connection string for the data source
connected

Return True if the provider is connected.

Returns:True if the provider is connected.
Return type:bool
connection_string

Return provider connection string.

Returns:Provider connection string.
Return type:str
exec_query(query: str, **kwargs) → Union[pandas.core.frame.DataFrame, Any]

Execute simple query string.

Parameters:query (str) – [description]
Returns:Query results - a DataFrame if successful or a KqlResult if unsuccessful.
Return type:Union[pd.DataFrame, Any]
get_query(query_name) → str

Return the raw query text.

import_query_file(query_file: str)

Import a yaml data source definition.

Parameters:query_file (str) – Path to the file to import
classmethod list_data_environments() → List[str]

Return list of current data environments.

Returns:List of current data environments
Return type:List[str]
list_queries() → List[str]

Return list of family.query in the store.

Returns:List of queries
Return type:Iterable[str]
query_help(query_name)

Print help for query.

query_time

Return the default QueryTime control for queries.

schema

Return current data schema of connection.

Returns:Data schema of current connection.
Return type:Dict[str, Dict]
schema_tables

Return list of tables in the data schema of the connection.

Returns:Tables in the of current connection.
Return type:List[str]

msticpy.data.data_query_reader module

Data query definition reader.

msticpy.data.data_query_reader.find_yaml_files(source_path: str, recursive: bool = False) → Iterable[pathlib.Path]

Return iterable of yaml files found in source_path.

Parameters:
  • source_path (str) – The source path to search in.
  • recursive (bool, optional) – Whether to recurse through subfolders. By default False
Returns:

File paths of yanl files found.

Return type:

Iterable[str]

msticpy.data.data_query_reader.read_query_def_file(query_file: str) → Tuple[Dict[KT, VT], Dict[KT, VT], Dict[KT, VT]]

Read a yaml data query definition file.

Parameters:query_file (str) – Path to yaml query defintion file
Returns:Tuple of dictionaries. sources - dictionary of query definitions defaults - the default parameters from the file metadata - the global metadata from the file
Return type:Tuple[Dict, Dict, Dict]
msticpy.data.data_query_reader.validate_query_defs(query_def_dict: Dict[str, Any]) → bool

Validate content of query definition.

Parameters:query_def_dict (dict) – Dictionary of query definition yaml file contents.
Returns:True if validation succeeds.
Return type:bool
Raises:ValueError – The validation failure reason is returned in the exception message (arg[0])

msticpy.data.param_extractor module

Parameter extractor helper functions for use with IPython/Juptyer queries.

msticpy.data.param_extractor.extract_query_params(query_source: msticpy.data.query_source.QuerySource, *args, **kwargs) → Tuple[Dict[str, Any], List[str]]

Get the parameters needed for the query.

Parameters:
  • query_source (QuerySource) – Query source
  • args (Tuple[QueryParamProvider]) – objects that implement QueryParamProvider (from which query parameters can be extracted).
  • kwargs (Dict[str, Any]) – custom parameter list to populate queries (override default values and values extracted from QueryParamProviders).
Returns:

Dictionary of parameter names and values to be used in the query. List of any missing parameters

Return type:

Tuple[Dict[str, Any], List[str]]

msticpy.data.query_container module

Query hierarchy attribute class.

class msticpy.data.query_container.QueryContainer

Bases: object

Empty class used to create hierarchical attributes.

msticpy.data.query_defns module

Query helper definitions.

class msticpy.data.query_defns.DataEnvironment

Bases: enum.Enum

Enumeration of data environments.

Used to identify which queries are relevant for which data sources.

AzureSecurityCenter = 3
AzureSentinel = 1
Kusto = 2
LocalData = 6
LogAnalytics = 1
M365D = 11
MDATP = 5
MDE = 5
MSSentinel = 1
Mordor = 8
ResourceGraph = 9
SecurityGraph = 4
Splunk = 7
Sumologic = 10
Unknown = 0
parse = <bound method DataEnvironment.parse of <enum 'DataEnvironment'>>
class msticpy.data.query_defns.DataFamily

Bases: enum.Enum

Enumeration of data families.

Used to identify which queries are relevant for which data sources.

AzureNetwork = 6
LinuxSecurity = 2
LinuxSyslog = 5
MDATP = 7
ResourceGraph = 9
SecurityAlert = 3
SecurityGraphAlert = 4
Splunk = 8
Sumologic = 10
Unknown = 0
WindowsSecurity = 1
parse = <bound method DataFamily.parse of <enum 'DataFamily'>>
class msticpy.data.query_defns.QueryParamProvider

Bases: abc.ABC

Abstract type for QueryParamProvider.

Method query_params must be overridden by derived classes.

query_params

Return dict of query parameters.

These parameters are sourced in the object implementing this method.

Returns:
Return type:dict – dictionary of query parameter values.
msticpy.data.query_defns.ensure_df_datetimes(data: pandas.core.frame.DataFrame, columns: Union[str, List[str], None] = None, add_utc_tz: bool = True) → pandas.core.frame.DataFrame

Return dataframe with converted TZ-aware timestamps.

Parameters:
  • data (pd.DataFrame) – Input dataframe
  • columns (Union[str, List[str], None], optional) – column (str) or list of columns to convert, by default None. If this parameter is not supplied then any column containing the substring “time” is used as a candidate for conversion.
  • add_utc_tz (bool, optional) – If True any datetime columns in the columns parameter ( (or default ‘.*time.*’ columns) that are timezone-naive, will be converted to Timezone-aware timestamps marked as UTC.
Returns:

Converted DataFrame.

Return type:

pd.DataFrame

msticpy.data.query_source module

Intake kql driver.

class msticpy.data.query_source.QuerySource(name: str, source: Dict[str, Any], defaults: Dict[str, Any], metadata: Dict[str, Any])

Bases: object

Query definition class for templated queries.

name

The query name

Type:str
metadata

The consolidated metadata for the query

Type:Dict[str, Any]
params

The dictionary of parameter definitions for the query.

Type:dict[str, Any]
query_store

The query store object that the query belongs to

Type:QueryStore

Initialize query source definition.

Parameters:
  • name (str) – The query name
  • source (dict) – The data source definition settings
  • defaults (dict) – The default settings (if source-specific setting not supplied)
  • metadata (dict) – The global metadata from the source file.

Notes

A data source can belong to multiple families (e.g. a query that joins data from several sources)

create_doc_string() → str

Return a doc string for the query.

Returns:New-line delimited docstring dynamically created from query definition properties.
Return type:str
create_query(formatters: Dict[str, Callable] = None, **kwargs) → str

Return query with values from kwargs and defaults substituted.

Parameters:
  • formatters (Dict[str, Callable]) – Dictionary of custom parameter formatters indexed by data type
  • kwargs (Mapping[str, Any]) – Set of parameter name, value pairs used to populate the template query.
Returns:

The populated query

Return type:

str

Raises:

ValueError – If one or more parameters with no default values are not supplied.

Notes

Parameters supplied as arguments will override any parameter defaults (see default_params property).

data_families

Return the list of data families used by the query.

Returns:The list of data families. A data family is usually equivalent to a table or entity set.
Return type:List[str]
default_params

Return the set of parameters with default values.

Returns:List of parameters
Return type:Iterable[dict]
description

Return description of the query.

Returns:Query description.
Return type:str
help()

Print help for query.

query

Return the query template.

Returns:The template query.
Return type:str
required_params

Return the set of parameters with no default values.

Returns:List of parameters
Return type:Iterable[dict]
resolve_param_aliases(param_dict: Dict[str, Any]) → Dict[str, Any]

Try to resolve any parameters in param_dict that are aliases.

validate() → Tuple[bool, List[str]]

Validate the source to ensure that all required properties are present.

Returns:True if validation is successful.
Return type:bool

msticpy.data.query_store module

QueryStore class - holds a collection of QuerySources.

class msticpy.data.query_store.QueryStore(environment: str)

Bases: object

Repository for query definitions for a data environment.

environment

The data environment for the queries.

Type:str
data_families

The set of data families and associated queries for each.

Type:Dict[str, Dict[str, QuerySource]]

Intialize a QueryStore for a new environment.

Parameters:environment (str) – The data environment
add_data_source(source: msticpy.data.query_source.QuerySource)

Add a datasource/query to the store.

Parameters:source (QuerySource) – The source to add. An existing item with the same name will be overwritten
add_query(name: str, query: str, query_paths: Union[str, List[str]], description: str = None)

Add a query from name/query text.

Parameters:
  • name (str) – name of the query
  • query (str) – The query string
  • query_paths (Union[str, List[str]]) – The path/data_family to categorize. Multiple paths can be specified. If the path is dotted, this will cause the query to be displayed in the corresponding hierarchy.
  • description (str, optional) – Query description
find_query(query_name: str) → Set[Optional[msticpy.data.query_source.QuerySource]]

Return set of queries with name query_name.

Parameters:query_name (str) – Name of the query
Returns:Set (distinct) queries matching name.
Return type:Set[QuerySource]
get_query(query_name: str, query_path: Union[str, msticpy.data.query_defns.DataFamily] = None) → msticpy.data.query_source.QuerySource

Return query with name data_family and query_name.

Parameters:
  • query_name (str) – Name of the query
  • query_path (Union[str, DataFamily]) – The data family for the query
Returns:

Query matching name and family.

Return type:

QuerySource

import_file(query_file: str)

Import a yaml data source definition.

Parameters:query_file (str) – Path to the file to import
Raises:ImportError – File read error or Syntax or semantic error found in the source file.
classmethod import_files(source_path: list, recursive: bool = False, driver_query_filter: Optional[Dict[str, Set[str]]] = None) → Dict[str, msticpy.data.query_store.QueryStore]

Import multiple query definition files from directory path.

Parameters:
  • source_path (str) – The folder containing the yaml definition files.
  • recursive (bool, optional) – True to recurse sub-directories (the default is False, which only reads from the top level)
  • driver_query_filter (Dict[str, Set[str]]) – A dictionary of query metadata keys and values. This is used to test each read query to see if it is relevant to the driver and should be returned in the created QueryStore dictionary.
Returns:

Dictionary of one or more environments and the QueryStore containing the queries for each environment.

Return type:

Dict[str, ‘QueryStore’]

Raises:

FileNotFoundError – File read error or Syntax or semantic error found in a source file.

query_names

Return list of family.query in the store.

Returns:List of queries
Return type:Iterable[str]

msticpy.data.azure_data module

Uses the Azure Python SDK to collect and return details related to Azure.

class msticpy.data.azure_data.AzureData(connect: bool = False, cloud: str = None)

Bases: object

Class for returning data on an Azure tenant.

Initialize connector for Azure Python SDK.

connect(auth_methods: List[T] = None, silent: bool = False)

Authenticate to the Azure SDK.

Parameters:
  • auth_methods (List, optional) – list of preferred authentication methods to use, by default None
  • silent (bool, optional) – Set true to prevent output during auth process, by default False
Raises:

CloudError – If no valid credentials are found or if subscription client can’t be created

get_metrics(metrics: str, resource_id: str, sub_id: str, sample_time: str = 'hour', start_time: int = 30) → Dict[str, pandas.core.frame.DataFrame]

Return specified metrics on Azure Resource.

Parameters:
  • metrics (str) – A string list of metrics you wish to collect (https://docs.microsoft.com/en-us/azure/azure-monitor/platform/metrics-supported)
  • resource_id (str) – The resource ID of the resource to collet the metrics from
  • sub_id (str) – The subscription ID that the resource is part of
  • sample_time (str (Optional)) – You can select to collect the metrics every hour of minute - default is hour Accepted inputs = ‘hour’ or ‘minute’
  • start_time (int (Optional)) – The number of days prior to today to collect metrics for, default is 30
Returns:

results – A Dictionary of DataFrames containing the metrics details

Return type:

dict

get_network_details(network_id: str, sub_id: str) → Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]

Return details related to an Azure network interface and associated NSG.

Parameters:
  • network_id (str) – The ID of the network interface to return details on
  • sub_id (str) – The subscription ID that the network interface is part of
Returns:

details – A dictionary of items related to the network interface

Return type:

dict

get_resource_details(sub_id: str, resource_id: str = None, resource_details: dict = None) → dict

Return the details of a specific Azure resource.

Parameters:
  • resource_id (str, optional) – The ID of the resource to get details on
  • resource_details (dict, optional) –
    If ID is unknown provide the following details:
    -resource_group_name -resource_provider_namespace -resource_type -resource_name -parent_resource_path
  • sub_id (str) – The ID of the subscription to get resources from
Returns:

resource_details – The details of the requested resource

Return type:

dict

get_resources(sub_id: str, rgroup: str = None, get_props: bool = False) → pandas.core.frame.DataFrame

Return details on all resources in a subscription or Resource Group.

Parameters:
  • sub_id (str) – The subscription ID to get resources for
  • rgroup (str (Optional)) – The name of a Resource Group to get resources for
  • get_props (bool (Optional)) – Set to True if you want to get the full properties of every resource Warning this may be a slow process depending on the number of resources
Returns:

A dataframe of resource details

Return type:

pd.DataFrame

get_subscription_info(sub_id: str) → dict

Get information on a specific subscription.

Parameters:sub_id (str) – The ID of the subscription to return details on.
Returns:Details on the selected subscription.
Return type:dict
Raises:MsticpyNotConnectedError – If .connect() has not been called.
get_subscriptions() → pandas.core.frame.DataFrame

Get details of all subscriptions within the tenant.

Returns:Details of the subscriptions present in the users tenant.
Return type:pd.DataFrame
Raises:MsticpyNotConnectedError – If .connect() has not been called
class msticpy.data.azure_data.InterfaceItems(interface_id, private_ip, private_ip_allocation, public_ip, public_ip_allocation, app_sec_group, subnet, subnet_nsg, subnet_route_table)

Bases: object

attr class to build network interface details dictionary.

Method generated by attrs for class InterfaceItems.

class msticpy.data.azure_data.Items(resource_id, name, resource_type, location, tags, plan, properties, kind, managed_by, sku, identity, state)

Bases: object

attr class to build resource details dictionary.

Method generated by attrs for class Items.

class msticpy.data.azure_data.NsgItems(rule_name, description, protocol, direction, src_ports, dst_ports, src_addrs, dst_addrs, action)

Bases: object

attr class to build NSG rule dictionary.

Method generated by attrs for class NsgItems.

msticpy.data.azure_sentinel module

Uses the Azure Python SDK to collect and return details related to Azure.

class msticpy.data.azure_sentinel.AzureSentinel(connect: bool = False, cloud: Optional[str] = None)

Bases: msticpy.data.azure_data.AzureData

Class for returning key Microsoft Sentinel elements.

Initialize connector for Azure APIs.

Parameters:
  • connect (bool, optional) – Set true if you want to connect to API on initialization, by default False
  • cloud (str, optional) – Specify cloud to use, overriding any configuration value. Default is to use configuration setting or public cloud if no configuration setting is available.
connect(auth_methods: List[T] = None, silent: bool = False, **kwargs)

Authenticate with the SDK & API.

Parameters:
  • auth_methods (List, optional) – list of preferred authentication methods to use, by default None
  • silent (bool, optional) – Set true to prevent output during auth process, by default False
get_alert_rules(res_id: str = None, sub_id: str = None, res_grp: str = None, ws_name: str = None) → pandas.core.frame.DataFrame

Return all Microsoft Sentinel alert rules for a workspace.

Parameters:
  • res_id (str, optional) – Resource ID of the workspace, if not provided details from config file will be used.
  • sub_id (str, optional) – Sub ID of the workspace, to be used if not providing Resource ID.
  • res_grp (str, optional) – Resource Group name of the workspace, to be used if not providing Resource ID.
  • ws_name (str, optional) – Workspace name of the workspace, to be used if not providing Resource ID.
Returns:

A table of the workspace’s alert rules.

Return type:

pd.DataFrame

get_bookmarks(res_id: str = None, sub_id: str = None, res_grp: str = None, ws_name: str = None) → pandas.core.frame.DataFrame

Return a list of Bookmarks from a Sentinel workspace.

Parameters:
  • res_id (str, optional) – Resource ID of the workspace, if not provided details from config file will be used.
  • sub_id (str, optional) – Sub ID of the workspace, to be used if not providing Resource ID.
  • res_grp (str, optional) – Resource Group name of the workspace, to be used if not providing Resource ID.
  • ws_name (str, optional) – Workspace name of the workspace, to be used if not providing Resource ID.
Returns:

A set of bookmarks.

Return type:

pd.DataFrame

Raises:

CloudError – If bookmark collection fails.

get_hunting_queries(res_id: str = None, sub_id: str = None, res_grp: str = None, ws_name: str = None) → pandas.core.frame.DataFrame

Return all hunting queries in a Microsoft Sentinel workspace.

Parameters:
  • res_id (str, optional) – Resource ID of the workspace, if not provided details from config file will be used.
  • sub_id (str, optional) – Sub ID of the workspace, to be used if not providing Resource ID.
  • res_grp (str, optional) – Resource Group name of the workspace, to be used if not providing Resource ID.
  • ws_name (str, optional) – Workspace name of the workspace, to be used if not providing Resource ID.
Returns:

A table of the hunting queries.

Return type:

pd.DataFrame

get_incident(incident_id: str, res_id: str = None, sub_id: str = None, res_grp: str = None, ws_name: str = None, entities: bool = False, alerts: bool = False) → pandas.core.frame.DataFrame

Get details on a specific incident.

Parameters:
  • incident_id (str) – Incident ID GUID.
  • res_id (str, optional) – Resource ID of the workspace, if not provided details from config file will be used.
  • sub_id (str, optional) – Sub ID of the workspace, to be used if not providing Resource ID.
  • res_grp (str, optional) – Resource Group name of the workspace, to be used if not providing Resource ID.
  • ws_name (str, optional) – Workspace name of the workspace, to be used if not providing Resource ID.
  • entities (bool, optional) – If True, include all entities in the response. Default is False.
  • alerts (bool, optional) – If True, include all alerts in the response. Default is False.
Returns:

Table containing incident details.

Return type:

pd.DataFrame

Raises:

CloudError – If incident could not be retrieved.

get_incidents(res_id: str = None, sub_id: str = None, res_grp: str = None, ws_name: str = None) → pandas.core.frame.DataFrame

Get a list of incident for a Sentinel workspace.

Parameters:
  • res_id (str, optional) – Resource ID of the workspace, if not provided details from config file will be used.
  • sub_id (str, optional) – Sub ID of the workspace, to be used if not providing Resource ID.
  • res_grp (str, optional) – Resource Group name of the workspace, to be used if not providing Resource ID.
  • ws_name (str, optional) – Workspace name of the workspace, to be used if not providing Resource ID.
Returns:

A table of incidents.

Return type:

pd.DataFrame

Raises:

CloudError – If incidents could not be retrieved.

get_metrics(metrics: str, resource_id: str, sub_id: str, sample_time: str = 'hour', start_time: int = 30) → Dict[str, pandas.core.frame.DataFrame]

Return specified metrics on Azure Resource.

Parameters:
  • metrics (str) – A string list of metrics you wish to collect (https://docs.microsoft.com/en-us/azure/azure-monitor/platform/metrics-supported)
  • resource_id (str) – The resource ID of the resource to collet the metrics from
  • sub_id (str) – The subscription ID that the resource is part of
  • sample_time (str (Optional)) – You can select to collect the metrics every hour of minute - default is hour Accepted inputs = ‘hour’ or ‘minute’
  • start_time (int (Optional)) – The number of days prior to today to collect metrics for, default is 30
Returns:

results – A Dictionary of DataFrames containing the metrics details

Return type:

dict

get_network_details(network_id: str, sub_id: str) → Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]

Return details related to an Azure network interface and associated NSG.

Parameters:
  • network_id (str) – The ID of the network interface to return details on
  • sub_id (str) – The subscription ID that the network interface is part of
Returns:

details – A dictionary of items related to the network interface

Return type:

dict

get_resource_details(sub_id: str, resource_id: str = None, resource_details: dict = None) → dict

Return the details of a specific Azure resource.

Parameters:
  • resource_id (str, optional) – The ID of the resource to get details on
  • resource_details (dict, optional) –
    If ID is unknown provide the following details:
    -resource_group_name -resource_provider_namespace -resource_type -resource_name -parent_resource_path
  • sub_id (str) – The ID of the subscription to get resources from
Returns:

resource_details – The details of the requested resource

Return type:

dict

get_resources(sub_id: str, rgroup: str = None, get_props: bool = False) → pandas.core.frame.DataFrame

Return details on all resources in a subscription or Resource Group.

Parameters:
  • sub_id (str) – The subscription ID to get resources for
  • rgroup (str (Optional)) – The name of a Resource Group to get resources for
  • get_props (bool (Optional)) – Set to True if you want to get the full properties of every resource Warning this may be a slow process depending on the number of resources
Returns:

A dataframe of resource details

Return type:

pd.DataFrame

get_sentinel_workspaces(sub_id: str = None) → Dict[str, str]

Return a list of Microsoft Sentinel workspaces in a Subscription.

Parameters:sub_id (str) – The subscription ID to get a list of workspaces from. If not provided it will attempt to get sub_id from config files.
Returns:A dictionary of workspace names and ids
Return type:Dict
get_subscription_info(sub_id: str) → dict

Get information on a specific subscription.

Parameters:sub_id (str) – The ID of the subscription to return details on.
Returns:Details on the selected subscription.
Return type:dict
Raises:MsticpyNotConnectedError – If .connect() has not been called.
get_subscriptions() → pandas.core.frame.DataFrame

Get details of all subscriptions within the tenant.

Returns:Details of the subscriptions present in the users tenant.
Return type:pd.DataFrame
Raises:MsticpyNotConnectedError – If .connect() has not been called
post_comment(incident_id: str, comment: str, res_id: str = None, sub_id: str = None, res_grp: str = None, ws_name: str = None)

Write a comment for an incident.

Parameters:
  • incident_id (str) – Incident ID GUID.
  • comment (str) – Comment message to post.
  • res_id (str, optional) – Resource ID of the workspace, if not provided details from config file will be used.
  • sub_id (str, optional) – Sub ID of the workspace, to be used if not providing Resource ID.
  • res_grp (str, optional) – Resource Group name of the workspace, to be used if not providing Resource ID.
  • ws_name (str, optional) – Workspace name of the workspace, to be used if not providing Resource ID.
Raises:

CloudError – If message could not be posted.

set_default_subscription(subscription_id: str)

Set the default subscription to use to subscription_id.

set_default_workspace(sub_id: Optional[str], workspace: Optional[str] = None)

Set the default workspace.

Parameters:
  • sub_id (Optional[str], optional) – Subscription ID containing the workspace. If not specified, the subscription will be taken from the default_subscription or from configuration.
  • workspace (Optional[str], optional) – Name of the workspace, by default None. If not specified and there is only one workspace in the subscription, this will be set as the default.
update_incident(incident_id: str, update_items: dict, res_id: str = None, sub_id: str = None, res_grp: str = None, ws_name: str = None)

Update properties of an incident.

Parameters:
  • incident_id (str) – Incident ID GUID.
  • update_items (dict) – Dictionary of properties to update and their values. Ref: https://docs.microsoft.com/en-us/rest/api/securityinsights/incidents/createorupdate
  • res_id (str, optional) – Resource ID of the workspace, if not provided details from config file will be used.
  • sub_id (str, optional) – Sub ID of the workspace, to be used if not providing Resource ID.
  • res_grp (str, optional) – Resource Group name of the workspace, to be used if not providing Resource ID.
  • ws_name (str, optional) – Workspace name of the workspace, to be used if not providing Resource ID.
Raises:

CloudError – If incident could not be updated.

msticpy.data.azure_blob_storage module

msticpy.data.sql_to_kql module

Module for SQL to KQL Conversion.

This is an experiment conversion utility built to support a limited subset of ANSI SQL. It relies on moz_sql_parser (https://github.com/mozilla/moz-sql-parser) to parse the SQL syntax tree. Some hacky additions have been done to allow table renaming and support for a few SparkSQL operators such as RLIKE.

For a more complete translation help with SQL to KQL see https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/sqlcheatsheet

Known limitations

  • Does not support aggregate functions in SELECT with no GROUP BY clause
  • Does not support IN, EXISTS, HAVING operators
  • Only partial support for AS naming (should work in SELECT expressions)
msticpy.data.sql_to_kql.sql_to_kql(sql: str, target_tables: Dict[str, str] = None) → str

Parse SQL and return KQL equivalent.

msticpy.data.drivers.driver_base module

Data driver base class.

class msticpy.data.drivers.driver_base.DriverBase(**kwargs)

Bases: abc.ABC

Base class for data providers.

Initialize new instance.

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters:connection_str (Optional[str]) – Connect to a data source
connected

Return true if at least one connection has been made.

Returns:True if a successful connection has been made.
Return type:bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

driver_queries

Return queries retrieved from the service after connecting.

Returns:List of Dictionary of query_name, query_text. Name of container to add queries to.
Return type:List[Dict[str, str]]
loaded

Return true if the provider is loaded.

Returns:True if the provider is loaded.
Return type:bool

Notes

This is not relevant for some providers.

query(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters:
  • query (str) – The query to execute
  • query_source (QuerySource) – The query definition object
Other Parameters:
 

kwargs – Are passed to the underlying provider query method, if supported.

Returns:

A DataFrame (if successfull) or the underlying provider result if an error.

Return type:

Union[pd.DataFrame, Any]

query_attach_spec

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) → Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame plus native results.

Parameters:query (str) – The query to execute
Returns:A DataFrame and native results.
Return type:Tuple[pd.DataFrame,Any]
schema

Return current data schema of connection.

Returns:Data schema of current connection.
Return type:Dict[str, Dict]
service_queries

Return queries retrieved from the service after connecting.

Returns:Dictionary of query_name, query_text. Name of container to add queries to.
Return type:Tuple[Dict[str, str], str]

msticpy.data.drivers.kql_driver module

KQL Driver class.

class msticpy.data.drivers.kql_driver.KqlDriver(connection_str: str = None, **kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

KqlDriver class to execute kql queries.

Instantiate KqlDriver and optionally connect.

Parameters:connection_str (str, optional) – Connection string
Other Parameters:
 debug (bool) – print out additional diagnostic information.
add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters:

connection_str (str) – Connect to a data source

Other Parameters:
 
  • kqlmagic_args (str, optional) – Additional string of parameters to be passed to KqlMagic
  • mp_az_auth (Union[bool, str, list, None], optional) – Optional parameter directing KqlMagic to use MSTICPy Azure authentication. Values can be: True or “default”: use the settings in msticpyconfig.yaml ‘Azure’ section str: single auth method name (‘msi’, ‘cli’, ‘env’ or ‘interactive’) List[str]: list of acceptable auth methods from (‘msi’, ‘cli’, ‘env’ or ‘interactive’)
connected

Return true if at least one connection has been made.

Returns:True if a successful connection has been made.
Return type:bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

driver_queries

Return queries retrieved from the service after connecting.

Returns:List of Dictionary of query_name, query_text. Name of container to add queries to.
Return type:List[Dict[str, str]]
loaded

Return true if the provider is loaded.

Returns:True if the provider is loaded.
Return type:bool

Notes

This is not relevant for some providers.

query(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters:
  • query (str) – The query to execute
  • query_source (QuerySource) – The query definition object
Returns:

A DataFrame (if successfull) or the underlying provider result if an error.

Return type:

Union[pd.DataFrame, results.ResultSet]

query_attach_spec

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) → Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters:query (str) – The kql query to execute
Returns:A DataFrame (if successfull) and Kql ResultSet.
Return type:Tuple[pd.DataFrame, results.ResultSet]
schema

Return current data schema of connection.

Returns:Data schema of current connection.
Return type:Dict[str, Dict]
service_queries

Return queries retrieved from the service after connecting.

Returns:Dictionary of query_name, query_text. Name of container to add queries to.
Return type:Tuple[Dict[str, str], str]

msticpy.data.drivers.local_data_driver module

Local Data Driver class - for testing and demos.

class msticpy.data.drivers.local_data_driver.LocalDataDriver(connection_str: str = None, **kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

LocalDataDriver class to execute kql queries.

Instantiate LocalDataDriver and optionally connect.

Parameters:
  • connection_str (str, optional) – Connection string (not used)
  • data_paths (List[str], optional) – Paths from which to load data files
add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters:connection_str (str) – Connect to a data source
connected

Return true if at least one connection has been made.

Returns:True if a successful connection has been made.
Return type:bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

driver_queries

Return queries retrieved from the service after connecting.

Returns:List of Dictionary of query_name, query_text. Name of container to add queries to.
Return type:List[Dict[str, str]]
loaded

Return true if the provider is loaded.

Returns:True if the provider is loaded.
Return type:bool

Notes

This is not relevant for some providers.

query(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters:
  • query (str) – The query to execute
  • query_source (QuerySource) – The query definition object
Returns:

A DataFrame (if successfull) or the underlying provider result if an error.

Return type:

Union[pd.DataFrame, results.ResultSet]

query_attach_spec

Parameters that determine whether a query is relevant for the driver.

query_with_results(query, **kwargs)

Return query with fake results.

schema

Return current data schema of connection.

Returns:Data schema of current connection.
Return type:Dict[str, Dict]
service_queries

Return queries retrieved from the service after connecting.

Returns:Dictionary of query_name, query_text. Name of container to add queries to.
Return type:Tuple[Dict[str, str], str]

msticpy.data.drivers.mdatp_driver module

MDATP OData Driver class.

class msticpy.data.drivers.mdatp_driver.MDATPDriver(connection_str: str = None, **kwargs)

Bases: msticpy.data.drivers.odata_driver.OData

KqlDriver class to retreive date from MS Defender APIs.

Instantiate MSDefenderDriver and optionally connect.

Parameters:connection_str (str, optional) – Connection string
CONFIG_NAME = 'MicrosoftDefender'
add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: str = None, **kwargs)

Connect to oauth data source.

Parameters:connection_str (str, optional) – Connect to a data source

Notes

Connection string fields: tenant_id client_id client_secret apiRoot apiVersion

connected

Return true if at least one connection has been made.

Returns:True if a successful connection has been made.
Return type:bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

driver_queries

Return queries retrieved from the service after connecting.

Returns:List of Dictionary of query_name, query_text. Name of container to add queries to.
Return type:List[Dict[str, str]]
loaded

Return true if the provider is loaded.

Returns:True if the provider is loaded.
Return type:bool

Notes

This is not relevant for some providers.

query(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters:
  • query (str) – The query to execute
  • query_source (QuerySource) – The query definition object
Returns:

A DataFrame (if successfull) or the underlying provider result if an error.

Return type:

Union[pd.DataFrame, results.ResultSet]

query_attach_spec

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) → Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters:query (str) – The kql query to execute
Returns:A DataFrame (if successfull) and Kql ResultSet.
Return type:Tuple[pd.DataFrame, results.ResultSet]
schema

Return current data schema of connection.

Returns:Data schema of current connection.
Return type:Dict[str, Dict]
service_queries

Return queries retrieved from the service after connecting.

Returns:Dictionary of query_name, query_text. Name of container to add queries to.
Return type:Tuple[Dict[str, str], str]

msticpy.data.drivers.mordor_driver module

.

class msticpy.data.drivers.mordor_driver.MitreAttack(attack: Dict[str, Any] = None, technique: str = None, sub_technique: str = None, tactics: List[str] = None)

Bases: object

MitreAttack container for techniques and tactics.

Create instance of MitreAttack.

Parameters:
  • attack (Dict[str, Any], optional) – attack data as dictionary, by default None
  • technique (str, optional) – technique ID, by default None
  • sub_technique (str, optional) – sub-technique ID, by default None
  • tactics (List[str], optional) – List of associated tactics, by default None
MTR_TAC_URI = 'https://attack.mitre.org/tactics/{tactic_id}/'
MTR_TECH_URI = 'https://attack.mitre.org/techniques/{technique_id}/'
tactics_full

Return full listing of Mitre tactics.

Returns:List of tuples of: (ID, Name, Description, URI)
Return type:List[Tuple[str, str, str, str]]
technique_desc

Return Mitre technique description.

Returns:Technique description
Return type:Optional[str]
technique_name

Return Mitre Technique full name.

Returns:Name of the Mitre technique
Return type:Optional[str]
technique_uri

Return Mitre Technique URI.

Returns:URI of the Mitre technique
Return type:Optional[str]
class msticpy.data.drivers.mordor_driver.MordorDriver(**kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

Mordor data driver.

Initialize the Morder driver.

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters:connection_str (Optional[str]) – Connect to a data source
connected

Return true if at least one connection has been made.

Returns:True if a successful connection has been made.
Return type:bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

driver_queries

Return generator of Mordor query definitions.

Yields:Iterable[Dict[str, Any]] – Iterable of Dictionaries containing query definitions.
loaded

Return true if the provider is loaded.

Returns:True if the provider is loaded.
Return type:bool

Notes

This is not relevant for some providers.

query(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters:
  • query (str) – The query to execute
  • query_source (QuerySource) – The query definition object
Other Parameters:
 

kwargs – Are passed to the underlying provider query method, if supported.

Returns:

A DataFrame (if successfull) or the underlying provider result if an error.

Return type:

Union[pd.DataFrame, Any]

query_attach_spec

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) → Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame plus native results.

Parameters:query (str) – The query to execute
Returns:A DataFrame and native results.
Return type:Tuple[pd.DataFrame,Any]
schema

Return current data schema of connection.

Returns:Data schema of current connection.
Return type:Dict[str, Dict]
search_queries(search: str) → Iterable[str]

Search queries for matching attributes.

Parameters:search (str) – Search string. Substrings separated by commas will be treated as OR terms - e.g. “a, b” == “a” or “b”. Substrings separated by “+” will be treated as AND terms - e.g. “a + b” == “a” and “b”
Returns:Iterable of matching query names.
Return type:Iterable[str]
service_queries

Return queries retrieved from the service after connecting.

Returns:Dictionary of query_name, query_text. Name of container to add queries to.
Return type:Tuple[Dict[str, str], str]
class msticpy.data.drivers.mordor_driver.MordorEntry(title: str, id: str, type: str, creation_date, modification_date, contributors: List[str] = NOTHING, author: Optional[str] = None, platform: Optional[str] = None, description: Optional[str] = None, tags: List[str] = NOTHING, files: List[Dict[str, Any]] = NOTHING, datasets: List[Dict[str, Any]] = NOTHING, attack_mappings: List[Dict[str, Any]] = NOTHING, notebooks: List[Dict[str, str]] = NOTHING, simulation: Dict[str, Any] = NOTHING, references: List[Any] = NOTHING, rel_file_paths: List[Dict[str, Any]] = NOTHING)

Bases: object

Mordor data set metadata.

Method generated by attrs for class MordorEntry.

get_attacks() → List[msticpy.data.drivers.mordor_driver.MitreAttack]

Return list of Mitre attack classifications.

Returns:List of MitreAttack definitions.
Return type:List[MitreAttack]
get_file_paths() → List[Dict[str, str]]

Return list of data file links.

Returns:list of dictionaries describing files. Each entry has key/values for: - file_type - file_path - relative_path - qry_path
Return type:List[Dict[str, str]]
get_notebooks() → List[Tuple[str, str, str]]

Return the list of notebooks for the dataset.

Returns:Tuples of (name, project, link)
Return type:List[Tuple[str, str, str]]
msticpy.data.drivers.mordor_driver.download_mdr_file(file_uri: str, use_cached: bool = True, save_folder: str = '.', silent: bool = False) → pandas.core.frame.DataFrame

Download data file from Mordor.

Parameters:
  • file_uri (str) – The URI of the file to download.
  • use_cached (bool, optional) – Try to use locally saved file first, by default True
  • save_folder (str, optional) – Path to output folder, by default “.”
  • silent (bool) – If True, suppress feedback. By default, False.
Returns:

DataFrame of Dataset

Return type:

pd.DataFrame

msticpy.data.drivers.mordor_driver.get_mdr_data_paths(item_type='metadata') → Generator[str, None, None]

Generate Mordor data sets from GitHub repo.

Parameters:item_type (str, optional) – The type of item required, by default “metadata” Other values are “large”, “small.
Yields:str – Iterable of paths
msticpy.data.drivers.mordor_driver.search_mdr_data(mdr_data: Dict[str, msticpy.data.drivers.mordor_driver.MordorEntry], terms: str = None, subset: Iterable[str] = None) → Set[str]

Return IDs for items matching terms.

Parameters:
  • mdr_data (Dict[str, MordorEntry]) – Mordor dataset
  • terms (str, optional) – Search terms, by default None (comma-separated values are treated as OR terms plus-separated values are treated as AND terms)
  • subset (Iterable[str], optional) – A subset of IDs over which to search, by default None
Returns:

The set of matching IDs.

Return type:

Set[str]

msticpy.data.drivers.odata_driver module

OData Driver class.

class msticpy.data.drivers.odata_driver.OData(**kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

Parent class to retreive date from an oauth based API.

Instantiate OData driver and optionally connect.

Parameters:connect (bool, optional) – Set true if you want to connect to the provider at initialization
CONFIG_NAME = ''
add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: str = None, **kwargs)

Connect to oauth data source.

Parameters:connection_str (str, optional) – Connect to a data source

Notes

Connection string fields: tenant_id client_id client_secret apiRoot apiVersion

connected

Return true if at least one connection has been made.

Returns:True if a successful connection has been made.
Return type:bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

driver_queries

Return queries retrieved from the service after connecting.

Returns:List of Dictionary of query_name, query_text. Name of container to add queries to.
Return type:List[Dict[str, str]]
loaded

Return true if the provider is loaded.

Returns:True if the provider is loaded.
Return type:bool

Notes

This is not relevant for some providers.

query(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters:
  • query (str) – The query to execute
  • query_source (QuerySource) – The query definition object
Returns:

A DataFrame (if successfull) or the underlying provider result if an error.

Return type:

Union[pd.DataFrame, Any]

query_attach_spec

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) → Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters:query (str) – The kql query to execute
Returns:A DataFrame (if successfull) and Kql ResultSet.
Return type:Tuple[pd.DataFrame, results.ResultSet]
schema

Return current data schema of connection.

Returns:Data schema of current connection.
Return type:Dict[str, Dict]
service_queries

Return queries retrieved from the service after connecting.

Returns:Dictionary of query_name, query_text. Name of container to add queries to.
Return type:Tuple[Dict[str, str], str]

msticpy.data.drivers.security_graph_driver module

Security Graph OData Driver class.

class msticpy.data.drivers.security_graph_driver.SecurityGraphDriver(connection_str: str = None, **kwargs)

Bases: msticpy.data.drivers.odata_driver.OData

Driver to query security graph.

Instantiate MSGraph driver and optionally connect.

Parameters:connection_str (str, optional) – Connection string
CONFIG_NAME = 'MicrosoftGraph'
add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: str = None, **kwargs)

Connect to oauth data source.

Parameters:connection_str (str, optional) – Connect to a data source

Notes

Connection string fields: tenant_id client_id client_secret apiRoot apiVersion

connected

Return true if at least one connection has been made.

Returns:True if a successful connection has been made.
Return type:bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

driver_queries

Return queries retrieved from the service after connecting.

Returns:List of Dictionary of query_name, query_text. Name of container to add queries to.
Return type:List[Dict[str, str]]
loaded

Return true if the provider is loaded.

Returns:True if the provider is loaded.
Return type:bool

Notes

This is not relevant for some providers.

query(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters:
  • query (str) – The query to execute
  • query_source (QuerySource) – The query definition object
Returns:

A DataFrame (if successfull) or the underlying provider result if an error.

Return type:

Union[pd.DataFrame, results.ResultSet]

query_attach_spec

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) → Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters:query (str) – The kql query to execute
Returns:A DataFrame (if successfull) and Kql ResultSet.
Return type:Tuple[pd.DataFrame, results.ResultSet]
schema

Return current data schema of connection.

Returns:Data schema of current connection.
Return type:Dict[str, Dict]
service_queries

Return queries retrieved from the service after connecting.

Returns:Dictionary of query_name, query_text. Name of container to add queries to.
Return type:Tuple[Dict[str, str], str]

msticpy.data.drivers.splunk_driver module

Splunk Driver class.

class msticpy.data.drivers.splunk_driver.SplunkDriver(**kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

Driver to connect and query from Splunk.

Instantiate Splunk Driver.

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: str = None, **kwargs)

Connect to Splunk via splunk-sdk.

Parameters:connection_str (Optional[str], optional) – Connection string with Splunk connection parameters
Other Parameters:
 kwargs – Connection parameters can be supplied as keyword parameters.

Notes

Default configuration is read from the DataProviders/Splunk section of msticpyconfig.yaml, if available.

connected

Return true if at least one connection has been made.

Returns:True if a successful connection has been made.
Return type:bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

driver_queries

Return dynamic queries available on connection to service.

Returns:List of queries with properties: “name”, “query”, “container” and (optionally) “description”
Return type:Iterable[Dict[str, Any]]
Raises:MsticpyNotConnectedError – If called before driver is connected.
loaded

Return true if the provider is loaded.

Returns:True if the provider is loaded.
Return type:bool

Notes

This is not relevant for some providers.

query(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]

Execute splunk query and retrieve results via OneShot search mode.

Parameters:
  • query (str) – Splunk query to execute via OneShot search mode
  • query_source (QuerySource) – The query definition object
Other Parameters:
 

kwargs – Are passed to Splunk oneshot method count=0 by default

Returns:

Query results in a dataframe. or query response if an error.

Return type:

Union[pd.DataFrame, Any]

query_attach_spec

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) → Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters:query (str) – Query to execute against splunk instance.
Returns:A DataFrame (if successful) or the underlying provider result if an error occurs.
Return type:Union[pd.DataFrame,Any]
schema

Return current data schema of connection.

Returns:Data schema of current connection.
Return type:Dict[str, Dict]
service_queries

Return dynamic queries available on connection to service.

Returns:Dictionary of query_name, query_text. Name of container to add queries to.
Return type:Tuple[Dict[str, str], str]

msticpy.data.data_obfus

Data obfuscation functions.

class msticpy.data.data_obfus.ObfuscationAccessor(pandas_obj)

Bases: object

Base64 Unpack pandas extension.

Initialize the extension.

mask(column_map: Mapping[str, Any] = None, use_default: bool = True) → pandas.core.frame.DataFrame

Obfuscate the data in columns of a pandas dataframe.

Parameters:
  • data (pd.DataFrame) – dataframe containing column to obfuscate
  • column_map (Mapping[str, Any], optional) – Custom column mapping, by default None
  • use_default (bool) – If True use the built-in map (adding any custom mappings to this dictionary)
Returns:

Obfuscated dataframe

Return type:

pd.DataFrame

msticpy.data.data_obfus.check_masking(data: pandas.core.frame.DataFrame, orig_data: pandas.core.frame.DataFrame, index: int = 0, silent=True) → Optional[Tuple[List[str], List[str]]]

Check the obfuscation results for a row.

Parameters:
  • data (pd.DataFrame) – Obfuscated DataFrame
  • orig_data (pd.DataFrame) – Original DataFrame
  • index (int, optional) – The row to check, by default 0
  • silent (bool) – If False the function returns no output and returns lists of changed and unchanged columns. By default, True
Returns:

If silent is True returns a tuple of unchanged, changed items. If False, returns None.

Return type:

Optional[Tuple[List[str], List[str]]]

msticpy.data.data_obfus.check_obfuscation(data: pandas.core.frame.DataFrame, orig_data: pandas.core.frame.DataFrame, index: int = 0, silent=True) → Optional[Tuple[List[str], List[str]]]

Check the obfuscation results for a row.

Parameters:
  • data (pd.DataFrame) – Obfuscated DataFrame
  • orig_data (pd.DataFrame) – Original DataFrame
  • index (int, optional) – The row to check, by default 0
  • silent (bool) – If False the function returns no output and returns lists of changed and unchanged columns. By default, True
Returns:

If silent is True returns a tuple of unchanged, changed items. If False, returns None.

Return type:

Optional[Tuple[List[str], List[str]]]

msticpy.data.data_obfus.hash_account

Hash an Account to something recognizable.

Parameters:account (str) – Account name (UPN, NT or simple name)
Returns:Hashed Account
Return type:str
msticpy.data.data_obfus.hash_dict(item_dict: Dict[str, Union[Dict[str, Any], List[Any], str]]) → Dict[str, Any]

Hash dictionary values.

Parameters:item_dict (Dict[str, Union[Dict[str, Any], List[Any], str]]) – Input item can be a Dict of strings, lists or other dictionaries.
Returns:Dictionary with hashed values.
Return type:Dict[str, Any]
msticpy.data.data_obfus.hash_ip(input_item: Union[List[str], str]) → Union[List[str], str]

Hash IP address or list of IP addresses.

Parameters:input_item (Union[List[str], str]) – List of IP addresses or single IP address.
Returns:List of hashed addresses or single address. (depending on input)
Return type:Union[List[str], str]
msticpy.data.data_obfus.hash_item

Hash a simple string.

Parameters:
  • input_item (str) – The input string
  • delim (str, optional) – A string of delimiters to use to split the input string prior to hashing.
Returns:

The obfuscated output string

Return type:

str

msticpy.data.data_obfus.hash_list(item_list: List[str]) → List[str]

Hash list of strings.

Parameters:item_list (List[str]) – Input list
Returns:Hashed list
Return type:List[str]
msticpy.data.data_obfus.hash_sid

Hash a SID preserving well-known SIDs and the RID.

Parameters:sid (str) – SID string
Returns:Hashed SID
Return type:str
msticpy.data.data_obfus.hash_string(input_str: str) → str

Hash a simple string.

Parameters:input_str (str) – The input string
Returns:The obfuscated output string
Return type:str
msticpy.data.data_obfus.mask_df(data: pandas.core.frame.DataFrame, column_map: Mapping[str, Any] = None, use_default: bool = True, silent: bool = True) → pandas.core.frame.DataFrame

Obfuscate columns of a DataFrame.

Parameters:
  • data (pd.DataFrame) – Input dataframe
  • column_map (Mapping[str, Any], optional) – Custom column mapping, by default None
  • use_default (bool) – If True use the built-in map (adding any custom mappings to this dictionary)
  • silent (bool) – If False the function returns progress output, by default True.
Returns:

Obfuscated dataframe.

Return type:

pd.DataFrame

msticpy.data.data_obfus.obfuscate_df(data: pandas.core.frame.DataFrame, column_map: Mapping[str, Any] = None, use_default: bool = True, silent: bool = True) → pandas.core.frame.DataFrame

Obfuscate columns of a DataFrame.

Parameters:
  • data (pd.DataFrame) – Input dataframe
  • column_map (Mapping[str, Any], optional) – Custom column mapping, by default None
  • use_default (bool) – If True use the built-in map (adding any custom mappings to this dictionary)
  • silent (bool) – If False the function returns progress output, by default True.
Returns:

Obfuscated dataframe.

Return type:

pd.DataFrame

msticpy.data.data_obfus.replace_guid(guid: str) → str

Replace GUID/UUID with mapped random UUID.

Parameters:guid (str) – Input UUID.
Returns:Mapped UUID
Return type:str

msticpy.data.browsers.query_browser module

QueryProvider Query Browser.

msticpy.data.browsers.query_browser.browse_queries(query_provider: Any, **kwargs) → msticpy.nbtools.nbwidgets.SelectItem

Return QueryProvider query browser.

Parameters:query_provider (QueryProvider) – Initialized query provider.
Other Parameters:
 kwargs – passed to SelectItem constuctor.
Returns:SelectItem browser for TI Data.
Return type:SelectItem

msticpy.data.browsers.mordor_browser module

Morder dataset browser.

class msticpy.data.browsers.mordor_browser.MordorBrowser(save_folder: str = '.', use_cached: bool = True)

Bases: object

Mordor browser widget.

Initialize MordorBrowser control.

Parameters:
  • save_folder (str, optional) – Folder to save downloaded files, by default “.”
  • use_cached (bool, optional) – If true, downloaded files are not deleted after download and are used as a local cache, by default True
fields

Return set of fields widget controls.

selected_dset

Return the ID of the selected data set.