diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index c08596d4..de510b55 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -1314,12 +1314,24 @@ func (j *Job) fullSync() error { // check table exists to ensure the idempotent if exist, err := j.IDest.CheckTableExistsByName(alias); err != nil { + log.Warnf("the tmp swap table %s is not exists return ", alias) return err } else if exist { log.Infof("fullsync swap table with alias, table: %s, alias: %s", targetName, alias) - swap := false // drop the old table - if err := j.IDest.ReplaceTable(alias, targetName, swap); err != nil { - return err + if target_exist, err2 := j.IDest.CheckTableExistsByName(targetName); err2 != nil { + log.Warnf("the dest table %s is check exists error ", targetName) + } else if target_exist { + log.Warnf("the dest table %s already exists, start replace", + targetName) + swap := false // drop the old table + if err := j.IDest.ReplaceTable(alias, targetName, swap); err != nil { + return err + } + } else { + log.Warnf("the dest table %s is not exists, start rename", targetName) + if err := j.IDest.RenameTableWithName(alias, targetName); err != nil { + return err + } } } else { log.Infof("fullsync the table alias has been swapped, table: %s, alias: %s", targetName, alias)