Skip to content

Commit

Permalink
[CORE-2246] Fix testing deadlock, remove dead code (#9938)
Browse files Browse the repository at this point in the history
This testing deadlock is unlikely to be triggered, but should still be
addressed.

The dead code was revealed in the course of my audit for code which
might leak database connections.
  • Loading branch information
robert-uhl committed Apr 9, 2024
1 parent 4442cf6 commit 3af8c1d
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 70 deletions.
19 changes: 0 additions & 19 deletions src/internal/collection/etcd_collection.go
Expand Up @@ -516,25 +516,6 @@ func (c *etcdReadOnlyCollection) List(val proto.Message, opts *Options, f func(k
})
}

// ListRev returns objects sorted based on the options passed in. f will be called
// with each key and the create-revision of the key, val will contain the
// corresponding value. Val is not an argument to f because that would require
// f to perform a cast before it could be used. You can break out of iteration
// by returning errutil.ErrBreak.
func (c *etcdReadOnlyCollection) ListRev(val proto.Message, opts *Options, f func(key string, createRev int64) error) error {
span, _ := tracing.AddSpanToAnyExisting(c.ctx, "/etcd.RO/List", "col", c.prefix)
defer tracing.FinishAnySpan(span)
if err := watch.CheckType(c.template, val); err != nil {
return err
}
return c.list(c.prefix, &c.limit, opts, func(kv *mvccpb.KeyValue) error {
if err := proto.Unmarshal(kv.Value, val); err != nil {
return errors.EnsureStack(err)
}
return f(strings.TrimPrefix(string(kv.Key), c.prefix), kv.CreateRevision)
})
}

func (c *etcdReadOnlyCollection) list(prefix string, limitPtr *int64, opts *Options, f func(*mvccpb.KeyValue) error) error {
return listRevision(c, prefix, limitPtr, opts, f)
}
Expand Down
47 changes: 0 additions & 47 deletions src/internal/collection/postgres_collection.go
Expand Up @@ -505,53 +505,6 @@ func (c *postgresReadWriteCollection) List(val proto.Message, opts *Options, f f
})
}

func (c *postgresReadOnlyCollection) listRev(withFields map[string]string, val proto.Message, opts *Options, f func(string, int64) error) error {
fakeRev := int64(0)
lastTimestamp := time.Time{}

updateRev := func(t time.Time) {
if t.After(lastTimestamp) {
lastTimestamp = t
fakeRev++
}
}

return c.list(withFields, opts, func(m *model) error {
if err := proto.Unmarshal(m.Proto, val); err != nil {
return errors.EnsureStack(err)
}

if opts.Target == SortByCreateRevision {
updateRev(m.CreatedAt)
} else if opts.Target == SortByModRevision {
updateRev(m.UpdatedAt)
}

return f(m.Key, fakeRev)
})
}

// ListRev emulates the behavior of etcd collection's ListRev, but doesn't
// reproduce it exactly. The revisions returned are not from the database -
// postgres uses 32-bit transaction ids and doesn't include one for the creating
// transaction of a row. So, we fake a revision id by sorting rows by their
// create/update timestamp and incrementing a fake revision id every time the
// timestamp changes. Note that the etcd implementation always returns the
// create revision, but that only works here if you also sort by the create
// revision.
func (c *postgresReadOnlyCollection) ListRev(val proto.Message, opts *Options, f func(string, int64) error) error {
return c.listRev(nil, val, opts, f)
}

// GetRevByIndex is identical to ListRev except that it filters the results
// according to a predicate on the given index.
func (c *postgresReadOnlyCollection) GetRevByIndex(index *Index, indexVal string, val proto.Message, opts *Options, f func(string, int64) error) error {
if err := c.validateIndex(index); err != nil {
return err
}
return c.listRev(map[string]string{indexFieldName(index): indexVal}, val, opts, f)
}

func (c *postgresReadOnlyCollection) Count() (int64, error) {
query := fmt.Sprintf("select count(*) from collections.%s", c.table)
row := c.db.QueryRowContext(c.ctx, query)
Expand Down
3 changes: 0 additions & 3 deletions src/internal/collection/types.go
Expand Up @@ -125,7 +125,6 @@ type ReadOnlyCollection interface {
Get(key interface{}, val proto.Message) error
GetByIndex(index *Index, indexVal string, val proto.Message, opts *Options, f func(string) error) error
List(val proto.Message, opts *Options, f func(string) error) error
ListRev(val proto.Message, opts *Options, f func(string, int64) error) error
Count() (int64, error)
Watch(opts ...watch.Option) (watch.Watcher, error)
WatchF(f func(*watch.Event) error, opts ...watch.Option) error
Expand All @@ -138,8 +137,6 @@ type ReadOnlyCollection interface {
type PostgresReadOnlyCollection interface {
ReadOnlyCollection

GetRevByIndex(index *Index, indexVal string, val proto.Message, opts *Options, f func(string, int64) error) error

// GetUniqueByIndex is identical to GetByIndex except it is an error if
// exactly one row is not found.
// TODO: decide if we should merge this with GetByIndex and use an `Options`.
Expand Down
2 changes: 1 addition & 1 deletion src/internal/storage/track/postgres_tracker_test.go
Expand Up @@ -20,7 +20,7 @@ func TestPostgresTracker(t *testing.T) {
db := dockertestenv.NewTestDB(t)
ctx := context.Background()
err := dbutil.WithTx(ctx, db, func(cbCtx context.Context, tx *pachsql.Tx) error {
db.MustExec("CREATE SCHEMA storage")
tx.MustExec("CREATE SCHEMA storage")
return track.SetupPostgresTrackerV0(cbCtx, tx)
})
require.NoError(t, err)
Expand Down

0 comments on commit 3af8c1d

Please sign in to comment.