Skip to content

Commit

Permalink
feat(spanner/spannertest): restructure column alteration implementati…
Browse files Browse the repository at this point in the history
…on (#3616)

Separate out validation from the alteration so that validation failures
won't leave a corrupted table. This will also permit other schema
changes to be more easily added in the future.

Permit adding and removing NOT NULL for column types other than
string/bytes, and enforce that it isn't permitted on primary key columns
or array-typed columns.
  • Loading branch information
dsymonds committed Jan 27, 2021
1 parent b96134f commit 176400b
Showing 1 changed file with 37 additions and 21 deletions.
58 changes: 37 additions & 21 deletions spanner/spannertest/db.go
Expand Up @@ -696,6 +696,9 @@ func (t *table) alterColumn(alt spansql.AlterColumn) *status.Status {
// Enable or disable commit timestamps in value and primary key columns.
// https://cloud.google.com/spanner/docs/schema-updates#supported-updates

// TODO: codes.InvalidArgument is used throughout here for reporting errors,
// but that has not been validated against the real Spanner.

sct, ok := alt.Alteration.(spansql.SetColumnType)
if !ok {
return status.Newf(codes.InvalidArgument, "unsupported ALTER COLUMN %s", alt.SQL())
Expand All @@ -706,43 +709,56 @@ func (t *table) alterColumn(alt spansql.AlterColumn) *status.Status {

ci, ok := t.colIndex[alt.Name]
if !ok {
// TODO: What's the right response code?
return status.Newf(codes.InvalidArgument, "unknown column %q", alt.Name)
}

// TODO: check if the column isn't a primary key or array types.
t.cols[ci].NotNull = sct.NotNull

// Check and make type transformations.
oldT, newT := t.cols[ci].Type, sct.Type
stringOrBytes := func(bt spansql.TypeBase) bool { return bt == spansql.String || bt == spansql.Bytes }

// TODO: We don't track whether commit timestamps are permitted on a per-column basis, so that's ignored.

// Change between STRING and BYTES is fine, as is increasing/decreasing the length limit.
// TODO: This should permit array conversions too.
// First phase: Check the validity of the change.
// TODO: Don't permit changes to allow commit timestamps.
if !t.cols[ci].NotNull && sct.NotNull {
// Adding NOT NULL is not permitted for primary key columns or array typed columns.
if ci < t.pkCols {
return status.Newf(codes.InvalidArgument, "cannot set NOT NULL on primary key column %q", alt.Name)
}
if oldT.Array {
return status.Newf(codes.InvalidArgument, "cannot set NOT NULL on array-typed column %q", alt.Name)
}
// Validate that there are no NULL values.
for _, row := range t.rows {
if row[ci] == nil {
return status.Newf(codes.InvalidArgument, "cannot set NOT NULL on column %q that contains NULL values", alt.Name)
}
}
}
var conv func(x interface{}) interface{}
if stringOrBytes(oldT.Base) && stringOrBytes(newT.Base) && !oldT.Array && !newT.Array {
// Change between STRING and BYTES is fine, as is increasing/decreasing the length limit.
// TODO: This should permit array conversions too.
// TODO: Validate data; length limit changes should be rejected if they'd lead to data loss, for instance.
var conv func(x interface{}) interface{}
if oldT.Base == spansql.Bytes && newT.Base == spansql.String {
conv = func(x interface{}) interface{} { return string(x.([]byte)) }
} else if oldT.Base == spansql.String && newT.Base == spansql.Bytes {
conv = func(x interface{}) interface{} { return []byte(x.(string)) }
}
if conv != nil {
for _, row := range t.rows {
if row[ci] != nil { // NULL stays as NULL.
row[ci] = conv(row[ci])
}
} else if oldT == newT {
// Same type; only NOT NULL changes.
} else { // TODO: Support other alterations.
return status.Newf(codes.InvalidArgument, "unsupported ALTER COLUMN %s", alt.SQL())
}

// Second phase: Make type transformations.
t.cols[ci].NotNull = sct.NotNull
t.cols[ci].Type = newT
if conv != nil {
for _, row := range t.rows {
if row[ci] != nil { // NULL stays as NULL.
row[ci] = conv(row[ci])
}
}
t.cols[ci].Type = newT
return nil
}

// TODO: Support other alterations.

return status.Newf(codes.InvalidArgument, "unsupported ALTER COLUMN %s", alt.SQL())
return nil
}

func (t *table) insertRow(rowNum int, r row) {
Expand Down

0 comments on commit 176400b

Please sign in to comment.