Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner/spannertest): restructure column alteration implementation #3616

Merged
merged 1 commit into from Jan 27, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 37 additions & 21 deletions spanner/spannertest/db.go
Expand Up @@ -696,6 +696,9 @@ func (t *table) alterColumn(alt spansql.AlterColumn) *status.Status {
// Enable or disable commit timestamps in value and primary key columns.
// https://cloud.google.com/spanner/docs/schema-updates#supported-updates

// TODO: codes.InvalidArgument is used throughout here for reporting errors,
// but that has not been validated against the real Spanner.

sct, ok := alt.Alteration.(spansql.SetColumnType)
if !ok {
return status.Newf(codes.InvalidArgument, "unsupported ALTER COLUMN %s", alt.SQL())
Expand All @@ -706,43 +709,56 @@ func (t *table) alterColumn(alt spansql.AlterColumn) *status.Status {

ci, ok := t.colIndex[alt.Name]
if !ok {
// TODO: What's the right response code?
return status.Newf(codes.InvalidArgument, "unknown column %q", alt.Name)
}

// TODO: check if the column isn't a primary key or array types.
t.cols[ci].NotNull = sct.NotNull

// Check and make type transformations.
oldT, newT := t.cols[ci].Type, sct.Type
stringOrBytes := func(bt spansql.TypeBase) bool { return bt == spansql.String || bt == spansql.Bytes }

// TODO: We don't track whether commit timestamps are permitted on a per-column basis, so that's ignored.

// Change between STRING and BYTES is fine, as is increasing/decreasing the length limit.
// TODO: This should permit array conversions too.
// First phase: Check the validity of the change.
// TODO: Don't permit changes to allow commit timestamps.
if !t.cols[ci].NotNull && sct.NotNull {
// Adding NOT NULL is not permitted for primary key columns or array typed columns.
if ci < t.pkCols {
return status.Newf(codes.InvalidArgument, "cannot set NOT NULL on primary key column %q", alt.Name)
}
if oldT.Array {
return status.Newf(codes.InvalidArgument, "cannot set NOT NULL on array-typed column %q", alt.Name)
}
// Validate that there are no NULL values.
for _, row := range t.rows {
if row[ci] == nil {
return status.Newf(codes.InvalidArgument, "cannot set NOT NULL on column %q that contains NULL values", alt.Name)
}
}
}
var conv func(x interface{}) interface{}
if stringOrBytes(oldT.Base) && stringOrBytes(newT.Base) && !oldT.Array && !newT.Array {
// Change between STRING and BYTES is fine, as is increasing/decreasing the length limit.
// TODO: This should permit array conversions too.
// TODO: Validate data; length limit changes should be rejected if they'd lead to data loss, for instance.
var conv func(x interface{}) interface{}
if oldT.Base == spansql.Bytes && newT.Base == spansql.String {
conv = func(x interface{}) interface{} { return string(x.([]byte)) }
} else if oldT.Base == spansql.String && newT.Base == spansql.Bytes {
conv = func(x interface{}) interface{} { return []byte(x.(string)) }
}
if conv != nil {
for _, row := range t.rows {
if row[ci] != nil { // NULL stays as NULL.
row[ci] = conv(row[ci])
}
} else if oldT == newT {
// Same type; only NOT NULL changes.
} else { // TODO: Support other alterations.
return status.Newf(codes.InvalidArgument, "unsupported ALTER COLUMN %s", alt.SQL())
}

// Second phase: Make type transformations.
t.cols[ci].NotNull = sct.NotNull
t.cols[ci].Type = newT
if conv != nil {
for _, row := range t.rows {
if row[ci] != nil { // NULL stays as NULL.
row[ci] = conv(row[ci])
}
}
t.cols[ci].Type = newT
return nil
}

// TODO: Support other alterations.

return status.Newf(codes.InvalidArgument, "unsupported ALTER COLUMN %s", alt.SQL())
return nil
}

func (t *table) insertRow(rowNum int, r row) {
Expand Down