Skip to content

Commit

Permalink
fix get record status type (#27)
Browse files Browse the repository at this point in the history
* fixed get record status type

* improved cassandra connect logic
  • Loading branch information
kcajmagic committed Jan 29, 2020
1 parent ad6e08e commit 1cf7df3
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 5 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## [v0.5.1]
- Fixed get record of status type [#27](https://github.com/xmidt-org/codex-db/pull/27)
- Improved cassandra connect logic [#27](https://github.com/xmidt-org/codex-db/pull/27)

## [v0.5.0]
- Added cassandra row_id with TIMEUUID for long-polling [#25](https://github.com/xmidt-org/codex-db/pull/25)

Expand Down Expand Up @@ -43,7 +47,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [v0.1.0]
- Initial creation, moved from: https://github.com/xmidt-org/codex-deploy

[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.5.0..HEAD
[Unreleased]: https://github.com/xmidt-org/codex-db/compare/v0.5.1..HEAD
[v0.5.1]: https://github.com/xmidt-org/codex-db/compare/v0.5.0..v0.5.1
[v0.5.0]: https://github.com/xmidt-org/codex-db/compare/v0.4.0..v0.5.0
[v0.4.0]: https://github.com/xmidt-org/codex-db/compare/v0.3.3..v0.4.0
[v0.3.3]: https://github.com/xmidt-org/codex-db/compare/v0.3.2..v0.3.3
Expand Down
37 changes: 34 additions & 3 deletions cassandra/db.go
Expand Up @@ -39,8 +39,11 @@ var (
)

const (
defaultOpTimeout = time.Duration(10) * time.Second
defaultDatabase = "devices"
defaultOpTimeout = time.Duration(10) * time.Second
defaultDatabase = "devices"
defaultNumRetries = 0
defaultWaitTimeMult = 1
defaultMaxNumberConnsPerHost = 2
)

type Config struct {
Expand Down Expand Up @@ -68,6 +71,15 @@ type Config struct {
Username string
// Password to authenticate into the cluster. Username must also be provided.
Password string

// NumRetries for connecting to the db
NumRetries int

// WaitTimeMult the amount of time to wait before retrying to connect to the db
WaitTimeMult time.Duration

// MaxConnsPerHost max number of connections per host
MaxConnsPerHost int
}

type Connection struct {
Expand Down Expand Up @@ -95,6 +107,8 @@ func CreateDbConnection(config Config, provider provider.Provider, health *healt
clusterConfig.Consistency = gocql.LocalQuorum
clusterConfig.Keyspace = config.Database
clusterConfig.Timeout = config.OpTimeout
// let retry package handle it
clusterConfig.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: 1}
// setup ssl
if config.SSLRootCert != "" && config.SSLCert != "" && config.SSLKey != "" {
clusterConfig.SslOpts = &gocql.SslOptions{
Expand All @@ -118,6 +132,14 @@ func CreateDbConnection(config Config, provider provider.Provider, health *healt
}

conn, err := connectWithMetrics(clusterConfig, dbConn.measures)

// retry if it fails
waitTime := 1 * time.Second
for attempt := 0; attempt < config.NumRetries && err != nil; attempt++ {
time.Sleep(waitTime)
conn, err = connectWithMetrics(clusterConfig, dbConn.measures)
waitTime = waitTime * config.WaitTimeMult
}
if err != nil {
return &Connection{}, emperror.WrapWith(err, "Connecting to database failed", "hosts", config.Hosts)
}
Expand All @@ -142,6 +164,15 @@ func validateConfig(config *Config) {
if config.Database == "" {
config.Database = defaultDatabase
}
if config.NumRetries < 0 {
config.NumRetries = defaultNumRetries
}
if config.WaitTimeMult < 1 {
config.WaitTimeMult = defaultWaitTimeMult
}
if config.MaxConnsPerHost <= 0 {
config.MaxConnsPerHost = defaultMaxNumberConnsPerHost
}
}

// GetRecords returns a list of records for a given device.
Expand Down Expand Up @@ -170,7 +201,7 @@ func (c *Connection) GetRecordsOfType(deviceID string, limit int, eventType db.E
filterString = "WHERE device_id = ? AND record_type = ? AND row_id > ?"
items = []interface{}{deviceID, eventType, stateHash}
}
deviceInfo, err := c.finder.findRecords(limit, filterString, items)
deviceInfo, err := c.finder.findRecords(limit, filterString, items...)
if err != nil {
c.measures.SQLQueryFailureCount.With(db.TypeLabel, db.ReadType).Add(1.0)
return []db.Record{}, emperror.WrapWith(err, "Getting records from database failed", "device id", deviceID)
Expand Down
1 change: 0 additions & 1 deletion cassandra/executer.go
Expand Up @@ -178,5 +178,4 @@ func connect(clusterConfig *gocql.ClusterConfig) (*dbDecorator, error) {
}

return &dbDecorator{session: session}, nil

}

0 comments on commit 1cf7df3

Please sign in to comment.