Skip to content

Commit

Permalink
feat(spanner/spannertest): implement RowDeletionPolicy in spannertest (
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Oct 8, 2021
1 parent e84e408 commit 7800a33
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 4 deletions.
57 changes: 53 additions & 4 deletions spanner/spannertest/db.go
Expand Up @@ -56,10 +56,11 @@ type table struct {
// Information about the table columns.
// They are reordered on table creation so the primary key columns come first.
cols []colInfo
colIndex map[spansql.ID]int // col name to index
origIndex map[spansql.ID]int // original index of each column upon construction
pkCols int // number of primary key columns (may be 0)
pkDesc []bool // whether each primary key column is in descending order
colIndex map[spansql.ID]int // col name to index
origIndex map[spansql.ID]int // original index of each column upon construction
pkCols int // number of primary key columns (may be 0)
pkDesc []bool // whether each primary key column is in descending order
rdw *spansql.RowDeletionPolicy // RowDeletionPolicy of this table (may be nil)

// Rows are stored in primary key order.
rows []row
Expand Down Expand Up @@ -301,6 +302,7 @@ func (d *database) ApplyDDL(stmt spansql.DDLStmt) *status.Status {
return status.Newf(codes.InvalidArgument, "primary key column %q not in table", col)
}
}
t.rdw = stmt.RowDeletionPolicy
d.tables[stmt.Name] = t
return nil
case *spansql.CreateIndex:
Expand Down Expand Up @@ -359,6 +361,21 @@ func (d *database) ApplyDDL(stmt spansql.DDLStmt) *status.Status {
return st
}
return nil
case spansql.AddRowDeletionPolicy:
if st := t.addRowDeletionPolicy(alt); st.Code() != codes.OK {
return st
}
return nil
case spansql.ReplaceRowDeletionPolicy:
if st := t.replaceRowDeletionPolicy(alt); st.Code() != codes.OK {
return st
}
return nil
case spansql.DropRowDeletionPolicy:
if st := t.dropRowDeletionPolicy(alt); st.Code() != codes.OK {
return st
}
return nil
}
}

Expand Down Expand Up @@ -823,6 +840,38 @@ func (t *table) alterColumn(alt spansql.AlterColumn) *status.Status {
return nil
}

func (t *table) addRowDeletionPolicy(ard spansql.AddRowDeletionPolicy) *status.Status {
_, ok := t.colIndex[ard.RowDeletionPolicy.Column]
if !ok {
return status.Newf(codes.InvalidArgument, "unknown column %q", ard.RowDeletionPolicy.Column)
}
if t.rdw != nil {
return status.New(codes.InvalidArgument, "table already has a row deletion policy")
}
t.rdw = &ard.RowDeletionPolicy
return nil
}

func (t *table) replaceRowDeletionPolicy(ard spansql.ReplaceRowDeletionPolicy) *status.Status {
_, ok := t.colIndex[ard.RowDeletionPolicy.Column]
if !ok {
return status.Newf(codes.InvalidArgument, "unknown column %q", ard.RowDeletionPolicy.Column)
}
if t.rdw == nil {
return status.New(codes.InvalidArgument, "table does not have a row deletion policy")
}
t.rdw = &ard.RowDeletionPolicy
return nil
}

func (t *table) dropRowDeletionPolicy(ard spansql.DropRowDeletionPolicy) *status.Status {
if t.rdw == nil {
return status.New(codes.InvalidArgument, "table does not have a row deletion policy")
}
t.rdw = nil
return nil
}

func (t *table) insertRow(rowNum int, r row) {
t.rows = append(t.rows, nil)
copy(t.rows[rowNum+1:], t.rows[rowNum:])
Expand Down
40 changes: 40 additions & 0 deletions spanner/spannertest/integration_test.go
Expand Up @@ -1413,6 +1413,46 @@ func TestIntegration_Views(t *testing.T) {
}
}

func TestIntegration_RowDeletionPolicy(t *testing.T) {
_, adminClient, _, cleanup := makeClient(t)
defer cleanup()

if err := updateDDL(t, adminClient,
`CREATE TABLE WithRowDeletionPolicy (
Id INT64,
Value STRING(MAX),
DelTimestamp TIMESTAMP,
) PRIMARY KEY (Id), ROW DELETION POLICY ( OLDER_THAN ( DelTimestamp, INTERVAL 30 DAY ))`,
`CREATE TABLE WithoutRowDeletionPolicy (
Id INT64,
Value STRING(MAX),
DelTimestamp TIMESTAMP,
) PRIMARY KEY (Id)`); err != nil {
t.Fatalf("Create tables: %v", err)
}
// These should succeed.
if err := updateDDL(t, adminClient, `ALTER TABLE WithRowDeletionPolicy REPLACE ROW DELETION POLICY ( OLDER_THAN ( DelTimestamp, INTERVAL 30 DAY ))`); err != nil {
t.Fatalf("Replacing row deletion policy: %v", err)
}
if err := updateDDL(t, adminClient, `ALTER TABLE WithRowDeletionPolicy DROP ROW DELETION POLICY`); err != nil {
t.Fatalf("Dropping row deletion policy: %v", err)
}
if err := updateDDL(t, adminClient, `ALTER TABLE WithRowDeletionPolicy ADD ROW DELETION POLICY ( OLDER_THAN ( DelTimestamp, INTERVAL 30 DAY ))`); err != nil {
t.Fatalf("Adding row deletion policy: %v", err)
}

// These should fail.
if err := updateDDL(t, adminClient, `ALTER TABLE WithoutRowDeletionPolicy REPLACE ROW DELETION POLICY ( OLDER_THAN ( DelTimestamp, INTERVAL 30 DAY ))`); err == nil {
t.Fatalf("Missing error for replacing row deletion policy")
}
if err := updateDDL(t, adminClient, `ALTER TABLE WithoutRowDeletionPolicy DROP ROW DELETION POLICY`); err == nil {
t.Fatalf("Missing error for dropping row deletion policy")
}
if err := updateDDL(t, adminClient, `ALTER TABLE WithRowDeletionPolicy ADD ROW DELETION POLICY ( OLDER_THAN ( DelTimestamp, INTERVAL 30 DAY ))`); err == nil {
t.Fatalf("Missing error for adding row deletion policy")
}
}

func dropTable(t *testing.T, adminClient *dbadmin.DatabaseAdminClient, table string) error {
t.Helper()
err := updateDDL(t, adminClient, "DROP TABLE "+table)
Expand Down

0 comments on commit 7800a33

Please sign in to comment.