msticpy.data package
msticpy.nbtools.entityschema module
msticpy.nbtools.security_alert module
Module for SecurityAlert class.
- class msticpy.nbtools.security_alert.SecurityAlert(*args, **kwargs)
Bases:
msticpy.nbtools.security_base.SecurityBase
Security Alert Class.
Instantiates a security alert from a pandas Series and provides convenience access methods to retrieve properties.
Deprecated since version 1.7.0: Replaced by Alert entity in datamodel
Instantiate a security alert from a pandas Series.
- property computer: Optional[str]
Return the Computer name of the host associated with the alert.
(host FQDN, if available)
- property data_environment: msticpy.data.query_defns.DataEnvironment
Return the data environment of the alert for subsequent queries.
- property data_family: msticpy.datamodel.entities.entity_enums.OSFamily
Return the data family of the alert for subsequent queries.
- property entities: List[msticpy.datamodel.entities.entity.Entity]
Return a list of the Security Alert entities.
- get_all_entities() pandas.core.frame.DataFrame
Return a DataFrame of the Alert or Event entities.
- Returns
Pandas DataFrame of the Alert or Event entities.
- Return type
DataFrame
- get_entities_of_type(entity_type: str) List[msticpy.datamodel.entities.entity.Entity]
Return entity collection for a give entity type.
- Parameters
entity_type (str, optional) – The entity type.
- Returns
The entities matching entity_type.
- Return type
List[Entity]
- get_logon_id(account: Optional[msticpy.datamodel.entities.account.Account] = None) Optional[Union[str, int]]
Get the logon Id for the alert or the account, if supplied.
If account is not supplied, return the logon id of the first host-logon-session or account entity.
- Parameters
account (Account, optional) – Account objec to use (the default is None)
- Returns
The logon Id for primary account
- Return type
Optional[Union[str, int]]
- host_filter(operator='==')
Return a KQL host filter clause derived from the alert properties.
- param operator=’==’
the operator to use in the filter clause. ‘==’ and ‘!=’ typically.
- property hostname: str
Return the Hostname (not FQDN) of the host associated with the alert.
- property ids: Dict[str, str]
Return a collection of Identity properties for the alert.
- property is_in_azure_sub: bool
Return True if the alert originates from an Azure Security Center host.
- property is_in_log_analytics: bool
Return True if the alert originates from a Log Analytics Workspace host.
- property is_in_workspace: bool
Return True if the alert has a Log Analytics WorkspaceID.
- property origin_time: datetime.datetime
Return the datetime of event.
- property primary_account: Optional[Union[msticpy.datamodel.entities.process.Process, msticpy.datamodel.entities.entity.Entity]]
Return the primary account entity (if any) associated with this object.
- Returns
primary account entity (if any)
- Return type
Optional[Process]
- property primary_host: Optional[Union[msticpy.datamodel.entities.host.Host, msticpy.datamodel.entities.entity.Entity]]
Return the primary host entity (if any) associated with this object.
- Returns
primary host entity (if any)
- Return type
Optional[Host]
- property primary_process: Optional[Union[msticpy.datamodel.entities.process.Process, msticpy.datamodel.entities.entity.Entity]]
Return the primary process entity (if any) associated with this object.
- Returns
primary process entity (if any)
- Return type
Optional[Process]
- property properties: Dict[str, Any]
Return a dictionary of the Alert or Event properties.
- Returns
dictionary of the Alert or Event properties.
- Return type
Dict[str, Any]
- property query_params: Dict[str, Any]
Query parameters derived from alert.
- Returns
Dictionary of parameter names/value
- Return type
Dict[str, Any]
- subscription_filter(operator='==')
Return a KQL subscription filter clause derived from the alert properties.
- to_html(show_entities=False) str
Return the item as HTML string.
msticpy.nbtools.security_alert_graph module
security_alert_graph.
Creates an entity graph for the alert.
Add related alerts to the graph.
Link to the entity that is common to both alerts.
- msticpy.nbtools.security_alert_graph.create_alert_graph(alert: msticpy.nbtools.security_alert.SecurityAlert)
Create a networkx graph from the alert and contained entities.
msticpy.nbtools.security_base module
Module for SecurityAlert class.
- class msticpy.nbtools.security_base.SecurityBase(*args, **kwargs)
Bases:
msticpy.data.query_defns.QueryParamProvider
Security Base Class for alerts and events.
Instantiates a security event or alert from a pandas Series and provides convenience access methods to retrieve properties.
Deprecated since version 1.7.0: Replaced by Alert entity in datamodel
Instantiate a security alert from a pandas Series.
- property computer: Optional[str]
Return the Computer name of the host associated with the alert.
(host FQDN, if available)
- property data_environment: msticpy.data.query_defns.DataEnvironment
Return the data environment of the alert for subsequent queries.
- property data_family: msticpy.datamodel.entities.entity_enums.OSFamily
Return the data family of the alert for subsequent queries.
- property entities: List[msticpy.datamodel.entities.entity.Entity]
Return a list of the Alert or Event entities.
- Returns
List of the Alert or Event entities.
- Return type
List[Entity]
- get_all_entities() pandas.core.frame.DataFrame
Return a DataFrame of the Alert or Event entities.
- Returns
Pandas DataFrame of the Alert or Event entities.
- Return type
DataFrame
- get_entities_of_type(entity_type: str) List[msticpy.datamodel.entities.entity.Entity]
Return entity collection for a give entity type.
- Parameters
entity_type (str, optional) – The entity type.
- Returns
The entities matching entity_type.
- Return type
List[Entity]
- get_logon_id(account: Optional[msticpy.datamodel.entities.account.Account] = None) Optional[Union[str, int]]
Get the logon Id for the alert or the account, if supplied.
If account is not supplied, return the logon id of the first host-logon-session or account entity.
- Parameters
account (Account, optional) – Account objec to use (the default is None)
- Returns
The logon Id for primary account
- Return type
Optional[Union[str, int]]
- host_filter(operator='==')
Return a KQL host filter clause derived from the alert properties.
- param operator=’==’
the operator to use in the filter clause. ‘==’ and ‘!=’ typically.
- property hostname: str
Return the Hostname (not FQDN) of the host associated with the alert.
- property ids: Dict[str, str]
Return a collection of Identity properties for the alert.
- property is_in_azure_sub: bool
Return True if the alert originates from an Azure Security Center host.
- property is_in_log_analytics: bool
Return True if the alert originates from a Log Analytics Workspace host.
- property is_in_workspace: bool
Return True if the alert has a Log Analytics WorkspaceID.
- property origin_time: datetime.datetime
Return the datetime of event.
- property primary_account: Optional[Union[msticpy.datamodel.entities.process.Process, msticpy.datamodel.entities.entity.Entity]]
Return the primary account entity (if any) associated with this object.
- Returns
primary account entity (if any)
- Return type
Optional[Process]
- property primary_host: Optional[Union[msticpy.datamodel.entities.host.Host, msticpy.datamodel.entities.entity.Entity]]
Return the primary host entity (if any) associated with this object.
- Returns
primary host entity (if any)
- Return type
Optional[Host]
- property primary_process: Optional[Union[msticpy.datamodel.entities.process.Process, msticpy.datamodel.entities.entity.Entity]]
Return the primary process entity (if any) associated with this object.
- Returns
primary process entity (if any)
- Return type
Optional[Process]
- property properties: Dict[str, Any]
Return a dictionary of the Alert or Event properties.
- Returns
dictionary of the Alert or Event properties.
- Return type
Dict[str, Any]
- property query_params: Dict[str, Any]
Query parameters derived from alert.
- Returns
Dictionary of parameter names/values
- Return type
Dict[str, Any]
- subscription_filter(operator='==')
Return a KQL subscription filter clause derived from the alert properties.
- to_html(show_entities: bool = False) str
Return the item as HTML string.
msticpy.nbtools.security_event module
Module for SecurityEvent class.
- class msticpy.nbtools.security_event.SecurityEvent(*args, **kwargs)
Bases:
msticpy.nbtools.security_base.SecurityBase
SecurityEvent class.
Deprecated since version 1.7.0: Replaced by datamodel entitis
Instantiate new instance of SecurityEvent.
- param src_row
Pandas series containing single security event
- property computer: Optional[str]
Return the Computer name of the host associated with the alert.
(host FQDN, if available)
- property data_environment: msticpy.data.query_defns.DataEnvironment
Return the data environment of the alert for subsequent queries.
- property data_family: msticpy.datamodel.entities.entity_enums.OSFamily
Return the data family of the alert for subsequent queries.
- property entities: List[msticpy.datamodel.entities.entity.Entity]
Return the list of entities extracted from the event.
- Returns
The list of entities extracted from the event.
- Return type
List[Entity]
- get_all_entities() pandas.core.frame.DataFrame
Return a DataFrame of the Alert or Event entities.
- Returns
Pandas DataFrame of the Alert or Event entities.
- Return type
DataFrame
- get_entities_of_type(entity_type: str) List[msticpy.datamodel.entities.entity.Entity]
Return entity collection for a give entity type.
- Parameters
entity_type (str, optional) – The entity type.
- Returns
The entities matching entity_type.
- Return type
List[Entity]
- get_logon_id(account: Optional[msticpy.datamodel.entities.account.Account] = None) Optional[Union[str, int]]
Get the logon Id for the alert or the account, if supplied.
If account is not supplied, return the logon id of the first host-logon-session or account entity.
- Parameters
account (Account, optional) – Account objec to use (the default is None)
- Returns
The logon Id for primary account
- Return type
Optional[Union[str, int]]
- host_filter(operator='==')
Return a KQL host filter clause derived from the alert properties.
- param operator=’==’
the operator to use in the filter clause. ‘==’ and ‘!=’ typically.
- property hostname: str
Return the Hostname (not FQDN) of the host associated with the alert.
- property ids: Dict[str, str]
Return a collection of Identity properties for the alert.
- property is_in_azure_sub: bool
Return True if the alert originates from an Azure Security Center host.
- property is_in_log_analytics: bool
Return True if the alert originates from a Log Analytics Workspace host.
- property is_in_workspace: bool
Return True if the alert has a Log Analytics WorkspaceID.
- property origin_time: datetime.datetime
Return the datetime of event.
- property primary_account: Optional[Union[msticpy.datamodel.entities.process.Process, msticpy.datamodel.entities.entity.Entity]]
Return the primary account entity (if any) associated with this object.
- Returns
primary account entity (if any)
- Return type
Optional[Process]
- property primary_host: Optional[Union[msticpy.datamodel.entities.host.Host, msticpy.datamodel.entities.entity.Entity]]
Return the primary host entity (if any) associated with this object.
- Returns
primary host entity (if any)
- Return type
Optional[Host]
- property primary_process: Optional[Union[msticpy.datamodel.entities.process.Process, msticpy.datamodel.entities.entity.Entity]]
Return the primary process entity (if any) associated with this object.
- Returns
primary process entity (if any)
- Return type
Optional[Process]
- property properties: Dict[str, Any]
Return a dictionary of the Alert or Event properties.
- Returns
dictionary of the Alert or Event properties.
- Return type
Dict[str, Any]
- property query_params: Dict[str, Any]
Query parameters derived from alert.
- Returns
Dictionary of parameter names
- Return type
Dict[str, Any]
- subscription_filter(operator='==')
Return a KQL subscription filter clause derived from the alert properties.
- to_html(show_entities: bool = False) str
Return the item as HTML string.
msticpy.data.data_providers module
Data provider loader.
- class msticpy.data.data_providers.QueryProvider(data_environment: Union[str, msticpy.data.query_defns.DataEnvironment], driver: Optional[msticpy.data.drivers.driver_base.DriverBase] = None, query_paths: Optional[List[str]] = None, **kwargs)
Bases:
object
Container for query store and query execution provider.
Instances of this class hold the query set and execution methods for a specific data environment.
Query provider interface to queries.
- Parameters
data_environment (Union[str, DataEnvironment]) – Name or Enum of environment for the QueryProvider
driver (DriverBase, optional) – Override the builtin driver (query execution class) and use your own driver (must inherit from DriverBase)
query_paths (List[str]) – Additional paths to look for query definitions.
kwargs – Other arguments are passed to the data provider driver.
See also
DataProviderBase
base class for data query providers.
- add_connection(connection_str: Optional[str] = None, alias: Optional[str] = None, **kwargs)
Add an additional connection for the query provider.
- Parameters
connection_str (Optional[str], optional) – Connection string for the provider, by default None
alias (Optional[str], optional) – Alias to use for the connection, by default None
kwargs (Dict[str, Any]) – Other parameters passed to the driver constructor.
Notes
Some drivers may accept types other than strings for the connection_str parameter.
- browse(**kwargs)
Return QueryProvider query browser.
- Parameters
kwargs – passed to SelectItem constructor.
- Returns
SelectItem browser for TI Data.
- Return type
SelectItem
- browse_queries(**kwargs)
Return QueryProvider query browser.
- Parameters
kwargs – passed to SelectItem constructor.
- Returns
SelectItem browser for TI Data.
- Return type
SelectItem
- connect(connection_str: Optional[str] = None, **kwargs)
Connect to data source.
- Parameters
connection_str (str) – Connection string for the data source
- property connected: bool
Return True if the provider is connected.
- Returns
True if the provider is connected.
- Return type
bool
- property connection_string: str
Return provider connection string.
- Returns
Provider connection string.
- Return type
str
- exec_query(query: str, **kwargs) Union[pandas.core.frame.DataFrame, Any]
Execute simple query string.
- Parameters
query (str) – [description]
use_connections (Union[str, List[str]]) –
query_options (Dict[str, Any]) – Additional options passed to query driver.
kwargs (Dict[str, Any]) – Additional options passed to query driver.
- Returns
Query results - a DataFrame if successful or a KqlResult if unsuccessful.
- Return type
Union[pd.DataFrame, Any]
- get_query(query_name: str) str
Return the raw query text for query_name.
- Parameters
query_name (str) – The name of the query.
- import_query_file(query_file: str)
Import a yaml data source definition.
- Parameters
query_file (str) – Path to the file to import
- list_connections() List[str]
Return a list of current connections or the default connection.
- Returns
The alias and connection string for each connection.
- Return type
List[str]
- classmethod list_data_environments() List[str]
Return list of current data environments.
- Returns
List of current data environments
- Return type
List[str]
- list_queries(substring: Optional[str] = None) List[str]
Return list of family.query in the store.
- Parameters
substring (Optional[str]) – Optional pattern - will return only queries matching the pattern, default None.
- Returns
List of queries
- Return type
List[str]
- query_help(query_name: str)
Print help for query_name.
- Parameters
query_name (str) – The name of the query.
- property query_time
Return the default QueryTime control for queries.
- property schema: Dict[str, Dict]
Return current data schema of connection.
- Returns
Data schema of current connection.
- Return type
Dict[str, Dict]
- property schema_tables: List[str]
Return list of tables in the data schema of the connection.
- Returns
Tables in the of current connection.
- Return type
List[str]
msticpy.data.data_query_reader module
Data query definition reader.
- msticpy.data.data_query_reader.find_yaml_files(source_path: str, recursive: bool = False) Iterable[pathlib.Path]
Return iterable of yaml files found in source_path.
- Parameters
source_path (str) – The source path to search in.
recursive (bool, optional) – Whether to recurse through subfolders. By default False
- Returns
File paths of yanl files found.
- Return type
Iterable[str]
- msticpy.data.data_query_reader.read_query_def_file(query_file: str) Tuple[Dict, Dict, Dict]
Read a yaml data query definition file.
- Parameters
query_file (str) – Path to yaml query defintion file
- Returns
Tuple of dictionaries. sources - dictionary of query definitions defaults - the default parameters from the file metadata - the global metadata from the file
- Return type
Tuple[Dict, Dict, Dict]
- msticpy.data.data_query_reader.validate_query_defs(query_def_dict: Dict[str, Any]) bool
Validate content of query definition.
- Parameters
query_def_dict (dict) – Dictionary of query definition yaml file contents.
- Returns
True if validation succeeds.
- Return type
bool
- Raises
ValueError – The validation failure reason is returned in the exception message (arg[0])
msticpy.data.param_extractor module
Parameter extractor helper functions for use with IPython/Juptyer queries.
- msticpy.data.param_extractor.extract_query_params(query_source: msticpy.data.query_source.QuerySource, *args, **kwargs) Tuple[Dict[str, Any], List[str]]
Get the parameters needed for the query.
- Parameters
query_source (QuerySource) – Query source
args (Tuple[QueryParamProvider]) – objects that implement QueryParamProvider (from which query parameters can be extracted).
kwargs (Dict[str, Any]) – custom parameter list to populate queries (override default values and values extracted from QueryParamProviders).
- Returns
Dictionary of parameter names and values to be used in the query. List of any missing parameters
- Return type
Tuple[Dict[str, Any], List[str]]
msticpy.data.query_container module
Query hierarchy attribute class.
- class msticpy.data.query_container.QueryContainer
Bases:
object
Empty class used to create hierarchical attributes.
msticpy.data.query_defns module
Query helper definitions.
- class msticpy.data.query_defns.DataEnvironment(value)
Bases:
enum.Enum
Enumeration of data environments.
Used to identify which queries are relevant for which data sources.
- AzureSecurityCenter = 3
- AzureSentinel = 1
- Cybereason = 12
- Kusto = 2
- LocalData = 6
- LogAnalytics = 1
- M365D = 11
- MDATP = 5
- MDE = 5
- MSSentinel = 1
- Mordor = 8
- ResourceGraph = 9
- SecurityGraph = 4
- Splunk = 7
- Sumologic = 10
- Unknown = 0
- classmethod parse(value: Union[str, int]) msticpy.data.query_defns.DataEnvironment
Convert string or int to enum.
- Parameters
value (Union[str, int]) – value to parse
- class msticpy.data.query_defns.DataFamily(value)
Bases:
enum.Enum
Enumeration of data families.
Used to identify which queries are relevant for which data sources.
- AzureNetwork = 6
- Cybereason = 11
- LinuxSecurity = 2
- LinuxSyslog = 5
- MDATP = 7
- ResourceGraph = 9
- SecurityAlert = 3
- SecurityGraphAlert = 4
- Splunk = 8
- Sumologic = 10
- Unknown = 0
- WindowsSecurity = 1
- classmethod parse(value: Union[str, int]) msticpy.data.query_defns.DataFamily
Convert string or int to enum.
- Parameters
value (Union[str, int]) – value to parse
- class msticpy.data.query_defns.QueryParamProvider
Bases:
abc.ABC
Abstract type for QueryParamProvider.
Method query_params must be overridden by derived classes.
- abstract property query_params
Return dict of query parameters.
These parameters are sourced in the object implementing this method.
- Returns
- Return type
dict – dictionary of query parameter values.
- msticpy.data.query_defns.ensure_df_datetimes(data: pandas.core.frame.DataFrame, columns: Optional[Union[str, List[str]]] = None, add_utc_tz: bool = True) pandas.core.frame.DataFrame
Return dataframe with converted TZ-aware timestamps.
- Parameters
data (pd.DataFrame) – Input dataframe
columns (Union[str, List[str], None], optional) – column (str) or list of columns to convert, by default None. If this parameter is not supplied then any column containing the substring “time” is used as a candidate for conversion.
add_utc_tz (bool, optional) – If True any datetime columns in the columns parameter ( (or default ‘.*time.*’ columns) that are timezone-naive, will be converted to Timezone-aware timestamps marked as UTC.
- Returns
Converted DataFrame.
- Return type
pd.DataFrame
msticpy.data.query_source module
Intake kql driver.
- class msticpy.data.query_source.QuerySource(name: str, source: Dict[str, Any], defaults: Dict[str, Any], metadata: Dict[str, Any])
Bases:
object
Query definition class for templated queries.
- name
The query name
- Type
str
- metadata
The consolidated metadata for the query
- Type
Dict[str, Any]
- params
The dictionary of parameter definitions for the query.
- Type
dict[str, Any]
- query_store
The query store object that the query belongs to
- Type
Initialize query source definition.
- Parameters
name (str) – The query name
source (dict) – The data source definition settings
defaults (dict) – The default settings (if source-specific setting not supplied)
metadata (dict) – The global metadata from the source file.
Notes
A data source can belong to multiple families (e.g. a query that joins data from several sources)
- create_doc_string() str
Return a doc string for the query.
- Returns
New-line delimited docstring dynamically created from query definition properties.
- Return type
str
- create_query(formatters: Optional[Dict[str, Callable]] = None, **kwargs) str
Return query with values from kwargs and defaults substituted.
- Parameters
formatters (Dict[str, Callable]) – Dictionary of custom parameter formatters indexed by data type
kwargs (Mapping[str, Any]) – Set of parameter name, value pairs used to populate the template query.
- Returns
The populated query
- Return type
str
- Raises
ValueError – If one or more parameters with no default values are not supplied.
Notes
Parameters supplied as arguments will override any parameter defaults (see default_params property).
- property data_families: List[str]
Return the list of data families used by the query.
- Returns
The list of data families. A data family is usually equivalent to a table or entity set.
- Return type
List[str]
- property default_params: Dict[str, dict]
Return the set of parameters with default values.
- Returns
List of parameters
- Return type
Iterable[dict]
- property description: str
Return description of the query.
- Returns
Query description.
- Return type
str
- help()
Print help for query.
- property query: str
Return the query template.
- Returns
The template query.
- Return type
str
- property required_params: Dict[str, dict]
Return the set of parameters with no default values.
- Returns
List of parameters
- Return type
Iterable[dict]
- resolve_param_aliases(param_dict: Dict[str, Any]) Dict[str, Any]
Try to resolve any parameters in param_dict that are aliases.
- validate() Tuple[bool, List[str]]
Validate the source to ensure that all required properties are present.
- Returns
True if validation is successful.
- Return type
bool
msticpy.data.query_store module
QueryStore class - holds a collection of QuerySources.
- class msticpy.data.query_store.QueryStore(environment: str)
Bases:
object
Repository for query definitions for a data environment.
- environment
The data environment for the queries.
- Type
str
- data_families
The set of data families and associated queries for each.
- Type
Dict[str, Dict[str, QuerySource]]
Intialize a QueryStore for a new environment.
- Parameters
environment (str) – The data environment
- add_data_source(source: msticpy.data.query_source.QuerySource)
Add a datasource/query to the store.
- Parameters
source (QuerySource) – The source to add. An existing item with the same name will be overwritten
- add_query(name: str, query: str, query_paths: Union[str, List[str]], description: Optional[str] = None)
Add a query from name/query text.
- Parameters
name (str) – name of the query
query (str) – The query string
query_paths (Union[str, List[str]]) – The path/data_family to categorize. Multiple paths can be specified. If the path is dotted, this will cause the query to be displayed in the corresponding hierarchy.
description (str, optional) – Query description
- find_query(query_name: str) Set[Optional[msticpy.data.query_source.QuerySource]]
Return set of queries with name query_name.
- Parameters
query_name (str) – Name of the query
- Returns
Set (distinct) queries matching name.
- Return type
Set[QuerySource]
- get_query(query_name: str, query_path: Optional[Union[str, msticpy.data.query_defns.DataFamily]] = None) msticpy.data.query_source.QuerySource
Return query with name data_family and query_name.
- Parameters
query_name (str) – Name of the query
query_path (Union[str, DataFamily]) – The data family for the query
- Returns
Query matching name and family.
- Return type
- import_file(query_file: str)
Import a yaml data source definition.
- Parameters
query_file (str) – Path to the file to import
- Raises
ImportError – File read error or Syntax or semantic error found in the source file.
- classmethod import_files(source_path: list, recursive: bool = False, driver_query_filter: Optional[Dict[str, Set[str]]] = None) Dict[str, msticpy.data.query_store.QueryStore]
Import multiple query definition files from directory path.
- Parameters
source_path (str) – The folder containing the yaml definition files.
recursive (bool, optional) – True to recurse sub-directories (the default is False, which only reads from the top level)
driver_query_filter (Dict[str, Set[str]]) – A dictionary of query metadata keys and values. This is used to test each read query to see if it is relevant to the driver and should be returned in the created QueryStore dictionary.
- Returns
Dictionary of one or more environments and the QueryStore containing the queries for each environment.
- Return type
Dict[str, ‘QueryStore’]
- Raises
FileNotFoundError – File read error or Syntax or semantic error found in a source file.
- property query_names: Iterable[str]
Return list of family.query in the store.
- Returns
List of queries
- Return type
Iterable[str]
msticpy.data.azure_data module
Deprecated path for data.azure.
msticpy.data.azure_sentinel module
Deprecated path for data.azure.
msticpy.data.azure_blob_storage module
Deprecated path for data.azure.
msticpy.data.sql_to_kql module
Module for SQL to KQL Conversion.
This is an experiment conversion utility built to support a limited subset of ANSI SQL. It relies on moz_sql_parser (https://github.com/mozilla/moz-sql-parser) to parse the SQL syntax tree. Some hacky additions have been done to allow table renaming and support for a few SparkSQL operators such as RLIKE.
For a more complete translation help with SQL to KQL see https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/sqlcheatsheet
Known limitations
Does not support aggregate functions in SELECT with no GROUP BY clause
Does not support IN, EXISTS, HAVING operators
Only partial support for AS naming (should work in SELECT expressions)
- msticpy.data.sql_to_kql.sql_to_kql(sql: str, target_tables: Optional[Dict[str, str]] = None) str
Parse SQL and return KQL equivalent.
msticpy.data.drivers.driver_base module
Data driver base class.
- class msticpy.data.drivers.driver_base.DriverBase(**kwargs)
Bases:
abc.ABC
Base class for data providers.
Initialize new instance.
- add_query_filter(name, query_filter)
Add an expression to the query attach filter.
- abstract connect(connection_str: Optional[str] = None, **kwargs)
Connect to data source.
- Parameters
connection_str (Optional[str]) – Connect to a data source
- property connected: bool
Return true if at least one connection has been made.
- Returns
True if a successful connection has been made.
- Return type
bool
Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
- property driver_queries: Iterable[Dict[str, Any]]
Return queries retrieved from the service after connecting.
- Returns
List of Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
List[Dict[str, str]]
- static get_http_timeout(**kwargs)
Get http timeout from settings or kwargs.
- property loaded: bool
Return true if the provider is loaded.
- Returns
True if the provider is loaded.
- Return type
bool
Notes
This is not relevant for some providers.
- abstract query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]
Execute query string and return DataFrame of results.
- Parameters
query (str) – The query to execute
query_source (QuerySource) – The query definition object
kwargs – Are passed to the underlying provider query method, if supported.
- Returns
A DataFrame (if successfull) or the underlying provider result if an error.
- Return type
Union[pd.DataFrame, Any]
- property query_attach_spec: Dict[str, Set[str]]
Parameters that determine whether a query is relevant for the driver.
- abstract query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]
Execute query string and return DataFrame plus native results.
- Parameters
query (str) – The query to execute
- Returns
A DataFrame and native results.
- Return type
Tuple[pd.DataFrame,Any]
- property schema: Dict[str, Dict]
Return current data schema of connection.
- Returns
Data schema of current connection.
- Return type
Dict[str, Dict]
- property service_queries: Tuple[Dict[str, str], str]
Return queries retrieved from the service after connecting.
- Returns
Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
Tuple[Dict[str, str], str]
msticpy.data.drivers.kql_driver module
KQL Driver class.
- class msticpy.data.drivers.kql_driver.KqlDriver(connection_str: Optional[str] = None, **kwargs)
Bases:
msticpy.data.drivers.driver_base.DriverBase
KqlDriver class to execute kql queries.
Instantiate KqlDriver and optionally connect.
- Parameters
connection_str (str, optional) – Connection string
debug (bool) – print out additional diagnostic information.
- add_query_filter(name, query_filter)
Add an expression to the query attach filter.
- connect(connection_str: Optional[str] = None, **kwargs)
Connect to data source.
- Parameters
connection_str (str) – Connect to a data source
kqlmagic_args (str, optional) – Additional string of parameters to be passed to KqlMagic
mp_az_auth (Union[bool, str, list, None], optional) – Optional parameter directing KqlMagic to use MSTICPy Azure authentication. Values can be: True or “default”: use the settings in msticpyconfig.yaml ‘Azure’ section str: single auth method name (‘msi’, ‘cli’, ‘env’, ‘vscode’, ‘powershell’, ‘cache’ or ‘interactive’) List[str]: list of acceptable auth methods from (‘msi’, ‘cli’, ‘env’, ‘vscode’, ‘powershell’, ‘cache’ or ‘interactive’)
mp_az_tenant_id (str, optional) – Optional parameter specifying a Tenant ID for use by MSTICPy Azure authentication.
- property connected: bool
Return true if at least one connection has been made.
- Returns
True if a successful connection has been made.
- Return type
bool
Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
- property driver_queries: Iterable[Dict[str, Any]]
Return queries retrieved from the service after connecting.
- Returns
List of Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
List[Dict[str, str]]
- static get_http_timeout(**kwargs)
Get http timeout from settings or kwargs.
- property loaded: bool
Return true if the provider is loaded.
- Returns
True if the provider is loaded.
- Return type
bool
Notes
This is not relevant for some providers.
- query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]
Execute query string and return DataFrame of results.
- Parameters
query (str) – The query to execute
query_source (QuerySource) – The query definition object
- Returns
A DataFrame (if successfull) or the underlying provider result if an error.
- Return type
Union[pd.DataFrame, results.ResultSet]
- property query_attach_spec: Dict[str, Set[str]]
Parameters that determine whether a query is relevant for the driver.
- query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Kqlmagic.kql_proxy.KqlResponse]
Execute query string and return DataFrame of results.
- Parameters
query (str) – The kql query to execute
- Returns
A DataFrame (if successfull) and Kql ResultSet.
- Return type
Tuple[pd.DataFrame, results.ResultSet]
- property schema: Dict[str, Dict]
Return current data schema of connection.
- Returns
Data schema of current connection.
- Return type
Dict[str, Dict]
- property service_queries: Tuple[Dict[str, str], str]
Return queries retrieved from the service after connecting.
- Returns
Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
Tuple[Dict[str, str], str]
msticpy.data.drivers.local_data_driver module
Local Data Driver class - for testing and demos.
- class msticpy.data.drivers.local_data_driver.LocalDataDriver(connection_str: Optional[str] = None, **kwargs)
Bases:
msticpy.data.drivers.driver_base.DriverBase
LocalDataDriver class to execute kql queries.
Instantiate LocalDataDriver and optionally connect.
- Parameters
connection_str (str, optional) – Connection string (not used)
data_paths (List[str], optional) – Paths from which to load data files
- add_query_filter(name, query_filter)
Add an expression to the query attach filter.
- connect(connection_str: Optional[str] = None, **kwargs)
Connect to data source.
- Parameters
connection_str (str) – Connect to a data source
- property connected: bool
Return true if at least one connection has been made.
- Returns
True if a successful connection has been made.
- Return type
bool
Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
- property driver_queries: Iterable[Dict[str, Any]]
Return queries retrieved from the service after connecting.
- Returns
List of Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
List[Dict[str, str]]
- static get_http_timeout(**kwargs)
Get http timeout from settings or kwargs.
- property loaded: bool
Return true if the provider is loaded.
- Returns
True if the provider is loaded.
- Return type
bool
Notes
This is not relevant for some providers.
- query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]
Execute query string and return DataFrame of results.
- Parameters
query (str) – The query to execute
query_source (QuerySource) – The query definition object
- Returns
A DataFrame (if successfull) or the underlying provider result if an error.
- Return type
Union[pd.DataFrame, results.ResultSet]
- property query_attach_spec: Dict[str, Set[str]]
Parameters that determine whether a query is relevant for the driver.
- query_with_results(query, **kwargs)
Return query with fake results.
- property schema: Dict[str, Dict]
Return current data schema of connection.
- Returns
Data schema of current connection.
- Return type
Dict[str, Dict]
- property service_queries: Tuple[Dict[str, str], str]
Return queries retrieved from the service after connecting.
- Returns
Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
Tuple[Dict[str, str], str]
msticpy.data.drivers.mdatp_driver module
MDATP OData Driver class.
- class msticpy.data.drivers.mdatp_driver.MDATPDriver(connection_str: Optional[str] = None, **kwargs)
Bases:
msticpy.data.drivers.odata_driver.OData
KqlDriver class to retreive date from MS Defender APIs.
Instantiate MSDefenderDriver and optionally connect.
- Parameters
connection_str (str, optional) – Connection string
- CONFIG_NAME = 'MicrosoftDefender'
- add_query_filter(name, query_filter)
Add an expression to the query attach filter.
- connect(connection_str: Optional[str] = None, **kwargs)
Connect to oauth data source.
- Parameters
connection_str (Optional[str], optional) – Connect to a data source
instance (Optional[str], optional) – Optional name of configuration instance - this is added as a prefix to the driver configuration key name when searching for configuration in the msticpyconfig.yaml
Notes
Connection string fields: tenant_id client_id client_secret apiRoot apiVersion
- property connected: bool
Return true if at least one connection has been made.
- Returns
True if a successful connection has been made.
- Return type
bool
Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
- property driver_queries: Iterable[Dict[str, Any]]
Return queries retrieved from the service after connecting.
- Returns
List of Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
List[Dict[str, str]]
- static get_http_timeout(**kwargs)
Get http timeout from settings or kwargs.
- property loaded: bool
Return true if the provider is loaded.
- Returns
True if the provider is loaded.
- Return type
bool
Notes
This is not relevant for some providers.
- query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]
Execute query string and return DataFrame of results.
- Parameters
query (str) – The query to execute
query_source (QuerySource) – The query definition object
- Returns
A DataFrame (if successfull) or the underlying provider result if an error.
- Return type
Union[pd.DataFrame, results.ResultSet]
- property query_attach_spec: Dict[str, Set[str]]
Parameters that determine whether a query is relevant for the driver.
- query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]
Execute query string and return DataFrame of results.
- Parameters
query (str) – The kql query to execute
- Returns
A DataFrame (if successfull) and Kql ResultSet.
- Return type
Tuple[pd.DataFrame, results.ResultSet]
- property schema: Dict[str, Dict]
Return current data schema of connection.
- Returns
Data schema of current connection.
- Return type
Dict[str, Dict]
- property service_queries: Tuple[Dict[str, str], str]
Return queries retrieved from the service after connecting.
- Returns
Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
Tuple[Dict[str, str], str]
msticpy.data.drivers.mordor_driver module
Mordor/OTRF Security datasets driver.
- class msticpy.data.drivers.mordor_driver.MitreAttack(attack: Optional[Dict[str, Any]] = None, technique: Optional[str] = None, sub_technique: Optional[str] = None, tactics: Optional[List[str]] = None)
Bases:
object
MitreAttack container for techniques and tactics.
Create instance of MitreAttack.
- Parameters
attack (Dict[str, Any], optional) – attack data as dictionary, by default None
technique (str, optional) – technique ID, by default None
sub_technique (str, optional) – sub-technique ID, by default None
tactics (List[str], optional) – List of associated tactics, by default None
- MTR_TAC_URI = 'https://attack.mitre.org/tactics/{tactic_id}/'
- MTR_TECH_URI = 'https://attack.mitre.org/techniques/{technique_id}/'
- property tactics_full: List[Tuple[str, str, str, str]]
Return full listing of Mitre tactics.
- Returns
List of tuples of: (ID, Name, Description, URI)
- Return type
List[Tuple[str, str, str, str]]
- property technique_desc: Optional[str]
Return Mitre technique description.
- Returns
Technique description
- Return type
Optional[str]
- property technique_name: Optional[str]
Return Mitre Technique full name.
- Returns
Name of the Mitre technique
- Return type
Optional[str]
- property technique_uri: str
Return Mitre Technique URI.
- Returns
URI of the Mitre technique
- Return type
Optional[str]
- class msticpy.data.drivers.mordor_driver.MordorDriver(**kwargs)
Bases:
msticpy.data.drivers.driver_base.DriverBase
Mordor data driver.
Initialize the Mordor driver.
- add_query_filter(name, query_filter)
Add an expression to the query attach filter.
- connect(connection_str: Optional[str] = None, **kwargs)
Connect to data source.
- Parameters
connection_str (Optional[str]) – Connect to a data source
- property connected: bool
Return true if at least one connection has been made.
- Returns
True if a successful connection has been made.
- Return type
bool
Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
- property driver_queries: Iterable[Dict[str, Any]]
Return generator of Mordor query definitions.
- Yields
Iterable[Dict[str, Any]] – Iterable of Dictionaries containing query definitions.
- static get_http_timeout(**kwargs)
Get http timeout from settings or kwargs.
- property loaded: bool
Return true if the provider is loaded.
- Returns
True if the provider is loaded.
- Return type
bool
Notes
This is not relevant for some providers.
- query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]
Execute query string and return DataFrame of results.
- Parameters
query (str) – The query to execute
query_source (QuerySource) – The query definition object
kwargs – Are passed to the underlying provider query method, if supported.
- Returns
A DataFrame (if successfull) or the underlying provider result if an error.
- Return type
Union[pd.DataFrame, Any]
- property query_attach_spec: Dict[str, Set[str]]
Parameters that determine whether a query is relevant for the driver.
- query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]
Execute query string and return DataFrame plus native results.
- Parameters
query (str) – The query to execute
- Returns
A DataFrame and native results.
- Return type
Tuple[pd.DataFrame,Any]
- property schema: Dict[str, Dict]
Return current data schema of connection.
- Returns
Data schema of current connection.
- Return type
Dict[str, Dict]
- search_queries(search: str) Iterable[str]
Search queries for matching attributes.
- Parameters
search (str) – Search string. Substrings separated by commas will be treated as OR terms - e.g. “a, b” == “a” or “b”. Substrings separated by “+” will be treated as AND terms - e.g. “a + b” == “a” and “b”
- Returns
Iterable of matching query names.
- Return type
Iterable[str]
- property service_queries: Tuple[Dict[str, str], str]
Return queries retrieved from the service after connecting.
- Returns
Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
Tuple[Dict[str, str], str]
- class msticpy.data.drivers.mordor_driver.MordorEntry(title: str, id: str, type: str, creation_date, modification_date, contributors: List[str] = NOTHING, author: Optional[str] = None, platform: Optional[str] = None, description: Optional[str] = None, tags: List[str] = NOTHING, files: List[Dict[str, Any]] = NOTHING, datasets: List[Dict[str, Any]] = NOTHING, attack_mappings: List[Dict[str, Any]] = NOTHING, notebooks: List[Dict[str, str]] = NOTHING, simulation: Dict[str, Any] = NOTHING, references: List[Any] = NOTHING, rel_file_paths: List[Dict[str, Any]] = NOTHING)
Bases:
object
Mordor data set metadata.
Method generated by attrs for class MordorEntry.
- attack_mappings: List[Dict[str, Any]]
- author: Optional[str]
- contributors: List[str]
- creation_date: datetime.datetime
- datasets: List[Dict[str, Any]]
- description: Optional[str]
- files: List[Dict[str, Any]]
- get_attacks() List[msticpy.data.drivers.mordor_driver.MitreAttack]
Return list of Mitre attack classifications.
- Returns
List of MitreAttack definitions.
- Return type
List[MitreAttack]
- get_file_paths() List[Dict[str, str]]
Return list of data file links.
- Returns
list of dictionaries describing files. Each entry has key/values for: - file_type - file_path - relative_path - qry_path
- Return type
List[Dict[str, str]]
- get_notebooks() List[Tuple[str, str, str]]
Return the list of notebooks for the dataset.
- Returns
Tuples of (name, project, link)
- Return type
List[Tuple[str, str, str]]
- id: str
- modification_date: datetime.datetime
- notebooks: List[Dict[str, str]]
- platform: Optional[str]
- references: List[Any]
- simulation: Dict[str, Any]
- tags: List[str]
- title: str
- type: str
- msticpy.data.drivers.mordor_driver.download_mdr_file(file_uri: str, use_cached: bool = True, save_folder: str = '.', silent: bool = False) pandas.core.frame.DataFrame
Download data file from Mordor.
- Parameters
file_uri (str) – The URI of the file to download.
use_cached (bool, optional) – Try to use locally saved file first, by default True
save_folder (str, optional) – Path to output folder, by default “.”
silent (bool) – If True, suppress feedback. By default, False.
- Returns
DataFrame of Dataset
- Return type
pd.DataFrame
- msticpy.data.drivers.mordor_driver.get_mdr_data_paths(item_type='metadata') Generator[str, None, None]
Generate Mordor data sets from GitHub repo.
- Parameters
item_type (str, optional) – The type of item required, by default “metadata” Other values are “large”, “small.
- Yields
str – Iterable of paths
- msticpy.data.drivers.mordor_driver.search_mdr_data(mdr_data: Dict[str, msticpy.data.drivers.mordor_driver.MordorEntry], terms: Optional[str] = None, subset: Optional[Iterable[str]] = None) Set[str]
Return IDs for items matching terms.
- Parameters
mdr_data (Dict[str, MordorEntry]) – Mordor dataset
terms (str, optional) – Search terms, by default None (comma-separated values are treated as OR terms plus-separated values are treated as AND terms)
subset (Iterable[str], optional) – A subset of IDs over which to search, by default None
- Returns
The set of matching IDs.
- Return type
Set[str]
msticpy.data.drivers.odata_driver module
OData Driver class.
- class msticpy.data.drivers.odata_driver.OData(**kwargs)
Bases:
msticpy.data.drivers.driver_base.DriverBase
Parent class to retreive date from an oauth based API.
Instantiate OData driver and optionally connect.
- Parameters
connect (bool, optional) – Set true if you want to connect to the provider at initialization
- CONFIG_NAME = ''
- add_query_filter(name, query_filter)
Add an expression to the query attach filter.
- connect(connection_str: Optional[str] = None, **kwargs)
Connect to oauth data source.
- Parameters
connection_str (Optional[str], optional) – Connect to a data source
instance (Optional[str], optional) – Optional name of configuration instance - this is added as a prefix to the driver configuration key name when searching for configuration in the msticpyconfig.yaml
Notes
Connection string fields: tenant_id client_id client_secret apiRoot apiVersion
- property connected: bool
Return true if at least one connection has been made.
- Returns
True if a successful connection has been made.
- Return type
bool
Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
- property driver_queries: Iterable[Dict[str, Any]]
Return queries retrieved from the service after connecting.
- Returns
List of Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
List[Dict[str, str]]
- static get_http_timeout(**kwargs)
Get http timeout from settings or kwargs.
- property loaded: bool
Return true if the provider is loaded.
- Returns
True if the provider is loaded.
- Return type
bool
Notes
This is not relevant for some providers.
- abstract query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]
Execute query string and return DataFrame of results.
- Parameters
query (str) – The query to execute
query_source (QuerySource) – The query definition object
- Returns
A DataFrame (if successfull) or the underlying provider result if an error.
- Return type
Union[pd.DataFrame, Any]
- property query_attach_spec: Dict[str, Set[str]]
Parameters that determine whether a query is relevant for the driver.
- query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]
Execute query string and return DataFrame of results.
- Parameters
query (str) – The kql query to execute
- Returns
A DataFrame (if successfull) and Kql ResultSet.
- Return type
Tuple[pd.DataFrame, results.ResultSet]
- property schema: Dict[str, Dict]
Return current data schema of connection.
- Returns
Data schema of current connection.
- Return type
Dict[str, Dict]
- property service_queries: Tuple[Dict[str, str], str]
Return queries retrieved from the service after connecting.
- Returns
Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
Tuple[Dict[str, str], str]
msticpy.data.drivers.resource_graph_driver module
Azure Resource Graph Driver class.
- class msticpy.data.drivers.resource_graph_driver.ResourceGraphDriver(**kwargs)
Bases:
msticpy.data.drivers.driver_base.DriverBase
Driver to connect and query from Azure Resource Graph.
Instantiate Azure Resource Graph Driver.
- add_query_filter(name, query_filter)
Add an expression to the query attach filter.
- connect(connection_str: Optional[str] = None, **kwargs)
Connect to Azure Resource Graph via Azure SDK.
- Parameters
connection_str (Optional[str], optional) – Not used.
kwargs – Connection parameters can be supplied as keyword parameters.
Notes
Default configuration is read from the DataProviders/AzureCLI section of msticpyconfig.yaml, if available.
- property connected: bool
Return true if at least one connection has been made.
- Returns
True if a successful connection has been made.
- Return type
bool
Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
- property driver_queries: Iterable[Dict[str, Any]]
Return queries retrieved from the service after connecting.
- Returns
List of Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
List[Dict[str, str]]
- static get_http_timeout(**kwargs)
Get http timeout from settings or kwargs.
- property loaded: bool
Return true if the provider is loaded.
- Returns
True if the provider is loaded.
- Return type
bool
Notes
This is not relevant for some providers.
- query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]
Execute Resource Graph query and retrieve results.
- Parameters
query (str) – KQL query to execute
query_source (QuerySource) – The query definition object
kwargs – count
- Returns
Query results in a dataframe. or query response if an error.
- Return type
Union[pd.DataFrame, Any]
- property query_attach_spec: Dict[str, Set[str]]
Parameters that determine whether a query is relevant for the driver.
- query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]
Execute query string and return DataFrame of results.
- Parameters
query (str) – Query to execute against Resource Graph
- Returns
A DataFrame (if successful) or the underlying provider result if an error occurs.
- Return type
Union[pd.DataFrame,Any]
- property schema: Dict[str, Dict]
Return current data schema of connection.
- Returns
Data schema of current connection.
- Return type
Dict[str, Dict]
- property service_queries: Tuple[Dict[str, str], str]
Return queries retrieved from the service after connecting.
- Returns
Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
Tuple[Dict[str, str], str]
msticpy.data.drivers.security_graph_driver module
Security Graph OData Driver class.
- class msticpy.data.drivers.security_graph_driver.SecurityGraphDriver(connection_str: Optional[str] = None, **kwargs)
Bases:
msticpy.data.drivers.odata_driver.OData
Driver to query security graph.
Instantiate MSGraph driver and optionally connect.
- Parameters
connection_str (str, optional) – Connection string
- CONFIG_NAME = 'MicrosoftGraph'
- add_query_filter(name, query_filter)
Add an expression to the query attach filter.
- api_root: Optional[str]
- api_ver: Optional[str]
- connect(connection_str: Optional[str] = None, **kwargs)
Connect to oauth data source.
- Parameters
connection_str (Optional[str], optional) – Connect to a data source
instance (Optional[str], optional) – Optional name of configuration instance - this is added as a prefix to the driver configuration key name when searching for configuration in the msticpyconfig.yaml
Notes
Connection string fields: tenant_id client_id client_secret apiRoot apiVersion
- property connected: bool
Return true if at least one connection has been made.
- Returns
True if a successful connection has been made.
- Return type
bool
Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
- property driver_queries: Iterable[Dict[str, Any]]
Return queries retrieved from the service after connecting.
- Returns
List of Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
List[Dict[str, str]]
- formatters: Dict[str, Callable]
- static get_http_timeout(**kwargs)
Get http timeout from settings or kwargs.
- property loaded: bool
Return true if the provider is loaded.
- Returns
True if the provider is loaded.
- Return type
bool
Notes
This is not relevant for some providers.
- oauth_url: Optional[str]
- public_attribs: Dict[str, Any]
- query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]
Execute query string and return DataFrame of results.
- Parameters
query (str) – The query to execute
query_source (QuerySource) – The query definition object
- Returns
A DataFrame (if successfull) or the underlying provider result if an error.
- Return type
Union[pd.DataFrame, results.ResultSet]
- property query_attach_spec: Dict[str, Set[str]]
Parameters that determine whether a query is relevant for the driver.
- query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]
Execute query string and return DataFrame of results.
- Parameters
query (str) – The kql query to execute
- Returns
A DataFrame (if successfull) and Kql ResultSet.
- Return type
Tuple[pd.DataFrame, results.ResultSet]
- req_body: Optional[Dict[str, Optional[str]]]
- request_uri: Optional[str]
- property schema: Dict[str, Dict]
Return current data schema of connection.
- Returns
Data schema of current connection.
- Return type
Dict[str, Dict]
- property service_queries: Tuple[Dict[str, str], str]
Return queries retrieved from the service after connecting.
- Returns
Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
Tuple[Dict[str, str], str]
msticpy.data.drivers.splunk_driver module
Splunk Driver class.
- class msticpy.data.drivers.splunk_driver.SplunkDriver(**kwargs)
Bases:
msticpy.data.drivers.driver_base.DriverBase
Driver to connect and query from Splunk.
Instantiate Splunk Driver.
- add_query_filter(name, query_filter)
Add an expression to the query attach filter.
- connect(connection_str: Optional[str] = None, **kwargs)
Connect to Splunk via splunk-sdk.
- Parameters
connection_str (Optional[str], optional) – Connection string with Splunk connection parameters
kwargs – Connection parameters can be supplied as keyword parameters.
Notes
Default configuration is read from the DataProviders/Splunk section of msticpyconfig.yaml, if available.
- property connected: bool
Return true if at least one connection has been made.
- Returns
True if a successful connection has been made.
- Return type
bool
Notes
This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.
- property driver_queries: Iterable[Dict[str, Any]]
Return dynamic queries available on connection to service.
- Returns
List of queries with properties: “name”, “query”, “container” and (optionally) “description”
- Return type
Iterable[Dict[str, Any]]
- Raises
MsticpyNotConnectedError – If called before driver is connected.
- static get_http_timeout(**kwargs)
Get http timeout from settings or kwargs.
- property loaded: bool
Return true if the provider is loaded.
- Returns
True if the provider is loaded.
- Return type
bool
Notes
This is not relevant for some providers.
- query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]
Execute splunk query and retrieve results via OneShot or async search mode.
- Parameters
query (str) – Splunk query to execute via OneShot or async search mode
query_source (QuerySource) – The query definition object
kwargs –
Are passed to Splunk oneshot method count=0 by default oneshot=False by default for async query,
set to True for oneshot (blocking) mode
- Returns
Query results in a dataframe. or query response if an error.
- Return type
Union[pd.DataFrame, Any]
- property query_attach_spec: Dict[str, Set[str]]
Parameters that determine whether a query is relevant for the driver.
- query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]
Execute query string and return DataFrame of results.
- Parameters
query (str) – Query to execute against splunk instance.
- Returns
A DataFrame (if successful) or the underlying provider result if an error occurs.
- Return type
Union[pd.DataFrame,Any]
- property schema: Dict[str, Dict]
Return current data schema of connection.
- Returns
Data schema of current connection.
- Return type
Dict[str, Dict]
- property service_queries: Tuple[Dict[str, str], str]
Return dynamic queries available on connection to service.
- Returns
Dictionary of query_name, query_text. Name of container to add queries to.
- Return type
Tuple[Dict[str, str], str]
msticpy.data.data_obfus
Data obfuscation functions.
- class msticpy.data.data_obfus.ObfuscationAccessor(pandas_obj)
Bases:
object
Base64 Unpack pandas extension.
Initialize the extension.
- mask(column_map: Optional[Mapping[str, Any]] = None, use_default: bool = True) pandas.core.frame.DataFrame
Obfuscate the data in columns of a pandas dataframe.
- Parameters
data (pd.DataFrame) – dataframe containing column to obfuscate
column_map (Mapping[str, Any], optional) – Custom column mapping, by default None
use_default (bool) – If True use the built-in map (adding any custom mappings to this dictionary)
- Returns
Obfuscated dataframe
- Return type
pd.DataFrame
- msticpy.data.data_obfus.check_masking(data: pandas.core.frame.DataFrame, orig_data: pandas.core.frame.DataFrame, index: int = 0, silent=True) Optional[Tuple[List[str], List[str]]]
Check the obfuscation results for a row.
- Parameters
data (pd.DataFrame) – Obfuscated DataFrame
orig_data (pd.DataFrame) – Original DataFrame
index (int, optional) – The row to check, by default 0
silent (bool) – If False the function returns no output and returns lists of changed and unchanged columns. By default, True
- Returns
If silent is True returns a tuple of unchanged, changed items. If False, returns None.
- Return type
Optional[Tuple[List[str], List[str]]]
- msticpy.data.data_obfus.check_obfuscation(data: pandas.core.frame.DataFrame, orig_data: pandas.core.frame.DataFrame, index: int = 0, silent=True) Optional[Tuple[List[str], List[str]]]
Check the obfuscation results for a row.
- Parameters
data (pd.DataFrame) – Obfuscated DataFrame
orig_data (pd.DataFrame) – Original DataFrame
index (int, optional) – The row to check, by default 0
silent (bool) – If False the function returns no output and returns lists of changed and unchanged columns. By default, True
- Returns
If silent is True returns a tuple of unchanged, changed items. If False, returns None.
- Return type
Optional[Tuple[List[str], List[str]]]
- msticpy.data.data_obfus.hash_account(account: str) str
Hash an Account to something recognizable.
- Parameters
account (str) – Account name (UPN, NT or simple name)
- Returns
Hashed Account
- Return type
str
- msticpy.data.data_obfus.hash_dict(item_dict: Dict[str, Union[Dict[str, Any], List[Any], str]]) Dict[str, Any]
Hash dictionary values.
- Parameters
item_dict (Dict[str, Union[Dict[str, Any], List[Any], str]]) – Input item can be a Dict of strings, lists or other dictionaries.
- Returns
Dictionary with hashed values.
- Return type
Dict[str, Any]
- msticpy.data.data_obfus.hash_ip(input_item: Union[List[str], str]) Union[List[str], str]
Hash IP address or list of IP addresses.
- Parameters
input_item (Union[List[str], str]) – List of IP addresses or single IP address.
- Returns
List of hashed addresses or single address. (depending on input)
- Return type
Union[List[str], str]
- msticpy.data.data_obfus.hash_item(input_item: str, delim: str = None) str
Hash a simple string.
- Parameters
input_item (str) – The input string
delim (str, optional) – A string of delimiters to use to split the input string prior to hashing.
- Returns
The obfuscated output string
- Return type
str
- msticpy.data.data_obfus.hash_list(item_list: List[str]) List[str]
Hash list of strings.
- Parameters
item_list (List[str]) – Input list
- Returns
Hashed list
- Return type
List[str]
- msticpy.data.data_obfus.hash_sid(sid: str) str
Hash a SID preserving well-known SIDs and the RID.
- Parameters
sid (str) – SID string
- Returns
Hashed SID
- Return type
str
- msticpy.data.data_obfus.hash_string(input_str: str) str
Hash a simple string.
- Parameters
input_str (str) – The input string
- Returns
The obfuscated output string
- Return type
str
- msticpy.data.data_obfus.mask_df(data: pandas.core.frame.DataFrame, column_map: Optional[Mapping[str, Any]] = None, use_default: bool = True, silent: bool = True) pandas.core.frame.DataFrame
Obfuscate columns of a DataFrame.
- Parameters
data (pd.DataFrame) – Input dataframe
column_map (Mapping[str, Any], optional) – Custom column mapping, by default None
use_default (bool) – If True use the built-in map (adding any custom mappings to this dictionary)
silent (bool) – If False the function returns progress output, by default True.
- Returns
Obfuscated dataframe.
- Return type
pd.DataFrame
- msticpy.data.data_obfus.obfuscate_df(data: pandas.core.frame.DataFrame, column_map: Optional[Mapping[str, Any]] = None, use_default: bool = True, silent: bool = True) pandas.core.frame.DataFrame
Obfuscate columns of a DataFrame.
- Parameters
data (pd.DataFrame) – Input dataframe
column_map (Mapping[str, Any], optional) – Custom column mapping, by default None
use_default (bool) – If True use the built-in map (adding any custom mappings to this dictionary)
silent (bool) – If False the function returns progress output, by default True.
- Returns
Obfuscated dataframe.
- Return type
pd.DataFrame
- msticpy.data.data_obfus.replace_guid(guid: str) str
Replace GUID/UUID with mapped random UUID.
- Parameters
guid (str) – Input UUID.
- Returns
Mapped UUID
- Return type
str
msticpy.data.browsers.query_browser module
QueryProvider Query Browser.
- msticpy.data.browsers.query_browser.browse_queries(query_provider: Any, **kwargs) msticpy.nbtools.nbwidgets.select_item.SelectItem
Return QueryProvider query browser.
- Parameters
query_provider (QueryProvider) – Initialized query provider.
kwargs – passed to SelectItem constuctor.
- Returns
SelectItem browser for TI Data.
- Return type
SelectItem
msticpy.data.browsers.mordor_browser module
Morder dataset browser.
- class msticpy.data.browsers.mordor_browser.MordorBrowser(save_folder: Optional[str] = None, use_cached: bool = True)
Bases:
object
Mordor browser widget.
Initialize MordorBrowser control.
- Parameters
save_folder (str, optional) – Folder to save downloaded files, by default reads the value from settings or defaults to “.”
use_cached (bool, optional) – If true, downloaded files are not deleted after download and are used as a local cache, by default True
- property fields
Return set of fields widget controls.
- property selected_dset
Return the ID of the selected data set.