Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner/spannertest): implement RowDeletionPolicy in spannertest #4961

Merged
merged 4 commits into from Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 53 additions & 4 deletions spanner/spannertest/db.go
Expand Up @@ -56,10 +56,11 @@ type table struct {
// Information about the table columns.
// They are reordered on table creation so the primary key columns come first.
cols []colInfo
colIndex map[spansql.ID]int // col name to index
origIndex map[spansql.ID]int // original index of each column upon construction
pkCols int // number of primary key columns (may be 0)
pkDesc []bool // whether each primary key column is in descending order
colIndex map[spansql.ID]int // col name to index
origIndex map[spansql.ID]int // original index of each column upon construction
pkCols int // number of primary key columns (may be 0)
pkDesc []bool // whether each primary key column is in descending order
rdw *spansql.RowDeletionPolicy // RowDeletionPolicy of this table (may be nil)

// Rows are stored in primary key order.
rows []row
Expand Down Expand Up @@ -301,6 +302,7 @@ func (d *database) ApplyDDL(stmt spansql.DDLStmt) *status.Status {
return status.Newf(codes.InvalidArgument, "primary key column %q not in table", col)
}
}
t.rdw = stmt.RowDeletionPolicy
d.tables[stmt.Name] = t
return nil
case *spansql.CreateIndex:
Expand Down Expand Up @@ -359,6 +361,21 @@ func (d *database) ApplyDDL(stmt spansql.DDLStmt) *status.Status {
return st
}
return nil
case spansql.AddRowDeletionPolicy:
if st := t.addRowDeletionPolicy(alt); st.Code() != codes.OK {
return st
}
return nil
case spansql.ReplaceRowDeletionPolicy:
if st := t.replaceRowDeletionPolicy(alt); st.Code() != codes.OK {
return st
}
return nil
case spansql.DropRowDeletionPolicy:
if st := t.dropRowDeletionPolicy(alt); st.Code() != codes.OK {
return st
}
return nil
}
}

Expand Down Expand Up @@ -823,6 +840,38 @@ func (t *table) alterColumn(alt spansql.AlterColumn) *status.Status {
return nil
}

func (t *table) addRowDeletionPolicy(ard spansql.AddRowDeletionPolicy) *status.Status {
_, ok := t.colIndex[ard.RowDeletionPolicy.Column]
if !ok {
return status.Newf(codes.InvalidArgument, "unknown column %q", ard.RowDeletionPolicy.Column)
}
if t.rdw != nil {
return status.New(codes.InvalidArgument, "table already has a row deletion policy")
}
t.rdw = &ard.RowDeletionPolicy
return nil
}

func (t *table) replaceRowDeletionPolicy(ard spansql.ReplaceRowDeletionPolicy) *status.Status {
_, ok := t.colIndex[ard.RowDeletionPolicy.Column]
if !ok {
return status.Newf(codes.InvalidArgument, "unknown column %q", ard.RowDeletionPolicy.Column)
}
if t.rdw == nil {
return status.New(codes.InvalidArgument, "table does not have a row deletion policy")
}
t.rdw = &ard.RowDeletionPolicy
return nil
}

func (t *table) dropRowDeletionPolicy(ard spansql.DropRowDeletionPolicy) *status.Status {
if t.rdw == nil {
return status.New(codes.InvalidArgument, "table does not have a row deletion policy")
}
t.rdw = nil
return nil
}

func (t *table) insertRow(rowNum int, r row) {
t.rows = append(t.rows, nil)
copy(t.rows[rowNum+1:], t.rows[rowNum:])
Expand Down
40 changes: 40 additions & 0 deletions spanner/spannertest/integration_test.go
Expand Up @@ -1413,6 +1413,46 @@ func TestIntegration_Views(t *testing.T) {
}
}

func TestIntegration_RowDeletionPolicy(t *testing.T) {
_, adminClient, _, cleanup := makeClient(t)
defer cleanup()

if err := updateDDL(t, adminClient,
`CREATE TABLE WithRowDeletionPolicy (
Id INT64,
Value STRING(MAX),
DelTimestamp TIMESTAMP,
) PRIMARY KEY (Id), ROW DELETION POLICY ( OLDER_THAN ( DelTimestamp, INTERVAL 30 DAY ))`,
`CREATE TABLE WithoutRowDeletionPolicy (
Id INT64,
Value STRING(MAX),
DelTimestamp TIMESTAMP,
) PRIMARY KEY (Id)`); err != nil {
t.Fatalf("Create tables: %v", err)
}
// These should succeed.
if err := updateDDL(t, adminClient, `ALTER TABLE WithRowDeletionPolicy REPLACE ROW DELETION POLICY ( OLDER_THAN ( DelTimestamp, INTERVAL 30 DAY ))`); err != nil {
t.Fatalf("Replacing row deletion policy: %v", err)
}
if err := updateDDL(t, adminClient, `ALTER TABLE WithRowDeletionPolicy DROP ROW DELETION POLICY`); err != nil {
t.Fatalf("Dropping row deletion policy: %v", err)
}
if err := updateDDL(t, adminClient, `ALTER TABLE WithRowDeletionPolicy ADD ROW DELETION POLICY ( OLDER_THAN ( DelTimestamp, INTERVAL 30 DAY ))`); err != nil {
t.Fatalf("Adding row deletion policy: %v", err)
}

// These should fail.
if err := updateDDL(t, adminClient, `ALTER TABLE WithoutRowDeletionPolicy REPLACE ROW DELETION POLICY ( OLDER_THAN ( DelTimestamp, INTERVAL 30 DAY ))`); err == nil {
t.Fatalf("Missing error for replacing row deletion policy")
}
if err := updateDDL(t, adminClient, `ALTER TABLE WithoutRowDeletionPolicy DROP ROW DELETION POLICY`); err == nil {
t.Fatalf("Missing error for dropping row deletion policy")
}
if err := updateDDL(t, adminClient, `ALTER TABLE WithRowDeletionPolicy ADD ROW DELETION POLICY ( OLDER_THAN ( DelTimestamp, INTERVAL 30 DAY ))`); err == nil {
t.Fatalf("Missing error for adding row deletion policy")
}
}

func dropTable(t *testing.T, adminClient *dbadmin.DatabaseAdminClient, table string) error {
t.Helper()
err := updateDDL(t, adminClient, "DROP TABLE "+table)
Expand Down