From 7f1ca340ab46c5e25a549ca6aee615e1c62130b5 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Fri, 30 Jan 2026 16:09:02 -0800 Subject: [PATCH 1/2] fix: correctly handle integer -> bigint conversion --- integration/schema_sync/dev.sh | 68 ++- integration/schema_sync/ecommerce_schema.sql | 225 +++++++ pgdog/src/backend/schema/sync/pg_dump.rs | 589 ++++++++++++------- 3 files changed, 663 insertions(+), 219 deletions(-) diff --git a/integration/schema_sync/dev.sh b/integration/schema_sync/dev.sh index 0092bf54..fbc607ae 100644 --- a/integration/schema_sync/dev.sh +++ b/integration/schema_sync/dev.sh @@ -1,9 +1,14 @@ #!/bin/bash set -e SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) -PGDOG_BIN_PATH="${PGDOG_BIN:-${SCRIPT_DIR}/../../target/release/pgdog}" +PGDOG_BIN_PATH="${PGDOG_BIN:-${SCRIPT_DIR}/../../target/debug/pgdog}" pushd ${SCRIPT_DIR} +dropdb pgdog1 || true +dropdb pgdog2 || true +createdb pgdog1 +createdb pgdog2 + export PGPASSWORD=pgdog export PGUSER=pgdog export PGHOST=127.0.0.1 @@ -48,15 +53,70 @@ done # Expected content changes (without line numbers for portability) EXPECTED_CHANGES=$(cat < document_id bigint NOT NULL, +< event_id integer NOT NULL, +> event_id bigint NOT NULL, < flag_id integer NOT NULL, > flag_id bigint NOT NULL, -< setting_id integer NOT NULL, -> setting_id bigint NOT NULL, < override_id integer NOT NULL, > override_id bigint NOT NULL, < flag_id integer NOT NULL, > flag_id bigint NOT NULL, -EOF) +< notification_id integer NOT NULL, +> notification_id bigint NOT NULL, +< session_id integer NOT NULL, +> session_id bigint NOT NULL, +< session_id integer DEFAULT nextval('core.session_data_session_id_seq'::regclass) CONSTRAINT session_data_session_id_not_null NOT NULL, +> session_id bigint DEFAULT nextval('core.session_data_session_id_seq'::regclass) CONSTRAINT session_data_session_id_not_null NOT NULL, +< session_id integer DEFAULT nextval('core.session_data_session_id_seq'::regclass) CONSTRAINT session_data_session_id_not_null NOT NULL, +> session_id bigint DEFAULT nextval('core.session_data_session_id_seq'::regclass) CONSTRAINT session_data_session_id_not_null NOT NULL, +< session_id integer DEFAULT nextval('core.session_data_session_id_seq'::regclass) CONSTRAINT session_data_session_id_not_null NOT NULL, +> session_id bigint DEFAULT nextval('core.session_data_session_id_seq'::regclass) CONSTRAINT session_data_session_id_not_null NOT NULL, +< session_id integer DEFAULT nextval('core.session_data_session_id_seq'::regclass) CONSTRAINT session_data_session_id_not_null NOT NULL, +> session_id bigint DEFAULT nextval('core.session_data_session_id_seq'::regclass) CONSTRAINT session_data_session_id_not_null NOT NULL, +< setting_id integer NOT NULL, +> setting_id bigint NOT NULL, +< price_history_id integer NOT NULL, +> price_history_id bigint NOT NULL, +< category_id integer NOT NULL, +> category_id bigint NOT NULL, +< price_history_id integer DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) CONSTRAINT price_history_price_history_id_not_null NOT NULL, +> price_history_id bigint DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) CONSTRAINT price_history_price_history_id_not_null NOT NULL, +< category_id integer CONSTRAINT price_history_category_id_not_null NOT NULL, +> category_id bigint CONSTRAINT price_history_category_id_not_null NOT NULL, +< price_history_id integer DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) CONSTRAINT price_history_price_history_id_not_null NOT NULL, +> price_history_id bigint DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) CONSTRAINT price_history_price_history_id_not_null NOT NULL, +< category_id integer CONSTRAINT price_history_category_id_not_null NOT NULL, +> category_id bigint CONSTRAINT price_history_category_id_not_null NOT NULL, +< price_history_id integer DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) CONSTRAINT price_history_price_history_id_not_null NOT NULL, +> price_history_id bigint DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) CONSTRAINT price_history_price_history_id_not_null NOT NULL, +< category_id integer CONSTRAINT price_history_category_id_not_null NOT NULL, +> category_id bigint CONSTRAINT price_history_category_id_not_null NOT NULL, +< price_history_id integer DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) CONSTRAINT price_history_price_history_id_not_null NOT NULL, +> price_history_id bigint DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) CONSTRAINT price_history_price_history_id_not_null NOT NULL, +< category_id integer CONSTRAINT price_history_category_id_not_null NOT NULL, +> category_id bigint CONSTRAINT price_history_category_id_not_null NOT NULL, +< ticket_id integer NOT NULL, +> ticket_id bigint NOT NULL, +< ticket_id integer DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) CONSTRAINT ticket_queue_ticket_id_not_null NOT NULL, +> ticket_id bigint DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) CONSTRAINT ticket_queue_ticket_id_not_null NOT NULL, +< ticket_id integer DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) CONSTRAINT ticket_queue_ticket_id_not_null NOT NULL, +> ticket_id bigint DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) CONSTRAINT ticket_queue_ticket_id_not_null NOT NULL, +< ticket_id integer DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) CONSTRAINT ticket_queue_ticket_id_not_null NOT NULL, +> ticket_id bigint DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) CONSTRAINT ticket_queue_ticket_id_not_null NOT NULL, +< ticket_id integer DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) CONSTRAINT ticket_queue_ticket_id_not_null NOT NULL, +> ticket_id bigint DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) CONSTRAINT ticket_queue_ticket_id_not_null NOT NULL, +< ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_0_100 FOR VALUES FROM (0) TO (100); +> ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_0_100 FOR VALUES FROM ('0') TO ('100'); +< ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_100_200 FOR VALUES FROM (100) TO (200); +> ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_100_200 FOR VALUES FROM ('100') TO ('200'); +< ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_200_300 FOR VALUES FROM (200) TO (300); +> ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_200_300 FOR VALUES FROM ('200') TO ('300'); +< ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_300_plus FOR VALUES FROM (300) TO (MAXVALUE); +> ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_300_plus FOR VALUES FROM ('300') TO (MAXVALUE); +EOF +) diff source.sql destination.sql > diff.txt || true diff --git a/integration/schema_sync/ecommerce_schema.sql b/integration/schema_sync/ecommerce_schema.sql index c18e5e4c..836a7441 100644 --- a/integration/schema_sync/ecommerce_schema.sql +++ b/integration/schema_sync/ecommerce_schema.sql @@ -1014,3 +1014,228 @@ CREATE TABLE core.user_feature_overrides ( created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), UNIQUE(user_id, flag_id) ); + +-- ============================================================================ +-- TABLE INHERITANCE WITH INTEGER PRIMARY KEYS +-- ============================================================================ + +-- Parent table for documents with integer primary key +CREATE TABLE core.documents ( + document_id SERIAL PRIMARY KEY, + title VARCHAR(255) NOT NULL, + content TEXT, + created_by BIGINT REFERENCES core.users(user_id), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Child table inheriting from documents +CREATE TABLE core.legal_documents ( + effective_date DATE NOT NULL, + expiration_date DATE, + jurisdiction VARCHAR(100), + is_signed BOOLEAN NOT NULL DEFAULT FALSE +) INHERITS (core.documents); + +-- Child table inheriting from documents +CREATE TABLE core.technical_documents ( + version VARCHAR(50) NOT NULL DEFAULT '1.0', + language VARCHAR(10) NOT NULL DEFAULT 'en', + is_draft BOOLEAN NOT NULL DEFAULT TRUE +) INHERITS (core.documents); + +-- Parent table for notifications with integer primary key +CREATE TABLE core.notifications ( + notification_id SERIAL PRIMARY KEY, + user_id BIGINT REFERENCES core.users(user_id) ON DELETE CASCADE, + message TEXT NOT NULL, + is_read BOOLEAN NOT NULL DEFAULT FALSE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Child table: email notifications +CREATE TABLE core.email_notifications ( + email_address VARCHAR(255) NOT NULL, + subject VARCHAR(255) NOT NULL, + sent_at TIMESTAMPTZ, + delivery_status VARCHAR(50) DEFAULT 'pending' +) INHERITS (core.notifications); + +-- Child table: push notifications +CREATE TABLE core.push_notifications ( + device_token VARCHAR(500) NOT NULL, + platform VARCHAR(20) NOT NULL, + sent_at TIMESTAMPTZ, + clicked_at TIMESTAMPTZ +) INHERITS (core.notifications); + +-- Parent table for audit events with integer primary key +CREATE TABLE audit.events ( + event_id SERIAL PRIMARY KEY, + event_type VARCHAR(50) NOT NULL, + entity_type VARCHAR(50), + entity_id BIGINT, + actor_id BIGINT REFERENCES core.users(user_id), + event_data JSONB DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Child table: security events +CREATE TABLE audit.security_events ( + ip_address INET, + user_agent TEXT, + severity VARCHAR(20) NOT NULL DEFAULT 'info', + was_blocked BOOLEAN NOT NULL DEFAULT FALSE +) INHERITS (audit.events); + +-- Child table: data change events +CREATE TABLE audit.data_change_events ( + old_values JSONB, + new_values JSONB, + changed_fields TEXT[] +) INHERITS (audit.events); + +-- ============================================================================ +-- MATERIALIZED VIEWS ON TABLES WITH INTEGER PRIMARY KEYS +-- ============================================================================ + +-- Materialized view on documents (integer PK table with inheritance) +CREATE MATERIALIZED VIEW analytics.document_summary AS +SELECT + d.document_id, + d.title, + d.created_by, + u.username as created_by_username, + d.created_at, + d.updated_at +FROM core.documents d +LEFT JOIN core.users u ON d.created_by = u.user_id +WITH DATA; + +CREATE UNIQUE INDEX idx_document_summary_pk ON analytics.document_summary(document_id); + +-- Materialized view on notifications (integer PK table with inheritance) +CREATE MATERIALIZED VIEW analytics.notification_stats AS +SELECT + n.notification_id, + n.user_id, + n.message, + n.is_read, + n.created_at +FROM core.notifications n +WITH DATA; + +CREATE UNIQUE INDEX idx_notification_stats_pk ON analytics.notification_stats(notification_id); + +-- Materialized view on audit events (integer PK table with inheritance) +CREATE MATERIALIZED VIEW analytics.event_summary AS +SELECT + e.event_id, + e.event_type, + e.entity_type, + e.entity_id, + e.actor_id, + u.username as actor_username, + e.created_at +FROM audit.events e +LEFT JOIN core.users u ON e.actor_id = u.user_id +WITH DATA; + +CREATE UNIQUE INDEX idx_event_summary_pk ON analytics.event_summary(event_id); + +-- Materialized view on feature_flags (existing integer PK table) +CREATE MATERIALIZED VIEW analytics.feature_flag_usage AS +SELECT + ff.flag_id, + ff.flag_name, + ff.is_enabled as default_enabled, + COUNT(ufo.override_id) as override_count, + COUNT(ufo.override_id) FILTER (WHERE ufo.is_enabled = TRUE) as enabled_override_count, + COUNT(ufo.override_id) FILTER (WHERE ufo.is_enabled = FALSE) as disabled_override_count +FROM core.feature_flags ff +LEFT JOIN core.user_feature_overrides ufo ON ff.flag_id = ufo.flag_id +GROUP BY ff.flag_id, ff.flag_name, ff.is_enabled +WITH DATA; + +CREATE UNIQUE INDEX idx_feature_flag_usage_pk ON analytics.feature_flag_usage(flag_id); + +-- ============================================================================ +-- PARTITIONED TABLES WITH INTEGER PRIMARY KEYS +-- ============================================================================ + +-- Partitioned table with integer PK (range partitioned by category) +CREATE TABLE inventory.price_history ( + price_history_id SERIAL, + product_id BIGINT NOT NULL REFERENCES inventory.products(product_id) ON DELETE CASCADE, + price NUMERIC(12,2) NOT NULL, + currency_code CHAR(3) NOT NULL DEFAULT 'USD', + effective_from TIMESTAMPTZ NOT NULL DEFAULT NOW(), + effective_to TIMESTAMPTZ, + category_id INTEGER NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (price_history_id, category_id) +) PARTITION BY RANGE (category_id); + +CREATE TABLE inventory.price_history_cat_0_100 PARTITION OF inventory.price_history + FOR VALUES FROM (0) TO (100); +CREATE TABLE inventory.price_history_cat_100_200 PARTITION OF inventory.price_history + FOR VALUES FROM (100) TO (200); +CREATE TABLE inventory.price_history_cat_200_300 PARTITION OF inventory.price_history + FOR VALUES FROM (200) TO (300); +CREATE TABLE inventory.price_history_cat_300_plus PARTITION OF inventory.price_history + FOR VALUES FROM (300) TO (MAXVALUE); + +CREATE INDEX idx_price_history_product ON inventory.price_history(product_id); +CREATE INDEX idx_price_history_effective ON inventory.price_history(effective_from, effective_to); + +-- Partitioned table with integer PK (hash partitioned) +CREATE TABLE core.session_data ( + session_id SERIAL, + user_id BIGINT REFERENCES core.users(user_id) ON DELETE CASCADE, + session_token VARCHAR(255) NOT NULL, + ip_address INET, + user_agent TEXT, + data JSONB DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + expires_at TIMESTAMPTZ NOT NULL, + PRIMARY KEY (session_id, user_id) +) PARTITION BY HASH (user_id); + +CREATE TABLE core.session_data_p0 PARTITION OF core.session_data + FOR VALUES WITH (MODULUS 4, REMAINDER 0); +CREATE TABLE core.session_data_p1 PARTITION OF core.session_data + FOR VALUES WITH (MODULUS 4, REMAINDER 1); +CREATE TABLE core.session_data_p2 PARTITION OF core.session_data + FOR VALUES WITH (MODULUS 4, REMAINDER 2); +CREATE TABLE core.session_data_p3 PARTITION OF core.session_data + FOR VALUES WITH (MODULUS 4, REMAINDER 3); + +CREATE INDEX idx_session_data_token ON core.session_data(session_token); +CREATE INDEX idx_session_data_expires ON core.session_data(expires_at); + +-- Partitioned table with integer PK (list partitioned by status) +CREATE TABLE sales.ticket_queue ( + ticket_id SERIAL, + user_id BIGINT REFERENCES core.users(user_id), + subject VARCHAR(255) NOT NULL, + description TEXT, + priority INTEGER NOT NULL DEFAULT 3, + status VARCHAR(20) NOT NULL, + assigned_to BIGINT REFERENCES core.users(user_id), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (ticket_id, status) +) PARTITION BY LIST (status); + +CREATE TABLE sales.ticket_queue_open PARTITION OF sales.ticket_queue + FOR VALUES IN ('open'); +CREATE TABLE sales.ticket_queue_in_progress PARTITION OF sales.ticket_queue + FOR VALUES IN ('in_progress'); +CREATE TABLE sales.ticket_queue_resolved PARTITION OF sales.ticket_queue + FOR VALUES IN ('resolved'); +CREATE TABLE sales.ticket_queue_closed PARTITION OF sales.ticket_queue + FOR VALUES IN ('closed'); + +CREATE INDEX idx_ticket_queue_user ON sales.ticket_queue(user_id); +CREATE INDEX idx_ticket_queue_assigned ON sales.ticket_queue(assigned_to); +CREATE INDEX idx_ticket_queue_priority ON sales.ticket_queue(priority, created_at); diff --git a/pgdog/src/backend/schema/sync/pg_dump.rs b/pgdog/src/backend/schema/sync/pg_dump.rs index 6c4bbc95..b0f5581a 100644 --- a/pgdog/src/backend/schema/sync/pg_dump.rs +++ b/pgdog/src/backend/schema/sync/pg_dump.rs @@ -8,10 +8,7 @@ use std::{ use lazy_static::lazy_static; use pg_query::{ - protobuf::{ - AlterTableCmd, AlterTableStmt, AlterTableType, ColumnDef, ConstrType, Constraint, - ObjectType, ParseResult, RangeVar, String as PgString, TypeName, - }, + protobuf::{AlterTableType, ConstrType, ObjectType, ParseResult, RangeVar, String as PgString}, Node, NodeEnum, }; use pgdog_config::QueryParserEngine; @@ -55,84 +52,6 @@ fn schema_name(relation: &RangeVar) -> &str { } } -/// Track primary key columns that are INTEGER types. -fn track_primary_keys<'a>( - cons: &'a Constraint, - table: &'a RangeVar, - column_types: &HashMap, &'a str>, - integer_primary_keys: &mut HashSet>, -) { - let schema = schema_name(table); - let table_name = table.relname.as_str(); - - for key in &cons.keys { - let Some(NodeEnum::String(PgString { sval })) = &key.node else { - continue; - }; - - let col_name = sval.as_str(); - let key = ColumnTypeKey { - schema, - table: table_name, - column: col_name, - }; - - if let Some(&type_name) = column_types.get(&key) { - if matches!( - type_name, - "int4" | "int2" | "serial" | "smallserial" | "integer" | "smallint" - ) { - integer_primary_keys.insert(Column { - name: col_name, - table: Some(table_name), - schema: Some(schema), - }); - } - } - } -} - -/// Track foreign key columns that reference integer primary keys. -fn track_foreign_keys<'a>( - cons: &'a Constraint, - fk_table: &'a RangeVar, - integer_primary_keys: &HashSet>, - integer_foreign_keys: &mut HashSet>, -) { - let Some(ref pk_table) = cons.pktable else { - return; - }; - - let pk_schema = schema_name(pk_table); - let pk_table_name = pk_table.relname.as_str(); - let fk_schema = schema_name(fk_table); - let fk_table_name = fk_table.relname.as_str(); - - for (pk_attr, fk_attr) in cons.pk_attrs.iter().zip(cons.fk_attrs.iter()) { - let ( - Some(NodeEnum::String(PgString { sval: pk_col })), - Some(NodeEnum::String(PgString { sval: fk_col })), - ) = (&pk_attr.node, &fk_attr.node) - else { - continue; - }; - - let pk_column = Column { - name: pk_col.as_str(), - table: Some(pk_table_name), - schema: Some(pk_schema), - }; - - if integer_primary_keys.contains(&pk_column) { - integer_foreign_keys.insert(Column { - name: fk_col.as_str(), - table: Some(fk_table_name), - schema: Some(fk_schema), - }); - } - } -} - use tokio::process::Command; #[derive(Debug, Clone)] @@ -317,13 +236,213 @@ impl<'a> From for Statement<'a> { } impl PgDumpOutput { + /// Get integer primary key columns (columns that are part of PRIMARY KEY + /// constraints and have integer types like int4, int2, serial, etc.). + pub fn integer_primary_key_columns(&self) -> HashSet> { + let column_types = self.column_types(); + let mut result = HashSet::new(); + + for stmt in &self.stmts.stmts { + let Some(ref node) = stmt.stmt else { + continue; + }; + let Some(NodeEnum::AlterTableStmt(ref alter_stmt)) = node.node else { + continue; + }; + + let Some(ref relation) = alter_stmt.relation else { + continue; + }; + + for cmd in &alter_stmt.cmds { + let Some(NodeEnum::AlterTableCmd(ref cmd)) = cmd.node else { + continue; + }; + + if cmd.subtype() != AlterTableType::AtAddConstraint { + continue; + } + + let Some(ref def) = cmd.def else { + continue; + }; + + let Some(NodeEnum::Constraint(ref cons)) = def.node else { + continue; + }; + + if cons.contype() != ConstrType::ConstrPrimary { + continue; + } + + let schema = schema_name(relation); + let table_name = relation.relname.as_str(); + + for key in &cons.keys { + let Some(NodeEnum::String(PgString { sval })) = &key.node else { + continue; + }; + + let col_name = sval.as_str(); + let type_key = ColumnTypeKey { + schema, + table: table_name, + column: col_name, + }; + + let is_integer = column_types + .get(&type_key) + .map(|t| { + matches!( + *t, + "int4" | "int2" | "serial" | "smallserial" | "integer" | "smallint" + ) + }) + .unwrap_or(false); + + if is_integer { + result.insert(Column { + name: col_name, + table: Some(table_name), + schema: Some(schema), + }); + } + } + } + } + + result + } + + /// Get integer foreign key columns (FK columns that reference integer PKs). + pub fn integer_foreign_key_columns(&self) -> HashSet> { + let integer_pks = self.integer_primary_key_columns(); + let mut result = HashSet::new(); + + for stmt in &self.stmts.stmts { + let Some(ref node) = stmt.stmt else { + continue; + }; + let Some(NodeEnum::AlterTableStmt(ref alter_stmt)) = node.node else { + continue; + }; + + let Some(ref fk_table) = alter_stmt.relation else { + continue; + }; + + for cmd in &alter_stmt.cmds { + let Some(NodeEnum::AlterTableCmd(ref cmd)) = cmd.node else { + continue; + }; + + if cmd.subtype() != AlterTableType::AtAddConstraint { + continue; + } + + let Some(ref def) = cmd.def else { + continue; + }; + + let Some(NodeEnum::Constraint(ref cons)) = def.node else { + continue; + }; + + if cons.contype() != ConstrType::ConstrForeign { + continue; + } + + let Some(ref pk_table) = cons.pktable else { + continue; + }; + + let pk_schema = schema_name(pk_table); + let pk_table_name = pk_table.relname.as_str(); + let fk_schema = schema_name(fk_table); + let fk_table_name = fk_table.relname.as_str(); + + for (pk_attr, fk_attr) in cons.pk_attrs.iter().zip(cons.fk_attrs.iter()) { + let ( + Some(NodeEnum::String(PgString { sval: pk_col })), + Some(NodeEnum::String(PgString { sval: fk_col })), + ) = (&pk_attr.node, &fk_attr.node) + else { + continue; + }; + + let pk_column = Column { + name: pk_col.as_str(), + table: Some(pk_table_name), + schema: Some(pk_schema), + }; + + if integer_pks.contains(&pk_column) { + result.insert(Column { + name: fk_col.as_str(), + table: Some(fk_table_name), + schema: Some(fk_schema), + }); + } + } + } + } + + result + } + + /// Get all column types from CREATE TABLE statements. + fn column_types(&self) -> HashMap, &str> { + let mut result = HashMap::new(); + + for stmt in &self.stmts.stmts { + let Some(ref node) = stmt.stmt else { + continue; + }; + let Some(NodeEnum::CreateStmt(ref create_stmt)) = node.node else { + continue; + }; + + let Some(ref relation) = create_stmt.relation else { + continue; + }; + + let schema = schema_name(relation); + let table_name = relation.relname.as_str(); + + for elt in &create_stmt.table_elts { + if let Some(NodeEnum::ColumnDef(col_def)) = &elt.node { + if let Some(ref type_name) = col_def.type_name { + if let Some(last_name) = type_name.names.last() { + if let Some(NodeEnum::String(PgString { sval })) = &last_name.node { + result.insert( + ColumnTypeKey { + schema, + table: table_name, + column: col_def.colname.as_str(), + }, + sval.as_str(), + ); + } + } + } + } + } + } + + result + } + /// Get schema statements to execute before data sync, /// e.g., CREATE TABLE, primary key. pub fn statements(&self, state: SyncState) -> Result>, Error> { let mut result = vec![]; - let mut integer_primary_keys = HashSet::>::new(); - let mut integer_foreign_keys = HashSet::>::new(); - let mut column_types: HashMap, &str> = HashMap::new(); + + // Get integer PK and FK columns that need bigint conversion + let columns_to_convert: HashSet> = self + .integer_primary_key_columns() + .union(&self.integer_foreign_key_columns()) + .copied() + .collect(); for stmt in &self.stmts.stmts { let (_, original_start) = self @@ -337,49 +456,53 @@ impl PgDumpOutput { if let Some(ref node) = stmt.stmt { if let Some(ref node) = node.node { match node { - NodeEnum::CreateStmt(stmt) => { - let sql = { - let mut stmt = stmt.clone(); - stmt.if_not_exists = true; - deparse_node(NodeEnum::CreateStmt(stmt))? - }; - - // Track column types for later PRIMARY KEY detection - if let Some(ref relation) = stmt.relation { - let schema = if relation.schemaname.is_empty() { - "public" + NodeEnum::CreateStmt(create_stmt) => { + let mut stmt = create_stmt.clone(); + stmt.if_not_exists = true; + + // Get table info + let (schema, table_name) = + if let Some(ref relation) = create_stmt.relation { + (schema_name(relation), relation.relname.as_str()) } else { - relation.schemaname.as_str() + ("public", "") }; - let table_name = relation.relname.as_str(); - - for elt in &stmt.table_elts { - if let Some(NodeEnum::ColumnDef(col_def)) = &elt.node { - if let Some(ref type_name) = col_def.type_name { - // Get the last element of the type name (e.g., "int4" from ["pg_catalog", "int4"]) - if let Some(last_name) = type_name.names.last() { - if let Some(NodeEnum::String(PgString { sval })) = - &last_name.node - { - column_types.insert( - ColumnTypeKey { - schema, - table: table_name, - column: col_def.colname.as_str(), - }, - sval.as_str(), - ); - } - } + + // Convert integer PK/FK columns to bigint + for elt in &mut stmt.table_elts { + if let Some(NodeEnum::ColumnDef(ref mut col_def)) = elt.node { + let col = Column { + name: col_def.colname.as_str(), + table: Some(table_name), + schema: Some(schema), + }; + + if columns_to_convert.contains(&col) { + if let Some(ref mut type_name) = col_def.type_name { + type_name.names = vec![ + Node { + node: Some(NodeEnum::String(PgString { + sval: "pg_catalog".to_owned(), + })), + }, + Node { + node: Some(NodeEnum::String(PgString { + sval: "int8".to_owned(), + })), + }, + ]; } } } } if state == SyncState::PreData { - // CREATE TABLE is always good. - let table = - stmt.relation.as_ref().map(Table::from).unwrap_or_default(); + let sql = deparse_node(NodeEnum::CreateStmt(stmt))?; + let table = create_stmt + .relation + .as_ref() + .map(Table::from) + .unwrap_or_default(); result.push(Statement::Table { table, sql }); } } @@ -428,21 +551,8 @@ impl PgDumpOutput { | ConstrType::ConstrNotnull | ConstrType::ConstrNull ) { - if cons.contype() - == ConstrType::ConstrPrimary - { - if let Some(ref relation) = - stmt.relation - { - track_primary_keys( - cons, - relation, - &column_types, - &mut integer_primary_keys, - ); - } - } - + // Integer PKs are already tracked and converted + // to bigint in CreateStmt handler if state == SyncState::PreData { result.push(Statement::Other { sql: original.to_string(), @@ -452,15 +562,8 @@ impl PgDumpOutput { } else if cons.contype() == ConstrType::ConstrForeign { - if let Some(ref relation) = stmt.relation { - track_foreign_keys( - cons, - relation, - &integer_primary_keys, - &mut integer_foreign_keys, - ); - } - + // FK columns referencing integer PKs are + // computed from fk_columns at the end if state == SyncState::PostData { result.push(Statement::Other { sql: original.to_string(), @@ -715,60 +818,6 @@ impl PgDumpOutput { } } - // Convert INTEGER primary keys and their referencing foreign keys to BIGINT - if state == SyncState::PreData { - for column in integer_primary_keys - .iter() - .chain(integer_foreign_keys.iter()) - { - let alter_stmt = AlterTableStmt { - relation: Some(RangeVar { - schemaname: column.schema.unwrap_or("public").to_owned(), - relname: column.table.unwrap_or_default().to_owned(), - inh: true, - relpersistence: "p".to_owned(), - ..Default::default() - }), - cmds: vec![Node { - node: Some(NodeEnum::AlterTableCmd(Box::new(AlterTableCmd { - subtype: AlterTableType::AtAlterColumnType.into(), - name: column.name.to_owned(), - def: Some(Box::new(Node { - node: Some(NodeEnum::ColumnDef(Box::new(ColumnDef { - type_name: Some(TypeName { - names: vec![ - Node { - node: Some(NodeEnum::String(PgString { - sval: "pg_catalog".to_owned(), - })), - }, - Node { - node: Some(NodeEnum::String(PgString { - sval: "int8".to_owned(), - })), - }, - ], - typemod: -1, - ..Default::default() - }), - ..Default::default() - }))), - })), - behavior: pg_query::protobuf::DropBehavior::DropRestrict.into(), - ..Default::default() - }))), - }], - objtype: ObjectType::ObjectTable.into(), - ..Default::default() - }; - let sql = deparse_node(NodeEnum::AlterTableStmt(alter_stmt))?; - result.push(Statement::Other { - sql, - idempotent: true, - }); - } - } - Ok(result) } @@ -933,6 +982,125 @@ ALTER TABLE ONLY public.users assert!(statements.is_empty()); } + #[test] + fn test_integer_primary_key_columns() { + let query = r#" +CREATE TABLE users (id INTEGER, name TEXT); +ALTER TABLE users ADD CONSTRAINT users_pkey PRIMARY KEY (id);"#; + + let output = PgDumpOutput { + stmts: parse(query).unwrap().protobuf, + original: query.to_owned(), + }; + + let pk_columns = output.integer_primary_key_columns(); + + // Should have one integer primary key column + assert_eq!(pk_columns.len(), 1); + assert!(pk_columns.contains(&Column { + name: "id", + table: Some("users"), + schema: Some("public"), + })); + } + + #[test] + fn test_non_integer_pk_excluded() { + let query = r#" +CREATE TABLE users (id UUID, name TEXT); +ALTER TABLE users ADD CONSTRAINT users_pkey PRIMARY KEY (id);"#; + + let output = PgDumpOutput { + stmts: parse(query).unwrap().protobuf, + original: query.to_owned(), + }; + + let pk_columns = output.integer_primary_key_columns(); + + // UUID primary key should not be included + assert_eq!(pk_columns.len(), 0); + } + + #[test] + fn test_integer_foreign_key_columns() { + let query = r#" +CREATE TABLE parent (id INTEGER, name TEXT); +CREATE TABLE child (id INTEGER, parent_id INTEGER); +ALTER TABLE parent ADD CONSTRAINT parent_pkey PRIMARY KEY (id); +ALTER TABLE child ADD CONSTRAINT child_parent_fk FOREIGN KEY (parent_id) REFERENCES parent(id);"#; + + let output = PgDumpOutput { + stmts: parse(query).unwrap().protobuf, + original: query.to_owned(), + }; + + let fk_columns = output.integer_foreign_key_columns(); + + // Should have one integer FK column + assert_eq!(fk_columns.len(), 1); + assert!(fk_columns.contains(&Column { + name: "parent_id", + table: Some("child"), + schema: Some("public"), + })); + } + + #[test] + fn test_integer_foreign_key_columns_composite() { + let query = r#" +CREATE TABLE parent (id1 INTEGER, id2 INTEGER, name TEXT); +CREATE TABLE child (id INTEGER, parent_id1 INTEGER, parent_id2 INTEGER); +ALTER TABLE parent ADD CONSTRAINT parent_pkey PRIMARY KEY (id1, id2); +ALTER TABLE child ADD CONSTRAINT child_parent_fk FOREIGN KEY (parent_id1, parent_id2) REFERENCES parent(id1, id2);"#; + + let output = PgDumpOutput { + stmts: parse(query).unwrap().protobuf, + original: query.to_owned(), + }; + + let fk_columns = output.integer_foreign_key_columns(); + + // Should have two integer FK columns + assert_eq!(fk_columns.len(), 2); + assert!(fk_columns.contains(&Column { + name: "parent_id1", + table: Some("child"), + schema: Some("public"), + })); + assert!(fk_columns.contains(&Column { + name: "parent_id2", + table: Some("child"), + schema: Some("public"), + })); + } + + #[test] + fn test_integer_primary_key_columns_composite() { + let query = r#" +CREATE TABLE order_items (order_id INTEGER, item_id INTEGER, quantity INTEGER); +ALTER TABLE order_items ADD CONSTRAINT order_items_pkey PRIMARY KEY (order_id, item_id);"#; + + let output = PgDumpOutput { + stmts: parse(query).unwrap().protobuf, + original: query.to_owned(), + }; + + let pk_columns = output.integer_primary_key_columns(); + + // Should have two integer primary key columns + assert_eq!(pk_columns.len(), 2); + assert!(pk_columns.contains(&Column { + name: "order_id", + table: Some("order_items"), + schema: Some("public"), + })); + assert!(pk_columns.contains(&Column { + name: "item_id", + table: Some("order_items"), + schema: Some("public"), + })); + } + #[test] fn test_bigint_rewrite() { let query = r#" @@ -945,20 +1113,17 @@ ALTER TABLE test ADD CONSTRAINT id_pkey PRIMARY KEY (id);"#; }; let statements = output.statements(SyncState::PreData).unwrap(); - assert_eq!(statements.len(), 3); + assert_eq!(statements.len(), 2); + // Integer PK column should be converted to bigint directly in CREATE TABLE assert_eq!( statements[0].deref(), - "CREATE TABLE IF NOT EXISTS test (id int, value text)" + "CREATE TABLE IF NOT EXISTS test (id bigint, value text)" ); assert_eq!( statements[1].deref(), "\nALTER TABLE test ADD CONSTRAINT id_pkey PRIMARY KEY (id)" ); - assert_eq!( - statements[2].deref(), - "ALTER TABLE public.test ALTER COLUMN id TYPE bigint" - ); } #[test] @@ -975,27 +1140,21 @@ ALTER TABLE child ADD CONSTRAINT child_parent_fk FOREIGN KEY (parent_id) REFEREN }; let statements = output.statements(SyncState::PreData).unwrap(); - assert_eq!(statements.len(), 5); + assert_eq!(statements.len(), 3); + // PK column converted to bigint in CREATE TABLE assert_eq!( statements[0].deref(), - "CREATE TABLE IF NOT EXISTS parent (id int, name text)" + "CREATE TABLE IF NOT EXISTS parent (id bigint, name text)" ); + // FK column also converted to bigint in CREATE TABLE assert_eq!( statements[1].deref(), - "CREATE TABLE IF NOT EXISTS child (id int, parent_id int)" + "CREATE TABLE IF NOT EXISTS child (id int, parent_id bigint)" ); assert_eq!( statements[2].deref(), "\nALTER TABLE parent ADD CONSTRAINT parent_pkey PRIMARY KEY (id)" ); - assert_eq!( - statements[3].deref(), - "ALTER TABLE public.parent ALTER COLUMN id TYPE bigint" - ); - assert_eq!( - statements[4].deref(), - "ALTER TABLE public.child ALTER COLUMN parent_id TYPE bigint" - ); } } From 56d227856f191dedd0fc043d87d2082bea6e19be Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Fri, 30 Jan 2026 16:18:32 -0800 Subject: [PATCH 2/2] letsgoo --- integration/schema_sync/dev.sh | 79 +++++++++++++++++++++++++++++++--- 1 file changed, 74 insertions(+), 5 deletions(-) diff --git a/integration/schema_sync/dev.sh b/integration/schema_sync/dev.sh index fbc607ae..631f1731 100644 --- a/integration/schema_sync/dev.sh +++ b/integration/schema_sync/dev.sh @@ -51,8 +51,75 @@ for f in source.sql destination.sql; do sed -i.bak '/^\\unrestrict.*$/d' $f done -# Expected content changes (without line numbers for portability) -EXPECTED_CHANGES=$(cat < document_id bigint NOT NULL, +< event_id integer NOT NULL, +> event_id bigint NOT NULL, +< flag_id integer NOT NULL, +> flag_id bigint NOT NULL, +< override_id integer NOT NULL, +> override_id bigint NOT NULL, +< flag_id integer NOT NULL, +> flag_id bigint NOT NULL, +< notification_id integer NOT NULL, +> notification_id bigint NOT NULL, +< session_id integer NOT NULL, +> session_id bigint NOT NULL, +< session_id integer DEFAULT nextval('core.session_data_session_id_seq'::regclass) NOT NULL, +> session_id bigint DEFAULT nextval('core.session_data_session_id_seq'::regclass) NOT NULL, +< session_id integer DEFAULT nextval('core.session_data_session_id_seq'::regclass) NOT NULL, +> session_id bigint DEFAULT nextval('core.session_data_session_id_seq'::regclass) NOT NULL, +< session_id integer DEFAULT nextval('core.session_data_session_id_seq'::regclass) NOT NULL, +> session_id bigint DEFAULT nextval('core.session_data_session_id_seq'::regclass) NOT NULL, +< session_id integer DEFAULT nextval('core.session_data_session_id_seq'::regclass) NOT NULL, +> session_id bigint DEFAULT nextval('core.session_data_session_id_seq'::regclass) NOT NULL, +< setting_id integer NOT NULL, +> setting_id bigint NOT NULL, +< price_history_id integer NOT NULL, +> price_history_id bigint NOT NULL, +< category_id integer NOT NULL, +> category_id bigint NOT NULL, +< price_history_id integer DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) NOT NULL, +> price_history_id bigint DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) NOT NULL, +< category_id integer NOT NULL, +> category_id bigint NOT NULL, +< price_history_id integer DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) NOT NULL, +> price_history_id bigint DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) NOT NULL, +< category_id integer NOT NULL, +> category_id bigint NOT NULL, +< price_history_id integer DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) NOT NULL, +> price_history_id bigint DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) NOT NULL, +< category_id integer NOT NULL, +> category_id bigint NOT NULL, +< price_history_id integer DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) NOT NULL, +> price_history_id bigint DEFAULT nextval('inventory.price_history_price_history_id_seq'::regclass) NOT NULL, +< category_id integer NOT NULL, +> category_id bigint NOT NULL, +< ticket_id integer NOT NULL, +> ticket_id bigint NOT NULL, +< ticket_id integer DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) NOT NULL, +> ticket_id bigint DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) NOT NULL, +< ticket_id integer DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) NOT NULL, +> ticket_id bigint DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) NOT NULL, +< ticket_id integer DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) NOT NULL, +> ticket_id bigint DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) NOT NULL, +< ticket_id integer DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) NOT NULL, +> ticket_id bigint DEFAULT nextval('sales.ticket_queue_ticket_id_seq'::regclass) NOT NULL, +< ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_0_100 FOR VALUES FROM (0) TO (100); +> ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_0_100 FOR VALUES FROM ('0') TO ('100'); +< ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_100_200 FOR VALUES FROM (100) TO (200); +> ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_100_200 FOR VALUES FROM ('100') TO ('200'); +< ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_200_300 FOR VALUES FROM (200) TO (300); +> ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_200_300 FOR VALUES FROM ('200') TO ('300'); +< ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_300_plus FOR VALUES FROM (300) TO (MAXVALUE); +> ALTER TABLE ONLY inventory.price_history ATTACH PARTITION inventory.price_history_cat_300_plus FOR VALUES FROM ('300') TO (MAXVALUE); +EOF +) + +# Expected content changes for PostgreSQL 15+ (named NOT NULL constraints) +EXPECTED_PG15=$(cat < document_id bigint NOT NULL, < event_id integer NOT NULL, @@ -122,10 +189,12 @@ diff source.sql destination.sql > diff.txt || true # Extract just the content lines (< and >) for comparison ACTUAL_CHANGES=$(grep '^[<>]' diff.txt) -if [ "$ACTUAL_CHANGES" != "$EXPECTED_CHANGES" ]; then +if [ "$ACTUAL_CHANGES" != "$EXPECTED_PG14" ] && [ "$ACTUAL_CHANGES" != "$EXPECTED_PG15" ]; then echo "Schema diff does not match expected changes" - echo "=== Expected ===" - echo "$EXPECTED_CHANGES" + echo "=== Expected (PG < 15) ===" + echo "$EXPECTED_PG14" + echo "=== Expected (PG 15+) ===" + echo "$EXPECTED_PG15" echo "=== Actual ===" echo "$ACTUAL_CHANGES" exit 1