Skip to content

Commit

Permalink
lightning: skip CREATE DATABASE when downstream exists, and return er…
Browse files Browse the repository at this point in the history
…ror when parse failed (#51801) (#52065)

close #51800
  • Loading branch information
ti-chi-bot committed Apr 2, 2024
1 parent e4108d2 commit 9815b45
Show file tree
Hide file tree
Showing 23 changed files with 358 additions and 210 deletions.
7 changes: 7 additions & 0 deletions DEPS.bzl
Expand Up @@ -5483,6 +5483,13 @@ def go_deps():
sum = "h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=",
version = "v1.2.0",
)
go_repository(
name = "org_uber_go_mock",
build_file_proto_mode = "disable",
importpath = "go.uber.org/mock",
sum = "h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=",
version = "v0.4.0",
)
go_repository(
name = "org_uber_go_multierr",
build_file_proto_mode = "disable_global",
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/BUILD.bazel
Expand Up @@ -34,10 +34,10 @@ go_test(
"//br/pkg/mock",
"//parser/mysql",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_golang_mock//gomock",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_mock//gomock",
],
)
8 changes: 8 additions & 0 deletions br/pkg/lightning/backend/backend.go
Expand Up @@ -127,6 +127,10 @@ type CheckCtx struct {

// TargetInfoGetter defines the interfaces to get target information.
type TargetInfoGetter interface {
// FetchRemoteDBModels obtains the models of all databases. Currently, only
// the database name is filled.
FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error)

// FetchRemoteTableModels obtains the models of all tables given the schema
// name. The returned table info does not need to be precise if the encoder,
// is not requiring them, but must at least fill in the following fields for
Expand Down Expand Up @@ -285,6 +289,10 @@ func (be Backend) FetchRemoteTableModels(ctx context.Context, schemaName string)
return be.abstract.FetchRemoteTableModels(ctx, schemaName)
}

func (be Backend) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) {
return be.abstract.FetchRemoteDBModels(ctx)
}

func (be Backend) FlushAll(ctx context.Context) error {
return be.abstract.FlushAllEngines(ctx)
}
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/backend_test.go
Expand Up @@ -7,7 +7,6 @@ import (
"time"

gmysql "github.com/go-sql-driver/mysql"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
Expand All @@ -16,18 +15,19 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/mock/gomock"
)

type backendSuite struct {
controller *gomock.Controller
mockBackend *mock.MockBackend
mockBackend *mock.MockAbstractBackend
backend backend.Backend
ts uint64
}

func createBackendSuite(c gomock.TestReporter) *backendSuite {
controller := gomock.NewController(c)
mockBackend := mock.NewMockBackend(controller)
mockBackend := mock.NewMockAbstractBackend(controller)
return &backendSuite{
controller: controller,
mockBackend: mockBackend,
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/BUILD.bazel
Expand Up @@ -128,7 +128,6 @@ go_test(
"@com_github_cockroachdb_pebble//sstable",
"@com_github_coreos_go_semver//semver",
"@com_github_docker_go_units//:go-units",
"@com_github_golang_mock//gomock",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand All @@ -142,6 +141,7 @@ go_test(
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//:zap",
],
)
9 changes: 9 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Expand Up @@ -247,6 +247,11 @@ func NewTargetInfoGetter(tls *common.TLS, g glue.Glue, pdCli pd.Client) backend.
}
}

// FetchRemoteDBModels implements the `backend.TargetInfoGetter` interface.
func (g *targetInfoGetter) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) {
return tikv.FetchRemoteDBModelsFromTLS(ctx, g.tls)
}

// FetchRemoteTableModels obtains the models of all tables given the schema name.
// It implements the `TargetInfoGetter` interface.
func (g *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
Expand Down Expand Up @@ -1950,6 +1955,10 @@ func (local *local) CheckRequirements(ctx context.Context, checkCtx *backend.Che
return local.targetInfoGetter.CheckRequirements(ctx, checkCtx)
}

func (local *local) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) {
return local.targetInfoGetter.FetchRemoteDBModels(ctx)
}

func (local *local) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
return local.targetInfoGetter.FetchRemoteTableModels(ctx, schemaName)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local_check_test.go
Expand Up @@ -19,12 +19,12 @@ import (
"testing"

"github.com/coreos/go-semver/semver"
"github.com/golang/mock/gomock"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

func TestCheckRequirementsTiFlash(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/noop/noop.go
Expand Up @@ -93,6 +93,10 @@ func (b noopBackend) CheckRequirements(context.Context, *backend.CheckCtx) error
return nil
}

func (b noopBackend) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) {
return nil, nil
}

// FetchRemoteTableModels obtains the models of all tables given the schema
// name. The returned table info does not need to be precise if the encoder,
// is not requiring them, but must at least fill in the following fields for
Expand Down
36 changes: 36 additions & 0 deletions br/pkg/lightning/backend/tidb/tidb.go
Expand Up @@ -126,6 +126,38 @@ func NewTargetInfoGetter(db *sql.DB) backend.TargetInfoGetter {
}
}

// FetchRemoteDBModels implements the `backend.TargetInfoGetter` interface.
func (b *targetInfoGetter) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) {
results := []*model.DBInfo{}
logger := log.FromContext(ctx)
s := common.SQLWithRetry{
DB: b.db,
Logger: logger,
}
err := s.Transact(ctx, "fetch db models", func(_ context.Context, tx *sql.Tx) error {
results = results[:0]

rows, e := tx.Query("SHOW DATABASES")
if e != nil {
return e
}
defer rows.Close()

for rows.Next() {
var dbName string
if e := rows.Scan(&dbName); e != nil {
return e
}
dbInfo := &model.DBInfo{
Name: model.NewCIStr(dbName),
}
results = append(results, dbInfo)
}
return rows.Err()
})
return results, err
}

