From 54eb2b5fa5cb8d9af996472d36d8d21b1ef45f9b Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Thu, 29 Jan 2026 17:53:41 +0530 Subject: [PATCH 1/2] feat: Add curated rules management tools for Chronicle detection rules --- server/secops/secops_mcp/tools/__init__.py | 1 + .../tools/curated_rules_management.py | 690 ++++++++++++++++++ .../tests/test_secops_curated_rules_tools.py | 445 +++++++++++ 3 files changed, 1136 insertions(+) create mode 100644 server/secops/secops_mcp/tools/curated_rules_management.py create mode 100644 server/secops/tests/test_secops_curated_rules_tools.py diff --git a/server/secops/secops_mcp/tools/__init__.py b/server/secops/secops_mcp/tools/__init__.py index 0cfc387..384adfa 100644 --- a/server/secops/secops_mcp/tools/__init__.py +++ b/server/secops/secops_mcp/tools/__init__.py @@ -26,3 +26,4 @@ from .udm_search import * from .search import * from .feed_management import * +from .curated_rules_management import * diff --git a/server/secops/secops_mcp/tools/curated_rules_management.py b/server/secops/secops_mcp/tools/curated_rules_management.py new file mode 100644 index 0000000..cb848fc --- /dev/null +++ b/server/secops/secops_mcp/tools/curated_rules_management.py @@ -0,0 +1,690 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Security Operations MCP tools for curated rules management.""" + +import logging +from datetime import datetime +from typing import Any, Dict, Optional + +from secops_mcp.server import get_chronicle_client, server + + +logger = logging.getLogger("secops-mcp") + + +@server.tool() +async def list_curated_rules( + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, + page_size: int = 100, + page_token: Optional[str] = None, + as_list: bool = False, +) -> Dict[str, Any]: + """List all curated detection rules available in Chronicle. + + Retrieves pre-built detection rules provided by Google Chronicle + that can be enabled and deployed without custom development. + Curated rules cover common security threats and attack patterns. + + **Workflow Integration:** + - Discover available pre-built detection content from Chronicle. + - Identify relevant curated rules for specific threat scenarios. + - Review rule descriptions and coverage before deployment. + - Complement custom detection rules with Google-maintained content. + + **Use Cases:** + - Browse available curated rules for ransomware detection. + - Find curated rules covering specific MITRE ATT&CK techniques. + - Identify pre-built rules for cloud security monitoring. + - Review available detection content before enabling rule sets. + - Audit currently available curated detection capabilities. + + Args: + project_id (Optional[str]): Google Cloud project ID. Defaults + to environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults + to environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", + "europe"). Defaults to environment configuration. + page_size (int): Maximum number of rules to return per page. + Defaults to 100. + page_token (Optional[str]): Token for retrieving next page of + results. + as_list (bool): If True, automatically paginate and return all + rules as a list. Defaults to False. + + Returns: + Dict[str, Any]: Response containing curated rules and + pagination metadata. Structure includes: + - curatedRules: List of curated rule objects + - nextPageToken: Token for pagination (if more results) + Returns error structure if the API call fails. + + Next Steps (using MCP-enabled tools): + - Review specific rule details using `get_curated_rule`. + - Enable relevant rules via `update_curated_rule_set_deployment`. + - Search for detections using `search_curated_detections`. + - Deploy rule sets containing multiple curated rules. + """ + try: + logger.info(f"Listing curated rules (page_size={page_size})") + + chronicle = get_chronicle_client(project_id, customer_id, region) + + if as_list: + rules = chronicle.list_curated_rules(as_list=True) + return {"curatedRules": rules} + else: + return chronicle.list_curated_rules( + page_size=page_size, page_token=page_token + ) + + except Exception as e: + logger.error(f"Error listing curated rules: {str(e)}", exc_info=True) + return {"error": str(e), "curatedRules": []} + + +@server.tool() +async def get_curated_rule( + rule_id: str, + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, +) -> Dict[str, Any]: + """Retrieve specific curated rule details by rule ID. + + Fetches the complete definition and metadata for a specific + Google-curated detection rule. Provides detailed information + about rule logic, severity, and detection capabilities. + + **Workflow Integration:** + - Review complete details of a specific curated rule. + - Understand detection logic before enabling the rule. + - Analyze rule metadata including severity and coverage. + - Evaluate rule fit for your security monitoring needs. + + **Use Cases:** + - Review details of a curated rule identified from search. + - Understand what threats a specific curated rule detects. + - Analyze rule logic before enabling it in production. + - Document curated rule capabilities for security teams. + - Compare multiple curated rules for similar threats. + + Args: + rule_id (str): Unique identifier of the curated rule to + retrieve. Example: "ur_ttp_lol_Atbroker" + project_id (Optional[str]): Google Cloud project ID. Defaults + to environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults + to environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", + "europe"). Defaults to environment configuration. + + Returns: + Dict[str, Any]: Complete curated rule information including: + - Rule definition and metadata + - Description and severity + - Detection logic details + Returns error structure if the API call fails. + + Next Steps (using MCP-enabled tools): + - Enable the rule via `update_curated_rule_set_deployment`. + - Search for historical detections using + `search_curated_detections`. + - Review related rules in the same rule set. + - Configure alerting settings for the rule. + """ + try: + logger.info(f"Retrieving curated rule: {rule_id}") + + chronicle = get_chronicle_client(project_id, customer_id, region) + rule = chronicle.get_curated_rule(rule_id) + + logger.info(f"Successfully retrieved curated rule: {rule_id}") + return rule + + except Exception as e: + logger.error( + f"Error retrieving curated rule {rule_id}: {str(e)}", + exc_info=True, + ) + return { + "error": f"Error retrieving curated rule: {str(e)}", + "rule": {}, + } + + +@server.tool() +async def get_curated_rule_by_name( + display_name: str, + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, +) -> Dict[str, Any]: + """Find curated rule by display name. + + Searches for a curated rule matching the specified display name. + Note: This performs a linear scan of all curated rules and may + be inefficient for large rule sets. + + **Workflow Integration:** + - Find curated rules by their human-readable names. + - Locate specific rules when only the display name is known. + - Search for rules mentioned in security documentation. + + **Use Cases:** + - Find "Atbroker.exe Abuse" rule by name. + - Locate specific rules referenced in threat intelligence. + - Search for rules by threat technique names. + + Args: + display_name (str): Display name of the curated rule to find. + Example: "Atbroker.exe Abuse" + project_id (Optional[str]): Google Cloud project ID. Defaults + to environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults + to environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", + "europe"). Defaults to environment configuration. + + Returns: + Dict[str, Any]: Curated rule information if found. Returns + error structure if not found or API call fails. + + Next Steps (using MCP-enabled tools): + - Enable the rule via `update_curated_rule_set_deployment`. + - Review rule details using `get_curated_rule`. + - Search for detections using `search_curated_detections`. + """ + try: + logger.info(f"Searching for curated rule by name: {display_name}") + + chronicle = get_chronicle_client(project_id, customer_id, region) + rule = chronicle.get_curated_rule_by_name(display_name) + + if rule: + logger.info(f"Found curated rule: {display_name}") + return rule + else: + return { + "error": f"Curated rule not found: {display_name}", + "rule": {}, + } + + except Exception as e: + logger.error( + f"Error finding curated rule by name {display_name}: " f"{str(e)}", + exc_info=True, + ) + return { + "error": f"Error finding curated rule: {str(e)}", + "rule": {}, + } + + +@server.tool() +async def search_curated_detections( + rule_id: str, + start_time: str, + end_time: str, + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, + list_basis: Optional[str] = None, + alert_state: Optional[str] = None, + page_size: int = 100, + page_token: Optional[str] = None, +) -> Dict[str, Any]: + """Search detections generated by a specific curated rule. + + Retrieves detections from a curated rule within a specified time + range. Useful for investigating threats detected by Google-curated + detection content and analyzing rule effectiveness. + + **Workflow Integration:** + - Investigate threats detected by curated rules. + - Analyze curated rule effectiveness and alert volume. + - Review historical detections for specific threat patterns. + - Scope incidents based on curated rule matches. + + **Use Cases:** + - Retrieve all detections from a curated ransomware rule. + - Analyze alert frequency for specific curated detections. + - Investigate threats detected by cloud security rules. + - Filter detections by alert state (ALERTING/NOT_ALERTING). + - Review detections from recently enabled curated rules. + + Args: + rule_id (str): Unique identifier of the curated rule. Example: + "ur_ttp_GCP_MassSecretDeletion" + start_time (str): Start of search time range in ISO 8601 + format. Example: "2025-01-20T00:00:00Z" + end_time (str): End of search time range in ISO 8601 format. + Example: "2025-01-27T23:59:59Z" + project_id (Optional[str]): Google Cloud project ID. Defaults + to environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults + to environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", + "europe"). Defaults to environment configuration. + list_basis (Optional[str]): Basis for listing detections. + Valid values: "DETECTION_TIME", "CREATED_TIME". + alert_state (Optional[str]): Filter by alert state. Valid + values: "ALERTING", "NOT_ALERTING". + page_size (int): Maximum number of detections to return. + Defaults to 100. + page_token (Optional[str]): Token for retrieving next page. + + Returns: + Dict[str, Any]: Response containing detections and pagination + metadata. Structure includes: + - curatedDetections: List of detection objects + - nextPageToken: Token for pagination (if more results) + Returns error structure if the API call fails. + + Next Steps (using MCP-enabled tools): + - Analyze detection details and associated events. + - Enrich indicators using threat intelligence tools. + - Create or update incidents in case management systems. + - Review entity details using `lookup_entity`. + - Investigate related UDM events using `search_udm`. + """ + try: + logger.info( + f"Searching curated detections for rule {rule_id} " + f"from {start_time} to {end_time}" + ) + + chronicle = get_chronicle_client(project_id, customer_id, region) + + start_dt = datetime.fromisoformat(start_time.replace("Z", "+00:00")) + end_dt = datetime.fromisoformat(end_time.replace("Z", "+00:00")) + + result = chronicle.search_curated_detections( + rule_id=rule_id, + start_time=start_dt, + end_time=end_dt, + list_basis=list_basis, + alert_state=alert_state, + page_size=page_size, + page_token=page_token, + ) + + detection_count = len(result.get("curatedDetections", [])) + logger.info(f"Found {detection_count} curated detections") + + return result + + except ValueError as ve: + logger.error( + f"Invalid time format for curated detections search: " f"{str(ve)}", + exc_info=True, + ) + return { + "error": f"Invalid time format: {str(ve)}", + "curatedDetections": [], + } + except Exception as e: + logger.error( + f"Error searching curated detections for rule {rule_id}: " + f"{str(e)}", + exc_info=True, + ) + return { + "error": str(e), + "curatedDetections": [], + } + + +@server.tool() +async def list_curated_rule_sets( + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, + page_size: int = 100, + page_token: Optional[str] = None, + as_list: bool = False, +) -> Dict[str, Any]: + """List all curated rule sets available in Chronicle. + + Retrieves collections of related curated rules grouped by threat + category, data source, or detection objective. Rule sets can be + enabled together for comprehensive coverage. + + **Workflow Integration:** + - Discover available curated rule set collections. + - Identify rule sets relevant to your data sources. + - Review rule set coverage for specific threat categories. + - Enable multiple related rules simultaneously. + + **Use Cases:** + - Browse available rule sets for Azure security monitoring. + - Find rule sets covering ransomware detection. + - Identify rule sets for specific cloud platforms. + - Review rule set organization and structure. + - Audit available pre-built detection content. + + Args: + project_id (Optional[str]): Google Cloud project ID. Defaults + to environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults + to environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", + "europe"). Defaults to environment configuration. + page_size (int): Maximum number of rule sets to return. + Defaults to 100. + page_token (Optional[str]): Token for retrieving next page. + as_list (bool): If True, automatically paginate and return all + rule sets as a list. Defaults to False. + + Returns: + Dict[str, Any]: Response containing rule sets and pagination + metadata. Structure includes: + - curatedRuleSets: List of rule set objects + - nextPageToken: Token for pagination (if more results) + Returns error structure if the API call fails. + + Next Steps (using MCP-enabled tools): + - Review specific rule set details using + `get_curated_rule_set`. + - Enable rule sets via + `update_curated_rule_set_deployment`. + - Check deployment status using + `list_curated_rule_set_deployments`. + - Review rules within a set using `list_curated_rules`. + """ + try: + logger.info(f"Listing curated rule sets (page_size={page_size})") + + chronicle = get_chronicle_client(project_id, customer_id, region) + + if as_list: + rule_sets = chronicle.list_curated_rule_sets(as_list=True) + return {"curatedRuleSets": rule_sets} + else: + result = chronicle.list_curated_rule_sets( + page_size=page_size, page_token=page_token + ) + return result + + except Exception as e: + logger.error( + f"Error listing curated rule sets: {str(e)}", + exc_info=True, + ) + return {"error": str(e), "curatedRuleSets": []} + + +@server.tool() +async def get_curated_rule_set( + rule_set_id: str, + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, +) -> Dict[str, Any]: + """Retrieve specific curated rule set details by ID. + + Fetches complete configuration and metadata for a specific curated + rule set. Provides information about the rules included in the set + and deployment options. + + **Workflow Integration:** + - Review detailed information about a specific rule set. + - Understand which rules are included in the set. + - Analyze rule set coverage before deployment. + - Plan rule set deployment strategy. + + **Use Cases:** + - Review rules included in "Azure - Network" rule set. + - Understand coverage of GCP security monitoring rule set. + - Analyze rule set configuration before enabling. + - Document deployed rule set capabilities. + - Compare multiple rule sets for similar coverage. + + Args: + rule_set_id (str): Unique identifier of the curated rule set. + Example: "00ad672e-ebb3-0dd1-2a4d-99bd7c5e5f93" + project_id (Optional[str]): Google Cloud project ID. Defaults + to environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults + to environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", + "europe"). Defaults to environment configuration. + + Returns: + Dict[str, Any]: Complete rule set information including: + - Rule set configuration and metadata + - Included rules and coverage + - Deployment options and settings + Returns error structure if the API call fails. + + Next Steps (using MCP-enabled tools): + - Enable the rule set via + `update_curated_rule_set_deployment`. + - Review individual rules using `get_curated_rule`. + - Check deployment status using + `list_curated_rule_set_deployments`. + - Configure alerting settings for the rule set. + """ + try: + logger.info(f"Retrieving curated rule set: {rule_set_id}") + + chronicle = get_chronicle_client(project_id, customer_id, region) + rule_set = chronicle.get_curated_rule_set(rule_set_id) + + logger.info(f"Successfully retrieved curated rule set: {rule_set_id}") + return rule_set + + except Exception as e: + logger.error( + f"Error retrieving curated rule set {rule_set_id}: " f"{str(e)}", + exc_info=True, + ) + return { + "error": f"Error retrieving curated rule set: {str(e)}", + "ruleSet": {}, + } + + +@server.tool() +async def list_curated_rule_set_deployments( + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, + page_size: int = 100, + page_token: Optional[str] = None, + as_list: bool = False, +) -> Dict[str, Any]: + """List deployment status of all curated rule sets. + + Retrieves deployment configuration for curated rule sets, + including enabled status, precision level (broad/precise), and + alerting configuration. Essential for managing curated content. + + **Workflow Integration:** + - Monitor which curated rule sets are currently enabled. + - Review alerting configuration for deployed rule sets. + - Audit detection coverage from curated content. + - Identify rule sets that need configuration updates. + + **Use Cases:** + - Check which curated rule sets are currently enabled. + - Review alerting settings for deployed rule sets. + - Identify rule sets configured for broad vs precise detection. + - Audit curated detection coverage across the environment. + - Find rule sets that need alerting enabled. + + Args: + project_id (Optional[str]): Google Cloud project ID. Defaults + to environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults + to environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", + "europe"). Defaults to environment configuration. + page_size (int): Maximum number of deployments to return. + Defaults to 100. + page_token (Optional[str]): Token for retrieving next page. + as_list (bool): If True, automatically paginate and return all + deployments as a list. Defaults to False. + + Returns: + Dict[str, Any]: Response containing deployments and pagination + metadata. Structure includes: + - curatedRuleSetDeployments: List of deployment objects + - nextPageToken: Token for pagination (if more results) + Each deployment includes enabled status, precision level, + and alerting configuration. + Returns error structure if the API call fails. + + Next Steps (using MCP-enabled tools): + - Update deployment settings using + `update_curated_rule_set_deployment`. + - Review rule set details using `get_curated_rule_set`. + - Enable alerting for specific rule sets. + - Adjust precision levels based on environment needs. + """ + try: + logger.info( + f"Listing curated rule set deployments " f"(page_size={page_size})" + ) + + chronicle = get_chronicle_client(project_id, customer_id, region) + + if as_list: + deployments = chronicle.list_curated_rule_set_deployments( + as_list=True + ) + return {"curatedRuleSetDeployments": deployments} + else: + result = chronicle.list_curated_rule_set_deployments( + page_size=page_size, page_token=page_token + ) + return result + + except Exception as e: + logger.error( + f"Error listing curated rule set deployments: {str(e)}", + exc_info=True, + ) + return {"error": str(e), "curatedRuleSetDeployments": []} + + +@server.tool() +async def update_curated_rule_set_deployment( + category_id: str, + rule_set_id: str, + precision: str, + enabled: bool, + alerting: bool, + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, +) -> Dict[str, Any]: + """Update deployment configuration for a curated rule set. + + Enables or disables a curated rule set, configures precision level + (broad or precise), and controls alerting settings. Essential for + managing curated detection content deployment. + + **Workflow Integration:** + - Enable curated rule sets for specific threat coverage. + - Configure precision levels to balance coverage and noise. + - Enable or disable alerting for rule sets. + - Adjust detection settings based on operational needs. + + **Use Cases:** + - Enable the "Azure - Network" rule set with precise detection. + - Turn on alerting for GCP security rule set. + - Switch rule set from broad to precise detection mode. + - Disable rule sets that generate excessive false positives. + - Enable comprehensive cloud security monitoring. + + **Precision Modes:** + - **broad**: More detections, potentially higher false positive + rate. Better for comprehensive threat hunting. + - **precise**: Fewer detections, lower false positive rate. + Better for production alerting. + + Args: + category_id (str): Category ID of the rule set. Example: + "110fa43d-7165-2355-1985-a63b7cdf90e8" + rule_set_id (str): Unique identifier of the rule set. Example: + "00ad672e-ebb3-0dd1-2a4d-99bd7c5e5f93" + precision (str): Detection precision level. Valid values: + "broad", "precise" + enabled (bool): Whether to enable the rule set for detection. + alerting (bool): Whether to enable alerting for detections. + project_id (Optional[str]): Google Cloud project ID. Defaults + to environment configuration. + customer_id (Optional[str]): Chronicle customer ID. Defaults + to environment configuration. + region (Optional[str]): Chronicle region (e.g., "us", + "europe"). Defaults to environment configuration. + + Returns: + Dict[str, Any]: Updated deployment configuration including + status and settings. Returns error structure if the API + call fails. + + Next Steps (using MCP-enabled tools): + - Verify deployment status using + `list_curated_rule_set_deployments`. + - Monitor detections using `search_curated_detections`. + - Review generated alerts using `get_security_alerts`. + - Adjust settings based on alert volume and accuracy. + """ + try: + logger.info( + f"Updating curated rule set deployment: {rule_set_id} " + f"(enabled={enabled}, precision={precision}, " + f"alerting={alerting})" + ) + + chronicle = get_chronicle_client(project_id, customer_id, region) + + valid_precision_values = ["broad", "precise"] + if precision not in valid_precision_values: + error_msg = ( + f"Invalid precision value: {precision}. " + f"Must be one of {valid_precision_values}" + ) + logger.error(error_msg) + return {"error": error_msg} + + deployment_config = { + "category_id": category_id, + "rule_set_id": rule_set_id, + "precision": precision, + "enabled": enabled, + "alerting": alerting, + } + + result = chronicle.update_curated_rule_set_deployment(deployment_config) + + logger.info( + f"Successfully updated curated rule set deployment: " + f"{rule_set_id}" + ) + return result + + except Exception as e: + logger.error( + f"Error updating curated rule set deployment " + f"{rule_set_id}: {str(e)}", + exc_info=True, + ) + return { + "error": (f"Error updating curated rule set deployment: {str(e)}"), + } diff --git a/server/secops/tests/test_secops_curated_rules_tools.py b/server/secops/tests/test_secops_curated_rules_tools.py new file mode 100644 index 0000000..5fb3d3f --- /dev/null +++ b/server/secops/tests/test_secops_curated_rules_tools.py @@ -0,0 +1,445 @@ +"""Integration tests for Chronicle SecOps MCP curated rules tools. + +These tests exercise the curated rules management functionality by making +actual API calls to the Chronicle service. They require proper +authentication and configuration to run. + +To run these tests: +1. Create a config.json file in the tests directory with your Chronicle + credentials (see conftest.py for format) +2. Authenticate with Google Cloud using ADC: + gcloud auth application-default login + OR set SECOPS_SA_PATH environment variable: + export SECOPS_SA_PATH=/path/to/service-account.json +3. Run: pytest -xvs server/secops/tests/test_secops_curated_rules_tools.py +""" + +from datetime import datetime, timedelta, timezone +from typing import Dict + +import pytest + +from secops_mcp.tools.curated_rules_management import ( + get_curated_rule, + get_curated_rule_by_name, + list_curated_rule_set_deployments, + list_curated_rule_sets, + list_curated_rules, + get_curated_rule_set, + search_curated_detections, + update_curated_rule_set_deployment, +) + + +class TestCuratedRulesManagement: + """Test class for Chronicle curated rules management tools.""" + + @pytest.mark.asyncio + async def test_list_curated_rules( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test listing curated rules. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + result = await list_curated_rules( + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + page_size=10, + ) + + assert isinstance(result, dict) + assert "curatedRules" in result + + if result.get("curatedRules"): + assert isinstance(result["curatedRules"], list) + first_rule = result["curatedRules"][0] + assert isinstance(first_rule, dict) + + @pytest.mark.asyncio + async def test_list_curated_rules_as_list( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test listing curated rules with as_list parameter. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + result = await list_curated_rules( + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + as_list=True, + ) + + assert isinstance(result, dict) + assert "curatedRules" in result + assert isinstance(result["curatedRules"], list) + + @pytest.mark.asyncio + async def test_get_curated_rule( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test retrieving a specific curated rule. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + list_result = await list_curated_rules( + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + page_size=1, + ) + + if not list_result.get("curatedRules"): + pytest.skip("No curated rules available to test") + + rule_id = list_result["curatedRules"][0].get("name", "").split("/")[-1] + + result = await get_curated_rule( + rule_id=rule_id, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + assert isinstance(result, dict) + if "error" not in result: + assert "name" in result or len(result) > 0 + + @pytest.mark.asyncio + async def test_get_curated_rule_by_name( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test finding a curated rule by display name. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + list_result = await list_curated_rules( + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + page_size=1, + ) + + if not list_result.get("curatedRules"): + pytest.skip("No curated rules available to test") + + display_name = list_result["curatedRules"][0].get("displayName", "") + + if not display_name: + pytest.skip("No display name available for testing") + + result = await get_curated_rule_by_name( + display_name=display_name, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + assert isinstance(result, dict) + + @pytest.mark.asyncio + async def test_search_curated_detections( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test searching detections from curated rules. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + list_result = await list_curated_rules( + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + page_size=1, + ) + + if not list_result.get("curatedRules"): + pytest.skip("No curated rules available to test") + + rule_id = list_result["curatedRules"][0].get("name", "").split("/")[-1] + + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(days=7) + + result = await search_curated_detections( + rule_id=rule_id, + start_time=start_time.isoformat().replace("+00:00", "Z"), + end_time=end_time.isoformat().replace("+00:00", "Z"), + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + page_size=10, + ) + + assert isinstance(result, dict) + if not result or "curatedDetections" not in result: + pytest.skip( + "No detections found for the rule in the time range" + ) + assert isinstance(result["curatedDetections"], list) + + @pytest.mark.asyncio + async def test_list_curated_rule_sets( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test listing curated rule sets. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + result = await list_curated_rule_sets( + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + page_size=10, + ) + + assert isinstance(result, dict) + assert "curatedRuleSets" in result + + if result.get("curatedRuleSets"): + assert isinstance(result["curatedRuleSets"], list) + first_set = result["curatedRuleSets"][0] + assert isinstance(first_set, dict) + + @pytest.mark.asyncio + async def test_list_curated_rule_sets_as_list( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test listing curated rule sets with as_list parameter. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + result = await list_curated_rule_sets( + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + as_list=True, + ) + + assert isinstance(result, dict) + assert "curatedRuleSets" in result + assert isinstance(result["curatedRuleSets"], list) + + @pytest.mark.asyncio + async def test_get_curated_rule_set( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test retrieving a specific curated rule set. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + list_result = await list_curated_rule_sets( + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + page_size=1, + ) + + if not list_result.get("curatedRuleSets"): + pytest.skip("No curated rule sets available to test") + + rule_set_id = ( + list_result["curatedRuleSets"][0].get("name", "").split("/")[-1] + ) + + result = await get_curated_rule_set( + rule_set_id=rule_set_id, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + assert isinstance(result, dict) + if "error" not in result: + assert "name" in result or len(result) > 0 + + @pytest.mark.asyncio + async def test_list_curated_rule_set_deployments( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test listing curated rule set deployments. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + result = await list_curated_rule_set_deployments( + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + page_size=10, + ) + + assert isinstance(result, dict) + assert "curatedRuleSetDeployments" in result + + if result.get("curatedRuleSetDeployments"): + assert isinstance(result["curatedRuleSetDeployments"], list) + first_deployment = result["curatedRuleSetDeployments"][0] + assert isinstance(first_deployment, dict) + + @pytest.mark.asyncio + async def test_list_curated_rule_set_deployments_as_list( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test listing deployments with as_list parameter. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + result = await list_curated_rule_set_deployments( + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + as_list=True, + ) + + assert isinstance(result, dict) + assert "curatedRuleSetDeployments" in result + assert isinstance(result["curatedRuleSetDeployments"], list) + + @pytest.mark.asyncio + async def test_update_curated_rule_set_deployment( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test updating curated rule set deployment configuration. + + This test captures the original deployment state, updates to + opposite values to verify the change works, then restores the + original state. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + rule_sets_result = await list_curated_rule_sets( + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + page_size=1, + ) + + if not rule_sets_result.get("curatedRuleSets"): + pytest.skip("No curated rule sets available") + + first_rule_set = rule_sets_result["curatedRuleSets"][0] + rule_set_name = first_rule_set.get("name", "") + name_parts = rule_set_name.split("/") + + try: + category_index = name_parts.index("curatedRuleSetCategories") + category_id = name_parts[category_index + 1] + rule_set_id = name_parts[-1] + except (ValueError, IndexError): + pytest.skip( + "Unable to extract category_id or rule_set_id " "from name" + ) + + deployment_found = False + original_precision = None + original_enabled = None + original_alerting = None + + for precision in ["precise", "broad"]: + deployments = await list_curated_rule_set_deployments( + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + for deployment in deployments.get("curatedRuleSetDeployments", []): + if rule_set_id in deployment.get( + "name", "" + ) and precision in deployment.get("name", ""): + original_enabled = deployment.get("enabled", False) + original_alerting = deployment.get("alerting", False) + original_precision = precision + deployment_found = True + break + + if deployment_found: + break + + if not deployment_found or original_enabled is None: + pytest.skip(f"No deployment found for rule set {rule_set_id}") + + try: + result = await update_curated_rule_set_deployment( + category_id=category_id, + rule_set_id=rule_set_id, + precision=original_precision, + enabled=not original_enabled, + alerting=not original_alerting, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + assert isinstance(result, dict) + if "error" not in result: + assert "name" in result or len(result) > 0 + + verify_result = await list_curated_rule_set_deployments( + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + for deployment in verify_result.get( + "curatedRuleSetDeployments", [] + ): + if rule_set_id in deployment.get( + "name", "" + ) and original_precision in deployment.get("name", ""): + if "enabled" in deployment: + assert deployment.get("enabled") == ( + not original_enabled + ) + if "alerting" in deployment: + assert deployment.get("alerting") == ( + not original_alerting + ) + break + + finally: + await update_curated_rule_set_deployment( + category_id=category_id, + rule_set_id=rule_set_id, + precision=original_precision, + enabled=original_enabled, + alerting=original_alerting, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + @pytest.mark.asyncio + async def test_update_curated_rule_set_deployment_invalid_precision( + self, chronicle_config: Dict[str, str] + ) -> None: + """Test update with invalid precision value. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + result = await update_curated_rule_set_deployment( + category_id="test-category", + rule_set_id="test-rule-set", + precision="invalid", + enabled=True, + alerting=False, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + assert isinstance(result, dict) + assert "error" in result + assert "Invalid precision value" in result["error"] From 192b0386ee7dd7aee3f3e00021a6b2f2b216e557 Mon Sep 17 00:00:00 2001 From: Mihir Vala <179564180+mihirvala-crestdata@users.noreply.github.com> Date: Fri, 30 Jan 2026 18:01:24 +0530 Subject: [PATCH 2/2] chore: updated docs --- docs/servers/secops_mcp.md | 169 +++++++++++++++++++++++++++++++++++++ server/secops/README.md | 37 +++++++- 2 files changed, 205 insertions(+), 1 deletion(-) diff --git a/docs/servers/secops_mcp.md b/docs/servers/secops_mcp.md index a2a5cfa..c6bb8be 100644 --- a/docs/servers/secops_mcp.md +++ b/docs/servers/secops_mcp.md @@ -344,6 +344,112 @@ The service account or user credentials need the following Chronicle roles: - Zero-day exploitation ``` +- **`list_curated_rules(project_id=None, customer_id=None, region=None, page_size=100, page_token=None, as_list=False)`** + - **Description:** List all curated detection rules available in Chronicle. Retrieves pre-built detection rules provided by Google that cover common security threats and attack patterns. + - **Parameters:** + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - `page_size` (optional): Maximum number of rules to return per page (default: 100). + - `page_token` (optional): Token for retrieving next page of results. + - `as_list` (optional): If True, automatically paginate and return all rules as a list (default: False). + - **Returns:** Dictionary containing curated rules and pagination metadata. + - **Return Example:** + ```json + { + "curatedRules": [ + { + "name": "projects/.../curatedRules/ur_ttp_lol_Atbroker", + "displayName": "Atbroker.exe Abuse", + "description": "Detects abuse of Windows Atbroker.exe", + "severity": "MEDIUM", + "type": "MULTI_EVENT" + } + ], + "nextPageToken": "token-for-next-page" + } + ``` + +- **`get_curated_rule(rule_id, project_id=None, customer_id=None, region=None)`** + - **Description:** Retrieve specific curated rule details by rule ID. Fetches complete definition and metadata for a specific Google-curated detection rule. + - **Parameters:** + - `rule_id` (required): Unique identifier of the curated rule. + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - **Returns:** Dictionary containing complete curated rule information. + +- **`get_curated_rule_by_name(display_name, project_id=None, customer_id=None, region=None)`** + - **Description:** Find curated rule by display name. Searches for a curated rule matching the specified human-readable name. + - **Parameters:** + - `display_name` (required): Display name of the curated rule to find. + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - **Returns:** Dictionary containing curated rule information if found. + +- **`search_curated_detections(rule_id, start_time, end_time, project_id=None, customer_id=None, region=None, list_basis=None, alert_state=None, page_size=100, page_token=None)`** + - **Description:** Search detections generated by a specific curated rule within a time range. Useful for investigating threats detected by Google-curated detection content. + - **Parameters:** + - `rule_id` (required): Unique identifier of the curated rule. + - `start_time` (required): Start of search time range in ISO 8601 format (e.g., "2025-01-20T00:00:00Z"). + - `end_time` (required): End of search time range in ISO 8601 format. + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - `list_basis` (optional): Basis for listing detections ("DETECTION_TIME" or "CREATED_TIME"). + - `alert_state` (optional): Filter by alert state ("ALERTING" or "NOT_ALERTING"). + - `page_size` (optional): Maximum number of detections to return (default: 100). + - `page_token` (optional): Token for retrieving next page. + - **Returns:** Dictionary containing detections and pagination metadata. + +- **`list_curated_rule_sets(project_id=None, customer_id=None, region=None, page_size=100, page_token=None, as_list=False)`** + - **Description:** List all curated rule sets available in Chronicle. Retrieves collections of related curated rules grouped by threat category or data source. + - **Parameters:** + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - `page_size` (optional): Maximum number of rule sets to return (default: 100). + - `page_token` (optional): Token for retrieving next page. + - `as_list` (optional): If True, automatically paginate and return all rule sets as a list (default: False). + - **Returns:** Dictionary containing rule sets and pagination metadata. + +- **`get_curated_rule_set(rule_set_id, project_id=None, customer_id=None, region=None)`** + - **Description:** Retrieve specific curated rule set details by ID. Provides information about rules included in the set and deployment options. + - **Parameters:** + - `rule_set_id` (required): Unique identifier of the curated rule set. + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - **Returns:** Dictionary containing complete rule set information. + +- **`list_curated_rule_set_deployments(project_id=None, customer_id=None, region=None, page_size=100, page_token=None, as_list=False)`** + - **Description:** List deployment status of all curated rule sets. Shows enabled status, precision level (broad/precise), and alerting configuration. + - **Parameters:** + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - `page_size` (optional): Maximum number of deployments to return (default: 100). + - `page_token` (optional): Token for retrieving next page. + - `as_list` (optional): If True, automatically paginate and return all deployments as a list (default: False). + - **Returns:** Dictionary containing deployment configurations with enabled status, precision level, and alerting settings. + +- **`update_curated_rule_set_deployment(category_id, rule_set_id, precision, enabled, alerting, project_id=None, customer_id=None, region=None)`** + - **Description:** Update deployment configuration for a curated rule set. Enables/disables rule sets, configures precision level (broad or precise), and controls alerting settings. + - **Parameters:** + - `category_id` (required): Category ID of the rule set. + - `rule_set_id` (required): Unique identifier of the rule set. + - `precision` (required): Detection precision level ("broad" or "precise"). + - `enabled` (required): Whether to enable the rule set for detection (boolean). + - `alerting` (required): Whether to enable alerting for detections (boolean). + - `project_id` (optional): Google Cloud project ID (defaults to environment config). + - `customer_id` (optional): Chronicle customer ID (defaults to environment config). + - `region` (optional): Chronicle region (defaults to environment config or 'us'). + - **Returns:** Dictionary containing updated deployment configuration. + - **Note:** Precision modes: + - **broad**: More detections, potentially higher false positive rate. Better for comprehensive threat hunting. + - **precise**: Fewer detections, lower false positive rate. Better for production alerting. + ## Usage Examples ### Example 1: Natural Language Security Event Search @@ -443,6 +549,69 @@ Their TTPs include: The group continues to evolve tactics and primarily targets financial institutions, cryptocurrency exchanges, and organizations in sectors of strategic interest to North Korea. ``` +### Example 5: Curated Rules Management Workflow + +``` +User: What pre-built detection rules does Chronicle offer for Azure security monitoring? + +AI uses: list_curated_rule_sets(as_list=True) + +AI: I found several curated rule sets for Azure security monitoring: + +1. "Azure - Network" - Detects network-based threats in Azure environments +2. "Azure - Authentication" - Monitors suspicious authentication patterns +3. "Azure - Storage" - Detects unusual storage access patterns + +Would you like to enable any of these? + +User: Yes, enable the Azure Network rule set with precise detection mode and alerting turned on. + +AI uses: list_curated_rule_sets() to get rule set details +AI extracts category_id and rule_set_id from the rule set information + +AI uses: update_curated_rule_set_deployment( + category_id="110fa43d-7165-2355-1985-a63b7cdf90e8", + rule_set_id="00ad672e-ebb3-0dd1-2a4d-99bd7c5e5f93", + precision="precise", + enabled=True, + alerting=True +) + +AI: Successfully enabled the "Azure - Network" rule set with: +- Precision: Precise (lower false positives, better for production alerting) +- Enabled: Yes +- Alerting: Yes + +The rule set is now actively monitoring your Azure network traffic. + +User: Can you show me what detections this rule set has generated in the last 7 days? + +AI uses: get_curated_rule_set(rule_set_id="00ad672e-ebb3-0dd1-2a4d-99bd7c5e5f93") +AI gets the rule IDs from the rule set + +AI uses: search_curated_detections( + rule_id="ur_azure_network_suspicious_traffic", + start_time="2025-01-20T00:00:00Z", + end_time="2025-01-27T23:59:59Z" +) + +AI: The Azure Network rule set generated 3 detections in the last 7 days: + +1. Suspicious outbound connection from Azure VM to known C2 server + Detection Time: 2025-01-25T14:23:45Z + Affected Asset: azure-vm-prod-01 + +2. Unusual data transfer volume to external Azure Storage + Detection Time: 2025-01-24T09:15:22Z + Affected Asset: azure-app-service-02 + +3. Port scanning activity from Azure container instance + Detection Time: 2025-01-22T16:48:31Z + Affected Asset: azure-container-instance-03 + +Would you like me to investigate any of these detections further? +``` + ## Additional Resources - [Chronicle API Documentation](https://cloud.google.com/chronicle/docs/reference/rest) diff --git a/server/secops/README.md b/server/secops/README.md index 21e4c92..05cc604 100644 --- a/server/secops/README.md +++ b/server/secops/README.md @@ -120,6 +120,32 @@ Chronicle Security Operations suite. - **`generate_feed_secret(feed_id, project_id=None, customer_id=None, region=None)`** - Creates a new authentication secret for feeds that support authentication (e.g., HTTP feeds with basic auth). This replaces any existing secret. +### Curated Rules Management Tools + +- **`list_curated_rules(project_id=None, customer_id=None, region=None, page_size=100, page_token=None, as_list=False)`** + - List all curated detection rules available in Chronicle. Retrieves pre-built detection rules provided by Google that cover common security threats and attack patterns. + +- **`get_curated_rule(rule_id, project_id=None, customer_id=None, region=None)`** + - Retrieve specific curated rule details by rule ID. Fetches complete definition and metadata for a specific Google-curated detection rule. + +- **`get_curated_rule_by_name(display_name, project_id=None, customer_id=None, region=None)`** + - Find curated rule by display name. Searches for a curated rule matching the specified human-readable name. + +- **`search_curated_detections(rule_id, start_time, end_time, project_id=None, customer_id=None, region=None, list_basis=None, alert_state=None, page_size=100, page_token=None)`** + - Search detections generated by a specific curated rule within a time range. Useful for investigating threats detected by Google-curated detection content. + +- **`list_curated_rule_sets(project_id=None, customer_id=None, region=None, page_size=100, page_token=None, as_list=False)`** + - List all curated rule sets available in Chronicle. Retrieves collections of related curated rules grouped by threat category or data source. + +- **`get_curated_rule_set(rule_set_id, project_id=None, customer_id=None, region=None)`** + - Retrieve specific curated rule set details by ID. Provides information about rules included in the set and deployment options. + +- **`list_curated_rule_set_deployments(project_id=None, customer_id=None, region=None, page_size=100, page_token=None, as_list=False)`** + - List deployment status of all curated rule sets. Shows enabled status, precision level (broad/precise), and alerting configuration. + +- **`update_curated_rule_set_deployment(category_id, rule_set_id, precision, enabled, alerting, project_id=None, customer_id=None, region=None)`** + - Update deployment configuration for a curated rule set. Enables/disables rule sets, configures precision level (broad or precise), and controls alerting settings. + ### API Capabilities The MCP server provides the following capabilities: @@ -136,7 +162,8 @@ The MCP server provides the following capabilities: 10. **Data Table Management**: Create and manage structured data tables for detection rules 11. **Reference List Management**: Create and manage reference lists for detection rules 12. **Feed Management**: Create, update, enable, disable, and delete data feeds -13. **UDM Search & Export**: Direct UDM querying, field value autocomplete, and CSV export +13. **Curated Rules Management**: Discover, retrieve, and manage Google-curated detection content and rule set deployments +14. **UDM Search & Export**: Direct UDM querying, field value autocomplete, and CSV export ### Example @@ -151,6 +178,7 @@ These tools focus on core security operations tasks: - **Entity Analysis**: Use `lookup_entity` to investigate IPs, domains, hashes, and other indicators - **Rule Management**: Use `list_security_rules` and `search_security_rules` to manage detection rules - **Threat Intelligence**: Use `get_ioc_matches` and `get_threat_intel` for IOC analysis and AI-powered insights +- **Curated Rules Management**: Use curated rules management tools to discover, enable, and configure Google-maintained detection content - **UDM Analysis & Export**: Use `search_udm`, `export_udm_search_csv`, and `find_udm_field_values` for direct UDM querying, data export, and field discovery ### Data Ingestion & Parsing Tools @@ -167,6 +195,13 @@ These tools help you maintain reference data for enhanced detections: - **Reference Lists**: Use for simple lists of values (e.g., IP addresses, domains, usernames) - **Detection Enhancement**: Both data tables and reference lists can be referenced in detection rules to make them more dynamic and maintainable +### Curated Detection Management Tools +These tools help you leverage Google-curated detection content: +- **Discovery**: Use `list_curated_rules` and `list_curated_rule_sets` to browse available pre-built detection content +- **Deployment**: Use `update_curated_rule_set_deployment` to enable rule sets with configurable precision levels (broad/precise) +- **Investigation**: Use `search_curated_detections` to analyze threats detected by curated rules +- **Configuration**: Manage alerting settings and precision tuning for optimal detection coverage + ## Configuration ### MCP Server Configuration