msticpy.data package

Subpackages

Submodules

msticpy.data.azure_blob_storage module

Deprecated path for data.azure.

msticpy.data.azure_data module

Deprecated path for data.azure.

msticpy.data.azure_sentinel module

Deprecated path for data.azure.

msticpy.data.data_obfus module

Data obfuscation functions.

class msticpy.data.data_obfus.ObfuscationAccessor(pandas_obj)

Bases: object

Base64 Unpack pandas extension.

Initialize the extension.

mask(column_map: Optional[Mapping[str, Any]] = None, use_default: bool = True) pandas.core.frame.DataFrame

Obfuscate the data in columns of a pandas dataframe.

Parameters
  • data (pd.DataFrame) – dataframe containing column to obfuscate

  • column_map (Mapping[str, Any], optional) – Custom column mapping, by default None

  • use_default (bool) – If True use the built-in map (adding any custom mappings to this dictionary)

Returns

Obfuscated dataframe

Return type

pd.DataFrame

msticpy.data.data_obfus.check_masking(data: pandas.core.frame.DataFrame, orig_data: pandas.core.frame.DataFrame, index: int = 0, silent=True) Optional[Tuple[List[str], List[str]]]

Check the obfuscation results for a row.

Parameters
  • data (pd.DataFrame) – Obfuscated DataFrame

  • orig_data (pd.DataFrame) – Original DataFrame

  • index (int, optional) – The row to check, by default 0

  • silent (bool) – If False the function returns no output and returns lists of changed and unchanged columns. By default, True

Returns

If silent is True returns a tuple of unchanged, changed items. If False, returns None.

Return type

Optional[Tuple[List[str], List[str]]]

msticpy.data.data_obfus.check_obfuscation(data: pandas.core.frame.DataFrame, orig_data: pandas.core.frame.DataFrame, index: int = 0, silent=True) Optional[Tuple[List[str], List[str]]]

Check the obfuscation results for a row.

Parameters
  • data (pd.DataFrame) – Obfuscated DataFrame

  • orig_data (pd.DataFrame) – Original DataFrame

  • index (int, optional) – The row to check, by default 0

  • silent (bool) – If False the function returns no output and returns lists of changed and unchanged columns. By default, True

Returns

If silent is True returns a tuple of unchanged, changed items. If False, returns None.

Return type

Optional[Tuple[List[str], List[str]]]

msticpy.data.data_obfus.hash_account(account: str) str

Hash an Account to something recognizable.

Parameters

account (str) – Account name (UPN, NT or simple name)

Returns

Hashed Account

Return type

str

msticpy.data.data_obfus.hash_dict(item_dict: Dict[str, Union[Dict[str, Any], List[Any], str]]) Dict[str, Any]

Hash dictionary values.

Parameters

item_dict (Dict[str, Union[Dict[str, Any], List[Any], str]]) – Input item can be a Dict of strings, lists or other dictionaries.

Returns

Dictionary with hashed values.

Return type

Dict[str, Any]

msticpy.data.data_obfus.hash_ip(input_item: Union[List[str], str]) Union[List[str], str]

Hash IP address or list of IP addresses.

Parameters

input_item (Union[List[str], str]) – List of IP addresses or single IP address.

Returns

List of hashed addresses or single address. (depending on input)

Return type

Union[List[str], str]

msticpy.data.data_obfus.hash_item(input_item: str, delim: str = None) str

Hash a simple string.

Parameters
  • input_item (str) – The input string

  • delim (str, optional) – A string of delimiters to use to split the input string prior to hashing.

Returns

The obfuscated output string

Return type

str

msticpy.data.data_obfus.hash_list(item_list: List[str]) List[str]

Hash list of strings.

Parameters

item_list (List[str]) – Input list

Returns

Hashed list

Return type

List[str]

msticpy.data.data_obfus.hash_sid(sid: str) str

Hash a SID preserving well-known SIDs and the RID.

