diff --git a/dm/common/common.go b/dm/common/common.go index e8dde43435..6da35f7c6a 100644 --- a/dm/common/common.go +++ b/dm/common/common.go @@ -64,6 +64,9 @@ var ( // ShardDDLPessimismOperationKeyAdapter is used to store shard DDL operation in pessimistic model. // k/v: Encode(task-name, source-id) -> shard DDL operation. ShardDDLPessimismOperationKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-pessimism/operation/") + // ShardDDLPessimismDDLsKeyAdapter is used to store last done DDLs in pessimistic model. + // k/v: Encode(task-name, downSchema, downTable) -> DDLs. + ShardDDLPessimismDDLsKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-pessimism/ddls/") // ShardDDLOptimismSourceTablesKeyAdapter is used to store INITIAL upstream schema & table names when starting the subtask. // In other words, if any Info for this subtask exists, we should obey source tables in the Info. @@ -100,7 +103,7 @@ func keyAdapterKeysLen(s KeyAdapter) int { ShardDDLPessimismInfoKeyAdapter, ShardDDLPessimismOperationKeyAdapter, ShardDDLOptimismSourceTablesKeyAdapter: return 2 - case ShardDDLOptimismInitSchemaKeyAdapter: + case ShardDDLOptimismInitSchemaKeyAdapter, ShardDDLPessimismDDLsKeyAdapter: return 3 case ShardDDLOptimismInfoKeyAdapter, ShardDDLOptimismOperationKeyAdapter: return 4 diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 1dd27ddff7..514f9305cd 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -775,7 +775,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { schema, table = "foo", "bar" ID = fmt.Sprintf("%s-`%s`.`%s`", taskName, schema, table) i11 = pessimism.NewInfo(taskName, sources[0], schema, table, DDLs) - op2 = pessimism.NewOperation(ID, taskName, sources[0], DDLs, true, false) + op2 = pessimism.NewOperation(ID, taskName, sources[0], DDLs, true, false, false) ) _, err = pessimism.PutInfo(etcdTestCli, i11) c.Assert(err, check.IsNil) diff --git a/dm/master/shardddl/pessimist.go b/dm/master/shardddl/pessimist.go index 072b13c452..754242c593 100644 --- a/dm/master/shardddl/pessimist.go +++ b/dm/master/shardddl/pessimist.go @@ -147,8 +147,13 @@ func (p *Pessimist) buildLocks(etcdCli *clientv3.Client) (int64, int64, error) { } p.logger.Info("get history shard DDL lock operation", zap.Reflect("operation", opm), zap.Int64("revision", rev2)) + latestDoneDDLsMap, _, err := pessimism.GetAllLatestDoneDDLs(etcdCli) + if err != nil { + return 0, 0, err + } + // recover the shard DDL lock based on history shard DDL info & lock operation. - err = p.recoverLocks(ifm, opm) + err = p.recoverLocks(ifm, opm, latestDoneDDLsMap) if err != nil { // only log the error, and don't return it to forbid the startup of the DM-master leader. // then these unexpected locks can be handled by the user. @@ -291,7 +296,7 @@ func (p *Pessimist) UnlockLock(ctx context.Context, id, replaceOwner string, for // 2. check whether has resolved before (this often should not happen). if lock.IsResolved() { - err := p.removeLock(lock) + err := p.removeLockPutDDLs(lock) if err != nil { return err } @@ -399,19 +404,26 @@ func (p *Pessimist) RemoveMetaData(task string) error { } // clear meta data in etcd - _, err = pessimism.DeleteInfosOperationsByTask(p.cli, task) - return err + if _, err = pessimism.DeleteInfosOperationsDDLsByTask(p.cli, task); err != nil { + return err + } + p.lk.RemoveLatestDoneDDLsByTask(task) + return nil } // recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation. -func (p *Pessimist) recoverLocks(ifm map[string]map[string]pessimism.Info, opm map[string]map[string]pessimism.Operation) error { +func (p *Pessimist) recoverLocks(ifm map[string]map[string]pessimism.Info, + opm map[string]map[string]pessimism.Operation, latestDoneDDLsMap map[string]map[string]map[string][]string) error { + // add all last done ddls. + p.lk.AddAllLatestDoneDDLs(latestDoneDDLsMap) + // construct locks based on the shard DDL info. for task, ifs := range ifm { sources := p.taskSources(task) // if no operation exists for the lock, we let the smallest (lexicographical order) source as the owner of the lock. // if any operation exists for the lock, we let the source with `exec=true` as the owner of the lock (the logic is below). for _, info := range pessimismInfoMapToSlice(ifs) { - _, _, _, err := p.lk.TrySync(info, sources) + _, _, _, err := p.lk.TrySync(p.cli, info, sources) if err != nil { return err } @@ -471,7 +483,7 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I p.logger.Info("receive a shard DDL info", zap.Stringer("info", info)) p.infoOpMu.Lock() - lockID, synced, remain, err := p.lk.TrySync(info, p.taskSources(info.Task)) + lockID, synced, remain, err := p.lk.TrySync(p.cli, info, p.taskSources(info.Task)) if err != nil { p.logger.Error("fail to try sync shard DDL lock", zap.Stringer("info", info), log.ShortError(err)) // currently, only DDL mismatch will cause error @@ -530,7 +542,7 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis if lock.IsResolved() { p.logger.Info("the lock for the shard DDL lock operation has been resolved", zap.Stringer("operation", op)) // remove all operations for this shard DDL lock. - err := p.removeLock(lock) + err := p.removeLockPutDDLs(lock) if err != nil { p.logger.Error("fail to delete the shard DDL lock operations", zap.String("lock", lock.ID), log.ShortError(err)) metrics.ReportDDLError(op.Task, metrics.OpErrRemoveLock) @@ -576,11 +588,7 @@ func (p *Pessimist) handleLock(lockID, source string) error { if lock.IsResolved() { // remove all operations for this shard DDL lock. // this is to handle the case where dm-master exit before deleting operations for them. - err := p.removeLock(lock) - if err != nil { - return err - } - return nil + return p.removeLockPutDDLs(lock) } // check whether the owner has done. @@ -601,7 +609,7 @@ func (p *Pessimist) handleLock(lockID, source string) error { // putOpForOwner PUTs the shard DDL lock operation for the owner into etcd. func (p *Pessimist) putOpForOwner(lock *pessimism.Lock, owner string, skipDone bool) error { - op := pessimism.NewOperation(lock.ID, lock.Task, owner, lock.DDLs, true, false) + op := pessimism.NewOperation(lock.ID, lock.Task, owner, lock.DDLs, true, false, false) rev, succ, err := pessimism.PutOperations(p.cli, skipDone, op) if err != nil { return err @@ -625,7 +633,7 @@ func (p *Pessimist) putOpsForNonOwner(lock *pessimism.Lock, onlySource string, s ops := make([]pessimism.Operation, 0, len(sources)) for _, source := range sources { - ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false)) + ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false, false)) } rev, succ, err := pessimism.PutOperations(p.cli, skipDone, ops...) @@ -636,10 +644,10 @@ func (p *Pessimist) putOpsForNonOwner(lock *pessimism.Lock, onlySource string, s return nil } -// removeLock removes the lock in memory and its information in etcd. -func (p *Pessimist) removeLock(lock *pessimism.Lock) error { +// removeLockPutDDLs removes the lock in memory and its information and put ddls in etcd. +func (p *Pessimist) removeLockPutDDLs(lock *pessimism.Lock) error { // remove all operations for this shard DDL lock. - if err := p.deleteOps(lock); err != nil { + if err := p.deleteOpsPutDDLs(lock); err != nil { return err } @@ -672,23 +680,24 @@ func (p *Pessimist) removeLock(lock *pessimism.Lock) error { } }) p.lk.RemoveLock(lock.ID) + p.lk.AddLatestDoneDDLs(lock.ID, lock.DDLs) return nil } // deleteOps DELETEs shard DDL lock operations relative to the lock. -func (p *Pessimist) deleteOps(lock *pessimism.Lock) error { +func (p *Pessimist) deleteOpsPutDDLs(lock *pessimism.Lock) error { ready := lock.Ready() ops := make([]pessimism.Operation, 0, len(ready)) for source := range ready { // When deleting operations, we do not verify the value of the operation now, // so simply set `exec=false` and `done=true`. - ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true)) + ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true, false)) } - rev, err := pessimism.DeleteOperations(p.cli, ops...) + rev, err := pessimism.DeleteOperationsPutDDLs(p.cli, lock.ID, ops, lock.DDLs) if err != nil { return err } - p.logger.Info("delete shard DDL lock operations", zap.String("lock", lock.ID), zap.Int64("revision", rev)) + p.logger.Info("delete shard DDL lock operations and put latest done ddls", zap.String("lock", lock.ID), zap.Int64("revision", rev), zap.Strings("ddls", lock.DDLs)) return err } @@ -704,7 +713,7 @@ func (p *Pessimist) deleteInfosOps(lock *pessimism.Lock) error { for source := range ready { // When deleting operations, we do not verify the value of the operation now, // so simply set `exec=false` and `done=true`. - ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true)) + ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true, false)) } rev, err := pessimism.DeleteInfosOperations(p.cli, infos, ops) @@ -785,7 +794,7 @@ func (p *Pessimist) waitNonOwnerToBeDone(ctx context.Context, lock *pessimism.Lo // we still put `skip` operations for waitSources one more time with `skipDone=true`. ops := make([]pessimism.Operation, 0, len(waitSources)) for _, source := range waitSources { - ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false)) + ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false, false)) } rev, succ, err := pessimism.PutOperations(p.cli, true, ops...) if err != nil { diff --git a/dm/master/shardddl/pessimist_test.go b/dm/master/shardddl/pessimist_test.go index 8e796c0e3e..de469a3e3c 100644 --- a/dm/master/shardddl/pessimist_test.go +++ b/dm/master/shardddl/pessimist_test.go @@ -305,7 +305,7 @@ func (t *testPessimist) testPessimistProgress(c *C, restart int) { c.Assert(p.Locks()[ID2].IsDone(source2), IsFalse) // mark exec operation for one non-owner as `done` (and delete the info). - op22c := pessimism.NewOperation(ID2, task2, source2, DDLs, false, true) + op22c := pessimism.NewOperation(ID2, task2, source2, DDLs, false, true, false) done, _, err = pessimism.PutOperationDeleteExistInfo(etcdTestCli, op22c, i22) c.Assert(err, IsNil) c.Assert(done, IsTrue) @@ -325,7 +325,7 @@ func (t *testPessimist) testPessimistProgress(c *C, restart int) { // mark skip operation for the non-owner as `done` (and delete the info). // the lock should become resolved and deleted. - op23c := pessimism.NewOperation(ID2, task2, source3, DDLs, false, true) + op23c := pessimism.NewOperation(ID2, task2, source3, DDLs, false, true, false) done, _, err = pessimism.PutOperationDeleteExistInfo(etcdTestCli, op23c, i23) c.Assert(err, IsNil) c.Assert(done, IsTrue) @@ -844,7 +844,7 @@ func (t *testPessimist) TestMeetEtcdCompactError(c *C) { ID1 = fmt.Sprintf("%s-`%s`.`%s`", task1, schema, table) i11 = pessimism.NewInfo(task1, source1, schema, table, DDLs) i12 = pessimism.NewInfo(task1, source2, schema, table, DDLs) - op = pessimism.NewOperation(ID1, task1, source1, DDLs, true, false) + op = pessimism.NewOperation(ID1, task1, source1, DDLs, true, false, false) revCompacted int64 infoCh chan pessimism.Info diff --git a/pkg/shardddl/pessimism/ddls.go b/pkg/shardddl/pessimism/ddls.go new file mode 100644 index 0000000000..4e6c214cc4 --- /dev/null +++ b/pkg/shardddl/pessimism/ddls.go @@ -0,0 +1,84 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pessimism + +import ( + "context" + "encoding/json" + + "go.etcd.io/etcd/clientv3" + + "github.com/pingcap/dm/dm/common" + "github.com/pingcap/dm/pkg/etcdutil" +) + +// putLatestDoneDDLsOp returns a PUT etcd operation for latest done ddls. +// This operation should often be sent by DM-master. +func putLatestDoneDDLsOp(task, downSchema, downTable string, ddls []string) (clientv3.Op, error) { + data, err := json.Marshal(ddls) + if err != nil { + return clientv3.Op{}, err + } + key := common.ShardDDLPessimismDDLsKeyAdapter.Encode(task, downSchema, downTable) + + return clientv3.OpPut(key, string(data)), nil +} + +// PutLatestDoneDDLs puts the last done shard DDL ddls into etcd. +func PutLatestDoneDDLs(cli *clientv3.Client, task, downSchema, downTable string, ddls []string) (int64, error) { + putOp, err := putLatestDoneDDLsOp(task, downSchema, downTable, ddls) + if err != nil { + return 0, err + } + _, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, putOp) + return rev, err +} + +// GetAllLatestDoneDDLs gets all last done shard DDL ddls in etcd currently. +// k/v: task -> downSchema -> downTable -> DDLs +// This function should often be called by DM-master. +func GetAllLatestDoneDDLs(cli *clientv3.Client) (map[string]map[string]map[string][]string, int64, error) { + ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout) + defer cancel() + + resp, err := cli.Get(ctx, common.ShardDDLPessimismDDLsKeyAdapter.Path(), clientv3.WithPrefix()) + if err != nil { + return nil, 0, err + } + + ddlsMap := make(map[string]map[string]map[string][]string, len(resp.Kvs)) + for _, kv := range resp.Kvs { + var ddls []string + if err2 := json.Unmarshal(kv.Value, &ddls); err2 != nil { + return nil, 0, err2 + } + keys, err2 := common.ShardDDLPessimismDDLsKeyAdapter.Decode(string(kv.Key)) + if err2 != nil { + return nil, 0, err2 + } + task := keys[0] + downSchema := keys[1] + downTable := keys[2] + + if _, ok := ddlsMap[task]; !ok { + ddlsMap[task] = make(map[string]map[string][]string) + } + if _, ok := ddlsMap[task][downSchema]; !ok { + ddlsMap[task][downSchema] = make(map[string][]string) + } + ddlsMap[task][downSchema][downTable] = ddls + } + + return ddlsMap, resp.Header.Revision, nil +} diff --git a/pkg/shardddl/pessimism/ddls_test.go b/pkg/shardddl/pessimism/ddls_test.go new file mode 100644 index 0000000000..69dff0cbe0 --- /dev/null +++ b/pkg/shardddl/pessimism/ddls_test.go @@ -0,0 +1,58 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pessimism + +import ( + . "github.com/pingcap/check" + + "github.com/pingcap/dm/pkg/utils" +) + +func (t *testForEtcd) TestDDLsEtcd(c *C) { + defer clearTestInfoOperation(c) + + var ( + ID1 = "test1-`foo`.`bar`" + ID2 = "test2-`foo`.`bar`" + task1, downSchema1, downTable1 = utils.ExtractAllFromLockID(ID1) + task2, downSchema2, downTable2 = utils.ExtractAllFromLockID(ID2) + DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 INT"} + DDLs2 = []string{"ALTER TABLE bar ADD COLUMN c2 INT"} + ) + + // put the same keys twice. + rev1, err := PutLatestDoneDDLs(etcdTestCli, task1, downSchema1, downTable1, DDLs1) + c.Assert(err, IsNil) + rev2, err := PutLatestDoneDDLs(etcdTestCli, task1, downSchema1, downTable1, DDLs1) + c.Assert(err, IsNil) + c.Assert(rev2, Greater, rev1) + + // put another DDLs + rev3, err := PutLatestDoneDDLs(etcdTestCli, task1, downSchema1, downTable1, DDLs2) + c.Assert(err, IsNil) + c.Assert(rev3, Greater, rev2) + + // put for another lock + rev4, err := PutLatestDoneDDLs(etcdTestCli, task2, downSchema2, downTable2, DDLs1) + c.Assert(err, IsNil) + c.Assert(rev4, Greater, rev3) + + // get all ddls. + latestDoneDDLsMap, rev5, err := GetAllLatestDoneDDLs(etcdTestCli) + c.Assert(err, IsNil) + c.Assert(rev5, Equals, rev4) + c.Assert(latestDoneDDLsMap, HasLen, 2) + c.Assert(latestDoneDDLsMap[task1][downSchema1][downTable1], DeepEquals, DDLs2) + c.Assert(latestDoneDDLsMap[task2][downSchema2][downTable2], DeepEquals, DDLs1) +} diff --git a/pkg/shardddl/pessimism/info_test.go b/pkg/shardddl/pessimism/info_test.go index c55f706ee9..c79fb527be 100644 --- a/pkg/shardddl/pessimism/info_test.go +++ b/pkg/shardddl/pessimism/info_test.go @@ -170,7 +170,7 @@ func (t *testForEtcd) TestPutInfoIfOpNotDone(c *C) { DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 INT"} ID = fmt.Sprintf("%s-%s", task, dbutil.TableName(schema, table)) info = NewInfo(task, source, schema, table, DDLs) - op = NewOperation(ID, task, source, DDLs, false, false) + op = NewOperation(ID, task, source, DDLs, false, false, false) ) // put info success because no operation exist. diff --git a/pkg/shardddl/pessimism/keeper.go b/pkg/shardddl/pessimism/keeper.go index 50427b1b53..b6634e68cc 100644 --- a/pkg/shardddl/pessimism/keeper.go +++ b/pkg/shardddl/pessimism/keeper.go @@ -16,6 +16,8 @@ package pessimism import ( "sync" + "go.etcd.io/etcd/clientv3" + "github.com/pingcap/dm/dm/master/metrics" "github.com/pingcap/dm/pkg/utils" ) @@ -23,19 +25,21 @@ import ( // LockKeeper used to keep and handle DDL lock conveniently. // The lock information do not need to be persistent, and can be re-constructed from the shard DDL info. type LockKeeper struct { - mu sync.RWMutex - locks map[string]*Lock // lockID -> Lock + mu sync.RWMutex + locks map[string]*Lock // lockID -> Lock + latestDoneDDLs map[string]map[string]map[string][]string // task -> downSchema -> downTable -> ddls } // NewLockKeeper creates a new LockKeeper instance. func NewLockKeeper() *LockKeeper { return &LockKeeper{ - locks: make(map[string]*Lock), + locks: make(map[string]*Lock), + latestDoneDDLs: make(map[string]map[string]map[string][]string), } } // TrySync tries to sync the lock. -func (lk *LockKeeper) TrySync(info Info, sources []string) (string, bool, int, error) { +func (lk *LockKeeper) TrySync(cli *clientv3.Client, info Info, sources []string) (string, bool, int, error) { var ( lockID = genDDLLockID(info) l *Lock @@ -50,10 +54,53 @@ func (lk *LockKeeper) TrySync(info Info, sources []string) (string, bool, int, e l = lk.locks[lockID] } - synced, remain, err := l.TrySync(info.Source, info.DDLs, sources) + synced, remain, err := l.TrySync(cli, info.Source, info.DDLs, sources, lk.GetLatestDoneDDLs(info.Task, info.Schema, info.Table)) return lockID, synced, remain, err } +// AddAllLatestDoneDDLs add all last done ddls. +func (lk *LockKeeper) AddAllLatestDoneDDLs(latestDoneDDLs map[string]map[string]map[string][]string) { + lk.mu.Lock() + defer lk.mu.Unlock() + lk.latestDoneDDLs = latestDoneDDLs +} + +// AddLatestDoneDDLs add last done ddls by lockID. +func (lk *LockKeeper) AddLatestDoneDDLs(lockID string, ddls []string) { + lk.mu.Lock() + defer lk.mu.Unlock() + task, downSchema, downTable := utils.ExtractAllFromLockID(lockID) + if _, ok := lk.latestDoneDDLs[task]; !ok { + lk.latestDoneDDLs[task] = make(map[string]map[string][]string) + } + if _, ok := lk.latestDoneDDLs[task][downSchema]; !ok { + lk.latestDoneDDLs[task][downSchema] = make(map[string][]string) + } + lk.latestDoneDDLs[task][downSchema][downTable] = ddls +} + +// RemoveLatestDoneDDLsByTask remove last done ddls by task. +func (lk *LockKeeper) RemoveLatestDoneDDLsByTask(task string) { + lk.mu.Lock() + defer lk.mu.Unlock() + delete(lk.latestDoneDDLs, task) +} + +// GetLatestDoneDDLs gets last done ddls by lockID. +func (lk *LockKeeper) GetLatestDoneDDLs(task, downSchema, downTable string) []string { + if _, ok := lk.latestDoneDDLs[task]; !ok { + return nil + } + if _, ok := lk.latestDoneDDLs[task][downSchema]; !ok { + return nil + } + latestDoneDDLs, ok := lk.latestDoneDDLs[task][downSchema][downTable] + if !ok { + return nil + } + return latestDoneDDLs +} + // RemoveLock removes a lock. func (lk *LockKeeper) RemoveLock(lockID string) bool { lk.mu.Lock() diff --git a/pkg/shardddl/pessimism/keeper_test.go b/pkg/shardddl/pessimism/keeper_test.go index da532c9438..a9a680f0bf 100644 --- a/pkg/shardddl/pessimism/keeper_test.go +++ b/pkg/shardddl/pessimism/keeper_test.go @@ -37,19 +37,19 @@ func (t *testLockKeeper) TestLockKeeper(c *C) { ) // lock with 2 sources. - lockID1, synced, remain, err := lk.TrySync(info11, []string{source1, source2}) + lockID1, synced, remain, err := lk.TrySync(etcdTestCli, info11, []string{source1, source2}) c.Assert(err, IsNil) c.Assert(lockID1, Equals, "task1-`foo`.`bar`") c.Assert(synced, IsFalse) c.Assert(remain, Equals, 1) - lockID1, synced, remain, err = lk.TrySync(info12, []string{source1, source2}) + lockID1, synced, remain, err = lk.TrySync(etcdTestCli, info12, []string{source1, source2}) c.Assert(err, IsNil) c.Assert(lockID1, Equals, "task1-`foo`.`bar`") c.Assert(synced, IsTrue) c.Assert(remain, Equals, 0) // lock with only 1 source. - lockID2, synced, remain, err := lk.TrySync(info21, []string{source1}) + lockID2, synced, remain, err := lk.TrySync(etcdTestCli, info21, []string{source1}) c.Assert(err, IsNil) c.Assert(lockID2, Equals, "task2-`foo`.`bar`") c.Assert(synced, IsTrue) diff --git a/pkg/shardddl/pessimism/lock.go b/pkg/shardddl/pessimism/lock.go index 034dde731f..c3a44b9638 100644 --- a/pkg/shardddl/pessimism/lock.go +++ b/pkg/shardddl/pessimism/lock.go @@ -16,7 +16,11 @@ package pessimism import ( "sync" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" + "github.com/pingcap/dm/dm/master/metrics" + "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" ) @@ -64,13 +68,47 @@ func NewLock(id, task, owner string, ddls, sources []string) *Lock { // TrySync tries to sync the lock, does decrease on remain, re-entrant. // new upstream sources may join when the DDL lock is in syncing, // so we need to merge these new sources. -func (l *Lock) TrySync(caller string, ddls, sources []string) (bool, int, error) { +func (l *Lock) TrySync(cli *clientv3.Client, caller string, ddls, sources, latestDoneDDLs []string) (bool, int, error) { l.mu.Lock() defer l.mu.Unlock() // check DDL statement first. if !utils.CompareShardingDDLs(ddls, l.DDLs) { - return l.remain <= 0, l.remain, terror.ErrMasterShardingDDLDiff.Generate(l.DDLs, ddls) + curIdempotent := utils.CompareShardingDDLs(latestDoneDDLs, ddls) + // handle conflict + if !curIdempotent && !utils.CompareShardingDDLs(latestDoneDDLs, l.DDLs) { + return l.remain <= 0, l.remain, terror.ErrMasterShardingDDLDiff.Generate(l.DDLs, ddls) + } + + // current ddls idempotent, skip it. + if curIdempotent { + log.L().Warn("conflict ddls equals last done ddls, skip it", zap.Strings("ddls", ddls), zap.String("source", caller)) + _, _, err := PutOperations(cli, true, NewOperation(l.ID, l.Task, caller, latestDoneDDLs, false, false, true)) + return l.remain <= 0, l.remain, err + } + + // other sources' ddls idempotent, skip them. + readySources := make([]string, 0, len(l.ready)) + ops := make([]Operation, 0, len(l.ready)) + for source, isReady := range l.ready { + if isReady { + readySources = append(readySources, source) + ops = append(ops, NewOperation(l.ID, l.Task, source, latestDoneDDLs, false, false, true)) + } + } + + log.L().Warn("conflict ddls equals last done ddls, skip them", zap.Strings("ddls", ddls), zap.Strings("sources", readySources)) + if _, _, err := PutOperations(cli, true, ops...); err != nil { + return l.remain <= 0, l.remain, err + } + + // revert ready + for _, source := range readySources { + l.ready[source] = false + l.remain++ + } + l.Owner = caller + l.DDLs = ddls } // try to merge any newly joined sources. diff --git a/pkg/shardddl/pessimism/lock_test.go b/pkg/shardddl/pessimism/lock_test.go index 59f1cb7562..5dab158e8f 100644 --- a/pkg/shardddl/pessimism/lock_test.go +++ b/pkg/shardddl/pessimism/lock_test.go @@ -40,7 +40,7 @@ func (t *testLock) TestLock(c *C) { l1 := NewLock(ID, task, source1, DDLs, []string{source1}) // DDLs mismatch. - synced, remain, err := l1.TrySync(source1, DDLs[1:], []string{source1}) + synced, remain, err := l1.TrySync(etcdTestCli, source1, DDLs[1:], []string{source1}, nil) c.Assert(terror.ErrMasterShardingDDLDiff.Equal(err), IsTrue) c.Assert(synced, IsFalse) c.Assert(remain, Equals, 1) @@ -51,7 +51,7 @@ func (t *testLock) TestLock(c *C) { c.Assert(l1.IsResolved(), IsFalse) // synced. - synced, remain, err = l1.TrySync(source1, DDLs, []string{source1}) + synced, remain, err = l1.TrySync(etcdTestCli, source1, DDLs, []string{source1}, nil) c.Assert(err, IsNil) c.Assert(synced, IsTrue) c.Assert(remain, Equals, 0) @@ -70,7 +70,7 @@ func (t *testLock) TestLock(c *C) { l2 := NewLock(ID, task, source1, DDLs, []string{source1, source2}) // join a new source. - synced, remain, err = l2.TrySync(source1, DDLs, []string{source2, source3}) + synced, remain, err = l2.TrySync(etcdTestCli, source1, DDLs, []string{source2, source3}, nil) c.Assert(err, IsNil) c.Assert(synced, IsFalse) c.Assert(remain, Equals, 2) @@ -81,7 +81,7 @@ func (t *testLock) TestLock(c *C) { }) // sync other sources. - synced, remain, err = l2.TrySync(source2, DDLs, []string{}) + synced, remain, err = l2.TrySync(etcdTestCli, source2, DDLs, []string{}, nil) c.Assert(err, IsNil) c.Assert(synced, IsFalse) c.Assert(remain, Equals, 1) @@ -90,7 +90,7 @@ func (t *testLock) TestLock(c *C) { source2: true, source3: false, }) - synced, remain, err = l2.TrySync(source3, DDLs, nil) + synced, remain, err = l2.TrySync(etcdTestCli, source3, DDLs, nil, nil) c.Assert(err, IsNil) c.Assert(synced, IsTrue) c.Assert(remain, Equals, 0) diff --git a/pkg/shardddl/pessimism/operation.go b/pkg/shardddl/pessimism/operation.go index 8520e345ca..7e5e9eee12 100644 --- a/pkg/shardddl/pessimism/operation.go +++ b/pkg/shardddl/pessimism/operation.go @@ -35,6 +35,7 @@ type Operation struct { DDLs []string `json:"ddls"` // DDL statements Exec bool `json:"exec"` // execute or skip the DDL statements Done bool `json:"done"` // whether the `Exec` operation has done + Skip bool `json:"skip"` // Whether worker skip this operation // only used to report to the caller of the watcher, do not marsh it. // if it's true, it means the Operation has been deleted in etcd. @@ -42,7 +43,7 @@ type Operation struct { } // NewOperation creates a new Operation instance. -func NewOperation(id, task, source string, ddls []string, exec, done bool) Operation { +func NewOperation(id, task, source string, ddls []string, exec, done, skip bool) Operation { return Operation{ ID: id, Task: task, @@ -50,6 +51,7 @@ func NewOperation(id, task, source string, ddls []string, exec, done bool) Opera DDLs: ddls, Exec: exec, Done: done, + Skip: skip, } } diff --git a/pkg/shardddl/pessimism/operation_test.go b/pkg/shardddl/pessimism/operation_test.go index 1e37381d92..0511b742a1 100644 --- a/pkg/shardddl/pessimism/operation_test.go +++ b/pkg/shardddl/pessimism/operation_test.go @@ -25,11 +25,11 @@ import ( func (t *testForEtcd) TestOperationJSON(c *C) { o1 := NewOperation("test-ID", "test", "mysql-replica-1", []string{ "ALTER TABLE bar ADD COLUMN c1 INT", - }, true, false) + }, true, false, false) j, err := o1.toJSON() c.Assert(err, IsNil) - c.Assert(j, Equals, `{"id":"test-ID","task":"test","source":"mysql-replica-1","ddls":["ALTER TABLE bar ADD COLUMN c1 INT"],"exec":true,"done":false}`) + c.Assert(j, Equals, `{"id":"test-ID","task":"test","source":"mysql-replica-1","ddls":["ALTER TABLE bar ADD COLUMN c1 INT"],"exec":true,"done":false,"skip":false}`) c.Assert(j, Equals, o1.String()) o2, err := operationFromJSON(j) @@ -49,10 +49,10 @@ func (t *testForEtcd) TestOperationEtcd(c *C) { source2 = "mysql-replica-2" source3 = "mysql-replica-3" DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 INT"} - op11 = NewOperation(ID1, task1, source1, DDLs, true, false) - op12 = NewOperation(ID1, task1, source2, DDLs, true, false) - op13 = NewOperation(ID1, task1, source3, DDLs, true, false) - op21 = NewOperation(ID2, task2, source1, DDLs, false, true) + op11 = NewOperation(ID1, task1, source1, DDLs, true, false, false) + op12 = NewOperation(ID1, task1, source2, DDLs, true, false, false) + op13 = NewOperation(ID1, task1, source3, DDLs, true, false, false) + op21 = NewOperation(ID2, task2, source1, DDLs, false, true, false) ) // put the same keys twice. diff --git a/pkg/shardddl/pessimism/ops.go b/pkg/shardddl/pessimism/ops.go index eb6b4f5a4e..c4bde54f04 100644 --- a/pkg/shardddl/pessimism/ops.go +++ b/pkg/shardddl/pessimism/ops.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/dm/dm/common" "github.com/pingcap/dm/pkg/etcdutil" + "github.com/pingcap/dm/pkg/utils" ) // TODO(csuzhangxc): assign terror code before merged into the master branch. @@ -60,12 +61,32 @@ func DeleteInfosOperations(cli *clientv3.Client, infos []Info, ops []Operation) return rev, err } -// DeleteInfosOperationsByTask deletes the shard DDL infos and operations of a specified task in etcd. +// DeleteOperationsPutDDLs deletes the shard DDL operations and add latest done DDLs in etcd. +// This function should often be called by DM-master when calling UnlockDDL and the lock is resolved. +func DeleteOperationsPutDDLs(cli *clientv3.Client, lockID string, ops []Operation, ddls []string) (int64, error) { + etcdOps := make([]clientv3.Op, 0, len(ops)) + for _, op := range ops { + etcdOps = append(etcdOps, deleteOperationOp(op)) + } + + task, downSchema, downTable := utils.ExtractAllFromLockID(lockID) + putOp, err := putLatestDoneDDLsOp(task, downSchema, downTable, ddls) + if err != nil { + return 0, err + } + + etcdOps = append(etcdOps, putOp) + _, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, etcdOps...) + return rev, err +} + +// DeleteInfosOperationsDDLsByTask deletes the shard DDL infos and operations of a specified task in etcd. // This function should often be called by DM-master when deleting ddl meta data. -func DeleteInfosOperationsByTask(cli *clientv3.Client, task string) (int64, error) { +func DeleteInfosOperationsDDLsByTask(cli *clientv3.Client, task string) (int64, error) { opsDel := make([]clientv3.Op, 0, 2) opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLPessimismInfoKeyAdapter.Encode(task), clientv3.WithPrefix())) opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLPessimismOperationKeyAdapter.Encode(task), clientv3.WithPrefix())) + opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLPessimismDDLsKeyAdapter.Encode(task), clientv3.WithPrefix())) _, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, opsDel...) return rev, err } diff --git a/pkg/shardddl/pessimism/ops_test.go b/pkg/shardddl/pessimism/ops_test.go index 7519175ad2..c9725d6aca 100644 --- a/pkg/shardddl/pessimism/ops_test.go +++ b/pkg/shardddl/pessimism/ops_test.go @@ -25,7 +25,7 @@ func (t *testForEtcd) TestPutOperationDeleteInfo(c *C) { source = "mysql-replica-1" DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 INT"} info = NewInfo(task, source, "foo", "bar", DDLs) - op = NewOperation("test-ID", task, source, DDLs, true, false) + op = NewOperation("test-ID", task, source, DDLs, true, false, false) ) // put info. diff --git a/pkg/utils/common.go b/pkg/utils/common.go index 65ef2711f7..b9b5d267c1 100644 --- a/pkg/utils/common.go +++ b/pkg/utils/common.go @@ -193,6 +193,16 @@ func ExtractDBAndTableFromLockID(lockID string) (string, string) { return strs[2], strs[3] } +// ExtractAllFromLockID extract task, downSchema, downTable from lockID. +func ExtractAllFromLockID(lockID string) (string, string, string) { + strs := lockIDPattern.FindStringSubmatch(lockID) + // strs should be [full-lock-ID, task, db, table] if successful matched + if len(strs) < 4 { + return "", "", "" + } + return strs[1], strs[2], strs[3] +} + // NonRepeatStringsEqual is used to compare two un-ordered, non-repeat-element string slice is equal. func NonRepeatStringsEqual(a, b []string) bool { if len(a) != len(b) { diff --git a/pkg/utils/common_test.go b/pkg/utils/common_test.go index 5b2b8de634..b0830ef9be 100644 --- a/pkg/utils/common_test.go +++ b/pkg/utils/common_test.go @@ -246,6 +246,13 @@ func (s *testCommonSuite) TestDDLLockID(c *C) { c.Assert(ID, Equals, "test-`d``b`.`tb``l`") c.Assert(ExtractTaskFromLockID(ID), Equals, task) + ID = GenDDLLockID("test-task", "d-b", "tb-l") + c.Assert(ID, Equals, "test-task-`d-b`.`tb-l`") + t, ds, dt := ExtractAllFromLockID(ID) + c.Assert(t, Equals, "test-task") + c.Assert(ds, Equals, "d-b") + c.Assert(dt, Equals, "tb-l") + // invalid ID c.Assert(ExtractTaskFromLockID("invalid-lock-id"), Equals, "") } diff --git a/syncer/shardddl/pessimist.go b/syncer/shardddl/pessimist.go index 524c8aee24..46adeb974a 100644 --- a/syncer/shardddl/pessimist.go +++ b/syncer/shardddl/pessimist.go @@ -129,6 +129,12 @@ func (p *Pessimist) GetOperation(ctx context.Context, info pessimism.Info, rev i } } +// DeleteInfosOperations deletes the shard DDL infos and operations in etcd. +// This function should often be called by DM-worker when skip operation. +func (p *Pessimist) DeleteInfosOperations(infos []pessimism.Info, ops []pessimism.Operation) (int64, error) { + return pessimism.DeleteInfosOperations(p.cli, infos, ops) +} + // DoneOperationDeleteInfo marks the shard DDL lock operation as done and delete the shard DDL info. func (p *Pessimist) DoneOperationDeleteInfo(op pessimism.Operation, info pessimism.Info) error { op.Done = true // mark the operation as `done`. diff --git a/syncer/shardddl/pessimist_test.go b/syncer/shardddl/pessimist_test.go index 6b01e8da8e..5f33d0806e 100644 --- a/syncer/shardddl/pessimist_test.go +++ b/syncer/shardddl/pessimist_test.go @@ -65,7 +65,7 @@ func (t *testPessimist) TestPessimist(c *C) { schema, table = "foo", "bar" DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 INT"} ID = "task-`foo`.`bar`" - op = pessimism.NewOperation(ID, task, source, DDLs, true, false) + op = pessimism.NewOperation(ID, task, source, DDLs, true, false, false) logger = log.L() p = NewPessimist(&logger, etcdTestCli, task, source) diff --git a/syncer/syncer.go b/syncer/syncer.go index 1b7dc2ca51..33829e3f87 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -980,8 +980,15 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, tctx.L().Warn("skip shard DDL handle in pessimistic shard mode", zap.Strings("ddl", sqlJob.ddls)) case shardPessimistOp == nil: err = terror.ErrWorkerDDLLockOpNotFound.Generate(shardInfo) + case shardPessimistOp.Skip: + tctx.L().Warn("skip shard DDL operation in pessimistic shard mode", zap.Strings("ddl", sqlJob.ddls)) + _, err = s.pessimist.DeleteInfosOperations([]pessimism.Info{*shardInfo}, []pessimism.Operation{*shardPessimistOp}) default: err = s.pessimist.DoneOperationDeleteInfo(*shardPessimistOp, *shardInfo) + failpoint.Inject("ErrorAfterOpDone", func() { + tctx.L().Warn("error after operation done", zap.Strings("DDL", sqlJob.ddls), zap.String("failpoint", "ErrorAfterOpDone")) + err = errors.Errorf("error after operation done") + }) } case config.ShardOptimistic: shardInfo := s.optimist.PendingInfo() diff --git a/tests/shardddl2/run.sh b/tests/shardddl2/run.sh index 7bfaacc72f..92b518059e 100644 --- a/tests/shardddl2/run.sh +++ b/tests/shardddl2/run.sh @@ -463,7 +463,7 @@ function DM_058() { run_case 058 "double-source-optimistic" "init_table 111 211 212" "clean_table" "optimistic" } -function DM_059_CASE { +function DM_059_CASE() { run_sql_source1 "insert into ${shardddl1}.${tb1} (id) values(1);" run_sql_source2 "insert into ${shardddl1}.${tb1} (id) values(2);" run_sql_source2 "insert into ${shardddl1}.${tb2} (id) values(3);" @@ -665,7 +665,7 @@ function DM_067() { run_case 067 "double-source-optimistic" "init_table 111 211 212" "clean_table" "optimistic" } -function DM_068_CASE { +function DM_068_CASE() { run_sql_source1 "alter table ${shardddl1}.${tb1} modify id datetime default now();" run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,now());" run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,now());" @@ -694,7 +694,7 @@ function DM_068() { "clean_table" "optimistic" } -function DM_ADD_DROP_COLUMNS_CASE { +function DM_ADD_DROP_COLUMNS_CASE() { # add cols run_sql_source1 "alter table ${shardddl1}.${tb1} add column col1 int, add column col2 int, add column col3 int;" run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,now(),1,1,1);" @@ -767,7 +767,7 @@ function DM_ADD_DROP_COLUMNS() { "clean_table" "optimistic" } -function DM_COLUMN_INDEX_CASE { +function DM_COLUMN_INDEX_CASE() { # add col and index run_sql_source1 "alter table ${shardddl1}.${tb1} add column col3 int, add index idx_col1(col1);" run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,1,1,1);" @@ -1030,6 +1030,78 @@ function DM_DROP_COLUMN_ALL_DONE() { "clean_table" "optimistic" } +function DM_PESSIMISTIC_LAST_DONE_CASE() { + run_sql_source1 "alter table ${shardddl1}.${tb1} add column b varchar(10);" + run_sql_source2 "alter table ${shardddl1}.${tb1} add column b varchar(10);" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(1,'aaa');" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,'bbb');" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # get worker of source1 + w="1" + got=$(grep "mysql-replica-01" $WORK_DIR/worker1/log/dm-worker.log | wc -l) + if [[ "$got" = "0" ]]; then + w="2" + fi + + restart_worker $w "github.com/pingcap/dm/syncer/ErrorAfterOpDone=return()" + + run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;" + # make sure source1 put info + check_log_contain_with_retry "putted shard DDL info" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log + run_sql_source2 "alter table ${shardddl1}.${tb1} drop column b;" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "error after operation done" 1 + + # case1: source2 put info, source1 reput last done ddl. + if [[ "$2" == "1" ]]; then + run_sql_source2 "alter table ${shardddl1}.${tb1} add column b varchar(10);" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "ALTER TABLE .* ADD COLUMN" 2 + fi + + restart_master + restart_worker $w "" + + # case2: source1 reput last done ddl, source2 put info. + if [[ "$2" == "2" ]]; then + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "ALTER TABLE .* DROP COLUMN" 2 + run_sql_source2 "alter table ${shardddl1}.${tb1} add column b varchar(10);" + fi + + # conflict happen, skip source1's drop column statement + check_log_contain_with_retry "ddls equals last done ddls, skip" $WORK_DIR/master/log/dm-master.log + check_log_contain_with_retry "skip shard DDL operation in pessimistic shard mode" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log + + restart_master + + run_sql_source1 "alter table ${shardddl1}.${tb1} add column b varchar(10);" + + run_sql_source1 "insert into ${shardddl1}.${tb1} values(3,'ccc');" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(4,'ddd');" + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"result\": true" 3 + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml +} + +function DM_PESSIMISTIC_LAST_DONE() { + run_case PESSIMISTIC_LAST_DONE "double-source-pessimistic" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key);\"; \ + run_sql_source2 \"create table ${shardddl1}.${tb1} (a int primary key);\"" \ + "clean_table" "pessimistic" 1 + + run_case PESSIMISTIC_LAST_DONE "double-source-pessimistic" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key);\"; \ + run_sql_source2 \"create table ${shardddl1}.${tb1} (a int primary key);\"" \ + "clean_table" "pessimistic" 2 +} + function run() { init_cluster init_database @@ -1048,6 +1120,7 @@ function run() { DM_INIT_SCHEMA DM_DROP_COLUMN_EXEC_ERROR DM_DROP_COLUMN_ALL_DONE + DM_PESSIMISTIC_LAST_DONE } cleanup_data $shardddl