Data Provider Library

Description:

This package provides functions to allow for the defining of data sources, connectors to them, and queries for them as well as the ability to call these elements to return query result from the defined data sources. The package currently support connections to Log Analytics/Azure Sentinel/Azure Security Center, and the Microsoft Security Graph.

The first step in using this package is to install the msticpy package.

!pip install msticpy --upgrade --user
Collecting git+https://github.com/microsoft/msticpy
Building wheels for collected packages: msticpy
  Building wheel for msticpy (setup.py): started
  Building wheel for msticpy (setup.py): finished with status 'done'
Successfully built msticpy
Installing collected packages: msticpy
Successfully installed msticpy-0.2.1
#Check we are running Python 3.6
import sys
MIN_REQ_PYTHON = (3,6)
if sys.version_info < MIN_REQ_PYTHON:
    print('Check the Kernel->Change Kernel menu and ensure that Python 3.6')
    print('or later is selected as the active kernel.')
    sys.exit("Python %s.%s or later is required.\n" % MIN_REQ_PYTHON)

#imports
import yaml
import msticpy.nbtools as nbtools

#data library imports
from msticpy.data.data_providers import QueryProvider
import msticpy.data.data_query_reader as QueryReader
from msticpy.data.param_extractor import extract_query_params
import msticpy.nbtools as mas

print('Imports Complete')
Imports Complete

Instantiating a Query Provider

In order to connect to and query a data source we need to define what sort of Data Environment we want to connect to and query (in this Notebook we will use Log Analytics as an example). To view the options available you can call QueryProvider.list_data_environments() which will return a list of all the available options.

After selecting a Data Environment we can initialize our Query Provider by calling QueryProvider(DATA_ENVIRONMENT). This will load the relevant driver for connecting to the data environment we have selected as well as provisioning a query store for us and adding queries from our default query directory.

There are two other optional parameters we can pass when initializing our Query Providers to further customize it: * We can also chose to initialize our Query Provider with a driver other than the default one with:

QueryProvider(
    data_environment=DATA_ENVIRONMENT,
    driver=QUERY_DRIVER
)

* We can choose to import queries from a custom query directory (see Creating new queries for more details) with:

QueryProvider(
    data_environment=DATA_ENVIRONMENT,
    driver=QUERY_DRIVER,
    query_path=QUERY_DIRECTORY_PATH
)

For now we will simply create a Query Provider with default values.

Query provider interface to queries.

Parameters
----------
data_environment : Union[str, DataEnvironment]
    Name or Enum of environment for the QueryProvider
driver : DriverBase, optional
    Override the built-in driver (query execution class)
    and use your own driver (must inherit from
    `DriverBase`)
# List the data environments available
data_environments = QueryProvider.list_data_environments()
print(data_environments)

# Create a query provider for Azure Sentinel/Log Analytics
qry_prov = QueryProvider(data_environment='LogAnalytics')
['LogAnalytics', 'Kusto', 'AzureSecurityCenter', 'SecurityGraph']
Please wait. Loading Kqlmagic extension...

Kql Query Language, aka kql, is the query language for advanced analytics on Azure Monitor resources. The current supported data sources are Azure Data Explorer (Kusto), Log Analytics and Application Insights. To get more information execute '%kql --help "kql"'

• kql reference: Click on 'Help' tab > and Select 'kql reference' or execute '%kql --help "kql"'
• Kqlmagic configuration: execute '%config Kqlmagic'
• Kqlmagic usage: execute '%kql --usage'

Connecting to a Data Environment

Once we have instantiated the query provider and loaded the relevant driver we can connect to the Data Environment. This is done by calling the connect() function of the Query Provider we just initialized and passing it a connection string to use.

For Log Analytics/Azure Sentinel the connection string is in the format of loganalytics://code().tenant(“TENANT_ID”).workspace(“WORKSPACE_ID”). Other Data Environments will have different connection string formats.

Documentation string

connect(self, connection_str: str, **kwargs):

    Connect to data source.

    Parameters
    ----------
    connection_string : str
        Connection string for the data source

Example

ws_id = input('Workspace ID')
ten_id = input('Tenant ID')
la_connection_string = f'loganalytics://code().tenant("{ten_id}").workspace("{ws_id}")'
qry_prov.connect(connection_str=f'{la_connection_string}')
Workspace ID xxxxxxxxxxxxxxxxxxxxxxxxxxx
Tenant ID xxxxxxxxxxxxxxxxxxxxxxxxxxx

Note

The KQL provider now supports authentication via the Azure CLI and Managed System Identities <https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview>_. To use these authentication methods, pass local() with either the cli or msi keyword arguements: la_connection_string = f'loganalytics://code().tenant("{ten_id}").workspace("{ws_id}")' qry_prov.connect(connection_str=f'{la_connection_string}', cli=locals())

List of current built-in queries

See this document MSTICPy built-in queries

Connecting to an Azure Sentinel Workspace

The previous example showed making a connection to an Azure Sentinel workspace by manually creating a connection string. msticpy has functions to build this connection string for you and some flexible configuration options allowing you to store and manage the settings for multiple workspaces.

Configuration in config.json

When you load a notebook from the Azure Sentinel UI (either in Azure Notebooks or in an Azure Machine Learning Workspace) a configuration file config.json is provisioned for you with the details of the source workspace populated in the file. An example is shown here.

{
    "tenant_id": "335b56ab-67a2-4118-ac14-6eb454f350af",
    "subscription_id": "b8f250f8-1ba5-4b2c-8e74-f7ea4a1df8a6",
    "resource_group": "ExampleWorkspaceRG",
    "workspace_id": "271f17d3-5457-4237-9131-ae98a6f55c37",
    "workspace_name": "ExampleWorkspace"
}

msticpy will automatically look for a config.json file in the current directory. If not found here, it will search the parent directory and in all its subdirectories. It will use the first config.json file found.

The class that searches for and loads your config.json is WorkspaceConfig. See WorkspaceConfig API documentation

WorkspaceConfig also works with workspace configuration stored in msticpyconfig.yaml (see next section).

