Skip to content
This repository has been archived by the owner on Jun 28, 2018. It is now read-only.

Commit

Permalink
Add Cassandra Enhancements
Browse files Browse the repository at this point in the history
* Authentication functionality,
* Execution of multiple statements in one migration file
  • Loading branch information
syndbg committed Sep 15, 2017
1 parent 804f9c2 commit 455d6f5
Showing 1 changed file with 33 additions and 14 deletions.
47 changes: 33 additions & 14 deletions database/cassandra/cassandra.go
Expand Up @@ -2,13 +2,15 @@ package cassandra

import (
"fmt"
"github.com/gocql/gocql"
"github.com/mattes/migrate/database"
"io"
"io/ioutil"
nurl "net/url"
"github.com/gocql/gocql"
"time"
"github.com/mattes/migrate/database"
"regexp"
"strconv"
"strings"
"time"
)

func init() {
Expand All @@ -20,8 +22,8 @@ var DefaultMigrationsTable = "schema_migrations"
var dbLocked = false

var (
ErrNilConfig = fmt.Errorf("no config")
ErrNoKeyspace = fmt.Errorf("no keyspace provided")
ErrNilConfig = fmt.Errorf("no config")
ErrNoKeyspace = fmt.Errorf("no keyspace provided")
ErrDatabaseDirty = fmt.Errorf("database is dirty")
)

Expand All @@ -35,7 +37,7 @@ type Cassandra struct {
isLocked bool

// Open and WithInstance need to guarantee that config is never nil
config *Config
config *Config
}

func (p *Cassandra) Open(url string) (database.Driver, error) {
Expand All @@ -59,11 +61,23 @@ func (p *Cassandra) Open(url string) (database.Driver, error) {
MigrationsTable: migrationsTable,
}

username := u.User.Username()
if len(username) == 0 {
username = "cassandra"
}
password, _ := u.User.Password()
if len(password) == 0 {
password = "cassandra"
}

cluster := gocql.NewCluster(u.Host)
cluster.Keyspace = u.Path[1:len(u.Path)]
cluster.Consistency = gocql.All
cluster.Timeout = 1 * time.Minute

cluster.Authenticator = gocql.PasswordAuthenticator{
Username: username,
Password: password,
}

// Retrieve query string configuration
if len(u.Query().Get("consistency")) > 0 {
Expand Down Expand Up @@ -111,7 +125,7 @@ func (p *Cassandra) Close() error {
}

func (p *Cassandra) Lock() error {
if (dbLocked) {
if dbLocked {
return database.ErrLocked
}
dbLocked = true
Expand All @@ -130,9 +144,17 @@ func (p *Cassandra) Run(migration io.Reader) error {
}
// run migration
query := string(migr[:])
if err := p.session.Query(query).Exec(); err != nil {
// TODO: cast to Cassandra error and get line number
return database.Error{OrigErr: err, Err: "migration failed", Query: migr}
matches := regexp.MustCompile(`(?m:;$)`).Split(query, -1)
for _, match := range matches {
trimmedMatch := strings.Trim(match, " \t\r\n")
if len(trimmedMatch) == 0 {
continue
}

if err := p.session.Query(trimmedMatch).Exec(); err != nil {
// TODO: cast to Cassandra error and get line number
return database.Error{OrigErr: err, Err: "migration failed", Query: migr}
}
}

return nil
Expand All @@ -153,7 +175,6 @@ func (p *Cassandra) SetVersion(version int, dirty bool) error {
return nil
}


// Return current keyspace version
func (p *Cassandra) Version() (version int, dirty bool, err error) {
query := `SELECT version, dirty FROM "` + p.config.MigrationsTable + `" LIMIT 1`
Expand Down Expand Up @@ -191,7 +212,6 @@ func (p *Cassandra) Drop() error {
return nil
}


// Ensure version table exists
func (p *Cassandra) ensureVersionTable() error {
err := p.session.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (version bigint, dirty boolean, PRIMARY KEY(version))", p.config.MigrationsTable)).Exec()
Expand All @@ -204,7 +224,6 @@ func (p *Cassandra) ensureVersionTable() error {
return nil
}


// ParseConsistency wraps gocql.ParseConsistency
// to return an error instead of a panicking.
func parseConsistency(consistencyStr string) (consistency gocql.Consistency, err error) {
Expand Down

0 comments on commit 455d6f5

Please sign in to comment.