msticpy.data.drivers package

Submodules

msticpy.data.drivers.cybereason_driver module

Cybereason Driver class.

class msticpy.data.drivers.cybereason_driver.CybereasonDriver(**kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

Class to interact with Cybereason.

Instantiate Cybereason driver.

CONFIG_NAME = 'Cybereason'
add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters
  • connection_str (Optional[str], optional) – Connect to a data source

  • instance (Optional[str], optional) – Optional name of configuration instance - this is added as a prefix to the driver configuration key name when searching for configuration in the msticpyconfig.yaml

Notes

Connection string fields:

instance client_id client_secret

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, Any]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters

query (str) – The kql query to execute

Returns

A DataFrame (if successfull) and Kql ResultSet.

Return type

Tuple[pd.DataFrame, results.ResultSet]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.driver_base module

Data driver base class.

class msticpy.data.drivers.driver_base.DriverBase(**kwargs)

Bases: abc.ABC

Base class for data providers.

Initialize new instance.

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

abstract connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters

connection_str (Optional[str]) – Connect to a data source

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

abstract query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

  • kwargs – Are passed to the underlying provider query method, if supported.

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, Any]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

abstract query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame plus native results.

Parameters

query (str) – The query to execute

Returns

A DataFrame and native results.

Return type

Tuple[pd.DataFrame,Any]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.kql_driver module

KQL Driver class.

class msticpy.data.drivers.kql_driver.KqlDriver(connection_str: Optional[str] = None, **kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

KqlDriver class to execute kql queries.

Instantiate KqlDriver and optionally connect.

Parameters
  • connection_str (str, optional) – Connection string

  • debug (bool) – print out additional diagnostic information.

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters
  • connection_str (str) – Connect to a data source

  • kqlmagic_args (str, optional) – Additional string of parameters to be passed to KqlMagic

  • mp_az_auth (Union[bool, str, list, None], optional) – Optional parameter directing KqlMagic to use MSTICPy Azure authentication. Values can be: True or “default”: use the settings in msticpyconfig.yaml ‘Azure’ section str: single auth method name (‘msi’, ‘cli’, ‘env’, ‘vscode’, ‘powershell’, ‘cache’ or ‘interactive’) List[str]: list of acceptable auth methods from (‘msi’, ‘cli’, ‘env’, ‘vscode’, ‘powershell’, ‘cache’ or ‘interactive’)

  • mp_az_tenant_id (str, optional) – Optional parameter specifying a Tenant ID for use by MSTICPy Azure authentication.

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, results.ResultSet]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Kqlmagic.kql_proxy.KqlResponse]

Execute query string and return DataFrame of results.

Parameters

query (str) – The kql query to execute

Returns

A DataFrame (if successfull) and Kql ResultSet.

Return type

Tuple[pd.DataFrame, results.ResultSet]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.kusto_driver module

Kusto Driver subclass.

class msticpy.data.drivers.kusto_driver.KustoDriver(connection_str: Optional[str] = None, **kwargs)

Bases: msticpy.data.drivers.kql_driver.KqlDriver

Kusto Driver class to execute kql queries for Azure Data Explorer.

Instantiate KustoDriver.

Parameters
  • connection_str (str, optional) – Connection string

  • debug (bool) – print out additional diagnostic information.

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters
  • connection_str (str) – Connect to a data source

  • cluster (str, optional) – Short name or URI of cluster to connect to.

  • database (str, optional) – Name of database to connect to.

  • kqlmagic_args (str, optional) – Additional string of parameters to be passed to KqlMagic

  • mp_az_auth (Union[bool, str, list, None], optional) – Optional parameter directing KqlMagic to use MSTICPy Azure authentication. Values can be: True or “default”: use the settings in msticpyconfig.yaml ‘Azure’ section str: single auth method name (‘msi’, ‘cli’, ‘env’, ‘vscode’, ‘powershell’, ‘cache’ or ‘interactive’) List[str]: list of acceptable auth methods from (‘msi’, ‘cli’, ‘env’, ‘vscode’, ‘powershell’, ‘cache’ or ‘interactive’)

  • mp_az_tenant_id (str, optional) – Optional parameter specifying a Tenant ID for use by MSTICPy Azure authentication.

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

  • cluster (str, Optional) – Supply or override the Kusto cluster name

  • database (str, Optional) – Supply or override the Kusto database name

  • data_source (str, Optional) – alias for db

  • connection_str (str, Optional) –

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, results.ResultSet]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Kqlmagic.kql_proxy.KqlResponse]

Execute query string and return DataFrame of results.

Parameters

query (str) – The kql query to execute

Returns

A DataFrame (if successfull) and Kql ResultSet.

Return type