// FetchRemoteTableModels obtains the models of all tables given the schema name.
// It implements the `backend.TargetInfoGetter` interface.
// TODO: refactor
Expand Down Expand Up @@ -753,6 +785,10 @@ func (be *tidbBackend) execStmts(ctx context.Context, stmtTasks []stmtTask, tabl
return nil
}

func (be *tidbBackend) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) {
return be.targetInfoGetter.FetchRemoteDBModels(ctx)
}

func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
return be.targetInfoGetter.FetchRemoteTableModels(ctx, schemaName)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/BUILD.bazel
Expand Up @@ -155,7 +155,6 @@ go_test(
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_golang_mock//gomock",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand All @@ -173,6 +172,7 @@ go_test(
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//:zap",
],
)
10 changes: 5 additions & 5 deletions br/pkg/lightning/restore/chunk_restore_test.go
Expand Up @@ -25,7 +25,6 @@ import (
"sync"
"testing"

"github.com/golang/mock/gomock"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
Expand All @@ -50,6 +49,7 @@ import (
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/mock/gomock"
)

type chunkRestoreSuite struct {
Expand Down Expand Up @@ -95,7 +95,7 @@ func (s *chunkRestoreSuite) TearDownTest() {
func (s *chunkRestoreSuite) TestDeliverLoopCancel() {
controller := gomock.NewController(s.T())
defer controller.Finish()
mockBackend := mock.NewMockBackend(controller)
mockBackend := mock.NewMockAbstractBackend(controller)
mockBackend.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).AnyTimes()

rc := &Controller{backend: backend.MakeBackend(mockBackend)}
Expand All @@ -113,7 +113,7 @@ func (s *chunkRestoreSuite) TestDeliverLoopEmptyData() {

controller := gomock.NewController(s.T())
defer controller.Finish()
mockBackend := mock.NewMockBackend(controller)
mockBackend := mock.NewMockAbstractBackend(controller)
importer := backend.MakeBackend(mockBackend)

mockBackend.EXPECT().OpenEngine(ctx, gomock.Any(), gomock.Any()).Return(nil).Times(2)
Expand Down Expand Up @@ -168,7 +168,7 @@ func (s *chunkRestoreSuite) TestDeliverLoop() {

controller := gomock.NewController(s.T())
defer controller.Finish()
mockBackend := mock.NewMockBackend(controller)
mockBackend := mock.NewMockAbstractBackend(controller)
importer := backend.MakeBackend(mockBackend)

mockBackend.EXPECT().OpenEngine(ctx, gomock.Any(), gomock.Any()).Return(nil).Times(2)
Expand Down Expand Up @@ -627,7 +627,7 @@ func (s *chunkRestoreSuite) TestRestore() {

controller := gomock.NewController(s.T())
defer controller.Finish()
mockBackend := mock.NewMockBackend(controller)
mockBackend := mock.NewMockAbstractBackend(controller)
importer := backend.MakeBackend(mockBackend)

mockBackend.EXPECT().OpenEngine(ctx, gomock.Any(), gomock.Any()).Return(nil).Times(2)
Expand Down
13 changes: 13 additions & 0 deletions br/pkg/lightning/restore/get_pre_info.go
Expand Up @@ -86,6 +86,8 @@ type PreRestoreInfoGetter interface {

// TargetInfoGetter defines the operations to get information from target.
type TargetInfoGetter interface {
// FetchRemoteDBModels fetches the database structures from the remote target.
FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error)
// FetchRemoteTableModels fetches the table structures from the remote target.
FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error)
// CheckVersionRequirements performs the check whether the target satisfies the version requirements.
Expand Down Expand Up @@ -153,6 +155,11 @@ func NewTargetInfoGetterImpl(
}, nil
}

// FetchRemoteDBModels implements TargetInfoGetter.
func (g *TargetInfoGetterImpl) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) {
return g.backend.FetchRemoteDBModels(ctx)
}

// FetchRemoteTableModels fetches the table structures from the remote target.
// It implements the TargetInfoGetter interface.
func (g *TargetInfoGetterImpl) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
Expand Down Expand Up @@ -785,6 +792,12 @@ func (p *PreRestoreInfoGetterImpl) IsTableEmpty(ctx context.Context, schemaName
return p.targetInfoGetter.IsTableEmpty(ctx, schemaName, tableName)
}

// FetchRemoteDBModels fetches the database structures from the remote target.
// It implements the PreImportInfoGetter interface.
func (p *PreRestoreInfoGetterImpl) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) {
return p.targetInfoGetter.FetchRemoteDBModels(ctx)
}

// FetchRemoteTableModels fetches the table structures from the remote target.
// It implements the PreRestoreInfoGetter interface.
func (p *PreRestoreInfoGetterImpl) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/lightning/restore/mock/mock.go
Expand Up @@ -211,6 +211,15 @@ func (t *MockTargetInfo) SetTableInfo(schemaName string, tableName string, tblIn
t.dbTblInfoMap[schemaName][tableName] = tblInfo
}

// FetchRemoteDBModels implements the TargetInfoGetter interface.
func (t *MockTargetInfo) FetchRemoteDBModels(_ context.Context) ([]*model.DBInfo, error) {
resultInfos := []*model.DBInfo{}
for dbName := range t.dbTblInfoMap {
resultInfos = append(resultInfos, &model.DBInfo{Name: model.NewCIStr(dbName)})
}
return resultInfos, nil
}

// FetchRemoteTableModels fetches the table structures from the remote target.
// It implements the TargetInfoGetter interface.
func (t *MockTargetInfo) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
Expand Down

0 comments on commit 9815b45

Please sign in to comment.