msticpy.data package¶
msticpy.nbtools.entityschema module¶
msticpy.nbtools.security_alert module¶
Module for SecurityAlert class.
-
class
msticpy.nbtools.security_alert.
SecurityAlert
(src_row: pandas.core.series.Series = None)¶ Bases:
msticpy.nbtools.security_base.SecurityBase
Security Alert Class.
Instantiates a security alert from a pandas Series and provides convenience access methods to retrieve properties.
Instantiate a security alert from a pandas Series.
-
computer
¶ Return the Computer name of the host associated with the alert.
(host FQDN, if available)
-
data_environment
¶ Return the data environment of the alert for subsequent queries.
-
data_family
¶ Return the data family of the alert for subsequent queries.
-
entities
¶ Return a list of the Security Alert entities.
-
get_all_entities
() → pandas.core.frame.DataFrame¶ Return a DataFrame of the Alert or Event entities.
Returns: Pandas DataFrame of the Alert or Event entities. Return type: DataFrame
-
get_entities_of_type
(entity_type: str) → List[msticpy.datamodel.entities.entity.Entity]¶ Return entity collection for a give entity type.
Parameters: entity_type (str, optional) – The entity type. Returns: The entities matching entity_type. Return type: List[Entity]
-
get_logon_id
(account: msticpy.datamodel.entities.account.Account = None) → Union[str, int, None]¶ Get the logon Id for the alert or the account, if supplied.
If account is not supplied, return the logon id of the first host-logon-session or account entity.
Parameters: account (Account, optional) – Account objec to use (the default is None) Returns: The logon Id for primary account Return type: Optional[Union[str, int]]
-
host_filter
(operator='==')¶ Return a KQL host filter clause derived from the alert properties.
param operator=’==’: the operator to use in the filter clause. ‘==’ and ‘!=’ typically.
-
hostname
¶ Return the Hostname (not FQDN) of the host associated with the alert.
-
ids
¶ Return a collection of Identity properties for the alert.
-
is_in_azure_sub
¶ Return True if the alert originates from an Azure Security Center host.
-
is_in_log_analytics
¶ Return True if the alert originates from a Log Analytics Workspace host.
-
is_in_workspace
¶ Return True if the alert has a Log Analytics WorkspaceID.
-
origin_time
¶ Return the datetime of event.
-
primary_account
¶ Return the primary account entity (if any) associated with this object.
Returns: primary account entity (if any) Return type: Optional[Process]
-
primary_host
¶ Return the primary host entity (if any) associated with this object.
Returns: primary host entity (if any) Return type: Optional[Host]
-
primary_process
¶ Return the primary process entity (if any) associated with this object.
Returns: primary process entity (if any) Return type: Optional[Process]
-
properties
¶ Return a dictionary of the Alert or Event properties.
Returns: dictionary of the Alert or Event properties. Return type: Dict[str, Any]
-
query_params
¶ Query parameters derived from alert.
Returns: Dictionary of parameter names/value Return type: Dict[str, Any]
-
subscription_filter
(operator='==')¶ Return a KQL subscription filter clause derived from the alert properties.
-
to_html
(show_entities=False) → str¶ Return the item as HTML string.
-
msticpy.nbtools.security_alert_graph module¶
security_alert_graph.
Creates an entity graph for the alert.
Add related alerts to the graph.
Link to the entity that is common to both alerts.
-
msticpy.nbtools.security_alert_graph.
create_alert_graph
(alert: msticpy.nbtools.security_alert.SecurityAlert)¶ Create a networkx graph from the alert and contained entities.
msticpy.nbtools.security_base module¶
Module for SecurityAlert class.
-
class
msticpy.nbtools.security_base.
SecurityBase
(src_row: pandas.core.series.Series = None)¶ Bases:
msticpy.data.query_defns.QueryParamProvider
Security Base Class for alerts and events.
Instantiates a security event or alert from a pandas Series and provides convenience access methods to retrieve properties.
Instantiate a security alert from a pandas Series.
-
computer
¶ Return the Computer name of the host associated with the alert.
(host FQDN, if available)
-
data_environment
¶ Return the data environment of the alert for subsequent queries.
-
data_family
¶ Return the data family of the alert for subsequent queries.
-
entities
¶ Return a list of the Alert or Event entities.
Returns: List of the Alert or Event entities. Return type: List[Entity]
-
get_all_entities
() → pandas.core.frame.DataFrame¶ Return a DataFrame of the Alert or Event entities.
Returns: Pandas DataFrame of the Alert or Event entities. Return type: DataFrame
-
get_entities_of_type
(entity_type: str) → List[msticpy.datamodel.entities.entity.Entity]¶ Return entity collection for a give entity type.
Parameters: entity_type (str, optional) – The entity type. Returns: The entities matching entity_type. Return type: List[Entity]
-
get_logon_id
(account: msticpy.datamodel.entities.account.Account = None) → Union[str, int, None]¶ Get the logon Id for the alert or the account, if supplied.
If account is not supplied, return the logon id of the first host-logon-session or account entity.
Parameters: account (Account, optional) – Account objec to use (the default is None) Returns: The logon Id for primary account Return type: Optional[Union[str, int]]
-
host_filter
(operator='==')¶ Return a KQL host filter clause derived from the alert properties.
param operator=’==’: the operator to use in the filter clause. ‘==’ and ‘!=’ typically.
-
hostname
¶ Return the Hostname (not FQDN) of the host associated with the alert.
-
ids
¶ Return a collection of Identity properties for the alert.
-
is_in_azure_sub
¶ Return True if the alert originates from an Azure Security Center host.
-
is_in_log_analytics
¶ Return True if the alert originates from a Log Analytics Workspace host.
-
is_in_workspace
¶ Return True if the alert has a Log Analytics WorkspaceID.
-
origin_time
¶ Return the datetime of event.
-
primary_account
¶ Return the primary account entity (if any) associated with this object.
Returns: primary account entity (if any) Return type: Optional[Process]
-
primary_host
¶ Return the primary host entity (if any) associated with this object.
Returns: primary host entity (if any) Return type: Optional[Host]
-
primary_process
¶ Return the primary process entity (if any) associated with this object.
Returns: primary process entity (if any) Return type: Optional[Process]
-
properties
¶ Return a dictionary of the Alert or Event properties.
Returns: dictionary of the Alert or Event properties. Return type: Dict[str, Any]
-
query_params
¶ Query parameters derived from alert.
Returns: Dictionary of parameter names/values Return type: Dict[str, Any]
-
subscription_filter
(operator='==')¶ Return a KQL subscription filter clause derived from the alert properties.
-
to_html
(show_entities: bool = False) → str¶ Return the item as HTML string.
-
msticpy.nbtools.security_event module¶
Module for SecurityEvent class.
-
class
msticpy.nbtools.security_event.
SecurityEvent
(src_row: pandas.core.series.Series = None)¶ Bases:
msticpy.nbtools.security_base.SecurityBase
SecurityEvent class.
Instantiate new instance of SecurityEvent.
param src_row: Pandas series containing single security event -
computer
¶ Return the Computer name of the host associated with the alert.
(host FQDN, if available)
-
data_environment
¶ Return the data environment of the alert for subsequent queries.
-
data_family
¶ Return the data family of the alert for subsequent queries.
-
entities
¶ Return the list of entities extracted from the event.
Returns: The list of entities extracted from the event. Return type: List[Entity]
-
get_all_entities
() → pandas.core.frame.DataFrame¶ Return a DataFrame of the Alert or Event entities.
Returns: Pandas DataFrame of the Alert or Event entities. Return type: DataFrame
-
get_entities_of_type
(entity_type: str) → List[msticpy.datamodel.entities.entity.Entity]¶ Return entity collection for a give entity type.
Parameters: entity_type (str, optional) – The entity type. Returns: The entities matching entity_type. Return type: List[Entity]
-
get_logon_id
(account: msticpy.datamodel.entities.account.Account = None) → Union[str, int, None]¶ Get the logon Id for the alert or the account, if supplied.
If account is not supplied, return the logon id of the first host-logon-session or account entity.
Parameters: account (Account, optional) – Account objec to use (the default is None) Returns: The logon Id for primary account Return type: Optional[Union[str, int]]
-
host_filter
(operator='==')¶ Return a KQL host filter clause derived from the alert properties.
param operator=’==’: the operator to use in the filter clause. ‘==’ and ‘!=’ typically.
-
hostname
¶ Return the Hostname (not FQDN) of the host associated with the alert.
-
ids
¶ Return a collection of Identity properties for the alert.
-
is_in_azure_sub
¶ Return True if the alert originates from an Azure Security Center host.
-
is_in_log_analytics
¶ Return True if the alert originates from a Log Analytics Workspace host.
-
is_in_workspace
¶ Return True if the alert has a Log Analytics WorkspaceID.
-
origin_time
¶ Return the datetime of event.
-
primary_account
¶ Return the primary account entity (if any) associated with this object.
Returns: primary account entity (if any) Return type: Optional[Process]
-
primary_host
¶ Return the primary host entity (if any) associated with this object.
Returns: primary host entity (if any) Return type: Optional[Host]
-
primary_process
¶ Return the primary process entity (if any) associated with this object.
Returns: primary process entity (if any) Return type: Optional[Process]
-
properties
¶ Return a dictionary of the Alert or Event properties.
Returns: dictionary of the Alert or Event properties. Return type: Dict[str, Any]
-
query_params
¶ Query parameters derived from alert.
Returns: Dictionary of parameter names Return type: Dict[str, Any]
-
subscription_filter
(operator='==')¶ Return a KQL subscription filter clause derived from the alert properties.
-
to_html
(show_entities: bool = False) → str¶ Return the item as HTML string.
-
msticpy.data.data_providers module¶
Data provider loader.
-
class
msticpy.data.data_providers.
QueryProvider
(data_environment: Union[str, msticpy.data.query_defns.DataEnvironment], driver: msticpy.data.drivers.driver_base.DriverBase = None, query_paths: List[str] = None, **kwargs)¶ Bases:
object
Container for query store and query execution provider.
Instances of this class hold the query set and execution methods for a specific data environment.
Query provider interface to queries.
Parameters: - data_environment (Union[str, DataEnvironment]) – Name or Enum of environment for the QueryProvider
- driver (DriverBase, optional) – Override the builtin driver (query execution class) and use your own driver (must inherit from DriverBase)
- query_paths (List[str]) – Additional paths to look for query definitions.
- kwargs – Other arguments are passed to the data provider driver.
See also
DataProviderBase
- base class for data query providers.
-
browse
(**kwargs)¶ Return QueryProvider query browser.
Other Parameters: kwargs – passed to SelectItem constructor. Returns: SelectItem browser for TI Data. Return type: SelectItem
-
browse_queries
(**kwargs)¶ Return QueryProvider query browser.
Other Parameters: kwargs – passed to SelectItem constructor. Returns: SelectItem browser for TI Data. Return type: SelectItem
-
connect
(connection_str: str = None, **kwargs)¶ Connect to data source.
Parameters: connection_str (str) – Connection string for the data source
-
connected
¶ Return True if the provider is connected.
Returns: True if the provider is connected. Return type: bool
-
connection_string
¶ Return provider connection string.
Returns: Provider connection string. Return type: str
-
exec_query
(query: str, **kwargs) → Union[pandas.core.frame.DataFrame, Any]¶ Execute simple query string.
Parameters: query (str) – [description] Returns: Query results - a DataFrame if successful or a KqlResult if unsuccessful. Return type: Union[pd.DataFrame, Any]
-
get_query
(query_name) → str¶ Return the raw query text.
-
import_query_file
(query_file: str)¶ Import a yaml data source definition.
Parameters: query_file (str) – Path to the file to import
-
classmethod
list_data_environments
() → List[str]¶ Return list of current data environments.
Returns: List of current data environments Return type: List[str]
-
list_queries
() → List[str]¶ Return list of family.query in the store.
Returns: List of queries Return type: Iterable[str]
-
query_help
(query_name)¶ Print help for query.
-
query_time
¶ Return the default QueryTime control for queries.
-
schema
¶ Return current data schema of connection.
Returns: Data schema of current connection. Return type: Dict[str, Dict]
-
schema_tables
¶ Return list of tables in the data schema of the connection.
Returns: Tables in the of current connection. Return type: List[str]
msticpy.data.data_query_reader module¶
Data query definition reader.
-
msticpy.data.data_query_reader.
find_yaml_files
(source_path: str, recursive: bool = False) → Iterable[pathlib.Path]¶ Return iterable of yaml files found in source_path.
Parameters: - source_path (str) – The source path to search in.
- recursive (bool, optional) – Whether to recurse through subfolders. By default False
Returns: File paths of yanl files found.
Return type: Iterable[str]
-
msticpy.data.data_query_reader.
read_query_def_file
(query_file: str) → Tuple[Dict[KT, VT], Dict[KT, VT], Dict[KT, VT]]¶ Read a yaml data query definition file.
Parameters: query_file (str) – Path to yaml query defintion file Returns: Tuple of dictionaries. sources - dictionary of query definitions defaults - the default parameters from the file metadata - the global metadata from the file Return type: Tuple[Dict, Dict, Dict]
-
msticpy.data.data_query_reader.
validate_query_defs
(query_def_dict: Dict[str, Any]) → bool¶ Validate content of query definition.
Parameters: query_def_dict (dict) – Dictionary of query definition yaml file contents. Returns: True if validation succeeds. Return type: bool Raises: ValueError
– The validation failure reason is returned in the exception message (arg[0])
msticpy.data.param_extractor module¶
Parameter extractor helper functions for use with IPython/Juptyer queries.
-
msticpy.data.param_extractor.
extract_query_params
(query_source: msticpy.data.query_source.QuerySource, *args, **kwargs) → Tuple[Dict[str, Any], List[str]]¶ Get the parameters needed for the query.
Parameters: - query_source (QuerySource) – Query source
- args (Tuple[QueryParamProvider]) – objects that implement QueryParamProvider (from which query parameters can be extracted).
- kwargs (Dict[str, Any]) – custom parameter list to populate queries (override default values and values extracted from QueryParamProviders).
Returns: Dictionary of parameter names and values to be used in the query. List of any missing parameters
Return type: Tuple[Dict[str, Any], List[str]]
msticpy.data.query_container module¶
Query hierarchy attribute class.
-
class
msticpy.data.query_container.
QueryContainer
¶ Bases:
object
Empty class used to create hierarchical attributes.
msticpy.data.query_defns module¶
Query helper definitions.
-
class
msticpy.data.query_defns.
DataEnvironment
¶ Bases:
enum.Enum
Enumeration of data environments.
Used to identify which queries are relevant for which data sources.
-
AzureSecurityCenter
= 3¶
-
AzureSentinel
= 1¶
-
Kusto
= 2¶
-
LocalData
= 6¶
-
LogAnalytics
= 1¶
-
M365D
= 11¶
-
MDATP
= 5¶
-
MDE
= 5¶
-
MSSentinel
= 1¶
-
Mordor
= 8¶
-
ResourceGraph
= 9¶
-
SecurityGraph
= 4¶
-
Splunk
= 7¶
-
Sumologic
= 10¶
-
Unknown
= 0¶
-
parse
= <bound method DataEnvironment.parse of <enum 'DataEnvironment'>>¶
-
-
class
msticpy.data.query_defns.
DataFamily
¶ Bases:
enum.Enum
Enumeration of data families.
Used to identify which queries are relevant for which data sources.
-
AzureNetwork
= 6¶
-
LinuxSecurity
= 2¶
-
LinuxSyslog
= 5¶
-
MDATP
= 7¶
-
ResourceGraph
= 9¶
-
SecurityAlert
= 3¶
-
SecurityGraphAlert
= 4¶
-
Splunk
= 8¶
-
Sumologic
= 10¶
-
Unknown
= 0¶
-
WindowsSecurity
= 1¶
-
parse
= <bound method DataFamily.parse of <enum 'DataFamily'>>¶
-
-
class
msticpy.data.query_defns.
QueryParamProvider
¶ Bases:
abc.ABC
Abstract type for QueryParamProvider.
Method query_params must be overridden by derived classes.
-
query_params
¶ Return dict of query parameters.
These parameters are sourced in the object implementing this method.
Returns: Return type: dict – dictionary of query parameter values.
-
-
msticpy.data.query_defns.
ensure_df_datetimes
(data: pandas.core.frame.DataFrame, columns: Union[str, List[str], None] = None, add_utc_tz: bool = True) → pandas.core.frame.DataFrame¶ Return dataframe with converted TZ-aware timestamps.
Parameters: - data (pd.DataFrame) – Input dataframe
- columns (Union[str, List[str], None], optional) – column (str) or list of columns to convert, by default None. If this parameter is not supplied then any column containing the substring “time” is used as a candidate for conversion.
- add_utc_tz (bool, optional) – If True any datetime columns in the columns parameter ( (or default ‘.*time.*’ columns) that are timezone-naive, will be converted to Timezone-aware timestamps marked as UTC.
Returns: Converted DataFrame.
Return type: pd.DataFrame
msticpy.data.query_source module¶
Intake kql driver.
-
class
msticpy.data.query_source.
QuerySource
(name: str, source: Dict[str, Any], defaults: Dict[str, Any], metadata: Dict[str, Any])¶ Bases:
object
Query definition class for templated queries.
-
name
¶ The query name
Type: str
-
metadata
¶ The consolidated metadata for the query
Type: Dict[str, Any]
-
params
¶ The dictionary of parameter definitions for the query.
Type: dict[str, Any]
-
query_store
¶ The query store object that the query belongs to
Type: QueryStore
Initialize query source definition.
Parameters: - name (str) – The query name
- source (dict) – The data source definition settings
- defaults (dict) – The default settings (if source-specific setting not supplied)
- metadata (dict) – The global metadata from the source file.
Notes
A data source can belong to multiple families (e.g. a query that joins data from several sources)
-
create_doc_string
() → str¶ Return a doc string for the query.
Returns: New-line delimited docstring dynamically created from query definition properties. Return type: str
-
create_query
(formatters: Dict[str, Callable] = None, **kwargs) → str¶ Return query with values from kwargs and defaults substituted.
Parameters: - formatters (Dict[str, Callable]) – Dictionary of custom parameter formatters indexed by data type
- kwargs (Mapping[str, Any]) – Set of parameter name, value pairs used to populate the template query.
Returns: The populated query
Return type: str
Raises: ValueError
– If one or more parameters with no default values are not supplied.Notes
Parameters supplied as arguments will override any parameter defaults (see default_params property).
-
data_families
¶ Return the list of data families used by the query.
Returns: The list of data families. A data family is usually equivalent to a table or entity set. Return type: List[str]
-
default_params
¶ Return the set of parameters with default values.
Returns: List of parameters Return type: Iterable[dict]
-
description
¶ Return description of the query.
Returns: Query description. Return type: str
-
help
()¶ Print help for query.
-
query
¶ Return the query template.
Returns: The template query. Return type: str
-
required_params
¶ Return the set of parameters with no default values.
Returns: List of parameters Return type: Iterable[dict]
-
resolve_param_aliases
(param_dict: Dict[str, Any]) → Dict[str, Any]¶ Try to resolve any parameters in param_dict that are aliases.
-
validate
() → Tuple[bool, List[str]]¶ Validate the source to ensure that all required properties are present.
Returns: True if validation is successful. Return type: bool
-
msticpy.data.query_store module¶
QueryStore class - holds a collection of QuerySources.
-
class
msticpy.data.query_store.
QueryStore
(environment: str)¶ Bases:
object
Repository for query definitions for a data environment.
-
environment
¶ The data environment for the queries.
Type: str
-
data_families
¶ The set of data families and associated queries for each.
Type: Dict[str, Dict[str, QuerySource]]
Intialize a QueryStore for a new environment.
Parameters: environment (str) – The data environment -
add_data_source
(source: msticpy.data.query_source.QuerySource)¶ Add a datasource/query to the store.
Parameters: source (QuerySource) – The source to add. An existing item with the same name will be overwritten
-
add_query
(name: str, query: str, query_paths: Union[str, List[str]], description: str = None)¶ Add a query from name/query text.
Parameters: - name (str) – name of the query
- query (str) – The query string
- query_paths (Union[str, List[str]]) – The path/data_family to categorize. Multiple paths can be specified. If the path is dotted, this will cause the query to be displayed in the corresponding hierarchy.
- description (str, optional) – Query description
-
find_query
(query_name: str) → Set[Optional[msticpy.data.query_source.QuerySource]]¶ Return set of queries with name query_name.
Parameters: query_name (str) – Name of the query Returns: Set (distinct) queries matching name. Return type: Set[QuerySource]
-
get_query
(query_name: str, query_path: Union[str, msticpy.data.query_defns.DataFamily] = None) → msticpy.data.query_source.QuerySource¶ Return query with name data_family and query_name.
Parameters: - query_name (str) – Name of the query
- query_path (Union[str, DataFamily]) – The data family for the query
Returns: Query matching name and family.
Return type:
-
import_file
(query_file: str)¶ Import a yaml data source definition.
Parameters: query_file (str) – Path to the file to import Raises: ImportError
– File read error or Syntax or semantic error found in the source file.
-
classmethod
import_files
(source_path: list, recursive: bool = False, driver_query_filter: Optional[Dict[str, Set[str]]] = None) → Dict[str, msticpy.data.query_store.QueryStore]¶ Import multiple query definition files from directory path.
Parameters: - source_path (str) – The folder containing the yaml definition files.
- recursive (bool, optional) – True to recurse sub-directories (the default is False, which only reads from the top level)
- driver_query_filter (Dict[str, Set[str]]) – A dictionary of query metadata keys and values. This is used to test each read query to see if it is relevant to the driver and should be returned in the created QueryStore dictionary.
Returns: Dictionary of one or more environments and the QueryStore containing the queries for each environment.
Return type: Dict[str, ‘QueryStore’]
Raises: FileNotFoundError
– File read error or Syntax or semantic error found in a source file.
-
query_names
¶ Return list of family.query in the store.
Returns: List of queries Return type: Iterable[str]
-
msticpy.data.azure_data module¶
Uses the Azure Python SDK to collect and return details related to Azure.
-
class
msticpy.data.azure_data.
AzureData
(connect: bool = False, cloud: str = None)¶ Bases:
object
Class for returning data on an Azure tenant.
Initialize connector for Azure Python SDK.
-
connect
(auth_methods: List[T] = None, silent: bool = False)¶ Authenticate to the Azure SDK.
Parameters: - auth_methods (List, optional) – list of preferred authentication methods to use, by default None
- silent (bool, optional) – Set true to prevent output during auth process, by default False
Raises: CloudError
– If no valid credentials are found or if subscription client can’t be created
-
get_metrics
(metrics: str, resource_id: str, sub_id: str, sample_time: str = 'hour', start_time: int = 30) → Dict[str, pandas.core.frame.DataFrame]¶ Return specified metrics on Azure Resource.
Parameters: - metrics (str) – A string list of metrics you wish to collect (https://docs.microsoft.com/en-us/azure/azure-monitor/platform/metrics-supported)
- resource_id (str) – The resource ID of the resource to collet the metrics from
- sub_id (str) – The subscription ID that the resource is part of
- sample_time (str (Optional)) – You can select to collect the metrics every hour of minute - default is hour Accepted inputs = ‘hour’ or ‘minute’
- start_time (int (Optional)) – The number of days prior to today to collect metrics for, default is 30
Returns: results – A Dictionary of DataFrames containing the metrics details
Return type: dict
-
get_network_details
(network_id: str, sub_id: str) → Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]¶ Return details related to an Azure network interface and associated NSG.
Parameters: - network_id (str) – The ID of the network interface to return details on
- sub_id (str) – The subscription ID that the network interface is part of
Returns: details – A dictionary of items related to the network interface
Return type: dict
-
get_resource_details
(sub_id: str, resource_id: str = None, resource_details: dict = None) → dict¶ Return the details of a specific Azure resource.
Parameters: - resource_id (str, optional) – The ID of the resource to get details on
- resource_details (dict, optional) –
- If ID is unknown provide the following details:
- -resource_group_name -resource_provider_namespace -resource_type -resource_name -parent_resource_path
- sub_id (str) – The ID of the subscription to get resources from
Returns: resource_details – The details of the requested resource
Return type: dict
-
get_resources
(sub_id: str, rgroup: str = None, get_props: bool = False) → pandas.core.frame.DataFrame¶ Return details on all resources in a subscription or Resource Group.
Parameters: - sub_id (str) – The subscription ID to get resources for
- rgroup (str (Optional)) – The name of a Resource Group to get resources for
- get_props (bool (Optional)) – Set to True if you want to get the full properties of every resource Warning this may be a slow process depending on the number of resources
Returns: A dataframe of resource details
Return type: pd.DataFrame
-
get_subscription_info
(sub_id: str) → dict¶ Get information on a specific subscription.
Parameters: sub_id (str) – The ID of the subscription to return details on. Returns: Details on the selected subscription. Return type: dict Raises: MsticpyNotConnectedError
– If .connect() has not been called.
-
get_subscriptions
() → pandas.core.frame.DataFrame¶ Get details of all subscriptions within the tenant.
Returns: Details of the subscriptions present in the users tenant. Return type: pd.DataFrame Raises: MsticpyNotConnectedError
– If .connect() has not been called
-
-
class
msticpy.data.azure_data.
InterfaceItems
(interface_id, private_ip, private_ip_allocation, public_ip, public_ip_allocation, app_sec_group, subnet, subnet_nsg, subnet_route_table)¶ Bases:
object
attr class to build network interface details dictionary.
Method generated by attrs for class InterfaceItems.
-
class
msticpy.data.azure_data.
Items
(resource_id, name, resource_type, location, tags, plan, properties, kind, managed_by, sku, identity, state)¶ Bases:
object
attr class to build resource details dictionary.
Method generated by attrs for class Items.
-
class
msticpy.data.azure_data.
NsgItems
(rule_name, description, protocol, direction, src_ports, dst_ports, src_addrs, dst_addrs, action)¶ Bases:
object
attr class to build NSG rule dictionary.
Method generated by attrs for class NsgItems.
msticpy.data.azure_sentinel module¶
Uses the Azure Python SDK to collect and return details related to Azure.
-
class
msticpy.data.azure_sentinel.
AzureSentinel
(connect: bool = False, cloud: Optional[str] = None)¶ Bases:
msticpy.data.azure_data.AzureData
Class for returning key Microsoft Sentinel elements.
Initialize connector for Azure APIs.
Parameters: - connect (bool, optional) – Set true if you want to connect to API on initialization, by default False
- cloud (str, optional) – Specify cloud to use, overriding any configuration value. Default is to use configuration setting or public cloud if no configuration setting is available.
-
connect
(auth_methods: List[T] = None, silent: bool = False, **kwargs)¶ Authenticate with the SDK & API.
Parameters: - auth_methods (List, optional) – list of preferred authentication methods to use, by default None
- silent (bool, optional) – Set true to prevent output during auth process, by default False
-
get_alert_rules
(res_id: str = None, sub_id: str = None, res_grp: str = None, ws_name: str = None) → pandas.core.frame.DataFrame¶ Return all Microsoft Sentinel alert rules for a workspace.
Parameters: - res_id (str, optional) – Resource ID of the workspace, if not provided details from config file will be used.
- sub_id (str, optional) – Sub ID of the workspace, to be used if not providing Resource ID.
- res_grp (str, optional) – Resource Group name of the workspace, to be used if not providing Resource ID.
- ws_name (str, optional) – Workspace name of the workspace, to be used if not providing Resource ID.
Returns: A table of the workspace’s alert rules.
Return type: pd.DataFrame
-
get_bookmarks
(res_id: str = None, sub_id: str = None, res_grp: str = None, ws_name: str = None) → pandas.core.frame.DataFrame¶ Return a list of Bookmarks from a Sentinel workspace.
Parameters: - res_id (str, optional) – Resource ID of the workspace, if not provided details from config file will be used.
- sub_id (str, optional) – Sub ID of the workspace, to be used if not providing Resource ID.
- res_grp (str, optional) – Resource Group name of the workspace, to be used if not providing Resource ID.
- ws_name (str, optional) – Workspace name of the workspace, to be used if not providing Resource ID.
Returns: A set of bookmarks.
Return type: pd.DataFrame
Raises: CloudError
– If bookmark collection fails.
-
get_hunting_queries
(res_id: str = None, sub_id: str = None, res_grp: str = None, ws_name: str = None) → pandas.core.frame.DataFrame¶ Return all hunting queries in a Microsoft Sentinel workspace.
Parameters: - res_id (str, optional) – Resource ID of the workspace, if not provided details from config file will be used.
- sub_id (str, optional) – Sub ID of the workspace, to be used if not providing Resource ID.
- res_grp (str, optional) – Resource Group name of the workspace, to be used if not providing Resource ID.
- ws_name (str, optional) – Workspace name of the workspace, to be used if not providing Resource ID.
Returns: A table of the hunting queries.
Return type: pd.DataFrame
-
get_incident
(incident_id: str, res_id: str = None, sub_id: str = None, res_grp: str = None, ws_name: str = None, entities: bool = False, alerts: bool = False) → pandas.core.frame.DataFrame¶ Get details on a specific incident.
Parameters: - incident_id (str) – Incident ID GUID.
- res_id (str, optional) – Resource ID of the workspace, if not provided details from config file will be used.
- sub_id (str, optional) – Sub ID of the workspace, to be used if not providing Resource ID.
- res_grp (str, optional) – Resource Group name of the workspace, to be used if not providing Resource ID.
- ws_name (str, optional) – Workspace name of the workspace, to be used if not providing Resource ID.
- entities (bool, optional) – If True, include all entities in the response. Default is False.
- alerts (bool, optional) – If True, include all alerts in the response. Default is False.
Returns: Table containing incident details.
Return type: pd.DataFrame
Raises: CloudError
– If incident could not be retrieved.
-
get_incidents
(res_id: str = None, sub_id: str = None, res_grp: str = None, ws_name: str = None) → pandas.core.frame.DataFrame¶ Get a list of incident for a Sentinel workspace.
Parameters: - res_id (str, optional) – Resource ID of the workspace, if not provided details from config file will be used.
- sub_id (str, optional) – Sub ID of the workspace, to be used if not providing Resource ID.
- res_grp (str, optional) – Resource Group name of the workspace, to be used if not providing Resource ID.
- ws_name (str, optional) – Workspace name of the workspace, to be used if not providing Resource ID.
Returns: A table of incidents.
Return type: pd.DataFrame
Raises: CloudError
– If incidents could not be retrieved.
-
get_metrics
(metrics: str, resource_id: str, sub_id: str, sample_time: str = 'hour', start_time: int = 30) → Dict[str, pandas.core.frame.DataFrame]¶ Return specified metrics on Azure Resource.
Parameters: - metrics (str) – A string list of metrics you wish to collect (https://docs.microsoft.com/en-us/azure/azure-monitor/platform/metrics-supported)
- resource_id (str) – The resource ID of the resource to collet the metrics from
- sub_id (str) – The subscription ID that the resource is part of
- sample_time (str (Optional)) – You can select to collect the metrics every hour of minute - default is hour Accepted inputs = ‘hour’ or ‘minute’
- start_time (int (Optional)) – The number of days prior to today to collect metrics for, default is 30
Returns: results – A Dictionary of DataFrames containing the metrics details
Return type: dict
-
get_network_details
(network_id: str, sub_id: str) → Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]¶ Return details related to an Azure network interface and associated NSG.
Parameters: - network_id (str) – The ID of the network interface to return details on
- sub_id (str) – The subscription ID that the network interface is part of
Returns: details – A dictionary of items related to the network interface
Return type: dict
-
get_resource_details
(sub_id: str, resource_id: str = None, resource_details: dict = None) → dict¶ Return the details of a specific Azure resource.
Parameters: - resource_id (str, optional) – The ID of the resource to get details on
- resource_details (dict, optional) –
- If ID is unknown provide the following details:
- -resource_group_name -resource_provider_namespace -resource_type -resource_name -parent_resource_path
- sub_id (str) – The ID of the subscription to get resources from
Returns: resource_details – The details of the requested resource
Return type: dict
-
get_resources
(sub_id: str, rgroup: str = None, get_props: bool = False) → pandas.core.frame.DataFrame¶ Return details on all resources in a subscription or Resource Group.
Parameters: - sub_id (str) – The subscription ID to get resources for
- rgroup (str (Optional)) – The name of a Resource Group to get resources for
- get_props (bool (Optional)) – Set to True if you want to get the full properties of every resource Warning this may be a slow process depending on the number of resources
Returns: A dataframe of resource details
Return type: pd.DataFrame
-
get_sentinel_workspaces
(sub_id: str = None) → Dict[str, str]¶ Return a list of Microsoft Sentinel workspaces in a Subscription.
Parameters: sub_id (str) – The subscription ID to get a list of workspaces from. If not provided it will attempt to get sub_id from config files. Returns: A dictionary of workspace names and ids Return type: Dict
-
get_subscription_info
(sub_id: str) → dict¶ Get information on a specific subscription.
Parameters: sub_id (str) – The ID of the subscription to return details on. Returns: Details on the selected subscription. Return type: dict Raises: MsticpyNotConnectedError
– If .connect() has not been called.
-
get_subscriptions
() → pandas.core.frame.DataFrame¶ Get details of all subscriptions within the tenant.
Returns: Details of the subscriptions present in the users tenant. Return type: pd.DataFrame Raises: MsticpyNotConnectedError
– If .connect() has not been called
-
post_comment
(incident_id: str, comment: str, res_id: str = None, sub_id: str = None, res_grp: str = None, ws_name: str = None)¶ Write a comment for an incident.
Parameters: - incident_id (str) – Incident ID GUID.
- comment (str) – Comment message to post.
- res_id (str, optional) – Resource ID of the workspace, if not provided details from config file will be used.
- sub_id (str, optional) – Sub ID of the workspace, to be used if not providing Resource ID.
- res_grp (str, optional) – Resource Group name of the workspace, to be used if not providing Resource ID.
- ws_name (str, optional) – Workspace name of the workspace, to be used if not providing Resource ID.
Raises: CloudError
– If message could not be posted.
-
set_default_subscription
(subscription_id: str)¶ Set the default subscription to use to subscription_id.
-
set_default_workspace
(sub_id: Optional[str], workspace: Optional[str] = None)¶ Set the default workspace.
Parameters: - sub_id (Optional[str], optional) – Subscription ID containing the workspace. If not specified, the subscription will be taken from the default_subscription or from configuration.
- workspace (Optional[str], optional) – Name of the workspace, by default None. If not specified and there is only one workspace in the subscription, this will be set as the default.
-
update_incident
(incident_id: str, update_items: dict, res_id: str = None, sub_id: str = None, res_grp: str = None, ws_name: str = None)¶ Update properties of an incident.
Parameters: - incident_id (str) – Incident ID GUID.
- update_items (dict) – Dictionary of properties to update and their values. Ref: https://docs.microsoft.com/en-us/rest/api/securityinsights/incidents/createorupdate
- res_id (str, optional) – Resource ID of the workspace, if not provided details from config file will be used.
- sub_id (str, optional) – Sub ID of the workspace, to be used if not providing Resource ID.
- res_grp (str, optional) – Resource Group name of the workspace, to be used if not providing Resource ID.
- ws_name (str, optional) – Workspace name of the workspace, to be used if not providing Resource ID.
Raises: CloudError
– If incident could not be updated.
msticpy.data.azure_blob_storage module¶
msticpy.data.sql_to_kql module¶
Module for SQL to KQL Conversion.
This is an experiment conversion utility built to support a limited subset of ANSI SQL. It relies on moz_sql_parser (https://github.com/mozilla/moz-sql-parser) to parse the SQL syntax tree. Some hacky additions have been done to allow table renaming and support for a few SparkSQL operators such as RLIKE.
For a more complete translation help with SQL to KQL see https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/sqlcheatsheet
Known limitations¶
- Does not support aggregate functions in SELECT with no GROUP BY clause
- Does not support IN, EXISTS, HAVING operators
- Only partial support for AS naming (should work in SELECT expressions)
-
msticpy.data.sql_to_kql.
sql_to_kql
(sql: str, target_tables: Dict[str, str] = None) → str¶ Parse SQL and return KQL equivalent.
msticpy.data.drivers.driver_base module¶
Data driver base class.
-
class
msticpy.data.drivers.driver_base.
DriverBase
(**kwargs)¶ Bases:
abc.ABC
Base class for data providers.
Initialize new instance.
-
add_query_filter
(name, query_filter)¶ Add an expression to the query attach filter.
-
connect
(connection_str: Optional[str] = None, **kwargs)¶ Connect to data source.
Parameters: connection_str (Optional[str]) – Connect to a data source
-
connected
¶ Return true if at least one connection has been made.
Returns: True if a successful connection has been made. Return type: bool Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
-
driver_queries
¶ Return queries retrieved from the service after connecting.
Returns: List of Dictionary of query_name, query_text. Name of container to add queries to. Return type: List[Dict[str, str]]
-
loaded
¶ Return true if the provider is loaded.
Returns: True if the provider is loaded. Return type: bool Notes
This is not relevant for some providers.
-
query
(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]¶ Execute query string and return DataFrame of results.
Parameters: - query (str) – The query to execute
- query_source (QuerySource) – The query definition object
Other Parameters: kwargs – Are passed to the underlying provider query method, if supported.
Returns: A DataFrame (if successfull) or the underlying provider result if an error.
Return type: Union[pd.DataFrame, Any]
-
query_attach_spec
¶ Parameters that determine whether a query is relevant for the driver.
-
query_with_results
(query: str, **kwargs) → Tuple[pandas.core.frame.DataFrame, Any]¶ Execute query string and return DataFrame plus native results.
Parameters: query (str) – The query to execute Returns: A DataFrame and native results. Return type: Tuple[pd.DataFrame,Any]
-
schema
¶ Return current data schema of connection.
Returns: Data schema of current connection. Return type: Dict[str, Dict]
-
service_queries
¶ Return queries retrieved from the service after connecting.
Returns: Dictionary of query_name, query_text. Name of container to add queries to. Return type: Tuple[Dict[str, str], str]
-
msticpy.data.drivers.kql_driver module¶
KQL Driver class.
-
class
msticpy.data.drivers.kql_driver.
KqlDriver
(connection_str: str = None, **kwargs)¶ Bases:
msticpy.data.drivers.driver_base.DriverBase
KqlDriver class to execute kql queries.
Instantiate KqlDriver and optionally connect.
Parameters: connection_str (str, optional) – Connection string Other Parameters: debug (bool) – print out additional diagnostic information. -
add_query_filter
(name, query_filter)¶ Add an expression to the query attach filter.
-
connect
(connection_str: Optional[str] = None, **kwargs)¶ Connect to data source.
Parameters: connection_str (str) – Connect to a data source
Other Parameters: - kqlmagic_args (str, optional) – Additional string of parameters to be passed to KqlMagic
- mp_az_auth (Union[bool, str, list, None], optional) – Optional parameter directing KqlMagic to use MSTICPy Azure authentication. Values can be: True or “default”: use the settings in msticpyconfig.yaml ‘Azure’ section str: single auth method name (‘msi’, ‘cli’, ‘env’ or ‘interactive’) List[str]: list of acceptable auth methods from (‘msi’, ‘cli’, ‘env’ or ‘interactive’)
-
connected
¶ Return true if at least one connection has been made.
Returns: True if a successful connection has been made. Return type: bool Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
-
driver_queries
¶ Return queries retrieved from the service after connecting.
Returns: List of Dictionary of query_name, query_text. Name of container to add queries to. Return type: List[Dict[str, str]]
-
loaded
¶ Return true if the provider is loaded.
Returns: True if the provider is loaded. Return type: bool Notes
This is not relevant for some providers.
-
query
(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]¶ Execute query string and return DataFrame of results.
Parameters: - query (str) – The query to execute
- query_source (QuerySource) – The query definition object
Returns: A DataFrame (if successfull) or the underlying provider result if an error.
Return type: Union[pd.DataFrame, results.ResultSet]
-
query_attach_spec
¶ Parameters that determine whether a query is relevant for the driver.
-
query_with_results
(query: str, **kwargs) → Tuple[pandas.core.frame.DataFrame, Any]¶ Execute query string and return DataFrame of results.
Parameters: query (str) – The kql query to execute Returns: A DataFrame (if successfull) and Kql ResultSet. Return type: Tuple[pd.DataFrame, results.ResultSet]
-
schema
¶ Return current data schema of connection.
Returns: Data schema of current connection. Return type: Dict[str, Dict]
-
service_queries
¶ Return queries retrieved from the service after connecting.
Returns: Dictionary of query_name, query_text. Name of container to add queries to. Return type: Tuple[Dict[str, str], str]
-
msticpy.data.drivers.local_data_driver module¶
Local Data Driver class - for testing and demos.
-
class
msticpy.data.drivers.local_data_driver.
LocalDataDriver
(connection_str: str = None, **kwargs)¶ Bases:
msticpy.data.drivers.driver_base.DriverBase
LocalDataDriver class to execute kql queries.
Instantiate LocalDataDriver and optionally connect.
Parameters: - connection_str (str, optional) – Connection string (not used)
- data_paths (List[str], optional) – Paths from which to load data files
-
add_query_filter
(name, query_filter)¶ Add an expression to the query attach filter.
-
connect
(connection_str: Optional[str] = None, **kwargs)¶ Connect to data source.
Parameters: connection_str (str) – Connect to a data source
-
connected
¶ Return true if at least one connection has been made.
Returns: True if a successful connection has been made. Return type: bool Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
-
driver_queries
¶ Return queries retrieved from the service after connecting.
Returns: List of Dictionary of query_name, query_text. Name of container to add queries to. Return type: List[Dict[str, str]]
-
loaded
¶ Return true if the provider is loaded.
Returns: True if the provider is loaded. Return type: bool Notes
This is not relevant for some providers.
-
query
(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]¶ Execute query string and return DataFrame of results.
Parameters: - query (str) – The query to execute
- query_source (QuerySource) – The query definition object
Returns: A DataFrame (if successfull) or the underlying provider result if an error.
Return type: Union[pd.DataFrame, results.ResultSet]
-
query_attach_spec
¶ Parameters that determine whether a query is relevant for the driver.
-
query_with_results
(query, **kwargs)¶ Return query with fake results.
-
schema
¶ Return current data schema of connection.
Returns: Data schema of current connection. Return type: Dict[str, Dict]
-
service_queries
¶ Return queries retrieved from the service after connecting.
Returns: Dictionary of query_name, query_text. Name of container to add queries to. Return type: Tuple[Dict[str, str], str]
msticpy.data.drivers.mdatp_driver module¶
MDATP OData Driver class.
-
class
msticpy.data.drivers.mdatp_driver.
MDATPDriver
(connection_str: str = None, **kwargs)¶ Bases:
msticpy.data.drivers.odata_driver.OData
KqlDriver class to retreive date from MS Defender APIs.
Instantiate MSDefenderDriver and optionally connect.
Parameters: connection_str (str, optional) – Connection string -
CONFIG_NAME
= 'MicrosoftDefender'¶
-
add_query_filter
(name, query_filter)¶ Add an expression to the query attach filter.
-
connect
(connection_str: str = None, **kwargs)¶ Connect to oauth data source.
Parameters: connection_str (str, optional) – Connect to a data source Notes
Connection string fields: tenant_id client_id client_secret apiRoot apiVersion
-
connected
¶ Return true if at least one connection has been made.
Returns: True if a successful connection has been made. Return type: bool Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
-
driver_queries
¶ Return queries retrieved from the service after connecting.
Returns: List of Dictionary of query_name, query_text. Name of container to add queries to. Return type: List[Dict[str, str]]
-
loaded
¶ Return true if the provider is loaded.
Returns: True if the provider is loaded. Return type: bool Notes
This is not relevant for some providers.
-
query
(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]¶ Execute query string and return DataFrame of results.
Parameters: - query (str) – The query to execute
- query_source (QuerySource) – The query definition object
Returns: A DataFrame (if successfull) or the underlying provider result if an error.
Return type: Union[pd.DataFrame, results.ResultSet]
-
query_attach_spec
¶ Parameters that determine whether a query is relevant for the driver.
-
query_with_results
(query: str, **kwargs) → Tuple[pandas.core.frame.DataFrame, Any]¶ Execute query string and return DataFrame of results.
Parameters: query (str) – The kql query to execute Returns: A DataFrame (if successfull) and Kql ResultSet. Return type: Tuple[pd.DataFrame, results.ResultSet]
-
schema
¶ Return current data schema of connection.
Returns: Data schema of current connection. Return type: Dict[str, Dict]
-
service_queries
¶ Return queries retrieved from the service after connecting.
Returns: Dictionary of query_name, query_text. Name of container to add queries to. Return type: Tuple[Dict[str, str], str]
-
msticpy.data.drivers.mordor_driver module¶
.
-
class
msticpy.data.drivers.mordor_driver.
MitreAttack
(attack: Dict[str, Any] = None, technique: str = None, sub_technique: str = None, tactics: List[str] = None)¶ Bases:
object
MitreAttack container for techniques and tactics.
Create instance of MitreAttack.
Parameters: - attack (Dict[str, Any], optional) – attack data as dictionary, by default None
- technique (str, optional) – technique ID, by default None
- sub_technique (str, optional) – sub-technique ID, by default None
- tactics (List[str], optional) – List of associated tactics, by default None
-
MTR_TAC_URI
= 'https://attack.mitre.org/tactics/{tactic_id}/'¶
-
MTR_TECH_URI
= 'https://attack.mitre.org/techniques/{technique_id}/'¶
-
tactics_full
¶ Return full listing of Mitre tactics.
Returns: List of tuples of: (ID, Name, Description, URI) Return type: List[Tuple[str, str, str, str]]
-
technique_desc
¶ Return Mitre technique description.
Returns: Technique description Return type: Optional[str]
-
technique_name
¶ Return Mitre Technique full name.
Returns: Name of the Mitre technique Return type: Optional[str]
-
technique_uri
¶ Return Mitre Technique URI.
Returns: URI of the Mitre technique Return type: Optional[str]
-
class
msticpy.data.drivers.mordor_driver.
MordorDriver
(**kwargs)¶ Bases:
msticpy.data.drivers.driver_base.DriverBase
Mordor data driver.
Initialize the Morder driver.
-
add_query_filter
(name, query_filter)¶ Add an expression to the query attach filter.
-
connect
(connection_str: Optional[str] = None, **kwargs)¶ Connect to data source.
Parameters: connection_str (Optional[str]) – Connect to a data source
-
connected
¶ Return true if at least one connection has been made.
Returns: True if a successful connection has been made. Return type: bool Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
-
driver_queries
¶ Return generator of Mordor query definitions.
Yields: Iterable[Dict[str, Any]] – Iterable of Dictionaries containing query definitions.
-
loaded
¶ Return true if the provider is loaded.
Returns: True if the provider is loaded. Return type: bool Notes
This is not relevant for some providers.
-
query
(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]¶ Execute query string and return DataFrame of results.
Parameters: - query (str) – The query to execute
- query_source (QuerySource) – The query definition object
Other Parameters: kwargs – Are passed to the underlying provider query method, if supported.
Returns: A DataFrame (if successfull) or the underlying provider result if an error.
Return type: Union[pd.DataFrame, Any]
-
query_attach_spec
¶ Parameters that determine whether a query is relevant for the driver.
-
query_with_results
(query: str, **kwargs) → Tuple[pandas.core.frame.DataFrame, Any]¶ Execute query string and return DataFrame plus native results.
Parameters: query (str) – The query to execute Returns: A DataFrame and native results. Return type: Tuple[pd.DataFrame,Any]
-
schema
¶ Return current data schema of connection.
Returns: Data schema of current connection. Return type: Dict[str, Dict]
-
search_queries
(search: str) → Iterable[str]¶ Search queries for matching attributes.
Parameters: search (str) – Search string. Substrings separated by commas will be treated as OR terms - e.g. “a, b” == “a” or “b”. Substrings separated by “+” will be treated as AND terms - e.g. “a + b” == “a” and “b” Returns: Iterable of matching query names. Return type: Iterable[str]
-
service_queries
¶ Return queries retrieved from the service after connecting.
Returns: Dictionary of query_name, query_text. Name of container to add queries to. Return type: Tuple[Dict[str, str], str]
-
-
class
msticpy.data.drivers.mordor_driver.
MordorEntry
(title: str, id: str, type: str, creation_date, modification_date, contributors: List[str] = NOTHING, author: Optional[str] = None, platform: Optional[str] = None, description: Optional[str] = None, tags: List[str] = NOTHING, files: List[Dict[str, Any]] = NOTHING, datasets: List[Dict[str, Any]] = NOTHING, attack_mappings: List[Dict[str, Any]] = NOTHING, notebooks: List[Dict[str, str]] = NOTHING, simulation: Dict[str, Any] = NOTHING, references: List[Any] = NOTHING, rel_file_paths: List[Dict[str, Any]] = NOTHING)¶ Bases:
object
Mordor data set metadata.
Method generated by attrs for class MordorEntry.
-
get_attacks
() → List[msticpy.data.drivers.mordor_driver.MitreAttack]¶ Return list of Mitre attack classifications.
Returns: List of MitreAttack definitions. Return type: List[MitreAttack]
-
get_file_paths
() → List[Dict[str, str]]¶ Return list of data file links.
Returns: list of dictionaries describing files. Each entry has key/values for: - file_type - file_path - relative_path - qry_path Return type: List[Dict[str, str]]
-
get_notebooks
() → List[Tuple[str, str, str]]¶ Return the list of notebooks for the dataset.
Returns: Tuples of (name, project, link) Return type: List[Tuple[str, str, str]]
-
-
msticpy.data.drivers.mordor_driver.
download_mdr_file
(file_uri: str, use_cached: bool = True, save_folder: str = '.', silent: bool = False) → pandas.core.frame.DataFrame¶ Download data file from Mordor.
Parameters: - file_uri (str) – The URI of the file to download.
- use_cached (bool, optional) – Try to use locally saved file first, by default True
- save_folder (str, optional) – Path to output folder, by default “.”
- silent (bool) – If True, suppress feedback. By default, False.
Returns: DataFrame of Dataset
Return type: pd.DataFrame
-
msticpy.data.drivers.mordor_driver.
get_mdr_data_paths
(item_type='metadata') → Generator[str, None, None]¶ Generate Mordor data sets from GitHub repo.
Parameters: item_type (str, optional) – The type of item required, by default “metadata” Other values are “large”, “small. Yields: str – Iterable of paths
-
msticpy.data.drivers.mordor_driver.
search_mdr_data
(mdr_data: Dict[str, msticpy.data.drivers.mordor_driver.MordorEntry], terms: str = None, subset: Iterable[str] = None) → Set[str]¶ Return IDs for items matching terms.
Parameters: - mdr_data (Dict[str, MordorEntry]) – Mordor dataset
- terms (str, optional) – Search terms, by default None (comma-separated values are treated as OR terms plus-separated values are treated as AND terms)
- subset (Iterable[str], optional) – A subset of IDs over which to search, by default None
Returns: The set of matching IDs.
Return type: Set[str]
msticpy.data.drivers.odata_driver module¶
OData Driver class.
-
class
msticpy.data.drivers.odata_driver.
OData
(**kwargs)¶ Bases:
msticpy.data.drivers.driver_base.DriverBase
Parent class to retreive date from an oauth based API.
Instantiate OData driver and optionally connect.
Parameters: connect (bool, optional) – Set true if you want to connect to the provider at initialization -
CONFIG_NAME
= ''¶
-
add_query_filter
(name, query_filter)¶ Add an expression to the query attach filter.
-
connect
(connection_str: str = None, **kwargs)¶ Connect to oauth data source.
Parameters: connection_str (str, optional) – Connect to a data source Notes
Connection string fields: tenant_id client_id client_secret apiRoot apiVersion
-
connected
¶ Return true if at least one connection has been made.
Returns: True if a successful connection has been made. Return type: bool Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
-
driver_queries
¶ Return queries retrieved from the service after connecting.
Returns: List of Dictionary of query_name, query_text. Name of container to add queries to. Return type: List[Dict[str, str]]
-
loaded
¶ Return true if the provider is loaded.
Returns: True if the provider is loaded. Return type: bool Notes
This is not relevant for some providers.
-
query
(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]¶ Execute query string and return DataFrame of results.
Parameters: - query (str) – The query to execute
- query_source (QuerySource) – The query definition object
Returns: A DataFrame (if successfull) or the underlying provider result if an error.
Return type: Union[pd.DataFrame, Any]
-
query_attach_spec
¶ Parameters that determine whether a query is relevant for the driver.
-
query_with_results
(query: str, **kwargs) → Tuple[pandas.core.frame.DataFrame, Any]¶ Execute query string and return DataFrame of results.
Parameters: query (str) – The kql query to execute Returns: A DataFrame (if successfull) and Kql ResultSet. Return type: Tuple[pd.DataFrame, results.ResultSet]
-
schema
¶ Return current data schema of connection.
Returns: Data schema of current connection. Return type: Dict[str, Dict]
-
service_queries
¶ Return queries retrieved from the service after connecting.
Returns: Dictionary of query_name, query_text. Name of container to add queries to. Return type: Tuple[Dict[str, str], str]
-
msticpy.data.drivers.security_graph_driver module¶
Security Graph OData Driver class.
-
class
msticpy.data.drivers.security_graph_driver.
SecurityGraphDriver
(connection_str: str = None, **kwargs)¶ Bases:
msticpy.data.drivers.odata_driver.OData
Driver to query security graph.
Instantiate MSGraph driver and optionally connect.
Parameters: connection_str (str, optional) – Connection string -
CONFIG_NAME
= 'MicrosoftGraph'¶
-
add_query_filter
(name, query_filter)¶ Add an expression to the query attach filter.
-
connect
(connection_str: str = None, **kwargs)¶ Connect to oauth data source.
Parameters: connection_str (str, optional) – Connect to a data source Notes
Connection string fields: tenant_id client_id client_secret apiRoot apiVersion
-
connected
¶ Return true if at least one connection has been made.
Returns: True if a successful connection has been made. Return type: bool Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
-
driver_queries
¶ Return queries retrieved from the service after connecting.
Returns: List of Dictionary of query_name, query_text. Name of container to add queries to. Return type: List[Dict[str, str]]
-
loaded
¶ Return true if the provider is loaded.
Returns: True if the provider is loaded. Return type: bool Notes
This is not relevant for some providers.
-
query
(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]¶ Execute query string and return DataFrame of results.
Parameters: - query (str) – The query to execute
- query_source (QuerySource) – The query definition object
Returns: A DataFrame (if successfull) or the underlying provider result if an error.
Return type: Union[pd.DataFrame, results.ResultSet]
-
query_attach_spec
¶ Parameters that determine whether a query is relevant for the driver.
-
query_with_results
(query: str, **kwargs) → Tuple[pandas.core.frame.DataFrame, Any]¶ Execute query string and return DataFrame of results.
Parameters: query (str) – The kql query to execute Returns: A DataFrame (if successfull) and Kql ResultSet. Return type: Tuple[pd.DataFrame, results.ResultSet]
-
schema
¶ Return current data schema of connection.
Returns: Data schema of current connection. Return type: Dict[str, Dict]
-
service_queries
¶ Return queries retrieved from the service after connecting.
Returns: Dictionary of query_name, query_text. Name of container to add queries to. Return type: Tuple[Dict[str, str], str]
-
msticpy.data.drivers.splunk_driver module¶
Splunk Driver class.
-
class
msticpy.data.drivers.splunk_driver.
SplunkDriver
(**kwargs)¶ Bases:
msticpy.data.drivers.driver_base.DriverBase
Driver to connect and query from Splunk.
Instantiate Splunk Driver.
-
add_query_filter
(name, query_filter)¶ Add an expression to the query attach filter.
-
connect
(connection_str: str = None, **kwargs)¶ Connect to Splunk via splunk-sdk.
Parameters: connection_str (Optional[str], optional) – Connection string with Splunk connection parameters Other Parameters: kwargs – Connection parameters can be supplied as keyword parameters. Notes
Default configuration is read from the DataProviders/Splunk section of msticpyconfig.yaml, if available.
-
connected
¶ Return true if at least one connection has been made.
Returns: True if a successful connection has been made. Return type: bool Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
-
driver_queries
¶ Return dynamic queries available on connection to service.
Returns: List of queries with properties: “name”, “query”, “container” and (optionally) “description” Return type: Iterable[Dict[str, Any]] Raises: MsticpyNotConnectedError
– If called before driver is connected.
-
loaded
¶ Return true if the provider is loaded.
Returns: True if the provider is loaded. Return type: bool Notes
This is not relevant for some providers.
-
query
(query: str, query_source: msticpy.data.query_source.QuerySource = None, **kwargs) → Union[pandas.core.frame.DataFrame, Any]¶ Execute splunk query and retrieve results via OneShot search mode.
Parameters: - query (str) – Splunk query to execute via OneShot search mode
- query_source (QuerySource) – The query definition object
Other Parameters: kwargs – Are passed to Splunk oneshot method count=0 by default
Returns: Query results in a dataframe. or query response if an error.
Return type: Union[pd.DataFrame, Any]
-
query_attach_spec
¶ Parameters that determine whether a query is relevant for the driver.
-
query_with_results
(query: str, **kwargs) → Tuple[pandas.core.frame.DataFrame, Any]¶ Execute query string and return DataFrame of results.
Parameters: query (str) – Query to execute against splunk instance. Returns: A DataFrame (if successful) or the underlying provider result if an error occurs. Return type: Union[pd.DataFrame,Any]
-
schema
¶ Return current data schema of connection.
Returns: Data schema of current connection. Return type: Dict[str, Dict]
-
service_queries
¶ Return dynamic queries available on connection to service.
Returns: Dictionary of query_name, query_text. Name of container to add queries to. Return type: Tuple[Dict[str, str], str]
-
msticpy.data.data_obfus¶
Data obfuscation functions.
-
class
msticpy.data.data_obfus.
ObfuscationAccessor
(pandas_obj)¶ Bases:
object
Base64 Unpack pandas extension.
Initialize the extension.
-
mask
(column_map: Mapping[str, Any] = None, use_default: bool = True) → pandas.core.frame.DataFrame¶ Obfuscate the data in columns of a pandas dataframe.
Parameters: - data (pd.DataFrame) – dataframe containing column to obfuscate
- column_map (Mapping[str, Any], optional) – Custom column mapping, by default None
- use_default (bool) – If True use the built-in map (adding any custom mappings to this dictionary)
Returns: Obfuscated dataframe
Return type: pd.DataFrame
-
-
msticpy.data.data_obfus.
check_masking
(data: pandas.core.frame.DataFrame, orig_data: pandas.core.frame.DataFrame, index: int = 0, silent=True) → Optional[Tuple[List[str], List[str]]]¶ Check the obfuscation results for a row.
Parameters: - data (pd.DataFrame) – Obfuscated DataFrame
- orig_data (pd.DataFrame) – Original DataFrame
- index (int, optional) – The row to check, by default 0
- silent (bool) – If False the function returns no output and returns lists of changed and unchanged columns. By default, True
Returns: If silent is True returns a tuple of unchanged, changed items. If False, returns None.
Return type: Optional[Tuple[List[str], List[str]]]
-
msticpy.data.data_obfus.
check_obfuscation
(data: pandas.core.frame.DataFrame, orig_data: pandas.core.frame.DataFrame, index: int = 0, silent=True) → Optional[Tuple[List[str], List[str]]]¶ Check the obfuscation results for a row.
Parameters: - data (pd.DataFrame) – Obfuscated DataFrame
- orig_data (pd.DataFrame) – Original DataFrame
- index (int, optional) – The row to check, by default 0
- silent (bool) – If False the function returns no output and returns lists of changed and unchanged columns. By default, True
Returns: If silent is True returns a tuple of unchanged, changed items. If False, returns None.
Return type: Optional[Tuple[List[str], List[str]]]
-
msticpy.data.data_obfus.
hash_account
¶ Hash an Account to something recognizable.
Parameters: account (str) – Account name (UPN, NT or simple name) Returns: Hashed Account Return type: str
-
msticpy.data.data_obfus.
hash_dict
(item_dict: Dict[str, Union[Dict[str, Any], List[Any], str]]) → Dict[str, Any]¶ Hash dictionary values.
Parameters: item_dict (Dict[str, Union[Dict[str, Any], List[Any], str]]) – Input item can be a Dict of strings, lists or other dictionaries. Returns: Dictionary with hashed values. Return type: Dict[str, Any]
-
msticpy.data.data_obfus.
hash_ip
(input_item: Union[List[str], str]) → Union[List[str], str]¶ Hash IP address or list of IP addresses.
Parameters: input_item (Union[List[str], str]) – List of IP addresses or single IP address. Returns: List of hashed addresses or single address. (depending on input) Return type: Union[List[str], str]
-
msticpy.data.data_obfus.
hash_item
¶ Hash a simple string.
Parameters: - input_item (str) – The input string
- delim (str, optional) – A string of delimiters to use to split the input string prior to hashing.
Returns: The obfuscated output string
Return type: str
-
msticpy.data.data_obfus.
hash_list
(item_list: List[str]) → List[str]¶ Hash list of strings.
Parameters: item_list (List[str]) – Input list Returns: Hashed list Return type: List[str]
-
msticpy.data.data_obfus.
hash_sid
¶ Hash a SID preserving well-known SIDs and the RID.
Parameters: sid (str) – SID string Returns: Hashed SID Return type: str
-
msticpy.data.data_obfus.
hash_string
(input_str: str) → str¶ Hash a simple string.
Parameters: input_str (str) – The input string Returns: The obfuscated output string Return type: str
-
msticpy.data.data_obfus.
mask_df
(data: pandas.core.frame.DataFrame, column_map: Mapping[str, Any] = None, use_default: bool = True, silent: bool = True) → pandas.core.frame.DataFrame¶ Obfuscate columns of a DataFrame.
Parameters: - data (pd.DataFrame) – Input dataframe
- column_map (Mapping[str, Any], optional) – Custom column mapping, by default None
- use_default (bool) – If True use the built-in map (adding any custom mappings to this dictionary)
- silent (bool) – If False the function returns progress output, by default True.
Returns: Obfuscated dataframe.
Return type: pd.DataFrame
-
msticpy.data.data_obfus.
obfuscate_df
(data: pandas.core.frame.DataFrame, column_map: Mapping[str, Any] = None, use_default: bool = True, silent: bool = True) → pandas.core.frame.DataFrame¶ Obfuscate columns of a DataFrame.
Parameters: - data (pd.DataFrame) – Input dataframe
- column_map (Mapping[str, Any], optional) – Custom column mapping, by default None
- use_default (bool) – If True use the built-in map (adding any custom mappings to this dictionary)
- silent (bool) – If False the function returns progress output, by default True.
Returns: Obfuscated dataframe.
Return type: pd.DataFrame
-
msticpy.data.data_obfus.
replace_guid
(guid: str) → str¶ Replace GUID/UUID with mapped random UUID.
Parameters: guid (str) – Input UUID. Returns: Mapped UUID Return type: str
msticpy.data.browsers.query_browser module¶
QueryProvider Query Browser.
-
msticpy.data.browsers.query_browser.
browse_queries
(query_provider: Any, **kwargs) → msticpy.nbtools.nbwidgets.SelectItem¶ Return QueryProvider query browser.
Parameters: query_provider (QueryProvider) – Initialized query provider. Other Parameters: kwargs – passed to SelectItem constuctor. Returns: SelectItem browser for TI Data. Return type: SelectItem
msticpy.data.browsers.mordor_browser module¶
Morder dataset browser.
-
class
msticpy.data.browsers.mordor_browser.
MordorBrowser
(save_folder: str = '.', use_cached: bool = True)¶ Bases:
object
Mordor browser widget.
Initialize MordorBrowser control.
Parameters: - save_folder (str, optional) – Folder to save downloaded files, by default “.”
- use_cached (bool, optional) – If true, downloaded files are not deleted after download and are used as a local cache, by default True
-
fields
¶ Return set of fields widget controls.
-
selected_dset
¶ Return the ID of the selected data set.