From 14125685ef63f890ee7de4f4fc0d0c6a840cdf37 Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Thu, 15 Jan 2026 15:53:05 +0530 Subject: [PATCH 1/3] feat: add investigation management tools and update secops dependency. --- server/secops/pyproject.toml | 2 +- server/secops/secops_mcp/tools/__init__.py | 1 + .../tools/investigation_management.py | 502 ++++++++++++++++++ .../tests/test_secops_investigations_mcp.py | 201 +++++++ 4 files changed, 705 insertions(+), 1 deletion(-) create mode 100644 server/secops/secops_mcp/tools/investigation_management.py create mode 100644 server/secops/tests/test_secops_investigations_mcp.py diff --git a/server/secops/pyproject.toml b/server/secops/pyproject.toml index 3d37078..6f6c2ba 100644 --- a/server/secops/pyproject.toml +++ b/server/secops/pyproject.toml @@ -16,7 +16,7 @@ classifiers = [ dependencies = [ "httpx>=0.28.1", "mcp[cli]>=1.23.0", - "secops>=0.29.0", + "secops>=0.34.0", "google-auth>=2.45.0", "google-auth-httplib2>=0.3.0", "google-api-python-client>=2.187.0" diff --git a/server/secops/secops_mcp/tools/__init__.py b/server/secops/secops_mcp/tools/__init__.py index 0cfc387..f17e21a 100644 --- a/server/secops/secops_mcp/tools/__init__.py +++ b/server/secops/secops_mcp/tools/__init__.py @@ -26,3 +26,4 @@ from .udm_search import * from .search import * from .feed_management import * +from .investigation_management import * diff --git a/server/secops/secops_mcp/tools/investigation_management.py b/server/secops/secops_mcp/tools/investigation_management.py new file mode 100644 index 0000000..b3a512b --- /dev/null +++ b/server/secops/secops_mcp/tools/investigation_management.py @@ -0,0 +1,502 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Security Operations MCP tools for investigation management.""" + +import logging +from typing import Any, Dict, List, Optional + +from secops_mcp.server import get_chronicle_client, server + + +logger = logging.getLogger("secops-mcp") + + +@server.tool() +async def get_cases( + case_ids: List[str], + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, +) -> Dict[str, Any]: + """Batch retrieve case details using the legacy batch API. + + Retrieves detailed information for multiple cases in a single request. + Supports up to 1000 case IDs per request. Returns case details including + priority, status, stage, and SOAR platform information. + + **Workflow Integration:** + - Use after retrieving alerts to get associated case details + - Essential for incident response workflows where AI helps analysts + navigate cases + - Can be used to check case status before creating new investigations + - Helps correlate alerts with existing case management workflows + + **Use Cases:** + - "Show me the case details for these alert IDs" + - "Get details for all high-priority cases" + - "What's the status of cases associated with these alerts?" + - "Retrieve case information for incident response" + + Args: + case_ids (List[str]): List of case IDs to retrieve (max 1000). + project_id (Optional[str]): Google Cloud project ID. Defaults to + environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults to + environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", "europe"). + Defaults to environment configuration. + + Returns: + Dict[str, Any]: Dictionary containing case details including display + name, priority, status, stage, and SOAR platform info. Returns + error message if retrieval fails. + + Next Steps (using MCP-enabled tools): + - Analyze case priority and status to determine next actions + - Use `list_investigations` or `fetch_associated_investigations` to + find related investigations + - Use `get_security_alerts` to find alerts associated with cases + - For high-priority cases, use `trigger_investigation` to create + new investigations + - Use entity lookup tools on indicators found in case details + """ + try: + if not case_ids: + return { + "error": "case_ids parameter is required and cannot be empty" + } + + if len(case_ids) > 1000: + return { + "error": ( + "Too many case IDs provided. Maximum is 1000, " + f"received {len(case_ids)}" + ) + } + + chronicle = get_chronicle_client(project_id, customer_id, region) + print(f"Retrieving details for {len(case_ids)} case(s)...") + + cases_response = chronicle.get_cases(case_ids) + + if not cases_response or not hasattr(cases_response, "cases"): + return {"message": "No cases found", "case_count": 0, "cases": []} + + print(f"Successfully retrieved {len(cases_response.cases)} case(s)") + return cases_response + + except Exception as e: + error_msg = f"Error retrieving cases: {str(e)}" + print(error_msg) + return {"error": error_msg} + + +@server.tool() +async def list_investigations( + page_size: int = 50, + page_token: Optional[str] = None, + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, +) -> Dict[str, Any]: + """List all investigations in Chronicle instance. + + Retrieves a paginated list of all investigations with their status, + verdict, and confidence information. Supports pagination for large + result sets. + + **Workflow Integration:** + - Use to get an overview of all active investigations + - Essential for monitoring investigation status across the environment + - Can be used to identify investigations that need attention + - Helps track investigation progress and outcomes + + **Use Cases:** + - "List all active investigations from the past week" + - "Show me all investigations with high confidence verdicts" + - "What investigations are currently in progress?" + - "Get an overview of recent investigation activity" + + Args: + page_size (int): Number of investigations to return per page. + Defaults to 50. + page_token (Optional[str]): Token for pagination. Use the + next_page_token from previous response to get next page. + project_id (Optional[str]): Google Cloud project ID. Defaults to + environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults to + environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", "europe"). + Defaults to environment configuration. + + Returns: + Dict[str, Any]: Dictionary containing list of investigations with + their display names, status, verdict, confidence, and pagination + token. Returns error message if retrieval fails. + + Next Steps (using MCP-enabled tools): + - Use `get_investigation` to get detailed info for specific + investigations + - Use `fetch_associated_investigations` to find investigations + linked to alerts or cases + - For investigations needing action, use alert or case management + tools + - Use entity lookup tools on indicators found in investigations + """ + try: + chronicle = get_chronicle_client(project_id, customer_id, region) + print(f"Listing investigations (page_size={page_size})...") + + result = chronicle.list_investigations( + page_size=page_size, page_token=page_token + ) + + investigations = result.get("investigations", []) + print(f"Successfully retrieved {len(investigations)} investigation(s)") + return result + + except Exception as e: + error_msg = f"Error listing investigations: {str(e)}" + print(error_msg) + return {"error": error_msg} + + +@server.tool() +async def get_investigation( + investigation_id: str, + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, +) -> Dict[str, Any]: + """Retrieve specific investigation by ID. + + Gets detailed information for a specific investigation including status, + verdict, confidence, and associated metadata. + + **Workflow Integration:** + - Use after listing investigations to get detailed information + - Essential for reviewing investigation findings and recommendations + - Can be used to check investigation progress and status + - Helps analysts understand automated analysis results + + **Use Cases:** + - "Get details for investigation inv_123" + - "What's the verdict for this investigation?" + - "Show me the confidence score for this investigation" + - "Retrieve investigation findings and recommendations" + + Args: + investigation_id (str): The unique identifier of the investigation + to retrieve. + project_id (Optional[str]): Google Cloud project ID. Defaults to + environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults to + environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", "europe"). + Defaults to environment configuration. + + Returns: + Dict[str, Any]: Dictionary containing detailed investigation + information including display name, status, verdict, confidence, + and timestamps. Returns error message if retrieval fails. + + Next Steps (using MCP-enabled tools): + - Based on verdict, use alert or case management tools to update + status + - Use `fetch_associated_investigations` to find related + investigations + - For high-confidence verdicts, consider creating cases or alerts + - Use entity lookup tools on indicators found in investigation + """ + try: + if not investigation_id: + return { + "error": ( + "investigation_id parameter is required and cannot be " + "empty" + ) + } + + chronicle = get_chronicle_client(project_id, customer_id, region) + print(f"Retrieving investigation: {investigation_id}...") + + investigation = chronicle.get_investigation( + investigation_id=investigation_id + ) + + if not investigation: + return { + "error": f"Investigation not found: {investigation_id}", + "investigation_id": investigation_id, + } + + print(f"Successfully retrieved investigation: {investigation_id}") + return investigation + + except Exception as e: + error_msg = ( + f"Error retrieving investigation {investigation_id}: {str(e)}" + ) + print(error_msg) + return {"error": error_msg} + + +@server.tool() +async def trigger_investigation( + alert_id: str, + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, +) -> Dict[str, Any]: + """Create new investigation for a specific alert. + + Triggers automated investigation analysis for a given alert. Returns + the created investigation details including status and trigger type. + + **Workflow Integration:** + - Use after identifying high-priority alerts that need investigation + - Essential for initiating automated analysis and recommendations + - Can be used as part of incident response workflows + - Helps automate investigation processes for security alerts + + **Use Cases:** + - "Trigger an investigation for this high-priority alert" + - "Create an investigation for alert_123" + - "Start automated analysis for this suspicious alert" + - "Initiate investigation for this security event" + + Args: + alert_id (str): The unique identifier of the alert to investigate. + project_id (Optional[str]): Google Cloud project ID. Defaults to + environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults to + environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", "europe"). + Defaults to environment configuration. + + Returns: + Dict[str, Any]: Dictionary containing created investigation details + including name, status, and trigger type. Returns error message + if creation fails. + + Next Steps (using MCP-enabled tools): + - Use `get_investigation` to check investigation progress and + results + - Use `list_investigations` to see all investigations + - Based on investigation verdict, update alert status using + `do_update_security_alert` + - Use entity lookup tools on indicators found in investigation + - Consider creating a case if investigation confirms threat + """ + try: + if not alert_id: + return { + "error": "alert_id parameter is required and cannot be empty" + } + + chronicle = get_chronicle_client(project_id, customer_id, region) + print(f"Triggering investigation for alert: {alert_id}...") + + investigation = chronicle.trigger_investigation(alert_id=alert_id) + + if not investigation: + return { + "error": ( + f"Failed to trigger investigation for alert: {alert_id}" + ), + "alert_id": alert_id, + } + + result = { + "message": "Successfully triggered investigation", + "alert_id": alert_id, + "investigation": { + "name": investigation.get("name"), + "display_name": investigation.get("displayName"), + "status": investigation.get("status"), + "trigger_type": investigation.get("triggerType"), + "create_time": investigation.get("createTime"), + }, + } + + print(f"Successfully triggered investigation for alert: {alert_id}") + return result + + except Exception as e: + error_msg = ( + f"Error triggering investigation for alert {alert_id}: {str(e)}" + ) + print(error_msg) + return {"error": error_msg} + + +@server.tool() +async def fetch_associated_investigations( + detection_type: str, + alert_ids: Optional[List[str]] = None, + case_ids: Optional[List[str]] = None, + association_limit_per_detection: int = 5, + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, +) -> Dict[str, Any]: + """Retrieve investigations associated with alerts or cases. + + Fetches investigations linked to specific alerts or cases. Supports + filtering by detection type (ALERT or CASE) and returns investigation + associations with verdict information. + + **Workflow Integration:** + - Use to find all investigations related to specific alerts or cases + - Essential for understanding investigation history and outcomes + - Can be used to correlate multiple investigations + - Helps track investigation progress across related detections + + **Use Cases:** + - "What investigations are associated with this alert?" + - "Show me all investigations for these cases" + - "Find investigations related to alert_123 and alert_456" + - "Get investigation history for this case" + + Args: + detection_type (str): Type of detection to query. Valid values: + "ALERT", "CASE", "DETECTION_TYPE_ALERT", "DETECTION_TYPE_CASE". + alert_ids (Optional[List[str]]): List of alert IDs to query. + Required if detection_type is ALERT. + case_ids (Optional[List[str]]): List of case IDs to query. + Required if detection_type is CASE. + association_limit_per_detection (int): Maximum number of + investigations to return per detection. Defaults to 5. + project_id (Optional[str]): Google Cloud project ID. Defaults to + environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults to + environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", "europe"). + Defaults to environment configuration. + + Returns: + Dict[str, Any]: Dictionary containing investigation associations + grouped by detection ID, with verdict and confidence information. + Returns error message if retrieval fails. + + Next Steps (using MCP-enabled tools): + - Use `get_investigation` to get detailed info for specific + investigations + - Based on verdicts, use alert or case management tools to update + status + - Use `trigger_investigation` to create new investigations if + needed + - Use entity lookup tools on indicators found in investigations + """ + try: + detection_type_upper = detection_type.upper() + valid_types = [ + "ALERT", + "CASE", + "DETECTION_TYPE_ALERT", + "DETECTION_TYPE_CASE", + ] + + if detection_type_upper not in valid_types: + return { + "error": ( + f"Invalid detection_type: {detection_type}. " + f'Valid values: {", ".join(valid_types)}' + ) + } + + is_alert_type = detection_type_upper in [ + "ALERT", + "DETECTION_TYPE_ALERT", + ] + + if is_alert_type and not alert_ids: + return { + "error": ( + "alert_ids parameter is required when detection_type " + "is ALERT" + ) + } + + if not is_alert_type and not case_ids: + return { + "error": ( + "case_ids parameter is required when detection_type " + "is CASE" + ) + } + + chronicle = get_chronicle_client(project_id, customer_id, region) + + detection_label = "alert" if is_alert_type else "case" + ids = alert_ids if is_alert_type else case_ids + print( + f"Fetching investigations for {len(ids)} " + f"{detection_label}(s)..." + ) + + result = chronicle.fetch_associated_investigations( + detection_type=detection_type, + alert_ids=alert_ids, + case_ids=case_ids, + association_limit_per_detection=association_limit_per_detection, + ) + + associations_list = result.get("associationsList", {}) + + associations_dict = {} + total_investigations = 0 + + for detection_id, data in associations_list.items(): + investigations = data.get("investigations", []) + total_investigations += len(investigations) + + inv_list = [] + for inv in investigations: + inv_dict = { + "name": inv.get("name"), + "display_name": inv.get("displayName"), + "verdict": inv.get("verdict"), + "confidence": inv.get("confidence"), + "status": inv.get("status"), + } + inv_list.append(inv_dict) + + associations_dict[detection_id] = { + "investigation_count": len(inv_list), + "investigations": inv_list, + } + + response = { + "message": ( + f"Successfully retrieved investigations for " + f"{len(associations_dict)} {detection_label}(s)" + ), + "detection_type": detection_type, + "total_detections": len(associations_dict), + "total_investigations": total_investigations, + "associations": associations_dict, + } + + print( + f"Successfully retrieved {total_investigations} " + f"investigation(s) for {len(associations_dict)} " + f"{detection_label}(s)" + ) + return response + + except Exception as e: + error_msg = f"Error fetching associated investigations: {str(e)}" + print(error_msg) + return {"error": error_msg} diff --git a/server/secops/tests/test_secops_investigations_mcp.py b/server/secops/tests/test_secops_investigations_mcp.py new file mode 100644 index 0000000..e214d33 --- /dev/null +++ b/server/secops/tests/test_secops_investigations_mcp.py @@ -0,0 +1,201 @@ +"""Integration tests for Chronicle SecOps Investigation Management tools. + +These tests exercise the investigation management functionality including +cases, investigations, and their associations. They require proper +authentication and configuration to run. + +To run these tests: +1. Make sure you have created a config.json file in the tests directory + with your Chronicle credentials (see conftest.py for format) +2. Authenticate with Google Cloud using ADC: + gcloud auth application-default login +3. Run: pytest -xvs server/secops/tests/test_secops_investigations_mcp.py +""" + +from datetime import datetime, timedelta, timezone +from typing import Dict + +import pytest + +from secops_mcp.tools.investigation_management import ( + fetch_associated_investigations, + get_cases, + get_investigation, + list_investigations, + trigger_investigation, +) + + +class TestChronicleInvestigationsMCP: + """Test class for Chronicle Investigation Management MCP tools.""" + + @pytest.mark.asyncio + async def test_list_investigations( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test listing investigations. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + result = await list_investigations( + page_size=10, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + assert isinstance(result, dict) + + if "error" not in result: + assert "investigations" in result + assert isinstance(result["investigations"], list) + + @pytest.mark.asyncio + async def test_get_investigation( + self, chronicle_config: Dict[str, str], chronicle_client + ) -> None: + """Test getting a specific investigation. + + Args: + chronicle_config: Dictionary with Chronicle configuration + chronicle_client: Chronicle client fixture + """ + investigations_result = await list_investigations( + page_size=1, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + if "investigations" in investigations_result: + investigations = investigations_result["investigations"] + if investigations: + investigation_name = investigations[0].get("name") + if investigation_name: + investigation_id = investigation_name.split("/")[-1] + result = await get_investigation( + investigation_id=investigation_id, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + assert isinstance(result, dict) + assert "error" not in result + assert "name" in result + + @pytest.mark.asyncio + async def test_get_cases( + self, chronicle_config: Dict[str, str], chronicle_client + ) -> None: + """Test getting cases using alert case IDs. + + Args: + chronicle_config: Dictionary with Chronicle configuration + chronicle_client: Chronicle client fixture + """ + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(days=30) + + alerts = chronicle_client.get_alerts( + start_time=start_time, + end_time=end_time, + max_alerts=10, + ) + + alert_list = alerts.get("alerts", {}).get("alerts", []) + case_ids = [ + alert.get("caseName") + for alert in alert_list + if alert.get("caseName") + ] + + if case_ids: + result = await get_cases( + case_ids=case_ids[:5], + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + assert isinstance(result, dict) + if "error" not in result: + assert hasattr(result, "cases") + + @pytest.mark.asyncio + async def test_trigger_investigation( + self, chronicle_config: Dict[str, str], chronicle_client + ) -> None: + """Test triggering investigation for a real alert. + + Args: + chronicle_config: Dictionary with Chronicle configuration + chronicle_client: Chronicle client fixture + """ + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(days=30) + + alerts = chronicle_client.get_alerts( + start_time=start_time, + end_time=end_time, + max_alerts=5, + ) + + alert_list = alerts.get("alerts", {}).get("alerts", []) + + if alert_list: + alert_id = alert_list[0].get("name") + if alert_id: + result = await trigger_investigation( + alert_id=alert_id, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + assert isinstance(result, dict) + if "error" not in result: + assert "investigation" in result + assert "alert_id" in result + assert result["alert_id"] == alert_id + + @pytest.mark.asyncio + async def test_fetch_associated_investigations( + self, chronicle_config: Dict[str, str], chronicle_client + ) -> None: + """Test fetching investigations associated with alerts. + + Args: + chronicle_config: Dictionary with Chronicle configuration + chronicle_client: Chronicle client fixture + """ + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(days=30) + + alerts = chronicle_client.get_alerts( + start_time=start_time, + end_time=end_time, + max_alerts=3, + ) + + alert_list = alerts.get("alerts", {}).get("alerts", []) + alert_ids = [ + alert.get("name") for alert in alert_list if alert.get("name") + ] + + if alert_ids: + result = await fetch_associated_investigations( + detection_type="ALERT", + alert_ids=alert_ids, + association_limit_per_detection=3, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + assert isinstance(result, dict) + if "error" not in result: + assert "detection_type" in result + assert "associations" in result + assert isinstance(result["associations"], dict) From fdac1e39d930373e466163ad210b910e0e26dba3 Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Fri, 16 Jan 2026 12:41:39 +0530 Subject: [PATCH 2/3] docs: add investigation management tools documentation --- docs/servers/secops_mcp.md | 49 ++++++++++++++++++++++++++++++++++++++ server/secops/README.md | 21 +++++++++++++++- 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/docs/servers/secops_mcp.md b/docs/servers/secops_mcp.md index a2a5cfa..2145562 100644 --- a/docs/servers/secops_mcp.md +++ b/docs/servers/secops_mcp.md @@ -344,6 +344,55 @@ The service account or user credentials need the following Chronicle roles: - Zero-day exploitation ``` +- **`get_cases(case_ids, project_id=None, customer_id=None, region=None)`** + - **Description:** Batch retrieve case details using the legacy batch API. Supports up to 1000 case IDs. Returns case details including priority, status, stage, and SOAR platform info. + - **Parameters:** + - `case_ids` (required): List of case IDs to retrieve (max 1000). + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - **Returns:** Dictionary containing case details with priority, status, stage, and SOAR platform information. + +- **`list_investigations(page_size=50, page_token=None, project_id=None, customer_id=None, region=None)`** + - **Description:** List all investigations in Chronicle instance. Returns investigation status, verdict, and confidence. Supports pagination. + - **Parameters:** + - `page_size` (optional): Number of investigations to return per page (default: 50). + - `page_token` (optional): Token for pagination from previous response. + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - **Returns:** Dictionary containing list of investigations with status, verdict, confidence, and pagination token. + +- **`get_investigation(investigation_id, project_id=None, customer_id=None, region=None)`** + - **Description:** Retrieve specific investigation by ID. Returns detailed investigation information including status and verdict. + - **Parameters:** + - `investigation_id` (required): The unique identifier of the investigation to retrieve. + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - **Returns:** Dictionary containing detailed investigation information including display name, status, verdict, confidence, and timestamps. + +- **`trigger_investigation(alert_id, project_id=None, customer_id=None, region=None)`** + - **Description:** Create new investigation for a specific alert. Returns created investigation details and trigger type. + - **Parameters:** + - `alert_id` (required): The unique identifier of the alert to investigate. + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - **Returns:** Dictionary containing created investigation details including name, status, and trigger type. + +- **`fetch_associated_investigations(detection_type, alert_ids=None, case_ids=None, association_limit_per_detection=5, project_id=None, customer_id=None, region=None)`** + - **Description:** Retrieve investigations associated with alerts or cases. Supports filtering by detection type (ALERT or CASE). Returns investigation associations with verdict information. + - **Parameters:** + - `detection_type` (required): Type of detection to query. Valid values: "ALERT", "CASE", "DETECTION_TYPE_ALERT", "DETECTION_TYPE_CASE". + - `alert_ids` (optional): List of alert IDs to query. Required if detection_type is ALERT. + - `case_ids` (optional): List of case IDs to query. Required if detection_type is CASE. + - `association_limit_per_detection` (optional): Maximum number of investigations to return per detection (default: 5). + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - **Returns:** Dictionary containing investigation associations grouped by detection ID, with verdict and confidence information. + ## Usage Examples ### Example 1: Natural Language Security Event Search diff --git a/server/secops/README.md b/server/secops/README.md index 21e4c92..ab3bebe 100644 --- a/server/secops/README.md +++ b/server/secops/README.md @@ -120,6 +120,23 @@ Chronicle Security Operations suite. - **`generate_feed_secret(feed_id, project_id=None, customer_id=None, region=None)`** - Creates a new authentication secret for feeds that support authentication (e.g., HTTP feeds with basic auth). This replaces any existing secret. +### Investigation Management Tools + +- **`get_cases(case_ids, project_id=None, customer_id=None, region=None)`** + - Batch retrieve case details using the legacy batch API. Supports up to 1000 case IDs. Returns case details including priority, status, stage, and SOAR platform info. + +- **`list_investigations(page_size=50, page_token=None, project_id=None, customer_id=None, region=None)`** + - List all investigations in Chronicle instance. Returns investigation status, verdict, and confidence. Supports pagination. + +- **`get_investigation(investigation_id, project_id=None, customer_id=None, region=None)`** + - Retrieve specific investigation by ID. Returns detailed investigation information including status and verdict. + +- **`trigger_investigation(alert_id, project_id=None, customer_id=None, region=None)`** + - Create new investigation for a specific alert. Returns created investigation details and trigger type. + +- **`fetch_associated_investigations(detection_type, alert_ids=None, case_ids=None, association_limit_per_detection=5, project_id=None, customer_id=None, region=None)`** + - Retrieve investigations associated with alerts or cases. Supports filtering by detection type (ALERT or CASE). Returns investigation associations with verdict information. + ### API Capabilities The MCP server provides the following capabilities: @@ -136,7 +153,8 @@ The MCP server provides the following capabilities: 10. **Data Table Management**: Create and manage structured data tables for detection rules 11. **Reference List Management**: Create and manage reference lists for detection rules 12. **Feed Management**: Create, update, enable, disable, and delete data feeds -13. **UDM Search & Export**: Direct UDM querying, field value autocomplete, and CSV export +13. **Investigation Management**: Manage cases and investigations, trigger investigations, and fetch associated investigations +14. **UDM Search & Export**: Direct UDM querying, field value autocomplete, and CSV export ### Example @@ -151,6 +169,7 @@ These tools focus on core security operations tasks: - **Entity Analysis**: Use `lookup_entity` to investigate IPs, domains, hashes, and other indicators - **Rule Management**: Use `list_security_rules` and `search_security_rules` to manage detection rules - **Threat Intelligence**: Use `get_ioc_matches` and `get_threat_intel` for IOC analysis and AI-powered insights +- **Investigation Management**: Use `list_investigations`, `get_investigation`, `trigger_investigation`, `get_cases`, and `fetch_associated_investigations` to manage investigations and cases - **UDM Analysis & Export**: Use `search_udm`, `export_udm_search_csv`, and `find_udm_field_values` for direct UDM querying, data export, and field discovery ### Data Ingestion & Parsing Tools From af77e3ce93bf5c988d1df5a34a6dcc6016c9a8a1 Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Wed, 21 Jan 2026 12:30:30 +0530 Subject: [PATCH 3/3] docs: remove flakey get_cases tool from investigation management. --- docs/servers/secops_mcp.md | 9 --- server/secops/README.md | 5 +- .../tools/investigation_management.py | 80 ------------------- .../tests/test_secops_investigations_mcp.py | 39 --------- 4 files changed, 1 insertion(+), 132 deletions(-) diff --git a/docs/servers/secops_mcp.md b/docs/servers/secops_mcp.md index 2145562..c73049c 100644 --- a/docs/servers/secops_mcp.md +++ b/docs/servers/secops_mcp.md @@ -344,15 +344,6 @@ The service account or user credentials need the following Chronicle roles: - Zero-day exploitation ``` -- **`get_cases(case_ids, project_id=None, customer_id=None, region=None)`** - - **Description:** Batch retrieve case details using the legacy batch API. Supports up to 1000 case IDs. Returns case details including priority, status, stage, and SOAR platform info. - - **Parameters:** - - `case_ids` (required): List of case IDs to retrieve (max 1000). - - `project_id` (optional): Google Cloud project ID (defaults to environment config). - - `customer_id` (optional): Chronicle customer ID (defaults to environment config). - - `region` (optional): Chronicle region (defaults to environment config or 'us'). - - **Returns:** Dictionary containing case details with priority, status, stage, and SOAR platform information. - - **`list_investigations(page_size=50, page_token=None, project_id=None, customer_id=None, region=None)`** - **Description:** List all investigations in Chronicle instance. Returns investigation status, verdict, and confidence. Supports pagination. - **Parameters:** diff --git a/server/secops/README.md b/server/secops/README.md index ab3bebe..af8baa4 100644 --- a/server/secops/README.md +++ b/server/secops/README.md @@ -122,9 +122,6 @@ Chronicle Security Operations suite. ### Investigation Management Tools -- **`get_cases(case_ids, project_id=None, customer_id=None, region=None)`** - - Batch retrieve case details using the legacy batch API. Supports up to 1000 case IDs. Returns case details including priority, status, stage, and SOAR platform info. - - **`list_investigations(page_size=50, page_token=None, project_id=None, customer_id=None, region=None)`** - List all investigations in Chronicle instance. Returns investigation status, verdict, and confidence. Supports pagination. @@ -169,7 +166,7 @@ These tools focus on core security operations tasks: - **Entity Analysis**: Use `lookup_entity` to investigate IPs, domains, hashes, and other indicators - **Rule Management**: Use `list_security_rules` and `search_security_rules` to manage detection rules - **Threat Intelligence**: Use `get_ioc_matches` and `get_threat_intel` for IOC analysis and AI-powered insights -- **Investigation Management**: Use `list_investigations`, `get_investigation`, `trigger_investigation`, `get_cases`, and `fetch_associated_investigations` to manage investigations and cases +- **Investigation Management**: Use `list_investigations`, `get_investigation`, `trigger_investigation`, and `fetch_associated_investigations` to manage investigations and cases - **UDM Analysis & Export**: Use `search_udm`, `export_udm_search_csv`, and `find_udm_field_values` for direct UDM querying, data export, and field discovery ### Data Ingestion & Parsing Tools diff --git a/server/secops/secops_mcp/tools/investigation_management.py b/server/secops/secops_mcp/tools/investigation_management.py index b3a512b..f419eee 100644 --- a/server/secops/secops_mcp/tools/investigation_management.py +++ b/server/secops/secops_mcp/tools/investigation_management.py @@ -22,86 +22,6 @@ logger = logging.getLogger("secops-mcp") -@server.tool() -async def get_cases( - case_ids: List[str], - project_id: Optional[str] = None, - customer_id: Optional[str] = None, - region: Optional[str] = None, -) -> Dict[str, Any]: - """Batch retrieve case details using the legacy batch API. - - Retrieves detailed information for multiple cases in a single request. - Supports up to 1000 case IDs per request. Returns case details including - priority, status, stage, and SOAR platform information. - - **Workflow Integration:** - - Use after retrieving alerts to get associated case details - - Essential for incident response workflows where AI helps analysts - navigate cases - - Can be used to check case status before creating new investigations - - Helps correlate alerts with existing case management workflows - - **Use Cases:** - - "Show me the case details for these alert IDs" - - "Get details for all high-priority cases" - - "What's the status of cases associated with these alerts?" - - "Retrieve case information for incident response" - - Args: - case_ids (List[str]): List of case IDs to retrieve (max 1000). - project_id (Optional[str]): Google Cloud project ID. Defaults to - environment configuration. - customer_id (Optional[str]): Chronicle customer ID. Defaults to - environment configuration. - region (Optional[str]): Chronicle region (e.g., "us", "europe"). - Defaults to environment configuration. - - Returns: - Dict[str, Any]: Dictionary containing case details including display - name, priority, status, stage, and SOAR platform info. Returns - error message if retrieval fails. - - Next Steps (using MCP-enabled tools): - - Analyze case priority and status to determine next actions - - Use `list_investigations` or `fetch_associated_investigations` to - find related investigations - - Use `get_security_alerts` to find alerts associated with cases - - For high-priority cases, use `trigger_investigation` to create - new investigations - - Use entity lookup tools on indicators found in case details - """ - try: - if not case_ids: - return { - "error": "case_ids parameter is required and cannot be empty" - } - - if len(case_ids) > 1000: - return { - "error": ( - "Too many case IDs provided. Maximum is 1000, " - f"received {len(case_ids)}" - ) - } - - chronicle = get_chronicle_client(project_id, customer_id, region) - print(f"Retrieving details for {len(case_ids)} case(s)...") - - cases_response = chronicle.get_cases(case_ids) - - if not cases_response or not hasattr(cases_response, "cases"): - return {"message": "No cases found", "case_count": 0, "cases": []} - - print(f"Successfully retrieved {len(cases_response.cases)} case(s)") - return cases_response - - except Exception as e: - error_msg = f"Error retrieving cases: {str(e)}" - print(error_msg) - return {"error": error_msg} - - @server.tool() async def list_investigations( page_size: int = 50, diff --git a/server/secops/tests/test_secops_investigations_mcp.py b/server/secops/tests/test_secops_investigations_mcp.py index e214d33..b840f21 100644 --- a/server/secops/tests/test_secops_investigations_mcp.py +++ b/server/secops/tests/test_secops_investigations_mcp.py @@ -19,7 +19,6 @@ from secops_mcp.tools.investigation_management import ( fetch_associated_investigations, - get_cases, get_investigation, list_investigations, trigger_investigation, @@ -85,44 +84,6 @@ async def test_get_investigation( assert "error" not in result assert "name" in result - @pytest.mark.asyncio - async def test_get_cases( - self, chronicle_config: Dict[str, str], chronicle_client - ) -> None: - """Test getting cases using alert case IDs. - - Args: - chronicle_config: Dictionary with Chronicle configuration - chronicle_client: Chronicle client fixture - """ - end_time = datetime.now(timezone.utc) - start_time = end_time - timedelta(days=30) - - alerts = chronicle_client.get_alerts( - start_time=start_time, - end_time=end_time, - max_alerts=10, - ) - - alert_list = alerts.get("alerts", {}).get("alerts", []) - case_ids = [ - alert.get("caseName") - for alert in alert_list - if alert.get("caseName") - ] - - if case_ids: - result = await get_cases( - case_ids=case_ids[:5], - project_id=chronicle_config["CHRONICLE_PROJECT_ID"], - customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], - region=chronicle_config["CHRONICLE_REGION"], - ) - - assert isinstance(result, dict) - if "error" not in result: - assert hasattr(result, "cases") - @pytest.mark.asyncio async def test_trigger_investigation( self, chronicle_config: Dict[str, str], chronicle_client