To use WorkspaceConfig, simple create an instance of it. It will automatically build your connection string for use with the query provider library.

>>> ws_config = WorkspaceConfig()
>>> ws_config.code_connect_str

"loganalytics://code().tenant('335b56ab-67a2-4118-ac14-6eb454f350af').workspace('271f17d3-5457-4237-9131-ae98a6f55c37')"

You can use this connection string in the call to QueryProvider.connect()

qry_prov.connect(connection_str=ws_config.code_connect_str)

If you need use a specific instance of a config.json you can specify a full path to the file you want to use when you create your WorkspaceConfig instance.

ws_config = WorkspaceConfig(config_file="~/myworkspaces/ws123-config.json")
ws_config.code_connect_str

Configuration in msticpyconfig.yaml

You can also store workspace details in your msticpyconfig.yaml file. This has some advantages over using a config.json:

  • you can store multiple workspace definitions
  • you can use an environment variable to specify its location

You likely need to use a msticpyconfig.yaml anyway. If you are using other msticpy features such as Threat Intelligence Providers, GeoIP Lookup, Azure Data, etc., these all have their own configuration settings, so using a single configuration file makes managing your settings easier. The one downside to using msticpyconfig.yaml is that you have to populate the workspace settings manually.

For more information on using and configuring msticpyconfig.yaml see msticpy Package Configuration

The Azure Sentinel connection settings are stored in the AzureSentinel\Workspaces section of the file.

AzureSentinel:
  Workspaces:
    # Workspace used if you don't explicitly name a workspace when creating WorkspaceConfig
    # Specifying values here overrides config.json settings unless you explicitly load
    # WorkspaceConfig with config_file parameter (WorkspaceConfig(config_file="../config.json")
    Default:
      WorkspaceId: "271f17d3-5457-4237-9131-ae98a6f55c37"
      TenantId: "335b56ab-67a2-4118-ac14-6eb454f350af"
    # To use these launch with an explicit name - WorkspaceConfig(workspace_name="Workspace2")
    Workspace2:
      WorkspaceId: "c88dd3c2-d657-4eb3-b913-58d58d811a41"
      TenantId: "335b56ab-67a2-4118-ac14-6eb454f350af"
    Workspace3:
      WorkspaceId: "17e64332-19c9-472e-afd7-3629f299300c"
      TenantId: "4ea41beb-4546-4fba-890b-55553ce6003a"

If you only use a single workspace, you only need to create a Default entry and add the values for your WorkspaceID and TenantID. You can add other entries here, for example, SubscriptionID, ResourceGroup but these are not currently used by msticpy.

Note

The property names are spelled differently to the values in the config.json so be sure to enter these as shown in the example. These names are case-sensitive, so they should be entered as shown.

If you use multiple workspaces, you can add further entries here. Each workspace entry is normally the name of the Azure Sentinel workspace but you can use any name you prefer.

To see which workspaces are configured in your msticpyconfig.yaml use the list_workspaces() function.

Tip

list_workspaces is a class function, so you do not need to instantiate a WorkspaceConfig to call this function.

>>> WorkspaceConfig.list_workspaces()

{'Default': {'WorkspaceId': '271f17d3-5457-4237-9131-ae98a6f55c37',
  'TenantId': '335b56ab-67a2-4118-ac14-6eb454f350af'},
 'Workspace2': {'WorkspaceId': 'c88dd3c2-d657-4eb3-b913-58d58d811a41',
   'TenantId': '335b56ab-67a2-4118-ac14-6eb454f350af'},
 'Workspace3': {'WorkspaceId': '17e64332-19c9-472e-afd7-3629f299300c',
   'TenantId': '4ea41beb-4546-4fba-890b-55553ce6003a'}}

If you run WorkspaceConfig with no parameters it will try to load values from the “Default” entry in msticpyconfig.yaml. If this fails it will fall back to searching for a config.json as described in the previous section.

Tip

You can duplicate the settings as the Default entry in another named entry so that you can load it by name.

To load settings for a specific workspace use the workspace_name parameter to specify the workspace that you want to connect to. workspace_name is the name of the workspace entry that you created in the msticpyconfig section added under AzureSentinel\\Workspaces - not necessarily that actual name of the workspace.

ws_config = WorkspaceConfig(workspace_name="Workspace2")

Entries in msticpyconfig always take precedence over settings in your config.json. If you want to force use of the config.json, specify the path to the config.json file in the config_file parameter to WorkspaceConfig.

Warning

Although msticpy allows you to configure multiple entries for workspaces in different tenants, you cannot currently authenticate to workspaces that span multiple tenants in the same notebook. If you need to do this, you should investigate Azure Lighthouse. This allows delegated access to workspaces in multiple tenants from a single tenant.

Connecting to an OData Source

OData driver API documentation

You can also connect to OData based data sources such as the MDATP API, or the Security Graph API. These connections often rely on having a dedicated Azure AD app for handling the authentication process.

MDATP

MDATP driver API documentation

Details on registering an Azure AD application for MDATP can be found here. Once you have registered the application you can use it to connect to the MDATP API via the MDATP Data Environment.

When connecting the required elements for connection can be passed in a number of ways. The simpliest is to pass the required elements as kwargs. The required elements are:

  • tenant_id – The tenant ID of the MDATP workspace to connect to.
  • client_id – The ID of the application registered for MDATP.
  • client_secret – The secret used for by the application.
ten_id = input('Tenant ID')
client_id = input('Client ID')
client_secret = input('Client Secret')
mdatp_prov = QueryProvider('MDATP')
mdatp_prov.connect(tenant_id=ten_id, client_id=client_id, client_secret=client_secret)

Alternatively you can store these details in the msticpyconfig.yaml file. Details should be included in the following format:

MDATPApp:
    Args:
      ClientId: "CLIENT ID"
      ClientSecret: "CLIENT SECRET"
      TenantId: "TENANT ID"

To use the stored variables when connecting pass app_name to the connect function with the value passed being the heading used in msticpyconfig.yaml