Tuple[pd.DataFrame, results.ResultSet]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.local_data_driver module

Local Data Driver class - for testing and demos.

class msticpy.data.drivers.local_data_driver.LocalDataDriver(connection_str: Optional[str] = None, **kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

LocalDataDriver class to execute kql queries.

Instantiate LocalDataDriver and optionally connect.

Parameters
  • connection_str (str, optional) – Connection string (not used)

  • data_paths (List[str], optional) – Paths from which to load data files

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters

connection_str (str) – Connect to a data source

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, results.ResultSet]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query, **kwargs)

Return query with fake results.

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.mdatp_driver module

MDATP OData Driver class.

class msticpy.data.drivers.mdatp_driver.MDATPDriver(connection_str: Optional[str] = None, **kwargs)

Bases: msticpy.data.drivers.odata_driver.OData

KqlDriver class to retreive date from MS Defender APIs.

Instantiate MSDefenderDriver and optionally connect.

Parameters

connection_str (str, optional) – Connection string

CONFIG_NAME = 'MicrosoftDefender'
add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to oauth data source.

Parameters
  • connection_str (Optional[str], optional) – Connect to a data source

  • instance (Optional[str], optional) – Optional name of configuration instance - this is added as a prefix to the driver configuration key name when searching for configuration in the msticpyconfig.yaml

Notes

Connection string fields: tenant_id client_id client_secret apiRoot apiVersion

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, results.ResultSet]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters

query (str) – The kql query to execute

Returns

A DataFrame (if successfull) and Kql ResultSet.

Return type

Tuple[pd.DataFrame, results.ResultSet]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.mordor_driver module

Mordor/OTRF Security datasets driver.

class msticpy.data.drivers.mordor_driver.MitreAttack(attack: Optional[Dict[str, Any]] = None, technique: Optional[str] = None, sub_technique: Optional[str] = None, tactics: Optional[List[str]] = None)

Bases: object

MitreAttack container for techniques and tactics.

Create instance of MitreAttack.

Parameters
  • attack (Dict[str, Any], optional) – attack data as dictionary, by default None

  • technique (str, optional) – technique ID, by default None

  • sub_technique (str, optional) – sub-technique ID, by default None

  • tactics (List[str], optional) – List of associated tactics, by default None

MTR_TAC_URI = 'https://attack.mitre.org/tactics/{tactic_id}/'
MTR_TECH_URI = 'https://attack.mitre.org/techniques/{technique_id}/'
property tactics_full: List[Tuple[str, str, str, str]]

Return full listing of Mitre tactics.

Returns

List of tuples of: (ID, Name, Description, URI)

Return type

List[Tuple[str, str, str, str]]

property technique_desc: Optional[str]

Return Mitre technique description.

Returns

Technique description

Return type

Optional[str]

property technique_name: Optional[str]

Return Mitre Technique full name.

Returns

Name of the Mitre technique

Return type

Optional[str]

property technique_uri: str

Return Mitre Technique URI.

Returns

URI of the Mitre technique

Return type

Optional[str]

class msticpy.data.drivers.mordor_driver.MordorDriver(**kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

Mordor data driver.

Initialize the Mordor driver.

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to data source.

Parameters

connection_str (Optional[str]) – Connect to a data source

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return generator of Mordor query definitions.

Yields

Iterable[Dict[str, Any]] – Iterable of Dictionaries containing query definitions.

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

  • kwargs – Are passed to the underlying provider query method, if supported.

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, Any]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame plus native results.

Parameters

query (str) – The query to execute

Returns

A DataFrame and native results.

Return type

Tuple[pd.DataFrame,Any]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

search_queries(search: str) Iterable[str]

Search queries for matching attributes.

Parameters

search (str) – Search string. Substrings separated by commas will be treated as OR terms - e.g. “a, b” == “a” or “b”. Substrings separated by “+” will be treated as AND terms - e.g. “a + b” == “a” and “b”

Returns

Iterable of matching query names.

Return type

Iterable[str]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

class msticpy.data.drivers.mordor_driver.MordorEntry(title: str, id: str, type: str, creation_date, modification_date, contributors: List[str] = NOTHING, author: Optional[str] = None, platform: Optional[str] = None, description: Optional[str] = None, tags: List[str] = NOTHING, files: List[Dict[str, Any]] = NOTHING, datasets: List[Dict[str, Any]] = NOTHING, attack_mappings: List[Dict[str, Any]] = NOTHING, notebooks: List[Dict[str, str]] = NOTHING, simulation: Dict[str, Any] = NOTHING, references: List[Any] = NOTHING, rel_file_paths: List[Dict[str, Any]] = NOTHING)

Bases: object

Mordor data set metadata.

Method generated by attrs for class MordorEntry.