Parameters

sid (str) – SID string

Returns

Hashed SID

Return type

str

msticpy.data.data_obfus.hash_string(input_str: str) str

Hash a simple string.

Parameters

input_str (str) – The input string

Returns

The obfuscated output string

Return type

str

msticpy.data.data_obfus.mask_df(data: pandas.core.frame.DataFrame, column_map: Optional[Mapping[str, Any]] = None, use_default: bool = True, silent: bool = True) pandas.core.frame.DataFrame

Obfuscate columns of a DataFrame.

Parameters
  • data (pd.DataFrame) – Input dataframe

  • column_map (Mapping[str, Any], optional) – Custom column mapping, by default None

  • use_default (bool) – If True use the built-in map (adding any custom mappings to this dictionary)

  • silent (bool) – If False the function returns progress output, by default True.

Returns

Obfuscated dataframe.

Return type

pd.DataFrame

msticpy.data.data_obfus.obfuscate_df(data: pandas.core.frame.DataFrame, column_map: Optional[Mapping[str, Any]] = None, use_default: bool = True, silent: bool = True) pandas.core.frame.DataFrame

Obfuscate columns of a DataFrame.

Parameters
  • data (pd.DataFrame) – Input dataframe

  • column_map (Mapping[str, Any], optional) – Custom column mapping, by default None

  • use_default (bool) – If True use the built-in map (adding any custom mappings to this dictionary)

  • silent (bool) – If False the function returns progress output, by default True.

Returns

Obfuscated dataframe.

Return type

pd.DataFrame

msticpy.data.data_obfus.replace_guid(guid: str) str

Replace GUID/UUID with mapped random UUID.

Parameters

guid (str) – Input UUID.

Returns

Mapped UUID

Return type

str

msticpy.data.data_providers module

Data provider loader.

class msticpy.data.data_providers.QueryProvider(data_environment: Union[str, msticpy.data.query_defns.DataEnvironment], driver: Optional[msticpy.data.drivers.driver_base.DriverBase] = None, query_paths: Optional[List[str]] = None, **kwargs)

Bases: object

Container for query store and query execution provider.

Instances of this class hold the query set and execution methods for a specific data environment.

Query provider interface to queries.

Parameters
  • data_environment (Union[str, DataEnvironment]) – Name or Enum of environment for the QueryProvider

  • driver (DriverBase, optional) – Override the builtin driver (query execution class) and use your own driver (must inherit from DriverBase)

  • query_paths (List[str]) – Additional paths to look for query definitions.

  • kwargs – Other arguments are passed to the data provider driver.

See also

DataProviderBase

base class for data query providers.

add_connection(connection_str: Optional[str] = None, alias: Optional[str] = None, **kwargs)

Add an additional connection for the query provider.

Parameters
  • connection_str (Optional[str], optional) – Connection string for the provider, by default None

  • alias (Optional[str], optional) – Alias to use for the connection, by default None

  • kwargs (Dict[str, Any]) – Other parameters passed to the driver constructor.

Notes

Some drivers may accept types other than strings for the connection_str parameter.

browse(**kwargs)

Return QueryProvider query browser.

Parameters

kwargs – passed to SelectItem constructor.

Returns

SelectItem browser for TI Data.

Return type

SelectItem

browse_queries(**kwargs)

Return QueryProvider query browser.

Parameters

kwargs – passed to SelectItem constructor.

Returns

SelectItem browser for TI Data.

Return type

SelectItem

connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters

connection_str (str) – Connection string for the data source

property connected: bool

Return True if the provider is connected.

Returns

True if the provider is connected.

Return type

bool

property connection_string: str

Return provider connection string.

Returns

Provider connection string.

Return type

str

