Skip to content

Conversation

@Leavrth
Copy link

@Leavrth Leavrth commented Jan 30, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

None

Leavrth added 10 commits January 9, 2026 23:57
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 30, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign asddongmen for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added first-time-contributor Indicates that the PR was contributed by an external member and is a first-time contributor. release-note-none Denotes a PR that doesn't merit a release note. labels Jan 30, 2026
@coderabbitai
Copy link

coderabbitai bot commented Jan 30, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@pingcap-cla-assistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Jan 30, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @Leavrth, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a robust multi-cluster-consistency-checker tool designed to ensure data integrity across distributed TiCDC environments. It provides a comprehensive framework for monitoring active-active replication scenarios by consuming and analyzing data from S3 sinks. The tool systematically identifies potential data discrepancies, such as loss, redundancy, and LWW violations, offering critical insights into the health of multi-cluster setups.

Highlights

  • New Tool Introduction: A new command-line tool, multi-cluster-consistency-checker, has been introduced to monitor and verify data consistency across multiple TiCDC clusters.
  • Consistency Checking Logic: The tool includes sophisticated logic to detect data loss, data redundancy, and Last-Write-Wins (LWW) violations by comparing data streams from different clusters within defined time windows.
  • S3 Data Consumption: It features a Consumer component capable of discovering and downloading schema and DML files from S3 sinks, and a TableParser to process CSV data into structured records.
  • Dynamic Checkpoint Monitoring: The CheckpointWatcher and S3Watcher components enable dynamic monitoring of changefeed checkpoints in etcd and S3, ensuring checks are performed on up-to-date data.
  • Configurable Operation: The checker is highly configurable via a TOML file, allowing users to specify log levels, report directories, tables to monitor, and details for each cluster involved in the consistency check.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new tool, multi-cluster-consistency-checker, to verify data consistency across multiple TiCDC clusters. A security audit identified two medium-severity information exposure vulnerabilities: the exposure of S3 credentials in initialization output and the logging of full database records (potentially containing PII or secrets) at the Error level when inconsistencies are detected. Recommendations include masking credentials in URIs and reducing the log level or detail for database records. Furthermore, critical issues were found regarding capturing loop variables in goroutines, which could lead to race conditions and incorrect behavior. There are also opportunities to improve code maintainability by refactoring duplicated logic in the checker and fixing a minor issue in the configuration parsing.

