Skip to content

Commit

Permalink
fix: database schema version (#11868)
Browse files Browse the repository at this point in the history
  • Loading branch information
d-bytebase committed Apr 29, 2024
1 parent 174716c commit bc865e6
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 31 deletions.
12 changes: 0 additions & 12 deletions backend/runner/schemasync/syncer.go
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/bytebase/bytebase/backend/plugin/db"
"github.com/bytebase/bytebase/backend/store"
"github.com/bytebase/bytebase/backend/store/model"
"github.com/bytebase/bytebase/backend/utils"
storepb "github.com/bytebase/bytebase/proto/generated-go/store"
)

Expand Down Expand Up @@ -339,24 +338,13 @@ func (s *Syncer) SyncDatabaseSchema(ctx context.Context, database *store.Databas
}
setClassificationAndUserCommentFromComment(databaseMetadata)

var patchSchemaVersion *model.Version
if force {
// When there are too many databases, this might have performance issue.
schemaVersion, err := utils.GetLatestSchemaVersion(ctx, s.store, instance.UID, database.UID, databaseMetadata.Name)
if err != nil {
return errors.Wrapf(err, "failed to get latest schema version for database %q", database.DatabaseName)
}
patchSchemaVersion = &schemaVersion
}

syncStatus := api.OK
ts := time.Now().Unix()
if _, err := s.store.UpdateDatabase(ctx, &store.UpdateDatabaseMessage{
InstanceID: database.InstanceID,
DatabaseName: database.DatabaseName,
SyncState: &syncStatus,
SuccessfulSyncTimeTs: &ts,
SchemaVersion: patchSchemaVersion,
MetadataUpsert: &storepb.DatabaseMetadata{
LastSyncTime: timestamppb.New(time.Unix(ts, 0)),
},
Expand Down
22 changes: 21 additions & 1 deletion backend/runner/taskrun/database_create_executor.go
Expand Up @@ -498,7 +498,7 @@ func (*DatabaseCreateExecutor) getSchemaFromPeerTenantDatabase(ctx context.Conte
return nil, model.Version{}, "", err
}
defer driver.Close(ctx)
schemaVersion, err := utils.GetLatestSchemaVersion(ctx, stores, similarDBInstance.UID, similarDB.UID, similarDB.DatabaseName)
schemaVersion, err := getLatestDoneSchemaVersion(ctx, stores, similarDBInstance.UID, similarDB.UID, similarDB.DatabaseName)
if err != nil {
return nil, model.Version{}, "", errors.Wrapf(err, "failed to get migration history for database %q", similarDB.DatabaseName)
}
Expand All @@ -510,6 +510,26 @@ func (*DatabaseCreateExecutor) getSchemaFromPeerTenantDatabase(ctx context.Conte
return similarDB, schemaVersion, schemaBuf.String(), nil
}

// GetLatestDoneSchemaVersion gets the latest schema version for a database.
func getLatestDoneSchemaVersion(ctx context.Context, stores *store.Store, instanceID int, databaseID int, databaseName string) (model.Version, error) {
// TODO(d): support semantic versioning.
limit := 1
done := db.Done
history, err := stores.ListInstanceChangeHistory(ctx, &store.FindInstanceChangeHistoryMessage{
InstanceID: &instanceID,
DatabaseID: &databaseID,
Status: &done,
Limit: &limit,
})
if err != nil {
return model.Version{}, errors.Wrapf(err, "failed to get migration history for database %q", databaseName)
}
if len(history) == 0 {
return model.Version{}, nil
}
return history[0].Version, nil
}

func getPeerTenantDatabase(databaseMatrix [][]*store.DatabaseMessage, environmentID string) *store.DatabaseMessage {
var similarDB *store.DatabaseMessage
// We try to use an existing tenant with the same environment, if possible.
Expand Down
4 changes: 4 additions & 0 deletions backend/store/instance_change_history.go
Expand Up @@ -65,6 +65,7 @@ type FindInstanceChangeHistoryMessage struct {
DatabaseID *int
SheetID *int
Source *db.MigrationSource
Status *db.MigrationStatus
Version *model.Version
TypeList []db.MigrationType
ResourcesFilter *string
Expand Down Expand Up @@ -467,6 +468,9 @@ func (s *Store) ListInstanceChangeHistory(ctx context.Context, find *FindInstanc
if v := find.Source; v != nil {
where, args = append(where, fmt.Sprintf("instance_change_history.source = $%d", len(args)+1)), append(args, *v)
}
if v := find.Status; v != nil {
where, args = append(where, fmt.Sprintf("instance_change_history.status = $%d", len(args)+1)), append(args, *v)
}
if v := find.Version; v != nil {
storedVersion, err := find.Version.Marshal()
if err != nil {
Expand Down
18 changes: 0 additions & 18 deletions backend/utils/utils.go
Expand Up @@ -39,24 +39,6 @@ import (
v1pb "github.com/bytebase/bytebase/proto/generated-go/v1"
)

// GetLatestSchemaVersion gets the latest schema version for a database.
func GetLatestSchemaVersion(ctx context.Context, stores *store.Store, instanceID int, databaseID int, databaseName string) (model.Version, error) {
// TODO(d): support semantic versioning.
limit := 1
history, err := stores.ListInstanceChangeHistory(ctx, &store.FindInstanceChangeHistoryMessage{
InstanceID: &instanceID,
DatabaseID: &databaseID,
Limit: &limit,
})
if err != nil {
return model.Version{}, errors.Wrapf(err, "failed to get migration history for database %q", databaseName)
}
if len(history) == 0 {
return model.Version{}, nil
}
return history[0].Version, nil
}

// DataSourceFromInstanceWithType gets a typed data source from an instance.
func DataSourceFromInstanceWithType(instance *store.InstanceMessage, dataSourceType api.DataSourceType) *store.DataSourceMessage {
for _, dataSource := range instance.DataSources {
Expand Down

0 comments on commit bc865e6

Please sign in to comment.