attack_mappings: List[Dict[str, Any]]
author: Optional[str]
contributors: List[str]
creation_date: datetime.datetime
datasets: List[Dict[str, Any]]
description: Optional[str]
files: List[Dict[str, Any]]
get_attacks() List[msticpy.data.drivers.mordor_driver.MitreAttack]

Return list of Mitre attack classifications.

Returns

List of MitreAttack definitions.

Return type

List[MitreAttack]

get_file_paths() List[Dict[str, str]]

Return list of data file links.

Returns

list of dictionaries describing files. Each entry has key/values for: - file_type - file_path - relative_path - qry_path

Return type

List[Dict[str, str]]

get_notebooks() List[Tuple[str, str, str]]

Return the list of notebooks for the dataset.

Returns

Tuples of (name, project, link)

Return type

List[Tuple[str, str, str]]

id: str
modification_date: datetime.datetime
notebooks: List[Dict[str, str]]
platform: Optional[str]
references: List[Any]
simulation: Dict[str, Any]
tags: List[str]
title: str
type: str
msticpy.data.drivers.mordor_driver.download_mdr_file(file_uri: str, use_cached: bool = True, save_folder: str = '.', silent: bool = False) pandas.core.frame.DataFrame

Download data file from Mordor.

Parameters
  • file_uri (str) – The URI of the file to download.

  • use_cached (bool, optional) – Try to use locally saved file first, by default True

  • save_folder (str, optional) – Path to output folder, by default “.”

  • silent (bool) – If True, suppress feedback. By default, False.

Returns

DataFrame of Dataset

Return type

pd.DataFrame

msticpy.data.drivers.mordor_driver.get_mdr_data_paths(item_type='metadata') Generator[str, None, None]

Generate Mordor data sets from GitHub repo.

Parameters

item_type (str, optional) – The type of item required, by default “metadata” Other values are “large”, “small.

Yields

str – Iterable of paths

msticpy.data.drivers.mordor_driver.search_mdr_data(mdr_data: Dict[str, msticpy.data.drivers.mordor_driver.MordorEntry], terms: Optional[str] = None, subset: Optional[Iterable[str]] = None) Set[str]

Return IDs for items matching terms.

Parameters
  • mdr_data (Dict[str, MordorEntry]) – Mordor dataset

  • terms (str, optional) – Search terms, by default None (comma-separated values are treated as OR terms plus-separated values are treated as AND terms)

  • subset (Iterable[str], optional) – A subset of IDs over which to search, by default None

Returns

The set of matching IDs.

Return type

Set[str]

msticpy.data.drivers.odata_driver module

OData Driver class.

class msticpy.data.drivers.odata_driver.OData(**kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

Parent class to retreive date from an oauth based API.

Instantiate OData driver and optionally connect.

Parameters

connect (bool, optional) – Set true if you want to connect to the provider at initialization

CONFIG_NAME = ''
add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to oauth data source.

Parameters
  • connection_str (Optional[str], optional) – Connect to a data source

  • instance (Optional[str], optional) – Optional name of configuration instance - this is added as a prefix to the driver configuration key name when searching for configuration in the msticpyconfig.yaml

Notes

Connection string fields: tenant_id client_id client_secret apiRoot apiVersion

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

abstract query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, Any]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters

query (str) – The kql query to execute

Returns

A DataFrame (if successfull) and Kql ResultSet.

Return type

Tuple[pd.DataFrame, results.ResultSet]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.resource_graph_driver module

Azure Resource Graph Driver class.

