Skip to content

Commit

Permalink
add row_id to cassandra for long polling (#25)
Browse files Browse the repository at this point in the history
* add row_id to cassandra for long polling

* change method signature

* updated changelog and adding migration script

* updated case

* merge after func with get

* Apply suggestions from code review

Co-Authored-By: kristinaspring <kmspring57@gmail.com>

Co-authored-by: kristinaspring <kmspring57@gmail.com>
  • Loading branch information
kcajmagic and kristinapathak committed Jan 21, 2020
1 parent 601199a commit ad6e08e
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 24 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## [v0.5.0]
- Added cassandra row_id with TIMEUUID for long-polling [#25](https://github.com/xmidt-org/codex-db/pull/25)

## [v0.4.0]
- Modified retry package to use backoff package for exponential backoffs on retries [#21](https://github.com/xmidt-org/codex-db/pull/21)
- Added automated releases using travis [#22](https://github.com/xmidt-org/codex-db/pull/22)
Expand Down Expand Up @@ -40,7 +43,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [v0.1.0]
- Initial creation, moved from: https://github.com/xmidt-org/codex-deploy

[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.4.0..HEAD
[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.5.0..HEAD
[v0.5.0]: https://github.com/xmidt-org/codex-db/compare/v0.4.0..v0.5.0
[v0.4.0]: https://github.com/xmidt-org/codex-db/compare/v0.3.3..v0.4.0
[v0.3.3]: https://github.com/xmidt-org/codex-db/compare/v0.3.2..v0.3.3
[v0.3.2]: https://github.com/xmidt-org/codex-db/compare/v0.3.1..v0.3.2
Expand Down
20 changes: 13 additions & 7 deletions README.md
Expand Up @@ -34,13 +34,14 @@ This repo is a library of packages. There is no installation.
```cassandraql
CREATE KEYSPACE IF NOT EXISTS devices;
CREATE TABLE devices.events (device_id varchar,
record_type int,
birthdate bigint,
deathdate bigint,
data blob,
nonce blob,
alg varchar,
kid varchar,
record_type INT,
birthdate BIGINT,
deathdate BIGINT,
data BLOB,
nonce BLOB,
alg VARCHAR,
kid VARCHAR,
row_id TIMEUUID,
PRIMARY KEY (device_id, birthdate, record_type))
WITH CLUSTERING ORDER BY (birthdate DESC, record_type ASC)
AND default_time_to_live = 2768400
Expand All @@ -50,6 +51,11 @@ CREATE INDEX search_by_record_type ON devices.events
WITH CLUSTERING ORDER BY (record_type ASC, birthdate DESC)
AND default_time_to_live = 2768400
AND transactions = {'enabled': 'false', 'consistency_level':'user_enforced'};
CREATE INDEX search_by_row_id ON devices.events
(device_id, row_id)
WITH CLUSTERING ORDER BY (row_id DESC)
AND default_time_to_live = 2768400
AND transactions = {'enabled': 'false', 'consistency_level':'user_enforced'};
CREATE TABLE devices.blacklist (device_id varchar PRIMARY KEY, reason varchar);
```

Expand Down
19 changes: 19 additions & 0 deletions cassandra/README.md
@@ -0,0 +1,19 @@
# Cassandra DB driver
This implementation is geared towards yugabyte.

# Migration from v0.4.0 to v0.5.0
The addition of row_id as a TIMEUUID as a simplistic version of state hash.
Since row_id can be null, gungnir will work with both database schemas.
In order to do long polling, gungnir db driver will need to be updated.
Svalinn is not backwards compatible as the insert statement has changed to include the
TIMEUUID.

The following is the migration script from v0.4.0 to v0.5.0
```cassandraql
ALTER TABLE devices.events ADD row_id TIMEUUID;
CREATE INDEX search_by_row_id ON devices.events
(device_id, row_id)
WITH CLUSTERING ORDER BY (row_id DESC)
AND default_time_to_live = 2768400
AND transactions = {'enabled': 'false', 'consistency_level':'user_enforced'};
```
46 changes: 41 additions & 5 deletions cassandra/db.go
Expand Up @@ -50,7 +50,7 @@ type Config struct {
// Database aka Keyspace for cassandra
Database string

//OpTimeout
// OpTimeout
OpTimeout time.Duration

// SSLRootCert used for enabling tls to the cluster. SSLKey, and SSLCert must also be set.
Expand Down Expand Up @@ -145,8 +145,14 @@ func validateConfig(config *Config) {
}

// GetRecords returns a list of records for a given device.
func (c *Connection) GetRecords(deviceID string, limit int) ([]db.Record, error) {
deviceInfo, err := c.finder.findRecords(limit, "WHERE device_id=?", deviceID)
func (c *Connection) GetRecords(deviceID string, limit int, stateHash string) ([]db.Record, error) {
filterString := "WHERE device_id=?"
items := []interface{}{deviceID}
if stateHash != "" {
filterString = "WHERE device_id = ? AND row_id > ?"
items = []interface{}{deviceID, stateHash}
}
deviceInfo, err := c.finder.findRecords(limit, filterString, items...)
if err != nil {
c.measures.SQLQueryFailureCount.With(db.TypeLabel, db.ReadType).Add(1.0)
return []db.Record{}, emperror.WrapWith(err, "Getting records from database failed", "device id", deviceID)
Expand All @@ -157,8 +163,14 @@ func (c *Connection) GetRecords(deviceID string, limit int) ([]db.Record, error)
}

// GetRecords returns a list of records for a given device and event type.
func (c *Connection) GetRecordsOfType(deviceID string, limit int, eventType db.EventType) ([]db.Record, error) {
deviceInfo, err := c.finder.findRecords(limit, "WHERE device_id = ? AND record_type = ?", deviceID, eventType)
func (c *Connection) GetRecordsOfType(deviceID string, limit int, eventType db.EventType, stateHash string) ([]db.Record, error) {
filterString := "WHERE device_id = ? AND record_type = ?"
items := []interface{}{deviceID, eventType}
if stateHash != "" {
filterString = "WHERE device_id = ? AND record_type = ? AND row_id > ?"
items = []interface{}{deviceID, eventType, stateHash}
}
deviceInfo, err := c.finder.findRecords(limit, filterString, items)
if err != nil {
c.measures.SQLQueryFailureCount.With(db.TypeLabel, db.ReadType).Add(1.0)
return []db.Record{}, emperror.WrapWith(err, "Getting records from database failed", "device id", deviceID)
Expand All @@ -168,6 +180,30 @@ func (c *Connection) GetRecordsOfType(deviceID string, limit int, eventType db.E
return deviceInfo, nil
}

// GetStateHash returns a hash for the latest record added to the database.
func (c *Connection) GetStateHash(records []db.Record) (string, error) {
if len(records) == 0 {
return "", errors.New("record slice is empty")
} else if len(records) == 1 && records[0].RowID != "" {
return records[0].RowID, nil
}
original := gocql.UUIDFromTime(time.Time{})
latest := original
for _, elem := range records {
uuid, err := gocql.ParseUUID(elem.RowID)
if err != nil {
continue
}
if uuid.Time().UnixNano() > latest.Time().UnixNano() {
latest = uuid
}
}
if latest == original {
return "", errors.New("no hash found")
}
return latest.String(), nil
}

// GetBlacklist returns a list of blacklisted devices.
func (c *Connection) GetBlacklist() (list []blacklist.BlackListedItem, err error) {
list, err = c.findList.findBlacklist()
Expand Down
142 changes: 140 additions & 2 deletions cassandra/db_test.go
Expand Up @@ -89,7 +89,7 @@ func TestGetRecords(t *testing.T) {
p.Assert(t, SQLQuerySuccessCounter)(xmetricstest.Value(0.0))
p.Assert(t, SQLQueryFailureCounter)(xmetricstest.Value(0.0))

records, err := dbConnection.GetRecords(tc.deviceID, 5)
records, err := dbConnection.GetRecords(tc.deviceID, 5, "")
mockObj.AssertExpectations(t)
p.Assert(t, SQLQuerySuccessCounter, db.TypeLabel, db.ReadType)(xmetricstest.Value(tc.expectedSuccessMetric))
p.Assert(t, SQLQueryFailureCounter, db.TypeLabel, db.ReadType)(xmetricstest.Value(tc.expectedFailureMetric))
Expand Down Expand Up @@ -157,7 +157,145 @@ func TestGetRecordsOfType(t *testing.T) {
p.Assert(t, SQLQueryFailureCounter)(xmetricstest.Value(0.0))
p.Assert(t, SQLReadRecordsCounter)(xmetricstest.Value(0.0))

records, err := dbConnection.GetRecordsOfType(tc.deviceID, 5, tc.eventType)
records, err := dbConnection.GetRecordsOfType(tc.deviceID, 5, tc.eventType, "")
mockObj.AssertExpectations(t)
p.Assert(t, SQLQuerySuccessCounter, db.TypeLabel, db.ReadType)(xmetricstest.Value(tc.expectedSuccessMetric))
p.Assert(t, SQLQueryFailureCounter, db.TypeLabel, db.ReadType)(xmetricstest.Value(tc.expectedFailureMetric))
p.Assert(t, SQLReadRecordsCounter)(xmetricstest.Value(float64(len(tc.expectedRecords))))
if tc.expectedErr == nil || err == nil {
assert.Equal(tc.expectedErr, err)
} else {
assert.Contains(err.Error(), tc.expectedErr.Error())
}
assert.Equal(tc.expectedRecords, records)
})
}
}

func TestGetLatestHash(t *testing.T) {
tests := []struct {
description string
expectedHash string
records []db.Record
hasError bool
}{
{
description: "empty list",
expectedHash: "",
records: []db.Record{},
hasError: true,
},
{
description: "one record",
expectedHash: "cb9629c8-3256-11ea-91fe-6b6aedd62e7b",
records: []db.Record{{RowID: "cb9629c8-3256-11ea-91fe-6b6aedd62e7b"}},
hasError: false,
},
{
description: "multiple record",
expectedHash: "cb962de2-3256-11ea-91fe-6b6aedd62e7b",
records: []db.Record{{RowID: "cb962ad6-3256-11ea-91fe-6b6aedd62e7b"}, {RowID: "cb9629c8-3256-11ea-91fe-6b6aedd62e7b"}, {RowID: "cb962de2-3256-11ea-91fe-6b6aedd62e7b"}},
hasError: false,
},
{
description: "multiple record with last empty",
expectedHash: "cb962de2-3256-11ea-91fe-6b6aedd62e7b",
records: []db.Record{{RowID: "cb962ad6-3256-11ea-91fe-6b6aedd62e7b"}, {RowID: "cb9629c8-3256-11ea-91fe-6b6aedd62e7b"}, {RowID: "cb962de2-3256-11ea-91fe-6b6aedd62e7b"}, {}},
hasError: false,
},
{
description: "multiple record with first empty",
expectedHash: "cb962de2-3256-11ea-91fe-6b6aedd62e7b",
records: []db.Record{{}, {RowID: "cb962ad6-3256-11ea-91fe-6b6aedd62e7b"}, {RowID: "cb9629c8-3256-11ea-91fe-6b6aedd62e7b"}, {RowID: "cb962de2-3256-11ea-91fe-6b6aedd62e7b"}, {}},
hasError: false,
},
{
description: "empty record record",
expectedHash: "",
records: []db.Record{{}},
hasError: true,
},
}

for _, tc := range tests {
t.Run(tc.description, func(t *testing.T) {
assert := assert.New(t)
mockObj := new(mockFinder)
p := xmetricstest.NewProvider(nil, Metrics)
m := NewMeasures(p)
dbConnection := Connection{
measures: m,
finder: mockObj,
}

hash, err := dbConnection.GetStateHash(tc.records)
if tc.hasError {
assert.Error(err)
} else {
assert.NoError(err)
}
assert.Equal(tc.expectedHash, hash)
mockObj.AssertExpectations(t)

})
}

}
func TestGetRecordsAfter(t *testing.T) {
tests := []struct {
description string
deviceID string
hash string
expectedRecords []db.Record
expectedSuccessMetric float64
expectedFailureMetric float64
expectedErr error
expectedCalls int
}{
{
description: "Success",
deviceID: "1234",
hash: "123",
expectedRecords: []db.Record{
{
Type: 1,
DeviceID: "1234",
},
},
expectedSuccessMetric: 1.0,
expectedErr: nil,
expectedCalls: 1,
},
{
description: "Get Error",
deviceID: "1234",
expectedRecords: []db.Record{},
expectedFailureMetric: 1.0,
expectedErr: errors.New("test Get error"),
expectedCalls: 1,
},
}

for _, tc := range tests {
t.Run(tc.description, func(t *testing.T) {
assert := assert.New(t)
mockObj := new(mockFinder)
p := xmetricstest.NewProvider(nil, Metrics)
m := NewMeasures(p)
dbConnection := Connection{
measures: m,
finder: mockObj,
}
if tc.expectedCalls > 0 {
marshaledRecords, err := json.Marshal(tc.expectedRecords)
assert.Nil(err)
mockObj.On("findRecords", mock.Anything, mock.Anything, mock.Anything).Return(marshaledRecords, tc.expectedErr).Times(tc.expectedCalls)
}
p.Assert(t, SQLQuerySuccessCounter)(xmetricstest.Value(0.0))
p.Assert(t, SQLQueryFailureCounter)(xmetricstest.Value(0.0))
p.Assert(t, SQLReadRecordsCounter)(xmetricstest.Value(0.0))

records, err := dbConnection.GetRecords(tc.deviceID, 5, tc.hash)
mockObj.AssertExpectations(t)
p.Assert(t, SQLQuerySuccessCounter, db.TypeLabel, db.ReadType)(xmetricstest.Value(tc.expectedSuccessMetric))
p.Assert(t, SQLQueryFailureCounter, db.TypeLabel, db.ReadType)(xmetricstest.Value(tc.expectedFailureMetric))
Expand Down
9 changes: 6 additions & 3 deletions cassandra/executer.go
Expand Up @@ -66,11 +66,12 @@ func (b *dbDecorator) findRecords(limit int, filter string, where ...interface{}
nonce []byte
alg string
kid string
rowid string
)

iter := b.session.Query(fmt.Sprintf("SELECT device_id, record_type, birthdate, deathdate, data, nonce, alg, kid FROM devices.events %s LIMIT ?", filter), append(where, limit)...).Iter()
iter := b.session.Query(fmt.Sprintf("SELECT device_id, record_type, birthdate, deathdate, data, nonce, alg, kid, row_id FROM devices.events %s LIMIT ?", filter), append(where, limit)...).Iter()

for iter.Scan(&device, &eventType, &birthdate, &deathdate, &data, &nonce, &alg, &kid) {
for iter.Scan(&device, &eventType, &birthdate, &deathdate, &data, &nonce, &alg, &kid, &rowid) {
records = append(records, db.Record{
DeviceID: device,
Type: db.EventType(eventType),
Expand All @@ -80,6 +81,7 @@ func (b *dbDecorator) findRecords(limit int, filter string, where ...interface{}
Nonce: nonce,
Alg: alg,
KID: kid,
RowID: rowid,
})
// clear out vars https://github.com/gocql/gocql/issues/1348
device = ""
Expand All @@ -90,6 +92,7 @@ func (b *dbDecorator) findRecords(limit int, filter string, where ...interface{}
nonce = []byte{}
alg = ""
kid = ""
rowid = ""
}

err := iter.Close()
Expand Down Expand Up @@ -142,7 +145,7 @@ func (b *dbDecorator) insert(records []db.Record) (int, error) {

for _, record := range records {
// there can be no spaces for some weird reason. Otherwise the database returns and error.
batch.Query("INSERT INTO devices.events (device_id, record_type, birthdate, deathdate, data, nonce, alg, kid) VALUES (?, ?, ?, ?, ?, ?, ?, ?);",
batch.Query("INSERT INTO devices.events (device_id, record_type, birthdate, deathdate, data, nonce, alg, kid, row_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, now());",
record.DeviceID,
record.Type,
record.BirthDate,
Expand Down
6 changes: 4 additions & 2 deletions db.go
Expand Up @@ -45,6 +45,7 @@ type Record struct {
Nonce []byte `json:"nonce" bson:"nonce"`
Alg string `json:"alg" bson:"alg"`
KID string `json:"kid" bson:"kid" gorm:"Column:kid"`
RowID string `json:"rowid"`
}

// RecordToDelete is the information needed to get out of the database in order
Expand Down Expand Up @@ -75,6 +76,7 @@ type Pruner interface {
// RecordGetter is something that can get records, including only getting records of a
// certain type.
type RecordGetter interface {
GetRecords(deviceID string, limit int) ([]Record, error)
GetRecordsOfType(deviceID string, limit int, eventType EventType) ([]Record, error)
GetRecords(deviceID string, limit int, stateHash string) ([]Record, error)
GetRecordsOfType(deviceID string, limit int, eventType EventType, stateHash string) ([]Record, error)
GetStateHash(records []Record) (string, error)
}
9 changes: 7 additions & 2 deletions postgresql/db.go
Expand Up @@ -249,7 +249,7 @@ func (c *Connection) setupMetrics() {
}

// GetRecords returns a list of records for a given device.
func (c *Connection) GetRecords(deviceID string, limit int) ([]db.Record, error) {
func (c *Connection) GetRecords(deviceID string, limit int, stateHash string) ([]db.Record, error) {
var (
deviceInfo []db.Record
)
Expand All @@ -264,7 +264,7 @@ func (c *Connection) GetRecords(deviceID string, limit int) ([]db.Record, error)
}

// GetRecords returns a list of records for a given device and event type.
func (c *Connection) GetRecordsOfType(deviceID string, limit int, eventType db.EventType) ([]db.Record, error) {
func (c *Connection) GetRecordsOfType(deviceID string, limit int, eventType db.EventType, stateHash string) ([]db.Record, error) {
var (
deviceInfo []db.Record
)
Expand All @@ -278,6 +278,11 @@ func (c *Connection) GetRecordsOfType(deviceID string, limit int, eventType db.E
return deviceInfo, nil
}

// GetStateHash returns a hash for the latest record added to the database
func (c *Connection) GetStateHash(records []db.Record) (string, error) {
panic("not implemented")
}

// GetRecordsToDelete returns a list of record ids and deathdates not past a
// given date.
func (c *Connection) GetRecordsToDelete(shard int, limit int, deathDate int64) ([]db.RecordToDelete, error) {
Expand Down

0 comments on commit ad6e08e

Please sign in to comment.