mdatp_prov = QueryProvider('MDATP')
mdatp_prov.connect(app_name="MDATPApp")

For examples of using the MDATP connector see the sample MDATPQuery Notebook.

Security Graph API

Security Graph driver API documentation

Connecting to the Security Graph API follows the same format as MDATP connections with connection variables passed to the function in the same way.

Details for registering an application for the Security Graph API can be found here.

mdatp_prov = QueryProvider('SecurityGraph')
mdatp_prov.connect(app_name="SecurityGraphApp")

Using Local Data - the LocalData provider

LocalData driver documentation

The LocalData data provider is intended primarily for testing or demonstrations where you may not be able to connect to an online data source reliably.

The data backing this driver can be in the form of a pickled pandas DataFrame or a CSV file. In either case the data is converted to a DataFrame to be returned from the query. Usage of this driver is a little different to most other drivers:

  • You will need to provide a path to your data files when initializing the query provider (by default it will search in the current folder).
  • You will also need to provide a query definition file (see following example) that maps the data file names that you are using to query names. The path to search for this is specified in the query_paths parameter (see code examples below).
  • Parameters to queries are ignored.

To define the queries you need to create a query definition file. This is an example of a LocalData yaml query file. It is similar to the query definition files for other providers but simpler. It only requires a description, the data family that the query should be grouped under and the name of the file containing the data.

metadata:
    version: 1
    description: Local Data Alert Queries
    data_environments: [LocalData]
    data_families: [SecurityAlert, WindowsSecurity, Network]
    tags: ['alert', 'securityalert', 'process', 'account', 'network']
defaults:
sources:
    list_alerts:
        description: Retrieves list of alerts
        metadata:
            data_families: [SecurityAlert]
        args:
            query: alerts_list.pkl
        parameters:
    list_host_logons:
        description: List logons on host
        metadata:
            data_families: [WindowsSecurity]
        args:
            query: host_logons.csv
        parameters:
    list_network_alerts:
        description: List network alerts.
        args:
            query: net_alerts.csv
        parameters:

In this example the value for the “query” is just the file name. If the queries in your file are a mix of data from different data families, you can group them by specifying one or more values for data_families. If this isn’t specified for an individual query, it will inherit the setting for data_families in the global metadata section at the top of the file. Specifying more than one value for data_families will add links to the query under each data family grouping. This is to allow for cases where a query may be relevant to multiple categories. The data_families control only how the queries appear in query provider and don’t affect any other aspects of the query operation.

In the example shown, the list_alerts query has been added to the SecurityAlert attribute of the query provider, while list_host_logons is member of WindowsSecurity. The entry for list_network_alerts had no data_families attribute so inherits the values from the file’s metadata. Since this has multiple values, the query is added to all three families.

# Structure of the query attributes added to the query provider
qry_prov.list_queries()
Network.list_host_logons
Network.list_network_alerts
        ...<other queries>
SecurityAlert.list_alerts
SecurityAlert.list_network_alerts
        ...<other queries>
WindowsSecurity.list_host_logons
WindowsSecurity.list_network_alerts

For more details about the query definition file structure see Creating new queries.

To use the LocalData provider:

  1. Collect your data files into one or more directories or directory trees (the default location to search for data file is the current directory). Subdirectories are searched for “.pkl” and “.csv” files but only file names matching your query definitions will loaded.
  2. Create one or more query definition yaml files (following the pattern above) and place these in a directory (this can be the same as the data files). The query provider will load and merge definitions from multiple YAML files.

QueryProvider defaults to searching for data files in the current directory and subdirectories. The default paths for query definition files are a) the built-in package queries path (msticpy/data/queries) and b) any custom paths that you have added to msticpyconfig.yaml (see msticpy Package Configuration).

Note

The query definition files must have a .yaml extension.

# Creating a query provider with "LocalData" parameter
qry_prov = QueryProvider("LocalData")

# list the queries loaded
print(qry_prov.list_queries())

# run a query
my_alerts = qry_prov.SecurityAlert.list_alerts()

# Specify path to look for data files
data_path = "./my_data"
qry_prov = QueryProvider("LocalData", data_paths=[data_path])

# Show the schema of the data files read in
print(qry_prov.schema)

# Specify both data and query locations
data_path = "./my_data"
query_path = "./myqueries"
qry_prov = QueryProvider("LocalData", data_paths=[data_path], query_paths=[query_path])

host_logons_df = qry_prov.WindowsSecurity.list_host_logons()

# parameters are accepted but ignored
host_logons_df = qry_prov.WindowsSecurity.list_host_logons(
    start=st_date,
    end=end_date,
    host_name="myhost.com",
)

Listing available queries

Upon connecting to the relevant Data Environment we need to look at what query options we have available to us. In order to do this we can call

query_provider.list_queries().

This will return a list all queries in our store.

Note

An individual query may be listed multiple times if it was added to multiple data families.

The results returned show the data family the query belongs to and the name of the specific query.

list_queries(self):

    Return list of family.query in the store.

    Returns
    -------
    Iterable[str]
        List of queries
qry_prov.list_queries()
LinuxSyslog.all_syslog
LinuxSyslog.cron_activity
LinuxSyslog.squid_activity
LinuxSyslog.sudo_activity
LinuxSyslog.user_group_activity
LinuxSyslog.user_logon
SecurityAlert.get_alert
SecurityAlert.list_alerts
SecurityAlert.list_alerts_counts
SecurityAlert.list_alerts_for_ip
SecurityAlert.list_related_alerts
WindowsSecurity.get_host_logon
WindowsSecurity.get_parent_process
WindowsSecurity.get_process_tree
WindowsSecurity.list_host_logon_failures
WindowsSecurity.list_host_logons
WindowsSecurity.list_host_processes
WindowsSecurity.list_hosts_matching_commandline
WindowsSecurity.list_matching_processes
WindowsSecurity.list_processes_in_session

Each of these items is a callable function that will return results as a pandas DataFrame.

Getting Help for a query

To get further details on a specific query call:

qry_prov.{query_group}.{query_name}(‘?’) or

