msticpy.data.drivers.prismacloud_driver module
PrismaCloud Driver class.
- class msticpy.data.drivers.prismacloud_driver.DriverConfig
Bases:
TypedDictRepresent configuration options for the Prisma Cloud Driver.
- timeout(int, optional)
- Type:
Request timeout in seconds. Default is 300.
- base_url(str, optional)
- Type:
Base API URL for Prisma Cloud.
- debug(bool, optional)
- Type:
Enable or disable debug mode. Default is False.
- max_retries(int, optional)
- Type:
Maximum number of retries for API requests.
- headers(dict[str, str], optional)
- Type:
Custom headers for HTTP requests.
- base_url: str
- clear() None. Remove all items from D.
- copy() a shallow copy of D
- debug: bool
- classmethod fromkeys(iterable, value=None, /)
Create a new dictionary with keys from iterable and values set to value.
- get(key, default=None, /)
Return the value for key if key is in the dictionary, else default.
- headers: dict[str, str]
- items() a set-like object providing a view on D's items
- keys() a set-like object providing a view on D's keys
- max_retries: int
- pop(k[, d]) v, remove specified key and return the corresponding value.
If the key is not found, return the default if given; otherwise, raise a KeyError.
- popitem()
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
- setdefault(key, default=None, /)
Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
- timeout: int
- update([E, ]**F) None. Update D from mapping/iterable E and F.
If E is present and has a .keys() method, then does: for k in E.keys(): D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
- values() an object providing a view on D's values
- exception msticpy.data.drivers.prismacloud_driver.PrismaCloudAuthError(*args, help_uri=None, **kwargs)
Bases:
MsticpyConnectionErrorRaised for authentication failures.
Create an instance of the MsticpyUserError class.
- Parameters:
args (Iterable of strings) – Args will be printed as text of the exception.
help_uri (Union[tuple[str, str], str, None], optional) – Primary URL, by default “https://msticpy.readthedocs.org”
title (str, optional) – If a title keyword argument is supplied it will be used to create the title line.
*_uri (str, optional) – Additional keyword arguments who’s names end in “_uri” will be used to create a list of references in addition to the primary help_uri
display (bool, optional) – Display the exception when created. By default, False
Notes
The exception text is displayed when the exception is created and not when it is raised. We recommend creating the exception within the raise statement. E.g.
raise MsticpyUserException(arg1, arg2…)
Developer note: Any classes derived from MsticpyUserError should be named with an “Error” suffix to distinguish these from standard exception types.
- DEF_HELP_URI: ClassVar[tuple[str, str]] = ('DataProviders', 'https://msticpy.readthedocs.io/en/latest/data_acquisition/DataProviders.html')
- add_note()
Exception.add_note(note) – add a note to the exception
- args
- display_exception()
Output the exception HTML or text friendly exception.
- Return type:
None
- property help_uri: tuple[str, str] | str
Get the default help URI.
- classmethod no_display_exceptions()
Context manager to block exception display to IPython/stdout.
- Return type:
Generator[None, Any, None]
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class msticpy.data.drivers.prismacloud_driver.PrismaCloudDriver(**kwargs)
Bases:
DriverBaseProvide interface to connect and execute queries on Prisma Cloud.
This driver handles authentication, query execution, and data retrieval from Prisma Cloud APIs. It supports multiple query types, including asset searches, configuration checks, event monitoring, and network analysis. The driver also manages authentication tokens and handles API request retries.
- Features:
Authenticate with Prisma Cloud using username and password.
Authenticate using msticpyconfig configuration file.
Execute predefined queries on different Prisma Cloud data sources.
Retrieve assets, config_resource, events, and network-related data.
Multi cloud hunting
Automatically refresh authentication tokens.
Handle API request errors and retries gracefully.
Use pagination for large datasets.
- ENDPOINT_MAP(ClassVar[dict[str, str]])
respective API endpoints.
- Type:
Maps query sources to their
- timeout(int)
- Type:
Timeout for API requests in seconds (default: 300).
- base_url(str)
- Type:
Base URL for Prisma Cloud API.
- debug(bool)
- Type:
Enables or disables debug logging (default: False).
- connected(bool)
- Type:
Tracks authentication status.
- max_retries(int)
- Type:
Number of retry attempts for failed API requests.
- headers(dict[str, str])
- Type:
HTTP headers used in API requests.
- Example Usage direct query:
>>> driver = PrismaCloudDriver(base_url="https://api.prismacloud.io", timeout=200) >>> driver.connect(username="user@example.com", password="xxxxx") >>> # Execute a network search query >>> query_result = driver.direct_execute_query("config where cloud.type = 'aws'", "network") >>> print(query_result)
- Example Usage queryprovider:
>>> driver = QueryProvider("Prismacloud") >>> driver.connect(debug=True) >>> driver.list_queries()
- Execute loaded queries:
>>> driver.Prismacloud.event_sensitive_operation_in_aws_kms(amount_value = 2400 ) >>> driver.Prismacloud.search_asset_relative_with_prismafindings( selectasset="asset where cloud.service IN ('azure_sql_database')", finding_types=['INTERNET_EXPOSURE'] ) >>> driver.Prismacloud.config_resource_firewall_disabled_azurekeyvault( unit_type="hour", amount_value=24 )
- Raises:
PrismaCloudAuthError – If authentication fails.:
PrismaCloudQueryError – If there is an error in executing a query.:
MsticpyUserError – If an invalid query source is provided.:
- Parameters:
kwargs (DriverConfig)
Initialize the Prisma Cloud Driver and set up the HTTP client.
This method configures the HTTP client (self.client) for communicating with the Prisma Cloud API. It establishes connection settings, including timeout, retry logic, and custom headers. The HTTP client is used to execute authenticated API requests and handle error responses.
The transport layer (self.transport) is initialized with retry support. to ensure resilience against temporary failures.
- Attributes Initialized:
- self.client (httpx.Client):
Handles API requests and maintains authentication headers.
Uses a persistent session for improved efficiency.
Configured with a timeout, base URL, and custom headers.
- self.transport (httpx.HTTPTransport):
Provides automatic retry logic for API requests.
Ensures stability during transient network issues.
Controls connection reuse for performance optimization.
- CONFIG_NAME: ClassVar[str] = 'Prismacloud'
- DEFAULT_HEADER: ClassVar[dict[str, str]] = {'Accept': 'application/json', 'User-Agent': 'PrismaCloudDriver/1.0'}
- ENDPOINT_MAP: ClassVar[dict[str, str]] = {'assets': '/search/api/v1/asset', 'config_resource': '/search/api/v2/config', 'events': '/search/event', 'network': '/search'}
- add_query_filter(name, query_filter)
Add an expression to the query attach filter.
- Parameters:
name (str)
query_filter (str | Iterable)
- connect(username=None, password=None, connection_str=None, **kwargs)
Authenticate with Prisma Cloud and establish an authenticated session.
This method sends authentication credentials to the Prisma Cloud API and retrieves an access token if the login is successful. The token is stored in self.client.headers to enable subsequent authenticated requests. Credentials can be provided directly via the username and password parameters, or as a single connection_str (formatted as “username:password”). If neither is provided, the method attempts to retrieve the credentials from the driver configuration.
- Parameters:
username (str or None, optional) – The Prisma Cloud account username for authentication.
password (str or None, optional) – The corresponding password for the Prisma Cloud account.
connection_str (str or None, optional) – An alternative connection string in the format “username:password”. This parameter is used to extract credentials if username and password are not provided.
**kwargs (Any) – Additional keyword arguments to be passed to the connection logic.
- Returns:
PrismaCloudDriver – The instance of the driver with an authenticated session.
Behavior
——–
If connection_str is provided, the method extracts the username and password from it.
- If either the username or password remains missing,
it attempts to retrieve them from the driver configuration.
Sends a POST request to the /login endpoint with the provided credentials.
- On a successful login (indicated by a “login_successful” message),
the authentication token
is stored in the HTTP client’s headers, and internal flags (_connected and _loaded)
are set accordingly.
If authentication fails, the method logs the error and raises a PrismaCloudAuthError.
Network or API request failures are handled by raising a MsticpyConnectionError.
- Raises:
PrismaCloudAuthError – If authentication fails due to missing or invalid credentials.
MsticpyConnectionError – If a network or API request failure occurs.
- Return type:
Example
>>> driver = PrismaCloudDriver() >>> # Authenticate using separate username and password parameters >>> driver.connect(username="user@example.com", password="xxxxx") >>> # Alternatively, using a connection string: >>> driver.connect(connection_str="user@example.com:xxxxx")
- property connected: bool
Return the connection status.
- Returns:
True if the driver is connected and authenticated, False otherwise.
- Return type:
bool
Notes
This checks both the connection flag and the presence of the authentication token in the client headers.
- debug: bool
- direct_execute_query(query, query_endpoint=None, **kwargs)
Execute a query on Prisma Cloud and return results as a DataFrame.
This method serves as a unified interface for executing different types of queries on Prisma Cloud, routing the request to the appropriate search function based on the query_source.
The function validates authentication, maps the query_source to the correct API endpoint, and dynamically calls the relevant search method.
- Parameters:
query (str) – The Prisma Cloud query string to execute.
query_source (str | None, optional) – The source of the query, determining which search function to use. Options: “config_resource”, “assets”, “events”, “network”.
**kwargs (QueryArgs) – Additional optional query parameters (see Other Parameters).
timeout (int, optional) – API request timeout in seconds.
max_retries (int, optional) – Maximum retry attempts for failed requests.
limit (int, optional) – Maximum number of results to retrieve.
unit (str, optional) – Time unit for the query (e.g.,
"minute","hour","day").amount (int, optional) – Time range quantity in the specified unit.
query_endpoint (str | None)
**kwargs
- Returns:
A DataFrame containing the retrieved query results. Returns an empty DataFrame if no results are found.
- Return type:
pd.DataFrame
- Raises:
PrismaCloudQueryError – If the driver is not authenticated.
MsticpyUserError – If the
query_sourceis invalid or not supported.
Example
>>> query_result = driver.direct_execute_query( ... query="config where cloud.type = 'aws'", ... query_source="config_resource", ... limit=1000, ... unit="day", ... amount=7, ... )
- property driver_queries: Iterable[dict[str, Any]]
Return queries retrieved from the service after connecting.
- Returns:
List of Dictionary of query_name, query_text. Name of container to add queries to.
- Return type:
List[Dict[str, str]]
- get_driver_property(name)
Return value or KeyError from driver properties.
- Parameters:
name (str)
- Return type:
Any
- static get_http_timeout(**kwargs)
Get http timeout from settings or kwargs.
- headers: dict[str, str]
- property instance: str | None
Return instance name, if one is set.
- Returns:
The name of driver instance or None if the driver does not support multiple instances
- Return type:
Optional[str]
- property loaded: bool
Return true if the provider is loaded.
- Returns:
True if the provider is loaded.
- Return type:
bool
Notes
This is not relevant for some providers.
- max_retries: int
- prisma_search_assets(query, endpoint, **kwargs)
Execute an asset search on Prisma Cloud with pagination.
This method retrieves asset-related data from Prisma Cloud by executing the given query against the specified API endpoint. It supports pagination to efficiently handle large datasets.
The function allows customization of search parameters, such as time range and request limits, by accepting additional keyword arguments.
- Parameters:
query (str) – The Prisma Cloud query string to execute.
endpoint (str) – The API endpoint to send the query to.
**kwargs (QueryArgs) – Additional optional query parameters (see Other Parameters).
timeout (int, optional) – API request timeout in seconds.
limit (int, optional) – Maximum number of results to retrieve.
unit (str, optional) – Time unit for the query (e.g.,
"minute","hour").amount (int, optional) – Time range quantity in the specified unit.
- Returns:
A DataFrame containing the retrieved asset data. Returns an empty DataFrame if no results are found.
- Return type:
pd.DataFrame
- Raises:
MsticpyConnectionError – If the API request fails or returns an unexpected response.
Example
>>> asset_data = driver.prisma_search_assets( ... query="config where cloud.type = 'aws'", ... endpoint="/search/api/v1/asset", ... limit=5000, ... unit="day", ... amount=7, ... )
- prisma_search_config_resource(query, endpoint, **kwargs)
Execute a configuration search on Prisma Cloud.
This method retrieves configuration-related data from Prisma Cloud by executing the given query against the specified API endpoint. It allows customization of result limits and request parameters.
The function constructs a request payload, sends the API request, and processes the response into a Pandas DataFrame.
- Parameters:
query (str) – The Prisma Cloud query string to execute.
endpoint (str) – The API endpoint to send the query to.
**kwargs (QueryArgs) – Additional optional query parameters (see Other Parameters).
timeout (int, optional) – API request timeout in seconds.
max_retries (int, optional) – Maximum retry attempts for failed requests.
limit (int, optional) – Maximum number of results to retrieve (default: 1000).
withResourceJson (bool, optional) – Whether to include resource JSON details.
- Returns:
A DataFrame containing the retrieved configuration data. Returns an empty DataFrame if no results are found.
- Return type:
pd.DataFrame
- Raises:
MsticpyConnectionError – If the API request fails or returns an unexpected response.
Example
>>> config_data = driver.prisma_search_config_resource( ... query="config where cloud.type = 'aws'", ... endpoint="/search/api/v2/config", ... limit=5000, ... withResourceJson=True, ... )
- prisma_search_events(query, endpoint, **kwargs)
Execute an event search on Prisma Cloud.
This method retrieves event-related data from Prisma Cloud by executing the given query against the specified API endpoint. It allows customization of time range, result limits, and other search parameters.
The function constructs a request payload, sends the API request, and processes the response into a Pandas DataFrame.
- Parameters:
query (str) – The Prisma Cloud query string to execute.
endpoint (str) – The API endpoint to send the query to.
**kwargs (QueryArgs) – Additional optional query parameters (see Other Parameters).
timeout (int, optional) – API request timeout in seconds.
max_retries (int, optional) – Maximum retry attempts for failed requests.
unit (str, optional) – Time unit for the query (e.g.,
"minute","hour","day").amount (int, optional) – Time range quantity in the specified unit.
limit (int, optional) – Maximum number of results to retrieve.
- Returns:
A DataFrame containing the retrieved event data. Returns an empty DataFrame if no results are found.
- Return type:
pd.DataFrame
- Raises:
MsticpyConnectionError – If the API request fails or returns an unexpected response.
Example
>>> event_data = driver.prisma_search_events( ... query="event where cloud.type = 'aws'", ... endpoint="/search/event", ... limit=5000, ... unit="hour", ... amount=24, ... )
- prisma_search_network(query, endpoint, limitresult=10000, limitpage=100, **kwargs)
Execute a network data search on Prisma Cloud with pagination.
This method retrieves network-related data from Prisma Cloud by executing the given query against the specified API endpoint. It supports pagination and processes large datasets efficiently.
The function allows customization of search parameters, such as time range, cloud provider type, and request limits, by accepting additional keyword arguments.
- Parameters:
query (str) – The Prisma Cloud query string to execute.
endpoint (str) – The API endpoint to send the query to.
limitresult (int, optional) – The maximum number of results to retrieve. Defaults to 10,000.
limitpage (int, optional) – The number of results per page. Defaults to 100.
**kwargs (QueryArgs) – Additional optional query parameters (see Other Parameters).
timeout (int, optional) – API request timeout in seconds.
unit (str, optional) – Time unit for the query (e.g.,
"minute","hour").amount (int, optional) – Time range quantity in the specified unit.
cloudtype (str, optional) – Cloud provider filter (
"aws","gcp","azure").
- Returns:
A DataFrame containing the retrieved network data. Returns an empty DataFrame if no results are found.
- Return type:
pd.DataFrame
- Raises:
MsticpyConnectionError – If the API request fails or returns an unexpected response.
Example
>>> network_data = driver.prisma_search_network( ... query="network where cloud.type = 'aws'", ... endpoint="/search", ... limitresult=5000, ... unit="day", ... amount=7, ... )
- queries_loaded: bool
- query(query, query_source=None, **kwargs)
Execute a Prisma Cloud query and return the results as a Pandas DataFrame.
This method is the primary interface for executing queries via the Prisma Cloud driver. The query must be provided as a JSON-formatted string that contains a “querymetadata” section. This metadata should include keys such as “unit”, “amount”, “start_time”, “end_time”, “query_by_user”, and “endpoint” to specify the query details.
The method parses the JSON query string and extracts the necessary parameters. If an “endpoint” is provided in the query metadata, it updates the keyword arguments with the extracted time range values and calls the direct_execute_query() method to perform the query. If the query string cannot be parsed as valid JSON or the “endpoint” is missing, an empty DataFrame is returned.
- Parameters:
query (str) – A JSON-formatted string containing the query details and metadata.
query_source (QuerySource or None, optional) – This parameter is ignored in this implementation and is maintained only for compatibility with QueryProvider usage.
**kwargs (Any) – Additional keyword arguments to be passed to the underlying query execution method.
- Returns:
A DataFrame containing the results of the executed query. If the query string is invalid or lacks a required endpoint, an empty DataFrame is returned.
- Return type:
pd.DataFrame
Notes
The method expects the query string to include a “querymetadata” with the following keys:
“unit”: The time unit for the query (e.g., “minute”, “hour”).
“amount”: The quantity of the time unit.
“start_time”: The start of the time range in ISO 8601 format.
“end_time”: The end of the time range in ISO 8601 format.
“query_by_user”: The actual query to execute.
“endpoint”: The target API endpoint for the query.
If an error occurs during JSON parsing (a broad exception is caught), the error is logged and the method returns an empty DataFrame.
- property query_attach_spec: dict[str, set[str]]
Parameters that determine whether a query is relevant for the driver.
- query_store: QueryStore
- query_usable(query_source)
Return True if query should be exposed for this driver.
- Parameters:
query_source (QuerySource)
- Return type:
bool
- query_with_results(query, query_source=None, **kwargs)
Place holder.Not used.
- Parameters:
query (str)
query_source (str | None)
- Return type:
tuple[DataFrame, Any]
- refresh_token()
Refresh the authentication token.
- Return type:
None
- property schema: dict[str, dict]
Return current data schema of connection.
- Returns:
Data schema of current connection.
- Return type:
Dict[str, Dict]
- property service_queries: tuple[dict[str, str], str]
Return queries retrieved from the service after connecting.
- Returns:
Dictionary of query_name, query_text. Name of container to add queries to.
- Return type:
Tuple[Dict[str, str], str]
- set_driver_property(name, value)
Set an item in driver properties.
- Parameters:
name (str)
value (Any)
- timeout: int
- exception msticpy.data.drivers.prismacloud_driver.PrismaCloudQueryError(*args, help_uri=None, **kwargs)
Bases:
MsticpyConnectionErrorRaised for query execution errors.
Create an instance of the MsticpyUserError class.
- Parameters:
args (Iterable of strings) – Args will be printed as text of the exception.
help_uri (Union[tuple[str, str], str, None], optional) – Primary URL, by default “https://msticpy.readthedocs.org”
title (str, optional) – If a title keyword argument is supplied it will be used to create the title line.
*_uri (str, optional) – Additional keyword arguments who’s names end in “_uri” will be used to create a list of references in addition to the primary help_uri
display (bool, optional) – Display the exception when created. By default, False
Notes
The exception text is displayed when the exception is created and not when it is raised. We recommend creating the exception within the raise statement. E.g.
raise MsticpyUserException(arg1, arg2…)
Developer note: Any classes derived from MsticpyUserError should be named with an “Error” suffix to distinguish these from standard exception types.
- DEF_HELP_URI: ClassVar[tuple[str, str]] = ('DataProviders', 'https://msticpy.readthedocs.io/en/latest/data_acquisition/DataProviders.html')
- add_note()
Exception.add_note(note) – add a note to the exception
- args
- display_exception()
Output the exception HTML or text friendly exception.
- Return type:
None
- property help_uri: tuple[str, str] | str
Get the default help URI.
- classmethod no_display_exceptions()
Context manager to block exception display to IPython/stdout.
- Return type:
Generator[None, Any, None]
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class msticpy.data.drivers.prismacloud_driver.QueryArgs
Bases:
TypedDictDefine optional arguments for Prisma Cloud queries.
- start_time
Start of the time range for the query, in ISO 8601 format.
- Type:
str, optional
- end_time
End of the time range for the query, in ISO 8601 format.
- Type:
str, optional
- unit
The unit of time for the range (e.g., “minute”, “hour”, “day”).
- Type:
str, optional
- amount
The quantity of the time unit for the range.
- Type:
int, optional
- limit
Maximum number of results to return.
- Type:
int, optional
- amount: int
- clear() None. Remove all items from D.
- cloudtype: str
- copy() a shallow copy of D
- end_time: str
- classmethod fromkeys(iterable, value=None, /)
Create a new dictionary with keys from iterable and values set to value.
- get(key, default=None, /)
Return the value for key if key is in the dictionary, else default.
- items() a set-like object providing a view on D's items
- keys() a set-like object providing a view on D's keys
- limit: int
- pop(k[, d]) v, remove specified key and return the corresponding value.
If the key is not found, return the default if given; otherwise, raise a KeyError.
- popitem()
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
- setdefault(key, default=None, /)
Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
- start_time: str
- unit: str
- update([E, ]**F) None. Update D from mapping/iterable E and F.
If E is present and has a .keys() method, then does: for k in E.keys(): D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
- values() an object providing a view on D's values