exec_query(query: str, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute simple query string.

Parameters
  • query (str) – [description]

  • use_connections (Union[str, List[str]]) –

  • query_options (Dict[str, Any]) – Additional options passed to query driver.

  • kwargs (Dict[str, Any]) – Additional options passed to query driver.

Returns

Query results - a DataFrame if successful or a KqlResult if unsuccessful.

Return type

Union[pd.DataFrame, Any]

get_query(query_name: str) str

Return the raw query text for query_name.

Parameters

query_name (str) – The name of the query.

import_query_file(query_file: str)

Import a yaml data source definition.

Parameters

query_file (str) – Path to the file to import

list_connections() List[str]

Return a list of current connections or the default connection.

Returns

The alias and connection string for each connection.

Return type

List[str]

classmethod list_data_environments() List[str]

Return list of current data environments.

Returns

List of current data environments

Return type

List[str]

list_queries(substring: Optional[str] = None) List[str]

Return list of family.query in the store.

Parameters

substring (Optional[str]) – Optional pattern - will return only queries matching the pattern, default None.

Returns

List of queries

Return type

List[str]

query_help(query_name: str)

Print help for query_name.

Parameters

query_name (str) – The name of the query.

property query_time

Return the default QueryTime control for queries.

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property schema_tables: List[str]

Return list of tables in the data schema of the connection.

Returns

Tables in the of current connection.

Return type

List[str]

msticpy.data.data_query_reader module

Data query definition reader.

msticpy.data.data_query_reader.find_yaml_files(source_path: str, recursive: bool = False) Iterable[pathlib.Path]

Return iterable of yaml files found in source_path.

Parameters
  • source_path (str) – The source path to search in.

  • recursive (bool, optional) – Whether to recurse through subfolders. By default False

Returns

File paths of yanl files found.

Return type

Iterable[str]

msticpy.data.data_query_reader.read_query_def_file(query_file: str) Tuple[Dict, Dict, Dict]

Read a yaml data query definition file.

Parameters

query_file (str) – Path to yaml query defintion file

Returns

Tuple of dictionaries. sources - dictionary of query definitions defaults - the default parameters from the file metadata - the global metadata from the file

Return type

Tuple[Dict, Dict, Dict]

msticpy.data.data_query_reader.validate_query_defs(query_def_dict: Dict[str, Any]) bool

Validate content of query definition.

Parameters

query_def_dict (dict) – Dictionary of query definition yaml file contents.

Returns

True if validation succeeds.

Return type

bool

Raises

ValueError – The validation failure reason is returned in the exception message (arg[0])

msticpy.data.param_extractor module

Parameter extractor helper functions for use with IPython/Juptyer queries.

msticpy.data.param_extractor.extract_query_params(query_source: msticpy.data.query_source.QuerySource, *args, **kwargs) Tuple[Dict[str, Any], List[str]]

Get the parameters needed for the query.

Parameters
  • query_source (QuerySource) – Query source

  • args (Tuple[QueryParamProvider]) – objects that implement QueryParamProvider (from which query parameters can be extracted).

  • kwargs (Dict[str, Any]) – custom parameter list to populate queries (override default values and values extracted from QueryParamProviders).

Returns

Dictionary of parameter names and values to be used in the query. List of any missing parameters

Return type

Tuple[Dict[str, Any], List[str]]

msticpy.data.query_container module

Query hierarchy attribute class.

class msticpy.data.query_container.QueryContainer

Bases: object

Empty class used to create hierarchical attributes.

msticpy.data.query_defns module

Query helper definitions.

class msticpy.data.query_defns.DataEnvironment(value)

Bases: enum.Enum

Enumeration of data environments.

Used to identify which queries are relevant for which data sources.

AzureSecurityCenter = 3
AzureSentinel = 1
Cybereason = 12
Kusto = 2
LocalData = 6
LogAnalytics = 1
M365D = 11
MDATP = 5
MDE = 5
MSSentinel = 1
Mordor = 8
ResourceGraph = 9
SecurityGraph = 4
Splunk = 7
Sumologic = 10
Unknown = 0
classmethod parse(value: Union[str, int]) msticpy.data.query_defns.DataEnvironment

Convert string or int to enum.

Parameters

value (Union[str, int]) – value to parse

class msticpy.data.query_defns.DataFamily(value)

Bases: enum.Enum

Enumeration of data families.

Used to identify which queries are relevant for which data sources.

AzureNetwork = 6
Cybereason = 11
LinuxSecurity = 2
LinuxSyslog = 5
MDATP = 7
ResourceGraph = 9
SecurityAlert = 3
SecurityGraphAlert = 4
Splunk = 8
Sumologic = 10
Unknown = 0
WindowsSecurity = 1
classmethod parse(value: Union[str, int]) msticpy.data.query_defns.DataFamily

Convert string or int to enum.

Parameters

value (Union[str, int]) – value to parse

class msticpy.data.query_defns.QueryParamProvider

Bases: abc.ABC

Abstract type for QueryParamProvider.

Method query_params must be overridden by derived classes.

abstract property query_params

Return dict of query parameters.

These parameters are sourced in the object implementing this method.

Return type

dict – dictionary of query parameter values.

msticpy.data.query_defns.ensure_df_datetimes(data: pandas.core.frame.DataFrame, columns: Optional[Union[str, List[str]]] = None, add_utc_tz: bool = True) pandas.core.frame.DataFrame

Return dataframe with converted TZ-aware timestamps.

Parameters
  • data (pd.DataFrame) – Input dataframe

  • columns (Union[str, List[str], None], optional) – column (str) or list of columns to convert, by default None. If this parameter is not supplied then any column containing the substring “time” is used as a candidate for conversion.

  • add_utc_tz (bool, optional) – If True any datetime columns in the columns parameter ( (or default ‘.*time.*’ columns) that are timezone-naive, will be converted to Timezone-aware timestamps marked as UTC.

Returns

Converted DataFrame.

Return type

pd.DataFrame

msticpy.data.query_source module

Intake kql driver.

class msticpy.data.query_source.QuerySource(name: str, source: Dict[str, Any], defaults: Dict[str, Any], metadata: Dict[str, Any])

Bases: object

Query definition class for templated queries.

name

The query name

Type

str

metadata

The consolidated metadata for the query

Type

Dict[str, Any]

params

The dictionary of parameter definitions for the query.

Type

dict[str, Any]

query_store

The query store object that the query belongs to

Type

QueryStore

Initialize query source definition.

Parameters
  • name (str) – The query name

  • source (dict) – The data source definition settings

  • defaults (dict) – The default settings (if source-specific setting not supplied)

  • metadata (dict) – The global metadata from the source file.

Notes

A data source can belong to multiple families (e.g. a query that joins data from several sources)

create_doc_string() str

Return a doc string for the query.

Returns

New-line delimited docstring dynamically created from query definition properties.

Return type

str

create_query(formatters: Optional[Dict[str, Callable]] = None, **kwargs) str

Return query with values from kwargs and defaults substituted.

Parameters
  • formatters (Dict[str, Callable]) – Dictionary of custom parameter formatters indexed by data type

  • kwargs (Mapping[str, Any]) – Set of parameter name, value pairs used to populate the template query.

Returns

The populated query

Return type

str

Raises

ValueError – If one or more parameters with no default values are not supplied.

Notes

Parameters supplied as arguments will override any parameter defaults (see default_params property).

property data_families: List[str]

Return the list of data families used by the query.

Returns

The list of data families. A data family is usually equivalent to a table or entity set.

Return type

List[str]

property default_params: Dict[str, dict]

Return the set of parameters with default values.

Returns

List of parameters

Return type

Iterable[dict]

property description: str

Return description of the query.

Returns

Query description.

Return type

str

help()

Print help for query.

property query: str

Return the query template.

Returns

The template query.

Return type

str

property required_params: Dict[str, dict]

Return the set of parameters with no default values.

Returns

List of parameters

Return type

Iterable[dict]

resolve_param_aliases(param_dict: Dict[str, Any]) Dict[str, Any]

Try to resolve any parameters in param_dict that are aliases.

validate() Tuple[bool, List[str]]

Validate the source to ensure that all required properties are present.

Returns

True if validation is successful.

Return type

bool

msticpy.data.query_store module

QueryStore class - holds a collection of QuerySources.

class msticpy.data.query_store.QueryStore(environment: str)

Bases: object

Repository for query definitions for a data environment.

environment

The data environment for the queries.

Type

str

data_families

The set of data families and associated queries for each.

Type

Dict[str, Dict[str, QuerySource]]

Intialize a QueryStore for a new environment.

Parameters

environment (str) – The data environment

add_data_source(source: msticpy.data.query_source.QuerySource)

Add a datasource/query to the store.

Parameters

source (QuerySource) – The source to add. An existing item with the same name will be overwritten

add_query(name: str, query: str, query_paths: Union[str, List[str]], description: Optional[str] = None)

Add a query from name/query text.

Parameters
  • name (str) – name of the query

  • query (str) – The query string

  • query_paths (Union[str, List[str]]) – The path/data_family to categorize. Multiple paths can be specified. If the path is dotted, this will cause the query to be displayed in the corresponding hierarchy.

  • description (str, optional) – Query description

find_query(query_name: str) Set[Optional[msticpy.data.query_source.QuerySource]]

Return set of queries with name query_name.

Parameters

query_name (str) – Name of the query

Returns

Set (distinct) queries matching name.

Return type

Set[QuerySource]

get_query(query_name: str, query_path: Optional[Union[str, msticpy.data.query_defns.DataFamily]] = None) msticpy.data.query_source.QuerySource

Return query with name data_family and query_name.

Parameters
  • query_name (str) – Name of the query

  • query_path (Union[str, DataFamily]) – The data family for the query

Returns

Query matching name and family.

Return type

QuerySource

import_file(query_file: str)

Import a yaml data source definition.

Parameters

query_file (str) – Path to the file to import

Raises

ImportError – File read error or Syntax or semantic error found in the source file.

classmethod import_files(source_path: list, recursive: bool = False, driver_query_filter: Optional[Dict[str, Set[str]]] = None) Dict[str, msticpy.data.query_store.QueryStore]

Import multiple query definition files from directory path.

Parameters
  • source_path (str) – The folder containing the yaml definition files.

  • recursive (bool, optional) – True to recurse sub-directories (the default is False, which only reads from the top level)

  • driver_query_filter (Dict[str, Set[str]]) – A dictionary of query metadata keys and values. This is used to test each read query to see if it is relevant to the driver and should be returned in the created QueryStore dictionary.

Returns

Dictionary of one or more environments and the QueryStore containing the queries for each environment.

Return type

Dict[str, ‘QueryStore’]

Raises

FileNotFoundError – File read error or Syntax or semantic error found in a source file.

property query_names: Iterable[str]

Return list of family.query in the store.

Returns

List of queries

Return type

Iterable[str]

msticpy.data.sql_to_kql module

Module for SQL to KQL Conversion.

This is an experiment conversion utility built to support a limited subset of ANSI SQL. It relies on moz_sql_parser (https://github.com/mozilla/moz-sql-parser) to parse the SQL syntax tree. Some hacky additions have been done to allow table renaming and support for a few SparkSQL operators such as RLIKE.

For a more complete translation help with SQL to KQL see https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/sqlcheatsheet

Known limitations

  • Does not support aggregate functions in SELECT with no GROUP BY clause

  • Does not support IN, EXISTS, HAVING operators

  • Only partial support for AS naming (should work in SELECT expressions)

msticpy.data.sql_to_kql.sql_to_kql(sql: str, target_tables: Optional[Dict[str, str]] = None) str

Parse SQL and return KQL equivalent.

Module contents

Data sub-package.