qry_prov.{query_group}.{query_name}(‘help’)

or you can use the builtin Python help:

help(qry_prov.{query_group}.{query_name})

qry_prov is the name of your query provider object.

This will display:

  • Query Name
  • What Data Environment it is designed for
  • A short description of what the query does
  • What parameters the query can be passed
  • The raw (unparameterized) query that will be run
qry_prov.SecurityAlert.list_alerts('?')
Query:  list_alerts
Data source:  LogAnalytics
Retrieves list of alerts

Parameters
----------
add_query_items: str (optional)
    Additional query clauses
end: datetime
    Query end time
path_separator: str (optional)
    Path separator
    (default value is: \)
query_project: str (optional)
    Column project statement
    (default value is:  | project-rename StartTimeUtc = StartTime, EndTim...)
start: datetime
    Query start time
subscription_filter: str (optional)
    Optional subscription/tenant filter expression
    (default value is: true)
table: str (optional)
    Table name
    (default value is: SecurityAlert)
Query:
 {table} {query_project}
 | where {subscription_filter}
 | where TimeGenerated >= datetime({start})
 | where TimeGenerated <= datetime({end})
 | extend extendedProps = parse_json(ExtendedProperties)
 | extend CompromisedEntity = tostring(extendedProps["Compromised Host"])
 | project-away extendedProps {add_query_items}

Running a pre-defined query

To run a query from our query store we again call qry_prov.{query_group}.{query_name}(**kwargs) but this time we simply pass required parameters for that query as key word arguments.

This will return a Pandas DataFrame of the results with the columns determined by the query parameters. Should the query fail for some reason an exception will be raised.

alerts = qry_prov.SecurityAlert.list_alerts(
    start='2019-07-21 23:43:18.274492',
    end='2019-07-27 23:43:18.274492'
)
alerts.head()
TenantId TimeGenerated AlertDisplayName AlertName Severity Description ProviderName VendorName VendorOriginalId SystemAlertId ... ExtendedProperties Entities SourceSystem WorkspaceSubscriptionId WorkspaceResourceGroup ExtendedLinks ProductName ProductComponentName Type CompromisedEntity
0 b1315f05-4a7a-45b4-811f-73e715f7c122 2019-07-22 06:35:13 Suspicious authentication activity Suspicious authentication activity Medium Although none of them succeeded, some of them ... Detection Microsoft 8af9954d-f28d-40ff-a079-d9d4cc5a5268 2518385291989119899_8af9954d-f28d-40ff-a079-d9... ... {\r\n "Activity start time (UTC)": "2019/07/2... [\r\n {\r\n "$id": "4",\r\n "HostName":... Detection 3b701f84-d04b-4479-89b1-fa8827eb537e sentineltest [\r\n {\r\n "Href": "https://interflowwebp... Azure Security Center SecurityAlert
1 b1315f05-4a7a-45b4-811f-73e715f7c122 2019-07-22 06:35:13 Suspicious authentication activity Suspicious authentication activity Medium Although none of them succeeded, some of them ... Detection Microsoft 8af9954d-f28d-40ff-a079-d9d4cc5a5268 5d60fff6-7dd2-4474-a4d0-4c8e3fa6fad6 ... {\r\n "Activity start time (UTC)": "2019/07/2... [\r\n {\r\n "$id": "4",\r\n "HostName":... Detection 3b701f84-d04b-4479-89b1-fa8827eb537e sentineltest [\r\n {\r\n "Href": "https://interflowwebp... Azure Security Center SecurityAlert
2 b1315f05-4a7a-45b4-811f-73e715f7c122 2019-07-22 07:02:42 Traffic from unrecommended IP addresses was de... Traffic from unrecommended IP addresses was de... Low Azure security center has detected incoming tr... AdaptiveNetworkHardenings Microsoft ba07c315-0af5-4568-9ecd-6c788f9267ae b7adb73b-0778-4929-b46a-c0ed642bc61f ... {\r\n "Destination Port": "3389",\r\n "Proto... [\r\n {\r\n "$id": "4",\r\n "ResourceId... Detection [\r\n {\r\n "DetailBladeInputs": "protecte... Azure Security Center SecurityAlert
3 b1315f05-4a7a-45b4-811f-73e715f7c122 2019-07-26 06:03:16 Traffic from unrecommended IP addresses was de... Traffic from unrecommended IP addresses was de... Low Azure security center has detected incoming tr... AdaptiveNetworkHardenings Microsoft c3144593-9bae-448e-87dd-b2d3c47de571 d89ad3b2-f7a7-4cff-b8a4-3f6fa58b4760 ... {\r\n "Destination Port": "22",\r\n "Protoco... [\r\n {\r\n "$id": "4",\r\n "ResourceId... Detection [\r\n {\r\n "DetailBladeInputs": "protecte... Azure Security Center SecurityAlert
4 b1315f05-4a7a-45b4-811f-73e715f7c122 2019-07-23 06:42:01 Traffic from unrecommended IP addresses was de... Traffic from unrecommended IP addresses was de... Low Azure security center has detected incoming tr... AdaptiveNetworkHardenings Microsoft 4e4173a6-1a27-451f-8a3c-25d10b306c30 11813ab7-ab7c-4719-b0a1-ccb5d4a32223 ... {\r\n "Destination Port": "3389",\r\n "Proto... [\r\n {\r\n "$id": "4",\r\n "ResourceId... Detection [\r\n {\r\n "DetailBladeInputs": "protecte... Azure Security Center SecurityAlert

5 rows × 30 columns


It is also possible to pass queries objects as arguments before defining keyword arguments. For example if I wanted to define query times as an object rather than defining a start and end via keyword arguments I could simply pass a querytimes object to the pre-defined query.

query_times = mas.nbwidgets.QueryTime(
    units='day', max_before=40, max_after=1, before=5
)
query_times.display()

Running the above cell will display an interactive data range selector. You can use that when running a query to automatically supply the start and end parameters for the query

qry_prov.SecurityAlert.list_alerts(query_times)
TenantId TimeGenerated AlertDisplayName AlertName Severity Description ProviderName VendorName VendorOriginalId SystemAlertId ... ExtendedProperties Entities SourceSystem WorkspaceSubscriptionId WorkspaceResourceGroup ExtendedLinks ProductName ProductComponentName Type CompromisedEntity
0 b1315f05-4a7a-45b4-811f-73e715f7c122 2019-07-26 06:03:16 Traffic from unrecommended IP addresses was de... Traffic from unrecommended IP addresses was de... Low Azure security center has detected incoming tr... AdaptiveNetworkHardenings Microsoft c3144593-9bae-448e-87dd-b2d3c47de571 d89ad3b2-f7a7-4cff-b8a4-3f6fa58b4760 ... {\r\n "Destination Port": "22",\r\n "Protoco... [\r\n {\r\n "$id": "4",\r\n "ResourceId... Detection [\r\n {\r\n "DetailBladeInputs": "protecte... Azure Security Center SecurityAlert
1 b1315f05-4a7a-45b4-811f-73e715f7c122 2019-07-23 06:42:01 Traffic from unrecommended IP addresses was de... Traffic from unrecommended IP addresses was de... Low Azure security center has detected incoming tr... AdaptiveNetworkHardenings Microsoft 4e4173a6-1a27-451f-8a3c-25d10b306c30 11813ab7-ab7c-4719-b0a1-ccb5d4a32223 ... {\r\n "Destination Port": "3389",\r\n "Proto... [\r\n {\r\n "$id": "4",\r\n "ResourceId... Detection [\r\n {\r\n "DetailBladeInputs": "protecte... Azure Security Center SecurityAlert
2 b1315f05-4a7a-45b4-811f-73e715f7c122 2019-07-22 06:35:13 Suspicious authentication activity Suspicious authentication activity Medium Although none of them succeeded, some of them ... Detection Microsoft 8af9954d-f28d-40ff-a079-d9d4cc5a5268 2518385291989119899_8af9954d-f28d-40ff-a079-d9... ... {\r\n "Activity start time (UTC)": "2019/07/2... [\r\n {\r\n "$id": "4",\r\n "HostName":... Detection 3b701f84-d04b-4479-89b1-fa8827eb537e sentineltest [\r\n {\r\n "Href": "https://interflowwebp... Azure Security Center SecurityAlert
3 b1315f05-4a7a-45b4-811f-73e715f7c122 2019-07-22 06:35:13 Suspicious authentication activity Suspicious authentication activity Medium Although none of them succeeded, some of them ... Detection Microsoft 8af9954d-f28d-40ff-a079-d9d4cc5a5268 5d60fff6-7dd2-4474-a4d0-4c8e3fa6fad6 ... {\r\n "Activity start time (UTC)": "2019/07/2... [\r\n {\r\n "$id": "4",\r\n "HostName":... Detection 3b701f84-d04b-4479-89b1-fa8827eb537e sentineltest [\r\n {\r\n "Href": "https://interflowwebp... Azure Security Center SecurityAlert
4 b1315f05-4a7a-45b4-811f-73e715f7c122 2019-07-22 07:02:42 Traffic from unrecommended IP addresses was de... Traffic from unrecommended IP addresses was de... Low Azure security center has detected incoming tr... AdaptiveNetworkHardenings Microsoft ba07c315-0af5-4568-9ecd-6c788f9267ae b7adb73b-0778-4929-b46a-c0ed642bc61f ... {\r\n "Destination Port": "3389",\r\n "Proto... [\r\n {\r\n "$id": "4",\r\n "ResourceId... Detection [\r\n {\r\n "DetailBladeInputs": "protecte... Azure Security Center SecurityAlert

5 rows × 30 columns


Running an ad hoc query

It is also possible to run ad hoc queries via a similar method. Rather than calling a named query from the Query Provider query store, we can pass a query directly to our Query Provider with:

query_provider.exec_query(query= query_string)

This will execute the query string passed in the parameters with the driver contained in the Query Provider and return data in a Pandas DataFrame. As with predefined queries an exception will be raised should the query fail to execute.

query(self, query: str) -> Union[pd.DataFrame, Any]:
    Execute query string and return DataFrame of results.

    Parameters
    ----------
    query : str
        The kql query to execute

    Returns
    -------
    Union[pd.DataFrame, results.ResultSet]
        A DataFrame (if successful) or
        Kql ResultSet if an error.
test_query = '''
    SecurityAlert
    | take 5
    '''

query_test = qry_prov.exec_query(query=test_query)
query_test.head()
TenantId TimeGenerated DisplayName AlertName AlertSeverity Description ProviderName VendorName VendorOriginalId SystemAlertId ... RemediationSteps ExtendedProperties Entities SourceSystem WorkspaceSubscriptionId WorkspaceResourceGroup ExtendedLinks ProductName ProductComponentName Type
0 b1315f05-4a7a-45b4-811f-73e715f7c122 2019-07-22 06:35:13 Suspicious authentication activity Suspicious authentication activity Medium Although none of them succeeded, some of them ... Detection Microsoft 8af9954d-f28d-40ff-a079-d9d4cc5a5268 2518385291989119899_8af9954d-f28d-40ff-a079-d9... ... [\r\n "1. Enforce the use of strong passwords... {\r\n "Activity start time (UTC)": "2019/07/2... [\r\n {\r\n "$id": "4",\r\n "HostName":... Detection 3b701f84-d04b-4479-89b1-fa8827eb537e sentineltest [\r\n {\r\n "Href": "https://interflowwebp... Azure Security Center SecurityAlert
1 b1315f05-4a7a-45b4-811f-73e715f7c122 2019-07-22 06:35:13 Suspicious authentication activity Suspicious authentication activity Medium Although none of them succeeded, some of them ... Detection Microsoft 8af9954d-f28d-40ff-a079-d9d4cc5a5268 5d60fff6-7dd2-4474-a4d0-4c8e3fa6fad6 ... [\r\n "1. Enforce the use of strong passwords... {\r\n "Activity start time (UTC)": "2019/07/2... [\r\n {\r\n "$id": "4",\r\n "HostName":... Detection 3b701f84-d04b-4479-89b1-fa8827eb537e sentineltest [\r\n {\r\n "Href": "https://interflowwebp... Azure Security Center SecurityAlert
2 b1315f05-4a7a-45b4-811f-73e715f7c122 2019-07-22 07:02:42 Traffic from unrecommended IP addresses was de... Traffic from unrecommended IP addresses was de... Low Azure security center has detected incoming tr... AdaptiveNetworkHardenings Microsoft ba07c315-0af5-4568-9ecd-6c788f9267ae b7adb73b-0778-4929-b46a-c0ed642bc61f ... [\r\n "1. Review the IP addresses and determi... {\r\n "Destination Port": "3389",\r\n "Proto... [\r\n {\r\n "$id": "4",\r\n "ResourceId... Detection [\r\n {\r\n "DetailBladeInputs": "protecte... Azure Security Center SecurityAlert
3 b1315f05-4a7a-45b4-811f-73e715f7c122 2019-07-26 06:03:16 Traffic from unrecommended IP addresses was de... Traffic from unrecommended IP addresses was de... Low Azure security center has detected incoming tr... AdaptiveNetworkHardenings Microsoft c3144593-9bae-448e-87dd-b2d3c47de571 d89ad3b2-f7a7-4cff-b8a4-3f6fa58b4760 ... [\r\n "1. Review the IP addresses and determi... {\r\n "Destination Port": "22",\r\n "Protoco... [\r\n {\r\n "$id": "4",\r\n "ResourceId... Detection [\r\n {\r\n "DetailBladeInputs": "protecte... Azure Security Center SecurityAlert
4 b1315f05-4a7a-45b4-811f-73e715f7c122 2019-06-27 00:31:35 Security incident with shared process detected Security incident with shared process detected High The incident which started on 2019-06-25 21:24... Detection Microsoft be88b671-2572-4373-af4a-323849b1da1d 2518408029550429999_be88b671-2572-4373-af4a-32... ... [\r\n "1. Escalate the alert to the informati... {\r\n "isincident": "true",\r\n "Detected Ti... [\r\n {\r\n "$id": "4",\r\n "DisplayNam... Detection 3b701f84-d04b-4479-89b1-fa8827eb537e sentineltest Azure Security Center SecurityAlert

5 rows × 29 columns

Splitting Query Execution into Chunks

Some queries return too much data or take too long to execute in a single request. The MSTICPy data providers have an option to split a query into time ranges. Each sub-range is run as an independent query and the results are combined before being returned as a DataFrame.

To use this feature you must specify the keyword parameter split_queries_by when executing the query function. The value to this parameter is a string that specifies a time period. The time range specified by the start and end parameters to the query is split into sub-ranges each of which are the length of the split time period. For example, if you specify split_queries_by="1H" the query will be split into one hour chunks.

Note

The final chunk may cover a time period larger or smaller than the split period that you specified in the split_queries_by parameter. This can happen if start and end are not aligned exactly on time boundaries (e.g. if you used a one hour split period and end is 10 hours 15 min after start. The query split logic will create a larger final slice if end is close to the final time range or it will insert an extra time range to ensure that the full start* to end time range is covered.

The sub-ranges are used to generate a query for each time range. The queries are then executed in sequence and the results concatenated into a single DataFrame before being returned.

The values acceptable for the split_queries_by parameter have the format:

{N}{TimeUnit}

where N is the number of units and TimeUnit is a mnemonic of the unit, e.g. H = hour, D = day, etc. For the full list of these see the documentation for Timedelta in the pandas documentation

Warning

There are some important caveats to this feature.

  1. It currently only works with pre-defined queries (including ones that you may create and add yourself, see Creating new queries below). It does not work with Running an ad hoc query
  2. If the query contains joins, the joins will only happen within the time ranges of each subquery.
  3. It only supports queries that have start and end parameters.
  4. Very large queries may return results that can exhaust the memory on the Python client machine.
  5. Duplicate records are possible at the time boundaries. The code tries to avoid returning duplicate records occurring exactly on the time boundaries but some data sources may not use granular enough time stamps to avoid this.

Creating new queries

msticpy provides a number of pre-defined queries to call with using the data package. You can also add in additional queries to be imported and used by your Query Provider, these are defined in YAML format files and examples of these files can be found at the msticpy GitHub site https://github.com/microsoft/msticpy/tree/master/msticpy/data/queries.

The required structure of these query definition files is as follows.

At the top level the file has the following keys: - metadata - defaults - sources

These are described in the following sections.

The metadata section

  • version: The version number of the definition file
  • description: A description of the purpose of this collection of query definitions
  • data_environments []: A list of the Data Environments that the defined queries can be run against (1 or more)
  • data_families []: A list of Data Families the defined queries related to, these families are defined as part of msticpy.data.query_defns but you can add custom ones.
  • tags []: A list of tags to help manage definition files (this is not currently used

The defaults section

A set of defaults that apply to all queries in the file. You can use this section to define parameters that are common to all of the queries in the file. Child keys of the defaults section are inherited by the query definitions in the file.

  • metadata: Metadata regarding a query - data_source: The data source to be used for the query
  • parameters: parameter defaults for the queries (the format of the parameters section is the same as described in the sources section.

The sources section

Each key in the sources section defines a new query. The name of the key is the query name and must be unique and a valid Python identifier. Each query key has the following structure:

  • description: this is used to display help text for the query.
  • metadata: (optional) - if you want to override the global metadata for this query
  • args: The primary item here is the query text.
    • query: usually a multi-line string that will be passed to the data provider. The string is usually parameterized, the parameters being denoted by surrounding them with single braces ({}). If you need to include literal braces in the query, type two braces. For example:: “this {{literal_string}}” ->> “this {literal_string}” Surround your query string with single quotes.
    • uri: this is currently not used.
  • parameters: The parameters section defines the name, data type and optional default value for each parameter that will be substituted into the query before being passed to the data provider. Each parameter must have a unique name (for each query, not globally). All parameters specified in the query text must have an entry here or in the file defauls section. The parameter subsection has the following sub-keys:
    • description: A description of what the parameter is (used for generating documentation strings.
    • type: The data type of the parameter. Valid types include: “str”, “int”, “float”, “list” and “datetime”. The list and datetime types cause additional formatting to be applied (such as converting from a datestring)
    • default: (optional) the default value for that parameter. Any parameter that does not have a default value (here or in the file defaults section) must be supplied at query time.

Some common parameters used in the queries are:

  • table: making this a substitutable parameter allows you to use the same query with different data sets. More commonly, you can add additional filtering statements here, for example:
table:
    description: The table name
    type: str
    default: SecurityEvent | where EventID == 4624
  • add_query_items: This is a useful way of extending queries by adding ad hoc statements to the end of the query (e.g. additional filtering order summarization).

Using yaml aliases and macros in your queries

You can use standard yaml aliasing to define substitutable strings in your query definitions. E.g. you might have a parameter default that is a long string expression. Define an alias in the aliases key of the file metadata section. An alias is defined by prefixing the name with “&”. The alias is referenced (and inserted) by using the alias name prefixed with “*”

metadata:
    ...
    aliases:
        - &azure_network_project '| project TenantId, TimeGenerated,
            FlowStartTime = FlowStartTime_t,
            FlowEndTime = FlowEndTime_t,
            FlowIntervalEndTime = FlowIntervalEndTime_t,
            FlowType = FlowType_s,
            ResourceGroup = split(VM_s, "/")[0],
            VMName = split(VM_s, "/")[1],
            VMIPAddress = VMIP_s'
    ...
sources:
    list_azure_network_flows_by_host:
        description: Retrieves Azure network analytics flow events.
        ...
        parameters:
            ...
            query_project:
                description: Column project statement
                type: str
                default: *azure_network_project

You can also use macros, which work like parameters but are substituted into the query before any parameter substitution is carried out. This allows you to, for example, use a single base query but with different filter and summarization clauses defined as macros. The macro text is substituted into the main query.

Macros are added to the query_macros subkey of a query. They have two subkeys: description and value. value defines the text to be inserted. The key name is the name of the macro.

In the query, you denote the substitution point by surrounding the macro name with “$<” and “>$”. This is show in the example below.

- query: '
    {table}
    | where SubType_s == "FlowLog"
    | where FlowStartTime_t >= datetime({start})
    | where FlowEndTime_t <= datetime({end})
    $<query_condition>$
    | where (AllowedOutFlows_d > 0 or AllowedInFlows_d > 0)
    {query_project}
    | extend AllExtIPs = iif(isempty(PublicIPs), pack_array(ExtIP),
                     iif(isempty(ExtIP), PublicIPs, array_concat(PublicIPs, pack_array(ExtIP)))
                     )
    | project-away ExtIP
    | mvexpand AllExtIPs
    {add_query_items}'

Macros are particularly useful when combined with yaml aliases. You can, for example, define a base query (using a yaml alias) with a macro reference in the query body. Then in each query definition you can have different macro values for the macro to be substituted. For example:

metadata:
    ...
    aliases:
        - &azure_network_base_query '
            {table}
            | where SubType_s == "FlowLog"
            | where FlowStartTime_t >= datetime({start})
            | where FlowEndTime_t <= datetime({end})
            $<query_condition>$
            | where (AllowedOutFlows_d > 0 or AllowedInFlows_d > 0)
            {query_project}
            | extend AllExtIPs = iif(isempty(PublicIPs), pack_array(ExtIP),
                            iif(isempty(ExtIP), PublicIPs, array_concat(PublicIPs, pack_array(ExtIP)))
                            )
            | project-away ExtIP
            | mvexpand AllExtIPs
            {add_query_items}'
    ...
sources:
    list_azure_network_flows_by_ip:
        description: Retrieves Azure network analytics flow events.
    args:
        query: *azure_network_base_query
    parameters:
        query_project:
            ...
        end:
            description: Query end time
            type: datetime
    query_macros:
        query_condition:
            description: Query-specific where clause
            value: '| where (VMIP_s in ({ip_address_list})
            or SrcIP_s in ({ip_address_list})
            or DestIP_s in ({ip_address_list})
            )'

This allows you define a series of related queries that have the same basic logic but have different filter clauses. This is extremely useful where the query is complex and allows you to keep a single copy.

Note

Using aliases and macros complicates the logic for anyone trying to read the query file, so use this sparingly.

Guidelines for creating and debugging queries

It is often helpful to start with a working version of a query without using any parameters. Just paste in a query that you know is working. Once you have verified that this works and returns data as expected you can start to parameterize it.

As you add parameters you can expect to find escaping and quoting issues with the parameter values. To see what the parameterized version of the query (without submitting it to the data provider) run the query with the first parameter “print”. This will return the parameterized version of the query as a string:

qry_prov.SecurityEvents.my_new_query(
    "print",
    start=start_dt,
    end=end_dt,
    account="ian",
)

There are also a number of tools within the package to assist in validating new query definition files once created.

data_query_reader.find_yaml_files

    Return iterable of yaml files found in `source_path`.

    Parameters
    ----------
    source_path : str
        The source path to search in.
    recursive : bool, optional
        Whether to recurse through subfolders.
        By default False

    Returns
    -------
    Iterable[str]
        File paths of yaml files found.

 data_query_reader.validate_query_defs

     Validate content of query definition.

    Parameters
    ----------
    query_def_dict : dict
        Dictionary of query definition yaml file contents.

    Returns
    -------
    bool
        True if validation succeeds.

    Raises
    ------
    ValueError
        The validation failure reason is returned in the
        exception message (arg[0])

validate_query_defs() does not perform comprehensive checks on the file but does check key elements required in the file are present.

for file in QueryReader.find_yaml_files(source_path="C:\\queries"):
    with open(file) as f_handle:
        yaml_file = yaml.safe_load(f_handle)
        if QueryReader.validate_query_defs(query_def_dict = yaml_file) == True:
            print(f' {file} is a valid query definition')
        else:
            print(f'There is an error with {file}')
C:queriesexample.yaml is a valid query definition

Adding a new set of queries and running them

Once you are happy with a query definition file then you import it with

query_provider.import_query_file(query_file= path_to_query_file)

This will load the query file into the Query Provider’s Query Store from where it can be called.

qry_prov.import_query_file(query_file='C:\\queries\\example.yaml')

Once imported the queries in the files appear in the Query Provider’s Query Store alongside the others and can be called in the same manner as pre-defined queries.

If you have created a large number of query definition files and you want to have the automatically imported into a Query Provider’s query store at initialization you can specify a directory containing these queries in the msticpyconfig.yaml file under QueryDefinitions: Custom:

For example if I have a folder at C:\queries I will set the config file to:

QueryDefinitions:
    Custom:
        - C:\queries

Having the Custom field populated will mean the Query Provider will automatically enumerate all the YAML files in the directory provided and automatically import he relevant queries into the query store at initialization alongside the default queries. Custom queries with the same name as default queries will overwrite default queries.

queries = qry_prov.list_queries()
for query in queries:
    print(query)
LinuxSyslog.all_syslog
LinuxSyslog.cron_activity
LinuxSyslog.squid_activity
LinuxSyslog.sudo_activity
LinuxSyslog.syslog_example
LinuxSyslog.user_group_activity
LinuxSyslog.user_logon
SecurityAlert.get_alert
SecurityAlert.list_alerts
SecurityAlert.list_alerts_counts
SecurityAlert.list_alerts_for_ip
SecurityAlert.list_related_alerts
WindowsSecurity.get_host_logon
WindowsSecurity.get_parent_process
WindowsSecurity.get_process_tree
WindowsSecurity.list_host_logon_failures
WindowsSecurity.list_host_logons
WindowsSecurity.list_host_processes
WindowsSecurity.list_hosts_matching_commandline
WindowsSecurity.list_matching_processes
WindowsSecurity.list_processes_in_session
qry_prov.LinuxSyslog.syslog_example('?')
Query:  syslog_example
Data source:  LogAnalytics
Example query

Parameters
----------
add_query_items: str (optional)
    Additional query clauses
end: datetime
    Query end time
host_name: str
    Hostname to query for
query_project: str (optional)
    Column project statement
    (default value is:  | project TenantId, Computer, Facility, TimeGener...)
start: datetime
    Query start time
subscription_filter: str (optional)
    Optional subscription/tenant filter expression
    (default value is: true)
table: str (optional)
    Table name
    (default value is: Syslog)
Query:
 {table} | where {subscription_filter}
 | where TimeGenerated >= datetime({start})
 | where TimeGenerated <= datetime({end})
 | where Computer == "{host_name}" | take 5
qry_prov.LinuxSyslog.syslog_example(
    start='2019-07-21 23:43:18.274492',
    end='2019-07-27 23:43:18.274492',
    host_name='UbuntuDevEnv'
)
TenantId SourceSystem TimeGenerated Computer EventTime Facility HostName SeverityLevel SyslogMessage ProcessID HostIP ProcessName MG Type _ResourceId
0 b1315f05-4a7a-45b4-811f-73e715f7c122 Linux 2019-07-25 15:15:37.213 UbuntuDevEnv 2019-07-25 15:15:37 authpriv UbuntuDevEnv notice omsagent : TTY=unknown PWD=/opt/microsoft/om... NaN 10.0.1.4 sudo 00000000-0000-0000-0000-000000000002 Syslog /subscriptions/3b701f84-d04b-4479-89b1-fa8827e...
1 b1315f05-4a7a-45b4-811f-73e715f7c122 Linux 2019-07-25 15:15:37.313 UbuntuDevEnv 2019-07-25 15:15:37 authpriv UbuntuDevEnv info pam_unix(sudo:session): session opened for use... NaN 10.0.1.4 sudo 00000000-0000-0000-0000-000000000002 Syslog /subscriptions/3b701f84-d04b-4479-89b1-fa8827e...
2 b1315f05-4a7a-45b4-811f-73e715f7c122 Linux 2019-07-25 15:15:37.917 UbuntuDevEnv 2019-07-25 15:15:37 authpriv UbuntuDevEnv info pam_unix(sudo:session): session closed for use... NaN 10.0.1.4 sudo 00000000-0000-0000-0000-000000000002 Syslog /subscriptions/3b701f84-d04b-4479-89b1-fa8827e...
3 b1315f05-4a7a-45b4-811f-73e715f7c122 Linux 2019-07-25 15:15:50.793 UbuntuDevEnv 2019-07-25 15:15:50 authpriv UbuntuDevEnv info pam_unix(cron:session): session closed for use... 29486.0 10.0.1.4 CRON 00000000-0000-0000-0000-000000000002 Syslog /subscriptions/3b701f84-d04b-4479-89b1-fa8827e...
4 b1315f05-4a7a-45b4-811f-73e715f7c122 Linux 2019-07-25 15:16:01.800 UbuntuDevEnv 2019-07-25 15:16:01 authpriv UbuntuDevEnv info pam_unix(cron:session): session opened for use... 29844.0 10.0.1.4 CRON 00000000-0000-0000-0000-000000000002 Syslog /subscriptions/3b701f84-d04b-4479-89b1-fa8827e...

If you are having difficulties with a defined query and it is not producing the expected results it can be useful to see the raw query exactly as it is passed to the Data Environment. If you call a query with ‘print’ and the parameters required by that query it will construct and print out the query string to be run.

qry_prov.LinuxSyslog.syslog_example(
    'print',
    start='2019-07-21 23:43:18.274492',
    end='2019-07-27 23:43:18.274492',
    host_name='UbuntuDevEnv'
)
'Syslog
    | where true
    | where TimeGenerated >= datetime(2019-07-21 23:43:18.274492)
    | where TimeGenerated <= datetime(2019-07-27 23:43:18.274492)
    | where Computer == "UbuntuDevEnv"
    | take 5'