class msticpy.data.drivers.resource_graph_driver.ResourceGraphDriver(**kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

Driver to connect and query from Azure Resource Graph.

Instantiate Azure Resource Graph Driver.

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to Azure Resource Graph via Azure SDK.

Parameters
  • connection_str (Optional[str], optional) – Not used.

  • kwargs – Connection parameters can be supplied as keyword parameters.

Notes

Default configuration is read from the DataProviders/AzureCLI section of msticpyconfig.yaml, if available.

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute Resource Graph query and retrieve results.

Parameters
  • query (str) – KQL query to execute

  • query_source (QuerySource) – The query definition object

  • kwargs – count

Returns

Query results in a dataframe. or query response if an error.

Return type

Union[pd.DataFrame, Any]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters

query (str) – Query to execute against Resource Graph

Returns

A DataFrame (if successful) or the underlying provider result if an error occurs.

Return type

Union[pd.DataFrame,Any]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.security_graph_driver module

Security Graph OData Driver class.

class msticpy.data.drivers.security_graph_driver.SecurityGraphDriver(connection_str: Optional[str] = None, delegated_auth: bool = False, **kwargs)

Bases: msticpy.data.drivers.odata_driver.OData

Driver to query security graph.

Instantiate MSGraph driver and optionally connect.

Parameters
  • connection_str (str, optional) – Connection string

  • delegated_auth (bool, optional) – Set True if using App delegated

CONFIG_NAME = 'MicrosoftGraph'
add_query_filter(name, query_filter)

Add an expression to the query attach filter.

api_root: Optional[str]
api_ver: Optional[str]
connect(connection_str: Optional[str] = None, **kwargs)

Connect to oauth data source.

Parameters
  • connection_str (Optional[str], optional) – Connect to a data source

  • instance (Optional[str], optional) – Optional name of configuration instance - this is added as a prefix to the driver configuration key name when searching for configuration in the msticpyconfig.yaml

Notes

Connection string fields: tenant_id client_id client_secret apiRoot apiVersion

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return queries retrieved from the service after connecting.

Returns

List of Dictionary of query_name, query_text. Name of container to add queries to.

Return type

List[Dict[str, str]]

formatters: Dict[str, Callable]
static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

oauth_url: Optional[str]
public_attribs: Dict[str, Any]
query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters
  • query (str) – The query to execute

  • query_source (QuerySource) – The query definition object

Returns

A DataFrame (if successfull) or the underlying provider result if an error.

Return type

Union[pd.DataFrame, results.ResultSet]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters

query (str) – The kql query to execute

Returns

A DataFrame (if successfull) and Kql ResultSet.

Return type

Tuple[pd.DataFrame, results.ResultSet]

req_body: Optional[Dict[str, Optional[str]]]
request_uri: Optional[str]
property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return queries retrieved from the service after connecting.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.splunk_driver module

Splunk Driver class.

class msticpy.data.drivers.splunk_driver.SplunkDriver(**kwargs)

Bases: msticpy.data.drivers.driver_base.DriverBase

Driver to connect and query from Splunk.

Instantiate Splunk Driver.

add_query_filter(name, query_filter)

Add an expression to the query attach filter.

connect(connection_str: Optional[str] = None, **kwargs)

Connect to Splunk via splunk-sdk.

Parameters
  • connection_str (Optional[str], optional) – Connection string with Splunk connection parameters

  • kwargs – Connection parameters can be supplied as keyword parameters.

Notes

Default configuration is read from the DataProviders/Splunk section of msticpyconfig.yaml, if available.

property connected: bool

Return true if at least one connection has been made.

Returns

True if a successful connection has been made.

Return type

bool

Notes

This does not guarantee that the last data source connection was successful. It is a best effort to track whether the provider has made at least one successful authentication.

property driver_queries: Iterable[Dict[str, Any]]

Return dynamic queries available on connection to service.

Returns

List of queries with properties: “name”, “query”, “container” and (optionally) “description”

Return type

Iterable[Dict[str, Any]]

Raises

MsticpyNotConnectedError – If called before driver is connected.

static get_http_timeout(**kwargs)

Get http timeout from settings or kwargs.

property loaded: bool

Return true if the provider is loaded.

Returns

True if the provider is loaded.

Return type

bool

Notes

This is not relevant for some providers.

query(query: str, query_source: Optional[msticpy.data.query_source.QuerySource] = None, **kwargs) Union[pandas.core.frame.DataFrame, Any]

Execute splunk query and retrieve results via OneShot or async search mode.

Parameters
  • query (str) – Splunk query to execute via OneShot or async search mode

  • query_source (QuerySource) – The query definition object

  • kwargs

    Are passed to Splunk oneshot method count=0 by default oneshot=False by default for async query,

    set to True for oneshot (blocking) mode

Returns

Query results in a dataframe. or query response if an error.

Return type

Union[pd.DataFrame, Any]

property query_attach_spec: Dict[str, Set[str]]

Parameters that determine whether a query is relevant for the driver.

query_with_results(query: str, **kwargs) Tuple[pandas.core.frame.DataFrame, Any]

Execute query string and return DataFrame of results.

Parameters

query (str) – Query to execute against splunk instance.

Returns

A DataFrame (if successful) or the underlying provider result if an error occurs.

Return type

Union[pd.DataFrame,Any]

property schema: Dict[str, Dict]

Return current data schema of connection.

Returns

Data schema of current connection.

Return type

Dict[str, Dict]

property service_queries: Tuple[Dict[str, str], str]

Return dynamic queries available on connection to service.

Returns

Dictionary of query_name, query_text. Name of container to add queries to.

Return type

Tuple[Dict[str, str], str]

msticpy.data.drivers.sumologic_driver module

Module contents

Data provider sub-package.

msticpy.data.drivers.import_driver(data_environment: msticpy.data.query_defns.DataEnvironment) type

Import driver class for a data environment.