Comment on lines +132 to +135
for upstreamClusterID, downstreamCheckpointWatcherMap := range t.checkpointWatcher {
for downstreamClusterID, checkpointWatcher := range downstreamCheckpointWatcherMap {
mincheckpointTs := max(minCheckpointTsMap[upstreamClusterID][downstreamClusterID], maxTimeWindowRightBoundary)
eg.Go(func() error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The goroutine inside this loop (L135) captures loop variables upstreamClusterID, downstreamClusterID, and checkpointWatcher. This can lead to race conditions and incorrect behavior as the goroutines may not process the intended cluster/watcher pair. You should create copies of these variables within the loop before passing them to the goroutine.

Comment on lines +168 to +171
for clusterID, triplet := range t.timeWindowTriplet {
minTimeWindowRightBoundary := max(maxCheckpointTs[clusterID], maxPDTimestampAfterCheckpointTs[clusterID], triplet[2].NextMinLeftBoundary)
s3Watcher := t.s3Watcher[clusterID]
eg.Go(func() error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The goroutine inside this loop (L171) captures loop variables clusterID and triplet. This can lead to race conditions and incorrect behavior. You should create copies of these variables within the loop before passing them to the goroutine.

Comment on lines +234 to +239
for otherClusterID := range t.pdClients {
if otherClusterID == clusterID {
continue
}
pdClient := t.pdClients[otherClusterID]
eg.Go(func() error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The goroutine inside this loop (L239) captures loop variables otherClusterID and pdClient. This can lead to race conditions and incorrect behavior. You should create copies of these variables within the loop before passing them to the goroutine.

Comment on lines 162 to 163
for schemaPathKey, filePath := range newVersionPaths {
eg.Go(func() error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The goroutine inside this loop (L163) captures loop variables schemaPathKey and filePath. This can lead to race conditions and incorrect behavior as the goroutines may not process the intended schema/file. You should create copies of these variables within the loop before passing them to the goroutine.

Comment on lines 421 to 431
for schema, tables := range c.tables {
for _, table := range tables {
eg.Go(func() error {
newVersions, err := c.discoverAndDownloadNewTableVersions(egCtx, schema, table)
if err != nil {
return errors.Trace(err)
}
maxVersion := uint64(0)
for _, version := range newVersions {
maxVersion = max(maxVersion, version)
eg.Go(func() error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The nested goroutines here capture loop variables schema, table (L423), and version (L431). This can lead to race conditions and incorrect behavior. You should create copies of these variables within each loop before passing them to the goroutines.

for name, cluster := range cfg.Clusters {
fmt.Printf(" Cluster: %s\n", name)
fmt.Printf(" PD Address: %s\n", cluster.PDAddr)
fmt.Printf(" S3 Sink URI: %s\n", cluster.S3SinkURI)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The application prints the raw S3SinkURI to standard output during initialization. If this URI contains sensitive credentials (such as AWS access keys or secret keys), they will be exposed in the terminal and any logs capturing the output. This violates the principle of not logging sensitive information. It is recommended to mask credentials in the URI before printing.

Suggested change
fmt.Printf(" S3 Sink URI: %s\n", cluster.S3SinkURI)
fmt.Printf(" S3 Sink URI: %s\n", util.MaskSensitiveDataInURI(clusterConfig.S3SinkURI))

Comment on lines +63 to +66
log.Error("LWW violation detected",
zap.String("clusterID", c.clusterID),
zap.Any("entry", entry),
zap.Any("record", r))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The application logs full database records (including primary keys and column values) when data inconsistencies or violations are detected (see also lines 237, 242, 265, 272, and 291). If the database contains sensitive information or Personally Identifiable Information (PII), this data will be written to the application logs in plain text at the Error level. This could lead to unauthorized exposure of sensitive data if logs are not strictly secured. Consider logging only the primary keys or using a Debug level for full record details.

Comment on lines 224 to 280
func (cd *clusterDataChecker) dataLossDetection(checker *DataChecker) {
for _, upstreamDataCache := range cd.timeWindowDataCaches[1].upstreamDataCache {
for _, record := range upstreamDataCache {
for downstreamClusterID, checkpointTs := range cd.timeWindowDataCaches[1].checkpointTs {
if record.CommitTs <= checkpointTs {
continue
}
downstreamRecord, skipped := checker.FindClusterDownstreamData(downstreamClusterID, record.Pk, record.CommitTs)
if skipped {
continue
}
if downstreamRecord == nil {
// data loss detected
log.Error("data loss detected",
zap.String("upstreamClusterID", cd.clusterID),
zap.String("downstreamClusterID", downstreamClusterID),
zap.Any("record", record))
cd.report.AddDataLossItem(downstreamClusterID, string(record.Pk), record.OriginTs, record.CommitTs, false)
} else if !record.EqualDownstreamRecord(downstreamRecord) {
// data inconsistent detected
log.Error("data inconsistent detected",
zap.String("upstreamClusterID", cd.clusterID),
zap.String("downstreamClusterID", downstreamClusterID),
zap.Any("record", record))
cd.report.AddDataLossItem(downstreamClusterID, string(record.Pk), record.OriginTs, record.CommitTs, true)
}
}
}
}
for _, upstreamDataCache := range cd.timeWindowDataCaches[2].upstreamDataCache {
for _, record := range upstreamDataCache {
for downstreamClusterID, checkpointTs := range cd.timeWindowDataCaches[2].checkpointTs {
if record.CommitTs > checkpointTs {
continue
}
downstreamRecord, skipped := checker.FindClusterDownstreamData(downstreamClusterID, record.Pk, record.CommitTs)
if skipped {
continue
}
if downstreamRecord == nil {
// data loss detected
log.Error("data loss detected",
zap.String("upstreamClusterID", cd.clusterID),
zap.String("downstreamClusterID", downstreamClusterID),
zap.Any("record", record))
cd.report.AddDataLossItem(downstreamClusterID, string(record.Pk), record.OriginTs, record.CommitTs, false)
} else if !record.EqualDownstreamRecord(downstreamRecord) {
// data inconsistent detected
log.Error("data inconsistent detected",
zap.String("upstreamClusterID", cd.clusterID),
zap.String("downstreamClusterID", downstreamClusterID),
zap.Any("record", record))
cd.report.AddDataLossItem(downstreamClusterID, string(record.Pk), record.OriginTs, record.CommitTs, true)
}
}
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic inside the two main loops of dataLossDetection is very similar. The main difference is the condition for checking record.CommitTs against checkpointTs. This duplication can be reduced by extracting the common logic into a helper function. This would improve readability and maintainability.

Comment on lines +113 to +115
if keyStr != "global" && keyStr != "clusters" {
unknownKeys = append(unknownKeys, keyStr)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to filter unknown configuration keys seems incorrect. The condition if keyStr != "global" && keyStr != "clusters" will only filter out the exact keys "global" and "clusters", but not keys under them like "global.foo". Since meta.Undecoded() returns all keys that were not decoded into the struct, all of them should be considered unknown. You can probably remove this if condition and report all undecoded keys.

unknownKeys = append(unknownKeys, keyStr)

Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

first-time-contributor Indicates that the PR was contributed by an external member and is a first-time contributor. release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant