-
Notifications
You must be signed in to change notification settings - Fork 38
Enable active check #4096
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/active-active
Are you sure you want to change the base?
Enable active check #4096
Conversation
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing touches🧪 Generate unit tests (beta)
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
|
Summary of ChangesHello @Leavrth, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a robust Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new tool, multi-cluster-consistency-checker, to verify data consistency across multiple TiCDC clusters. A security audit identified two medium-severity information exposure vulnerabilities: the exposure of S3 credentials in initialization output and the logging of full database records (potentially containing PII or secrets) at the Error level when inconsistencies are detected. Recommendations include masking credentials in URIs and reducing the log level or detail for database records. Furthermore, critical issues were found regarding capturing loop variables in goroutines, which could lead to race conditions and incorrect behavior. There are also opportunities to improve code maintainability by refactoring duplicated logic in the checker and fixing a minor issue in the configuration parsing.
| for upstreamClusterID, downstreamCheckpointWatcherMap := range t.checkpointWatcher { | ||
| for downstreamClusterID, checkpointWatcher := range downstreamCheckpointWatcherMap { | ||
| mincheckpointTs := max(minCheckpointTsMap[upstreamClusterID][downstreamClusterID], maxTimeWindowRightBoundary) | ||
| eg.Go(func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goroutine inside this loop (L135) captures loop variables upstreamClusterID, downstreamClusterID, and checkpointWatcher. This can lead to race conditions and incorrect behavior as the goroutines may not process the intended cluster/watcher pair. You should create copies of these variables within the loop before passing them to the goroutine.
| for clusterID, triplet := range t.timeWindowTriplet { | ||
| minTimeWindowRightBoundary := max(maxCheckpointTs[clusterID], maxPDTimestampAfterCheckpointTs[clusterID], triplet[2].NextMinLeftBoundary) | ||
| s3Watcher := t.s3Watcher[clusterID] | ||
| eg.Go(func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| for otherClusterID := range t.pdClients { | ||
| if otherClusterID == clusterID { | ||
| continue | ||
| } | ||
| pdClient := t.pdClients[otherClusterID] | ||
| eg.Go(func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| for schemaPathKey, filePath := range newVersionPaths { | ||
| eg.Go(func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goroutine inside this loop (L163) captures loop variables schemaPathKey and filePath. This can lead to race conditions and incorrect behavior as the goroutines may not process the intended schema/file. You should create copies of these variables within the loop before passing them to the goroutine.
| for schema, tables := range c.tables { | ||
| for _, table := range tables { | ||
| eg.Go(func() error { | ||
| newVersions, err := c.discoverAndDownloadNewTableVersions(egCtx, schema, table) | ||
| if err != nil { | ||
| return errors.Trace(err) | ||
| } | ||
| maxVersion := uint64(0) | ||
| for _, version := range newVersions { | ||
| maxVersion = max(maxVersion, version) | ||
| eg.Go(func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| for name, cluster := range cfg.Clusters { | ||
| fmt.Printf(" Cluster: %s\n", name) | ||
| fmt.Printf(" PD Address: %s\n", cluster.PDAddr) | ||
| fmt.Printf(" S3 Sink URI: %s\n", cluster.S3SinkURI) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The application prints the raw S3SinkURI to standard output during initialization. If this URI contains sensitive credentials (such as AWS access keys or secret keys), they will be exposed in the terminal and any logs capturing the output. This violates the principle of not logging sensitive information. It is recommended to mask credentials in the URI before printing.
| fmt.Printf(" S3 Sink URI: %s\n", cluster.S3SinkURI) | |
| fmt.Printf(" S3 Sink URI: %s\n", util.MaskSensitiveDataInURI(clusterConfig.S3SinkURI)) |
| log.Error("LWW violation detected", | ||
| zap.String("clusterID", c.clusterID), | ||
| zap.Any("entry", entry), | ||
| zap.Any("record", r)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The application logs full database records (including primary keys and column values) when data inconsistencies or violations are detected (see also lines 237, 242, 265, 272, and 291). If the database contains sensitive information or Personally Identifiable Information (PII), this data will be written to the application logs in plain text at the Error level. This could lead to unauthorized exposure of sensitive data if logs are not strictly secured. Consider logging only the primary keys or using a Debug level for full record details.
| func (cd *clusterDataChecker) dataLossDetection(checker *DataChecker) { | ||
| for _, upstreamDataCache := range cd.timeWindowDataCaches[1].upstreamDataCache { | ||
| for _, record := range upstreamDataCache { | ||
| for downstreamClusterID, checkpointTs := range cd.timeWindowDataCaches[1].checkpointTs { | ||
| if record.CommitTs <= checkpointTs { | ||
| continue | ||
| } | ||
| downstreamRecord, skipped := checker.FindClusterDownstreamData(downstreamClusterID, record.Pk, record.CommitTs) | ||
| if skipped { | ||
| continue | ||
| } | ||
| if downstreamRecord == nil { | ||
| // data loss detected | ||
| log.Error("data loss detected", | ||
| zap.String("upstreamClusterID", cd.clusterID), | ||
| zap.String("downstreamClusterID", downstreamClusterID), | ||
| zap.Any("record", record)) | ||
| cd.report.AddDataLossItem(downstreamClusterID, string(record.Pk), record.OriginTs, record.CommitTs, false) | ||
| } else if !record.EqualDownstreamRecord(downstreamRecord) { | ||
| // data inconsistent detected | ||
| log.Error("data inconsistent detected", | ||
| zap.String("upstreamClusterID", cd.clusterID), | ||
| zap.String("downstreamClusterID", downstreamClusterID), | ||
| zap.Any("record", record)) | ||
| cd.report.AddDataLossItem(downstreamClusterID, string(record.Pk), record.OriginTs, record.CommitTs, true) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| for _, upstreamDataCache := range cd.timeWindowDataCaches[2].upstreamDataCache { | ||
| for _, record := range upstreamDataCache { | ||
| for downstreamClusterID, checkpointTs := range cd.timeWindowDataCaches[2].checkpointTs { | ||
| if record.CommitTs > checkpointTs { | ||
| continue | ||
| } | ||
| downstreamRecord, skipped := checker.FindClusterDownstreamData(downstreamClusterID, record.Pk, record.CommitTs) | ||
| if skipped { | ||
| continue | ||
| } | ||
| if downstreamRecord == nil { | ||
| // data loss detected | ||
| log.Error("data loss detected", | ||
| zap.String("upstreamClusterID", cd.clusterID), | ||
| zap.String("downstreamClusterID", downstreamClusterID), | ||
| zap.Any("record", record)) | ||
| cd.report.AddDataLossItem(downstreamClusterID, string(record.Pk), record.OriginTs, record.CommitTs, false) | ||
| } else if !record.EqualDownstreamRecord(downstreamRecord) { | ||
| // data inconsistent detected | ||
| log.Error("data inconsistent detected", | ||
| zap.String("upstreamClusterID", cd.clusterID), | ||
| zap.String("downstreamClusterID", downstreamClusterID), | ||
| zap.Any("record", record)) | ||
| cd.report.AddDataLossItem(downstreamClusterID, string(record.Pk), record.OriginTs, record.CommitTs, true) | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic inside the two main loops of dataLossDetection is very similar. The main difference is the condition for checking record.CommitTs against checkpointTs. This duplication can be reduced by extracting the common logic into a helper function. This would improve readability and maintainability.
| if keyStr != "global" && keyStr != "clusters" { | ||
| unknownKeys = append(unknownKeys, keyStr) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic to filter unknown configuration keys seems incorrect. The condition if keyStr != "global" && keyStr != "clusters" will only filter out the exact keys "global" and "clusters", but not keys under them like "global.foo". Since meta.Undecoded() returns all keys that were not decoded into the struct, all of them should be considered unknown. You can probably remove this if condition and report all undecoded keys.
unknownKeys = append(unknownKeys, keyStr)Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note