diff --git a/.gitignore b/.gitignore index d206ab0..fc6f830 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ dist/ .tox/ databricks_migration_tool.egg-info migrate.iml +export_dir/ \ No newline at end of file diff --git a/custom/README.md b/custom/README.md new file mode 100644 index 0000000..3207b8e --- /dev/null +++ b/custom/README.md @@ -0,0 +1,248 @@ +# Databricks UC Metastore Migration Guide + +This guide provides step-by-step instructions for exporting and importing Unity Catalog (UC) metastore data between Databricks workspaces, including table metadata and access control lists (ACLs). + +## Overview + +This migration process allows you to: +- Export metastore metadata from a source workspace +- Update S3 paths and prefixes for cross-region migration +- Export table ACLs +- Import metastore and ACLs into a target workspace + +## Prerequisites + +1. **Clone the repository** + ```bash + git clone -b updates https://github.com/arvindh-km/databricks-migrate.git + cd databricks-migrate + ``` + **Note:** Make sure to clone the `updates` branch as it contains the latest migration scripts and fixes. + +2. **Install Databricks CLI** + - Using Homebrew: `brew install databricks-cli` + - Or using curl + +3. **Set up Python environment** + ```bash + python3 -m venv venv + source venv/bin/activate + pip install -r requirements.txt + ``` + +## Initial Setup + +### Step 1: Configure Source Workspace + +1. **Create a Personal Access Token (PAT)** + - Navigate to your source Databricks workspace + - Go to User Settings → Access Tokens + - Generate a new token and save it securely + +2. **Configure Databricks CLI profile** + ```bash + databricks configure --token --profile sg_dsp + ``` + - Replace `sg_dsp` with a profile name of your choice + - When prompted, enter your workspace host URL and PAT token + +3. **Create migration clusters** + - Open `custom/create_clusters.py` + - Update the `profile` value to match your profile name + - Update the `email` field + - Run the script: + ```bash + python3 custom/create_clusters.py + ``` + - **Important**: Wait for cluster creation to complete in the source workspace before proceeding + - If cluster creation fails due to workspace-specific init scripts or restrictions, update the script accordingly + +## Export Process + +### Step 2: Export Metastore + +Run the following command to export the metastore: + +```bash +python3 migration_pipeline.py \ + --profile \ + --set-export-dir \ + --cluster-name "metastore-migrate-mti" \ + --export-pipeline \ + --use-checkpoint \ + --num-parallel 10 \ + --retry-total 3 \ + --retry-backoff 2 \ + --keep-tasks metastore \ + --session run1 \ + --skip-failed \ + --metastore-unicode \ + --repair-metastore-tables \ + --database +``` + +**Parameters to update:** +- ``: Your Databricks CLI profile name +- ``: Local directory path (e.g., `/export_dir/`) +- ``: Schema/database name to export (leave empty for all schemas) +- `--cluster-name`: Should match the cluster created in Step 1 + +**Output:** Metastore data will be exported to `/export_dir/run1/metastore/` + +### Step 3: Update S3 Paths and Prefixes + +Before importing, you need to update S3 paths to point to the target region's buckets: + +1. Open `custom/metastore_s3.py` +2. Update the following fields: + - `schemas`: Add all schemas you want to migrate + - `path`: Update to the metastore export path (e.g., `/export_dir/run1/metastore/`) + - `s3_buckets` dictionary: Update with equivalent region-specific bucket names + - `uc_prefix` dictionary: Update with required region-specific prefixes +3. Save the file and run: + ```bash + python3 custom/metastore_s3.py + ``` + +**What this does:** +- Updates S3 paths from source region (e.g., Singapore) to target region (e.g., Mumbai) buckets +- Removes UC-specific prefixes that prevent table creation for Managed Tables +- Apply additional filters as needed based on your bucket requirements + +### Step 4: Export Table ACLs + +Export table access control lists: + +```bash +python3 migration_pipeline.py \ + --profile \ + --set-export-dir \ + --cluster-name "table-acls-migrate-mti" \ + --export-pipeline \ + --use-checkpoint \ + --num-parallel 10 \ + --retry-total 3 \ + --retry-backoff 2 \ + --keep-tasks metastore_table_acls \ + --session run1 \ + --skip-failed \ + --metastore-unicode \ + --repair-metastore-tables \ + --database +``` + +**Note:** Users and groups do not need to be exported separately. The ACL export works without them. + +**Output:** Table ACLs will be exported to `/export_dir/run1/table_acls/` as zip files (no need to decompress) + +## Import Process + +### Step 5: Configure Target Workspace + +1. **Create PAT token in target workspace** (e.g., Mumbai workspace) +2. **Configure Databricks CLI profile for target workspace:** + ```bash + databricks configure --token --profile + ``` +3. **Create clusters in target workspace:** + - Follow Step 1 instructions, but use the target workspace profile and PAT + +### Step 6: Export and Import Catalog ACLs + +1. **Export catalog ACLs from source workspace:** + - Clone the notebook `custom/export_catalog_acls.py` in your source workspace + - Update the `catalog` variable to match your catalog name (e.g., `'prod'`) + - Run the notebook to generate catalog ACL export + - Copy the JSON output containing the GRANT commands + +2. **Import catalog ACLs to target workspace:** + - Clone the notebook `custom/import_catalog_acls.py` in your target workspace + - Replace the `grant_cmds` list with the output from the previous step + - Run all cells in the notebook to apply catalog ACLs + +### Step 7: Export and Import Schemas + +1. **Export schemas from source workspace:** + - Clone the notebook `custom/export_schema_s3.py` in your source workspace + - Update the `schemas_to_filter` list with the schemas you want to migrate + - Run the notebook to generate schema export + - Copy the JSON output containing schema creation commands + +2. **Import schemas to target workspace:** + - Clone the notebook `custom/import_schema_s3.py` in your target workspace + - Update the schema map using the output from the previous step + - Run all cells in the notebook to create schemas + +### Step 8: Export and Import Schema ACLs + +1. **Export schema ACLs from source workspace:** + - Clone the notebook `custom/export_schema_acls.py` in your source workspace + - Update the `schemas_to_filter` list with the schemas you want to export ACLs for (leave empty to export all schemas) + - Run the notebook to generate schema ACL export + - Copy the JSON output containing the schema ACL map + +2. **Import schema ACLs to target workspace:** + - Clone the notebook `custom/import_schema_acls.py` in your target workspace + - Replace the `grant_cmds` dictionary with the JSON output from the previous step + - Run all cells in the notebook to apply schema ACLs + +### Step 9: Import Metastore + +Import the metastore into the target workspace: + +```bash +python3 migration_pipeline.py \ + --profile \ + --set-export-dir \ + --cluster-name "metastore-migrate-mti" \ + --import-pipeline \ + --use-checkpoint \ + --num-parallel 8 \ + --retry-total 3 \ + --retry-backoff 2 \ + --keep-tasks metastore \ + --session run1 \ + --skip-failed \ + --metastore-unicode \ + --repair-metastore-tables \ + --database +``` + +**Important:** +- Use the target workspace profile (e.g., Mumbai workspace profile) +- Maintain the same export directory used during export +- Specify the schema you want to import +- This creates tables on top of the updated S3 paths + +### Step 10: Import Table ACLs + +Update table access control lists in the target workspace: + +```bash +python3 migration_pipeline.py \ + --profile \ + --set-export-dir \ + --cluster-name "metastore-migrate-mti" \ + --import-pipeline \ + --use-checkpoint \ + --num-parallel 8 \ + --retry-total 3 \ + --retry-backoff 2 \ + --keep-tasks metastore_table_acls \ + --session run1 \ + --skip-failed \ + --metastore-unicode \ + --repair-metastore-tables \ + --database +``` + +**Parameters to update:** +- ``: Target workspace profile (e.g., Mumbai workspace profile) +- ``: Schema for which you want to update access controls + +## Troubleshooting + +- **Cluster creation failures**: Update `custom/create_clusters.py` with workspace-specific init scripts and restrictions +- **S3 path issues**: Verify bucket names and prefixes in `custom/metastore_s3.py` match your target region configuration +- **Import failures**: Ensure schemas are created in the target workspace before importing metastore + diff --git a/custom/create_clusters.py b/custom/create_clusters.py new file mode 100644 index 0000000..5f8d69a --- /dev/null +++ b/custom/create_clusters.py @@ -0,0 +1,71 @@ +# Creates migration clusters in Databricks workspace for metastore and table ACLs migration +import os +import subprocess +import json + +# Update these values before running +profile = 'dsp' +email = "arvindh.km@swiggy.in" + +# Cluster configuration for metastore migration +cluster_config_for_metastore = { + "cluster_name": "metastore-migrate-mti", + "spark_version": "13.3.x-scala2.12", + "aws_attributes": { + "first_on_demand": 0, + "availability": "ON_DEMAND", + "zone_id": "auto", + "spot_bid_price_percent": 100, + "ebs_volume_count": 0 + }, + "node_type_id": "m6gd.large", + "driver_node_type_id": "r6gd.xlarge", + "autotermination_minutes": 30, + "enable_elastic_disk": True, + "single_user_name": email, + "enable_local_disk_encryption": False, + "data_security_mode": "DATA_SECURITY_MODE_DEDICATED", + "runtime_engine": "STANDARD", + "kind": "CLASSIC_PREVIEW", + "is_single_node": False, + "autoscale": { + "min_workers": 1, + "max_workers": 5 + }, + "apply_policy_default_values": False +} + +result = subprocess.run(['databricks', '--profile', profile, 'clusters', 'create', '--json', json.dumps(cluster_config_for_metastore), '--no-wait']) + +print(result.returncode) + +# Cluster configuration for table ACLs migration +cluster_config_for_table_acls = { + "cluster_name": "table-acls-migrate-mti", + "spark_version": "13.3.x-scala2.12", + "aws_attributes": { + "first_on_demand": 0, + "availability": "ON_DEMAND", + "zone_id": "auto", + "spot_bid_price_percent": 100, + "ebs_volume_count": 0 + }, + "node_type_id": "m6gd.large", + "driver_node_type_id": "r6gd.xlarge", + "autotermination_minutes": 30, + "enable_elastic_disk": True, + "enable_local_disk_encryption": False, + "data_security_mode": "DATA_SECURITY_MODE_STANDARD", + "runtime_engine": "STANDARD", + "kind": "CLASSIC_PREVIEW", + "is_single_node": False, + "autoscale": { + "min_workers": 1, + "max_workers": 5 + }, + "apply_policy_default_values": False +} + +result = subprocess.run(['databricks', '--profile', profile, 'clusters', 'create', '--json', json.dumps(cluster_config_for_table_acls), '--no-wait']) + +print(result.returncode) \ No newline at end of file diff --git a/custom/export_catalog_acls.py b/custom/export_catalog_acls.py new file mode 100644 index 0000000..f69964e --- /dev/null +++ b/custom/export_catalog_acls.py @@ -0,0 +1,15 @@ +# Exports catalog ACLs as GRANT commands in JSON format +import json + +# Update catalog name as needed +catalog = 'prod' + +result = [] + +for grant in spark.sql(f"SHOW GRANT ON catalog {catalog}").collect(): + action = grant.ActionType + principal = grant.Principal + prod_cmd = f"GRANT {action} ON CATALOG {catalog} TO {principal}" + result.append(prod_cmd) + +print(json.dumps(result, indent=4)) \ No newline at end of file diff --git a/custom/export_schema_acls.py b/custom/export_schema_acls.py new file mode 100644 index 0000000..0c4295e --- /dev/null +++ b/custom/export_schema_acls.py @@ -0,0 +1,19 @@ +''' +Export schema ACLs from source workspace to a JSON. +To filter for particular schema, mention the schema name without catalog in schemas_to_filter list. If list is empty, all schemas will be exported. +''' +import json + +schemas = [i.databaseName for i in spark.sql('show schemas in prod').collect()] + +schemas_to_filter = ['dsp'] + +schemas = [schema for schema in schemas if schema in schemas_to_filter] + +schema_acl_map = {} + +for schema in schemas: + schema_grants = [f"GRANT {grant.ActionType} ON SCHEMA {schema} TO {grant.Principal}" for grant in spark.sql(f"SHOW GRANT ON SCHEMA {schema}").collect() if grant.ObjectType == 'SCHEMA'] + schema_acl_map[schema] = schema_grants + +print(json.dumps(schema_acl_map, indent=4)) \ No newline at end of file diff --git a/custom/export_schema_s3.py b/custom/export_schema_s3.py new file mode 100644 index 0000000..58816a4 --- /dev/null +++ b/custom/export_schema_s3.py @@ -0,0 +1,21 @@ +# Exports schema creation commands with S3 locations +import json + +schemas = [i.databaseName for i in spark.sql('show schemas in prod').collect()] + +schema_commands = {} + +# Update with schemas to export (empty list exports all schemas) +schemas_to_filter = ['data_science_prod'] +schemas = [schema for schema in schemas if schema in schemas_to_filter] + +for schema in schemas: + schema_desc = spark.sql(f'describe schema {schema}').collect() + catalog, name, location = None, None, None + for i in schema_desc: + if i.database_description_item == 'Catalog Name': catalog = i.database_description_value + if i.database_description_item == 'Namespace Name': name = i.database_description_value + if i.database_description_item == 'RootLocation': location = i.database_description_value + schema_commands[schema] = f"CREATE SCHEMA IF NOT EXISTS {catalog}.{name} LOCATION '{location}'" + +print(json.dumps(schema_commands, indent=4)) \ No newline at end of file diff --git a/custom/import_catalog_acls.py b/custom/import_catalog_acls.py new file mode 100644 index 0000000..f78702f --- /dev/null +++ b/custom/import_catalog_acls.py @@ -0,0 +1,18 @@ +# Imports catalog ACLs from exported GRANT commands +# Replace this list with output from export_catalog_acls.py +grant_cmds = ['GRANT USE CATALOG ON CATALOG prod TO PROD_CDC_OBSERVABILITY_READER', + 'GRANT USE CATALOG ON CATALOG prod TO PROD_DATA_SCIENCE_READER', + 'GRANT USE CATALOG ON CATALOG prod TO PROD_FINANCECIRCUSRECOVERY_READER', + 'GRANT USE CATALOG ON CATALOG prod TO PROD_OSIRIS_WRITER', + 'GRANT USE CATALOG ON CATALOG prod TO PROD_TEMP_READER', + 'GRANT USE CATALOG ON CATALOG prod TO PROD_DASHCATALOG_READER', + 'GRANT USE CATALOG ON CATALOG prod TO PROD_ALCHEMIST_WRITER'] + +for command in grant_cmds: + try: + spark.sql(command) + except Exception as e: + print(f"Error importing catalog ACL: {e}") + continue + +print("Catalog ACLs imported successfully") \ No newline at end of file diff --git a/custom/import_schema_acls.py b/custom/import_schema_acls.py new file mode 100644 index 0000000..caa2563 --- /dev/null +++ b/custom/import_schema_acls.py @@ -0,0 +1,17 @@ +''' +Import schema ACLs from JSON to Target UC. +Use the JSON logged from export_schema_acls.py to import schema ACLs. +''' +grant_cmds = { + "dsp": ["GRANT SELECT ON SCHEMA dsp TO PROD_DSP_READER"] +} + +for schema, commands in grant_cmds.items(): + for command in commands: + try: + spark.sql(command) + except Exception as e: + print(f"Error importing schema ACLs for {schema}: {e}") + continue + +print("Schema ACLs imported successfully") \ No newline at end of file diff --git a/custom/import_schema_s3.py b/custom/import_schema_s3.py new file mode 100644 index 0000000..541b774 --- /dev/null +++ b/custom/import_schema_s3.py @@ -0,0 +1,177 @@ +# Imports schemas to target workspace using CREATE SCHEMA commands +# Replace schema_map with output from export_schema_s3.py +import json + +schema_map = { + "actioning": "CREATE SCHEMA IF NOT EXISTS prod.actioning LOCATION 's3://cdc-prod-data/'", + "ads": "CREATE SCHEMA IF NOT EXISTS prod.ads LOCATION 's3://cdc-prod-data/'", + "ads_mumbai": "CREATE SCHEMA IF NOT EXISTS prod.ads_mumbai LOCATION 's3://cdc-prod-data/'", + "alchemist": "CREATE SCHEMA IF NOT EXISTS prod.alchemist LOCATION 's3://cdc-prod-data/delta/mysql'", + "alchemist_archive": "CREATE SCHEMA IF NOT EXISTS prod.alchemist_archive LOCATION 's3://cdc-prod-data/'", + "anakin": "CREATE SCHEMA IF NOT EXISTS prod.anakin LOCATION 's3://swiggy-ci-anakin/'", + "analytics": "CREATE SCHEMA IF NOT EXISTS prod.analytics LOCATION 's3://data-platform-analytics/analytics_apg_dnd'", + "analytics_adhoc": "CREATE SCHEMA IF NOT EXISTS prod.analytics_adhoc LOCATION 's3://data-platform-analytics-adhoc/'", + "analytics_adhoc_test": "CREATE SCHEMA IF NOT EXISTS prod.analytics_adhoc_test LOCATION 's3://data-platform-analytics-adhoc/'", + "analytics_people_im": "CREATE SCHEMA IF NOT EXISTS prod.analytics_people_im LOCATION 's3://data-platform-analytics/analytics-people-im'", + "analytics_pharmeasy": "CREATE SCHEMA IF NOT EXISTS prod.analytics_pharmeasy LOCATION 's3://data-platform-analytics/analytics-prod/pharmeasy'", + "analytics_prod": "CREATE SCHEMA IF NOT EXISTS prod.analytics_prod LOCATION 's3://data-platform-analytics/analytics-prod'", + "analytics_spark": "CREATE SCHEMA IF NOT EXISTS prod.analytics_spark LOCATION 's3://data-platform-analytics/analytics_apg_spark'", + "analytics_spark_1": "CREATE SCHEMA IF NOT EXISTS prod.analytics_spark_1 LOCATION 's3://data-platform-analytics/analytics_apg_spark_1'", + "analytics_zflow_uat": "CREATE SCHEMA IF NOT EXISTS prod.analytics_zflow_uat LOCATION 'None'", + "anobis": "CREATE SCHEMA IF NOT EXISTS prod.anobis LOCATION 's3://cdc-prod-data/'", + "anobis_brt": "CREATE SCHEMA IF NOT EXISTS prod.anobis_brt LOCATION 's3://data-platform-analytics-adhoc/'", + "anobis_prod": "CREATE SCHEMA IF NOT EXISTS prod.anobis_prod LOCATION 's3://data-platform-analytics/anobis-prod'", + "anobis_streams": "CREATE SCHEMA IF NOT EXISTS prod.anobis_streams LOCATION 's3://data-platform-delta/'", + "appsflyer_push": "CREATE SCHEMA IF NOT EXISTS prod.appsflyer_push LOCATION 's3://af-datalocker-swiggy-reports/'", + "badges": "CREATE SCHEMA IF NOT EXISTS prod.badges LOCATION 's3://cdc-prod-data/'", + "carousel": "CREATE SCHEMA IF NOT EXISTS prod.carousel LOCATION 's3://cdc-prod-data/delta/mysql'", + "catalog": "CREATE SCHEMA IF NOT EXISTS prod.catalog LOCATION 's3://cdc-prod-data/'", + "catalog2": "CREATE SCHEMA IF NOT EXISTS prod.catalog2 LOCATION 's3://data-platform-uat/catalog2'", + "cdc_ddb": "CREATE SCHEMA IF NOT EXISTS prod.cdc_ddb LOCATION 's3://cdc-prod-data/delta/ddb'", + "cdc_ddb_stringified": "CREATE SCHEMA IF NOT EXISTS prod.cdc_ddb_stringified LOCATION 's3://cdc-prod-data/'", + "cdc_delta": "CREATE SCHEMA IF NOT EXISTS prod.cdc_delta LOCATION 's3://cdc-prod-data/'", + "cdc_e2_ow_consumer_prod": "CREATE SCHEMA IF NOT EXISTS prod.cdc_e2_ow_consumer_prod LOCATION 's3://cdc-prod-data/overwatch_etl/cdc_e2_ow_consumer_prod.db'", + "cdc_e2_ow_etl_prod": "CREATE SCHEMA IF NOT EXISTS prod.cdc_e2_ow_etl_prod LOCATION 's3://cdc-prod-data/overwatch_etl/cdc_e2_ow_etl_prod.db'", + "cdc_e2_ow_etl_s3_prod_latest": "CREATE SCHEMA IF NOT EXISTS prod.cdc_e2_ow_etl_s3_prod_latest LOCATION 's3://cdc-prod-data/'", + "cdc_e2_ow_etl_s3_prod_latest_testing": "CREATE SCHEMA IF NOT EXISTS prod.cdc_e2_ow_etl_s3_prod_latest_testing LOCATION 's3://cdc-prod-data/'", + "cdc_observability": "CREATE SCHEMA IF NOT EXISTS prod.cdc_observability LOCATION 's3://cdc-prod-data/'", + "cdc_prod": "CREATE SCHEMA IF NOT EXISTS prod.cdc_prod LOCATION 's3://cdc-prod-data/'", + "cdc_shallow": "CREATE SCHEMA IF NOT EXISTS prod.cdc_shallow LOCATION 'None'", + "cep": "CREATE SCHEMA IF NOT EXISTS prod.cep LOCATION 's3://data-platform-analytics/cep'", + "checkout": "CREATE SCHEMA IF NOT EXISTS prod.checkout LOCATION 's3://cdc-prod-data/'", + "checkout_ist": "CREATE SCHEMA IF NOT EXISTS prod.checkout_ist LOCATION 's3://cdc-prod-data/delta/mysql/checkout_ist/swiggy_order_management'", + "clm": "CREATE SCHEMA IF NOT EXISTS prod.clm LOCATION 's3://cdc-prod-data/'", + "cloudmenu": "CREATE SCHEMA IF NOT EXISTS prod.cloudmenu LOCATION 's3://cdc-prod-data/'", + "cms": "CREATE SCHEMA IF NOT EXISTS prod.cms LOCATION 's3://cdc-prod-data/'", + "cmsswiggy": "CREATE SCHEMA IF NOT EXISTS prod.cmsswiggy LOCATION 's3://cdc-prod-data/delta/mysql/cmsswiggy/swiggy'", + "compaction": "CREATE SCHEMA IF NOT EXISTS prod.compaction LOCATION 's3://data-platform-uat/'", + "compass_prod": "CREATE SCHEMA IF NOT EXISTS prod.compass_prod LOCATION 's3://cdc-prod-data/'", + "conaro": "CREATE SCHEMA IF NOT EXISTS prod.conaro LOCATION 's3://cdc-prod-data/delta/mysql/conaro/conaro'", + "connect": "CREATE SCHEMA IF NOT EXISTS prod.connect LOCATION 's3://cdc-prod-data/'", + "crew": "CREATE SCHEMA IF NOT EXISTS prod.crew LOCATION 's3://cdc-prod-data/'", + "dash_erp_engg": "CREATE SCHEMA IF NOT EXISTS prod.dash_erp_engg LOCATION 's3://cdc-prod-data/'", + "dashbaseoms": "CREATE SCHEMA IF NOT EXISTS prod.dashbaseoms LOCATION 's3://cdc-prod-data/'", + "dashcatalog": "CREATE SCHEMA IF NOT EXISTS prod.dashcatalog LOCATION 's3://data-platform-uat/dashcatalog'", + "dashcorelogistics": "CREATE SCHEMA IF NOT EXISTS prod.dashcorelogistics LOCATION 's3://cdc-prod-data/'", + "dashcorelogistics_view_v1": "CREATE SCHEMA IF NOT EXISTS prod.dashcorelogistics_view_v1 LOCATION 's3://data-platform-fact/modifiable/views'", + "dashorderaudit": "CREATE SCHEMA IF NOT EXISTS prod.dashorderaudit LOCATION 's3://cdc-prod-data/'", + "dashrating": "CREATE SCHEMA IF NOT EXISTS prod.dashrating LOCATION 's3://cdc-prod-data/'", + "data_engg": "CREATE SCHEMA IF NOT EXISTS prod.data_engg LOCATION 's3://data-platform-analytics/streams-custom-ingestion/delta'", + "data_platform": "CREATE SCHEMA IF NOT EXISTS prod.data_platform LOCATION 's3://cdc-prod-data/'", + "data_platform_retool": "CREATE SCHEMA IF NOT EXISTS prod.data_platform_retool LOCATION 's3://data-platform-retool/'", + "data_science": "CREATE SCHEMA IF NOT EXISTS prod.data_science LOCATION 's3://data-platform-analytics/datascience_apg_dnd'", + "data_science_dev": "CREATE SCHEMA IF NOT EXISTS prod.data_science_dev LOCATION 's3://swiggy-qubole/DS-temp/delta_tables_dev'", + "data_science_prod": "CREATE SCHEMA IF NOT EXISTS prod.data_science_prod LOCATION 's3://swiggy-ds-prod/prod_data/delta_tables_prod'", + "db_alchemist_archive": "CREATE SCHEMA IF NOT EXISTS prod.db_alchemist_archive LOCATION 's3://cdc-prod-data/'", + "de": "CREATE SCHEMA IF NOT EXISTS prod.de LOCATION 's3://cdc-prod-data/delta/mysql/delivery_service/swiggy'", + "de_surge": "CREATE SCHEMA IF NOT EXISTS prod.de_surge LOCATION 's3://cdc-prod-data/'", + "default": "CREATE SCHEMA IF NOT EXISTS prod.default LOCATION 'None'", + "degradation": "CREATE SCHEMA IF NOT EXISTS prod.degradation LOCATION 's3://cdc-prod-data/'", + "delivery_cluster": "CREATE SCHEMA IF NOT EXISTS prod.delivery_cluster LOCATION 's3://cdc-prod-data/delta/mysql/delivery_cluster/cluster'", + "delivery_service": "CREATE SCHEMA IF NOT EXISTS prod.delivery_service LOCATION 's3://cdc-prod-data/'", + "deticketing": "CREATE SCHEMA IF NOT EXISTS prod.deticketing LOCATION 's3://cdc-prod-data/'", + "dineout": "CREATE SCHEMA IF NOT EXISTS prod.dineout LOCATION 's3://cdc-prod-data/'", + "dp_uat": "CREATE SCHEMA IF NOT EXISTS prod.dp_uat LOCATION 's3://data-platform-uat/hive'", + "driver_payout": "CREATE SCHEMA IF NOT EXISTS prod.driver_payout LOCATION 's3://cdc-prod-data/delta/mysql/driver-payout/payout'", + "dsp": "CREATE SCHEMA IF NOT EXISTS prod.dsp LOCATION 's3://swiggy-data-science-platform/dsp'", + "dsp_costs": "CREATE SCHEMA IF NOT EXISTS prod.dsp_costs LOCATION 's3://swiggy-data-science-platform/dsp_cost'", + "dsp_delta": "CREATE SCHEMA IF NOT EXISTS prod.dsp_delta LOCATION 's3://swiggy-data-science-platform/delta_model_tables'", + "dsp_vidura": "CREATE SCHEMA IF NOT EXISTS prod.dsp_vidura LOCATION 's3://cdc-prod-data/'", + "ev_lead_gen_base": "CREATE SCHEMA IF NOT EXISTS prod.ev_lead_gen_base LOCATION 's3://dnd-ops-strategy/'", + "excel": "CREATE SCHEMA IF NOT EXISTS prod.excel LOCATION 's3://data-platform-fact/excel'", + "excel_delta": "CREATE SCHEMA IF NOT EXISTS prod.excel_delta LOCATION 'None'", + "fact": "CREATE SCHEMA IF NOT EXISTS prod.fact LOCATION 's3://data-platform-fact/modifiable/delta'", + "fact_delta_staging": "CREATE SCHEMA IF NOT EXISTS prod.fact_delta_staging LOCATION 's3://data-platform-fact/modifiable/delta_staging'", + "facts_spark_prod": "CREATE SCHEMA IF NOT EXISTS prod.facts_spark_prod LOCATION 's3://data-platform-fact/modifiable/delta'", + "fas": "CREATE SCHEMA IF NOT EXISTS prod.fas LOCATION 's3://swiggy-smart-business-prod/'", + "features": "CREATE SCHEMA IF NOT EXISTS prod.features LOCATION 's3://data-platform-analytics/warehouse/features'", + "features_delta": "CREATE SCHEMA IF NOT EXISTS prod.features_delta LOCATION 's3://swiggy-data-science-platform/delta_feature_tables'", + "fin": "CREATE SCHEMA IF NOT EXISTS prod.fin LOCATION 's3://cdc-prod-data/'", + "finance_cash_archive": "CREATE SCHEMA IF NOT EXISTS prod.finance_cash_archive LOCATION 's3://cdc-prod-data/'", + "finance_payout": "CREATE SCHEMA IF NOT EXISTS prod.finance_payout LOCATION 's3://cdc-prod-data/delta/mysql/finance_payout'", + "financecircusrecovery": "CREATE SCHEMA IF NOT EXISTS prod.financecircusrecovery LOCATION 's3://cdc-prod-data/delta/mysql/financecircusrecovery/financecircusrecovery'", + "financenodalremit": "CREATE SCHEMA IF NOT EXISTS prod.financenodalremit LOCATION 's3://cdc-prod-data/'", + "financeraw": "CREATE SCHEMA IF NOT EXISTS prod.financeraw LOCATION 's3://cdc-prod-data/'", + "financereconpayout": "CREATE SCHEMA IF NOT EXISTS prod.financereconpayout LOCATION 's3://cdc-prod-data/'", + "fincash": "CREATE SCHEMA IF NOT EXISTS prod.fincash LOCATION 's3://cdc-prod-data/'", + "fleetplanning": "CREATE SCHEMA IF NOT EXISTS prod.fleetplanning LOCATION 's3://cdc-prod-data/'", + "foodtooling": "CREATE SCHEMA IF NOT EXISTS prod.foodtooling LOCATION 's3://cdc-prod-data/'", + "forge": "CREATE SCHEMA IF NOT EXISTS prod.forge LOCATION 's3://cdc-prod-data/'", + "gamification": "CREATE SCHEMA IF NOT EXISTS prod.gamification LOCATION 's3://cdc-prod-data/'", + "gamooga": "CREATE SCHEMA IF NOT EXISTS prod.gamooga LOCATION 's3://cdc-prod-data/'", + "genesys": "CREATE SCHEMA IF NOT EXISTS prod.genesys LOCATION 's3://appflow-crm-external-client-data-ingestion/genesys'", + "growthpacks": "CREATE SCHEMA IF NOT EXISTS prod.growthpacks LOCATION 's3://cdc-prod-data/'", + "helpcenter": "CREATE SCHEMA IF NOT EXISTS prod.helpcenter LOCATION 's3://cdc-prod-data/'", + "hive": "CREATE SCHEMA IF NOT EXISTS prod.hive LOCATION 's3://cdc-prod-data/'", + "information_schema": "CREATE SCHEMA IF NOT EXISTS prod.information_schema LOCATION 'None'", + "intelligence_platform": "CREATE SCHEMA IF NOT EXISTS prod.intelligence_platform LOCATION 's3://cdc-prod-data/'", + "irctcgateway_uat": "CREATE SCHEMA IF NOT EXISTS prod.irctcgateway_uat LOCATION 'None'", + "klaxon": "CREATE SCHEMA IF NOT EXISTS prod.klaxon LOCATION 's3://cdc-prod-data/'", + "knowledge": "CREATE SCHEMA IF NOT EXISTS prod.knowledge LOCATION 's3://data-platform-fact/knowledge'", + "mpdb": "CREATE SCHEMA IF NOT EXISTS prod.mpdb LOCATION 'None'", + "olap": "CREATE SCHEMA IF NOT EXISTS prod.olap LOCATION 's3://data-platform-fact/cube'", + "opt": "CREATE SCHEMA IF NOT EXISTS prod.opt LOCATION 's3://data-platform-fact/orc_logs'", + "oracle_erp_fusion": "CREATE SCHEMA IF NOT EXISTS prod.oracle_erp_fusion LOCATION 's3://oracle-erp-fusion/'", + "osiris": "CREATE SCHEMA IF NOT EXISTS prod.osiris LOCATION 's3://cdc-prod-data/'", + "paas": "CREATE SCHEMA IF NOT EXISTS prod.paas LOCATION 's3://cdc-prod-data/'", + "picker_slave": "CREATE SCHEMA IF NOT EXISTS prod.picker_slave LOCATION 's3://cdc-prod-data/delta/mysql/picker_slave/dash_pickers'", + "pickermaster": "CREATE SCHEMA IF NOT EXISTS prod.pickermaster LOCATION 's3://cdc-prod-data/'", + "pimcore": "CREATE SCHEMA IF NOT EXISTS prod.pimcore LOCATION 's3://cdc-prod-data/s3:/cdc-prod-data/delta/mysql/pimcore'", + "placingfsm": "CREATE SCHEMA IF NOT EXISTS prod.placingfsm LOCATION 's3://cdc-prod-data/'", + "presentation_presentation": "CREATE SCHEMA IF NOT EXISTS prod.presentation_presentation LOCATION 's3://cdc-prod-data/schema-metastore/dms'", + "priceparitydb": "CREATE SCHEMA IF NOT EXISTS prod.priceparitydb LOCATION 's3://cdc-prod-data/'", + "proto": "CREATE SCHEMA IF NOT EXISTS prod.proto LOCATION 's3://cdc-prod-data/'", + "quality_gates": "CREATE SCHEMA IF NOT EXISTS prod.quality_gates LOCATION 's3://data-platform-quality-gates/metrics'", + "raw": "CREATE SCHEMA IF NOT EXISTS prod.raw LOCATION 's3://data-platform-fact/raw'", + "rcc": "CREATE SCHEMA IF NOT EXISTS prod.rcc LOCATION 's3://cdc-prod-data/'", + "realtime": "CREATE SCHEMA IF NOT EXISTS prod.realtime LOCATION 's3://data-platform-delta/delta_logs'", + "rill": "CREATE SCHEMA IF NOT EXISTS prod.rill LOCATION 's3://data-platform-rill-prod/'", + "rill_uat": "CREATE SCHEMA IF NOT EXISTS prod.rill_uat LOCATION 's3://data-platform-rill-uat/rilldelta'", + "rms": "CREATE SCHEMA IF NOT EXISTS prod.rms LOCATION 's3://cdc-prod-data/'", + "rng": "CREATE SCHEMA IF NOT EXISTS prod.rng LOCATION 's3://cdc-prod-data/'", + "rng_pricing": "CREATE SCHEMA IF NOT EXISTS prod.rng_pricing LOCATION 's3://cdc-prod-data/'", + "rng_stress": "CREATE SCHEMA IF NOT EXISTS prod.rng_stress LOCATION 's3://cdc-prod-data/delta/mysql/rng_stress/rng_stress'", + "s3_inventory_data": "CREATE SCHEMA IF NOT EXISTS prod.s3_inventory_data LOCATION 's3://data-platform-analytics/'", + "salesforce": "CREATE SCHEMA IF NOT EXISTS prod.salesforce LOCATION 's3://appflow-crm-external-client-data-ingestion/'", + "salesforce_temp": "CREATE SCHEMA IF NOT EXISTS prod.salesforce_temp LOCATION 's3://data-platform-analytics-adhoc/salesforce_temp'", + "sand": "CREATE SCHEMA IF NOT EXISTS prod.sand LOCATION 's3://cdc-prod-data/'", + "sandusers": "CREATE SCHEMA IF NOT EXISTS prod.sandusers LOCATION 's3://cdc-prod-data/'", + "scmmp": "CREATE SCHEMA IF NOT EXISTS prod.scmmp LOCATION 's3://scm-movement-planning/'", + "scmmp_test": "CREATE SCHEMA IF NOT EXISTS prod.scmmp_test LOCATION 's3://scm-movement-planning/'", + "secure_cdc": "CREATE SCHEMA IF NOT EXISTS prod.secure_cdc LOCATION 's3://secure-cdc-prod-data/'", + "singular": "CREATE SCHEMA IF NOT EXISTS prod.singular LOCATION 's3://singular-s3-exports-swiggy/'", + "snd": "CREATE SCHEMA IF NOT EXISTS prod.snd LOCATION 's3://cdc-prod-data/'", + "sno": "CREATE SCHEMA IF NOT EXISTS prod.sno LOCATION 's3://cdc-prod-data/'", + "ssb": "CREATE SCHEMA IF NOT EXISTS prod.ssb LOCATION 's3://swiggy-smart-business-prod/'", + "streams_delta": "CREATE SCHEMA IF NOT EXISTS prod.streams_delta LOCATION 's3://data-platform-delta/delta_logs'", + "streams_json": "CREATE SCHEMA IF NOT EXISTS prod.streams_json LOCATION 's3://data-platform-json/json_logs/daily'", + "streams_shallow": "CREATE SCHEMA IF NOT EXISTS prod.streams_shallow LOCATION 'None'", + "student_rewards": "CREATE SCHEMA IF NOT EXISTS prod.student_rewards LOCATION 's3://secure-cdc-prod-data/delta/student_id_verification'", + "suprdaily": "CREATE SCHEMA IF NOT EXISTS prod.suprdaily LOCATION 's3://cdc-prod-data/suprdaily'", + "svcplatform": "CREATE SCHEMA IF NOT EXISTS prod.svcplatform LOCATION 's3://cdc-prod-data/'", + "swiggy_cc_services": "CREATE SCHEMA IF NOT EXISTS prod.swiggy_cc_services LOCATION 's3://cdc-prod-data/'", + "swiggy_events": "CREATE SCHEMA IF NOT EXISTS prod.swiggy_events LOCATION 's3://cdc-prod-data/'", + "swiggykms": "CREATE SCHEMA IF NOT EXISTS prod.swiggykms LOCATION 's3://cdc-prod-data/'", + "swiggylms": "CREATE SCHEMA IF NOT EXISTS prod.swiggylms LOCATION 's3://cdc-prod-data/'", + "swiggysuper": "CREATE SCHEMA IF NOT EXISTS prod.swiggysuper LOCATION 's3://cdc-prod-data/'", + "swiss": "CREATE SCHEMA IF NOT EXISTS prod.swiss LOCATION 's3://cdc-prod-data/'", + "td": "CREATE SCHEMA IF NOT EXISTS prod.td LOCATION 's3://cdc-prod-data/'", + "temp": "CREATE SCHEMA IF NOT EXISTS prod.temp LOCATION 'None'", + "tns": "CREATE SCHEMA IF NOT EXISTS prod.tns LOCATION 's3://fna-fact-repo-bucket/delta'", + "tokude": "CREATE SCHEMA IF NOT EXISTS prod.tokude LOCATION 's3://cdc-prod-data/'", + "topic": "CREATE SCHEMA IF NOT EXISTS prod.topic LOCATION 's3://data-platform-fact/topic'", + "transformer": "CREATE SCHEMA IF NOT EXISTS prod.transformer LOCATION 's3://data-platform-analytics/'", + "transformer_uat": "CREATE SCHEMA IF NOT EXISTS prod.transformer_uat LOCATION 's3://data-platform-analytics/'", + "ugc": "CREATE SCHEMA IF NOT EXISTS prod.ugc LOCATION 's3://cdc-prod-data/'", + "vendor": "CREATE SCHEMA IF NOT EXISTS prod.vendor LOCATION 's3://cdc-prod-data/'", + "vendor_reporting_pii": "CREATE SCHEMA IF NOT EXISTS prod.vendor_reporting_pii LOCATION 's3://secure-cdc-prod-data/'", + "vinculum": "CREATE SCHEMA IF NOT EXISTS prod.vinculum LOCATION 's3://cdc-prod-data/'", + "vinculum_dev": "CREATE SCHEMA IF NOT EXISTS prod.vinculum_dev LOCATION 's3://cdc-prod-data/'", + "vinculum_swiggy_gamma": "CREATE SCHEMA IF NOT EXISTS prod.vinculum_swiggy_gamma LOCATION 's3://cdc-prod-data/'", + "vinculum_swiggy_gamma_dev": "CREATE SCHEMA IF NOT EXISTS prod.vinculum_swiggy_gamma_dev LOCATION 's3://cdc-prod-data/'", + "weightify": "CREATE SCHEMA IF NOT EXISTS prod.weightify LOCATION 's3://data-platform-analytics/weightify'", + "xp_prod": "CREATE SCHEMA IF NOT EXISTS prod.xp_prod LOCATION 's3://cdc-prod-data/'" +} + +for schema in schema_map: + spark.sql(schema_map[schema]) \ No newline at end of file diff --git a/custom/metastore_s3.py b/custom/metastore_s3.py new file mode 100644 index 0000000..a265479 --- /dev/null +++ b/custom/metastore_s3.py @@ -0,0 +1,32 @@ +# Updates S3 bucket paths and UC prefixes in exported metastore files for cross-region migration +import os + +# Update these values before running +schemas = ['dsp'] +path = 'export_dir/test/run2/metastore/' + +# Map source region buckets to target region buckets +s3_buckets = { + 's3://swiggy-data-science-platform/': 's3://swiggy-data-science-platform-mumbai/', + 's3://swiggy-qubole/': 's3://swiggy-qubole-mumbai/', +} +# Map UC prefixes to target prefixes (removes UC-specific prefixes for Managed Tables) +uc_prefix = { + '__unitystorage': 'uc_tables' +} + +for schema in schemas: + file_names = [f for f in os.listdir(path+schema) if os.path.isfile(os.path.join(path+schema, f))] + for i in file_names: + with open(path+schema+'/'+i, 'r', encoding='utf-8') as file: + content = file.read() + for bucket in s3_buckets: + if bucket in content: + content = content.replace(bucket, s3_buckets[bucket]) + break + for prefix in uc_prefix: + if prefix in content: + content = content.replace(prefix, uc_prefix[prefix]) + break + with open(path+schema+'/'+i, 'w', encoding='utf-8') as _f: + _f.write(content) \ No newline at end of file diff --git a/dbclient/HiveClient.py b/dbclient/HiveClient.py index 3980b7f..ea00267 100644 --- a/dbclient/HiveClient.py +++ b/dbclient/HiveClient.py @@ -533,7 +533,14 @@ def log_table_ddl(self, cid, ec_id, db_name, table_name, metastore_dir, error_lo :param has_unicode: export to a file if this flag is true :return: True for success, False for error """ - set_ddl_str_cmd = f'ddl_str = spark.sql("show create table {db_name}.{table_name}").collect()[0][0]' + set_ddl_str_cmd = f''' +ddl_str = spark.sql("show create table {db_name}.{table_name}").collect()[0][0] +if 'LOCATION' not in ddl_str: + path = spark.sql("DESCRIBE DETAIL {db_name}.{table_name}").select("location").collect()[0][0] + ddl_str = ddl_str.replace("""TBLPROPERTIES (""","LOCATION '"+path+ """'\nTBLPROPERTIES (""",1) +else: + ddl_str = ddl_str +''' ddl_str_resp = self.submit_command(cid, ec_id, set_ddl_str_cmd) if ddl_str_resp['resultType'] != 'text': @@ -561,9 +568,17 @@ def log_table_ddl(self, cid, ec_id, db_name, table_name, metastore_dir, error_lo return False # read that data using the dbfs rest endpoint which can handle 2MB of text easily read_args = {'path': '/tmp/migration/tmp_export_ddl.txt'} - read_resp = self.get('/dbfs/read', read_args) + offSet = 0 + length = 999999 + data_res = '' + while True: + read_resp = self.get(f'/dbfs/read?length={length}&offset={offSet}', read_args) + data_res += read_resp.get('data') + if int(read_resp.get('bytes_read')) >= length: + offSet += length + else: break with open(table_ddl_path, "w", encoding="utf-8") as fp: - fp.write(base64.b64decode(read_resp.get('data')).decode('utf-8')) + fp.write(base64.b64decode(data_res).decode('utf-8')) return True else: export_ddl_cmd = 'print(ddl_str)' diff --git a/dbclient/JobsClient.py b/dbclient/JobsClient.py index 0d4dace..061c58a 100644 --- a/dbclient/JobsClient.py +++ b/dbclient/JobsClient.py @@ -31,8 +31,8 @@ def get_jobs_list(self, print_json=False): # 'tasks' field) on API 2.0. res = self.get("/jobs/list", print_json, version='2.0') for job in res.get('jobs', []): - jobsById[job.get('job_id')] = job - + if job.get('settings', {}).get('schedule', {}).get('pause_status', '') == 'UNPAUSED' or job.get('settings', {}).get('continuous', {}).get('pause_status', '') == 'UNPAUSED': + jobsById[job.get('job_id')] = job limit = 25 # max limit supported by the API offset = 0 has_more = True @@ -46,7 +46,8 @@ def get_jobs_list(self, print_json=False): for job in res.get('jobs', []): jobId = job.get('job_id') # only replaces "real" MULTI_TASK jobs, as they contain the task definitions. - if jobsById[jobId]['settings'].get('format') == 'MULTI_TASK': + presentInJobsById = jobsById.get(jobId, None) + if presentInJobsById and jobsById[jobId]['settings'].get('format') == 'MULTI_TASK': jobsById[jobId] = job return jobsById.values() diff --git a/dbclient/TableACLsClient.py b/dbclient/TableACLsClient.py index 412d8ae..5be1610 100644 --- a/dbclient/TableACLsClient.py +++ b/dbclient/TableACLsClient.py @@ -219,7 +219,7 @@ def interpret_notebook_run_metadata(self, notebook_run_metadata): return notebook_exit_value - def export_table_acls(self, db_name='', table_alcs_dir='table_acls/'): + def export_table_acls(self, db_name='', table_alcs_dir='table_acls/', cluster_name=None): """Exports all table ACLs or just for a single database :param db_name: if set to empty strins, export ACLs for all databases @@ -229,7 +229,10 @@ def export_table_acls(self, db_name='', table_alcs_dir='table_acls/'): # TODO check whether this logic supports unicode (metadata had to do something to support it # as the IAM role is not used for Table ACLS, only metastore access required - cid = self.launch_cluster(iam_role=None, enable_table_acls=True) + if cluster_name: + cid = self.start_cluster_by_name(cluster_name) + else: + cid = self.launch_cluster(iam_role=None, enable_table_acls=True) self.wait_for_cluster(cid) user_name = self.get_current_username(must_be_admin=True) diff --git a/dbclient/parser.py b/dbclient/parser.py index e6ad312..f19735d 100644 --- a/dbclient/parser.py +++ b/dbclient/parser.py @@ -586,4 +586,10 @@ def get_pipeline_parser() -> argparse.ArgumentParser: parser.add_argument('--bypass-secret-acl', action='store_true', default=False, help='Use to set the initial principal for secrets in standard-tier workspaces') + parser.add_argument('--database', action='store', default=None, + help='Database name to export for the metastore and table ACLs. Single database name supported') + + parser.add_argument('--iam', action='store', + help='IAM Instance Profile to export metastore entires') + return parser diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..42b34ff --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +requests==2.32.5 +mlflow==3.9.0 +sqlparse==0.5.5 +pandas==2.3.3 \ No newline at end of file diff --git a/tasks/tasks.py b/tasks/tasks.py index 593b10e..be84499 100644 --- a/tasks/tasks.py +++ b/tasks/tasks.py @@ -293,8 +293,15 @@ def __init__(self, client_config, checkpoint_service, args, skip=False): self.args = args def run(self): + print("Arguments:") + print(self.args) hive_c = HiveClient(self.client_config, self.checkpoint_service) - hive_c.export_hive_metastore(cluster_name=self.args.cluster_name, + if self.args.database is not None: + # export only a single database with a given iam role + database_name = self.args.database + hive_c.export_database(database_name, self.args.cluster_name, self.args.iam, has_unicode=self.args.metastore_unicode) + else: + hive_c.export_hive_metastore(cluster_name=self.args.cluster_name, has_unicode=self.args.metastore_unicode) @@ -333,7 +340,10 @@ def __init__(self, client_config, args, checkpoint_service, skip=False): def run(self): table_acls_c = TableACLsClient(self.client_config, self.checkpoint_service) - notebook_exit_value = table_acls_c.export_table_acls(db_name='') + if self.args.database is not None: + notebook_exit_value = table_acls_c.export_table_acls(db_name=self.args.database, cluster_name=self.args.cluster_name) + else: + notebook_exit_value= table_acls_c.export_table_acls(db_name='', cluster_name=self.args.cluster_name) if notebook_exit_value['num_errors'] == 0: print("Table ACL export completed successfully without errors") elif notebook_exit_value['num_errors'] == -1: