Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Database drivers run migrations. [Add a new database?](database/driver.go)
* [PostgreSQL](database/postgres)
* [Redshift](database/redshift)
* [Ql](database/ql)
* [Cassandra](database/cassandra)
* [Cassandra/ Scylla](database/cassandra)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop the mention of scylla unless there are tests against it.

Copy link
Author

@cyyynthia cyyynthia Mar 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this would be fine since MariaDB is also mentioned despite not being tested as of now; I'll add tests

* [SQLite](database/sqlite3) ([todo #165](https://github.com/mattes/migrate/issues/165))
* [SQLCipher](database/sqlcipher)
* [MySQL/ MariaDB](database/mysql)
Expand Down
71 changes: 63 additions & 8 deletions database/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,27 +226,27 @@ func (c *Cassandra) Run(migration io.Reader) error {
}

func (c *Cassandra) SetVersion(version int, dirty bool) error {
query := `TRUNCATE "` + c.config.MigrationsTable + `"`
if err := c.session.Query(query).Exec(); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}

// Also re-write the schema version for nil dirty versions to prevent
// empty schema version for failed down migration on the first migration
// See: https://github.com/golang-migrate/migrate/issues/330
if version >= 0 || (version == database.NilVersion && dirty) {
query = `INSERT INTO "` + c.config.MigrationsTable + `" (version, dirty) VALUES (?, ?)`
query := `UPDATE "` + c.config.MigrationsTable + `" SET version = ?, dirty = ? WHERE dummy = 1`
if err := c.session.Query(query, version, dirty).Exec(); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
} else {
query := `DELETE FROM "` + c.config.MigrationsTable + `" WHERE dummy = 1`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to reproduce the original behavior of the TRUNCATE that were present before, which would have left the database empty if the condition wasn't met

if err := c.session.Query(query).Exec(); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
}

return nil
}

// Return current keyspace version
func (c *Cassandra) Version() (version int, dirty bool, err error) {
query := `SELECT version, dirty FROM "` + c.config.MigrationsTable + `" LIMIT 1`
query := `SELECT version, dirty FROM "` + c.config.MigrationsTable + `" WHERE dummy = 1 LIMIT 1`
err = c.session.Query(query).Scan(&version, &dirty)
switch {
case err == gocql.ErrNotFound:
Expand Down Expand Up @@ -296,16 +296,71 @@ func (c *Cassandra) ensureVersionTable() (err error) {
}
}()

err = c.session.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (version bigint, dirty boolean, PRIMARY KEY(version))", c.config.MigrationsTable)).Exec()
err = c.createVersionTable()
if err != nil {
return err
}
if _, _, err = c.Version(); err != nil {
var count int8
err = c.session.Query(`SELECT COUNT(*) FROM system_schema.columns WHERE keyspace_name = '` + c.config.KeyspaceName + `' AND table_name = '` + c.config.MigrationsTable + `' AND column_name = 'dummy'`).Scan(&count)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not move this into createVersionTable() and make createVersionTable() call upgradeVersionTable()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've put it here so in normal circumstances, this query is only performed if there is an error (which might indicate the table needs an update). Moving this in createVersionTable would mean there'd always be 2 calls done instead of 1

if err != nil {
return err
}

if count == 0 {
if err = c.upgradeVersionTable(); err != nil {
return err
}

if _, _, err = c.Version(); err != nil {
return err
}

return nil
}
return err
}
return nil
}

// upgradeVersionTable is responsible for upgrading legacy migrate users from the
// older version table schema to the new one without disruption.
// See: https://github.com/golang-migrate/migrate/issues/455
func (c *Cassandra) upgradeVersionTable() (err error) {
version := -1
dirty := false
skip := false

query := `SELECT version, dirty FROM "` + c.config.MigrationsTable + `" LIMIT 1`
if err = c.session.Query(query).Scan(&version, &dirty); err != nil {
if err == gocql.ErrNotFound {
skip = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upgrades should only be performed when we detect that an upgrade is needed.

Given that, we should error on attempted schema upgrade failures. e.g. failing to fetch the previous applied migration version & status when upgrading the cassandra schema table is a stopping error condition

Also, the previous version and status should be logged so subsequent failures can be manually recovered.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failing to fetch the previous applied migration version

Given the previous implementation, the table may have been left in an empty state since the table is always emptied when SetVersion is called, but only filled back if a condition is met (and if it is not, no error is returned). This is why I don't error if it can't find previous version and instead just skip adding back the data

Also, the previous version and status should be logged so subsequent failures can be manually recovered.

That's a valid point, I'll add some logging

} else {
return err
}
}

if err = c.session.Query(`DROP TABLE "` + c.config.MigrationsTable + `"`).Exec(); err != nil {
return err
}

if err = c.createVersionTable(); err != nil {
return err
}

if !skip {
if err = c.SetVersion(version, dirty); err != nil {
return err
}
}

return nil
}

func (c *Cassandra) createVersionTable() (err error) {
return c.session.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (dummy tinyint, version bigint, dirty boolean, PRIMARY KEY(dummy))", c.config.MigrationsTable)).Exec()
}

// ParseConsistency wraps gocql.ParseConsistency
// to return an error instead of a panicking.
func parseConsistency(consistencyStr string) (consistency gocql.Consistency, err error) {
Expand Down
75 changes: 69 additions & 6 deletions database/cassandra/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@ package cassandra
import (
"context"
"fmt"
"github.com/golang-migrate/migrate/v4"
"strconv"
"testing"
)

import (
"github.com/dhui/dktest"
"github.com/gocql/gocql"
)
"github.com/golang-migrate/migrate/v4"

import (
dt "github.com/golang-migrate/migrate/v4/database/testing"
"github.com/golang-migrate/migrate/v4/dktesting"

_ "github.com/golang-migrate/migrate/v4/source/file"
)

Expand Down Expand Up @@ -52,7 +49,7 @@ func isReady(ctx context.Context, c dktest.ContainerInfo) bool {
}
defer p.Close()
// Create keyspace for tests
if err = p.Query("CREATE KEYSPACE testks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor':1}").Exec(); err != nil {
if err = p.Query("CREATE KEYSPACE testks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}").Exec(); err != nil {
return false
}
return true
Expand Down Expand Up @@ -104,3 +101,69 @@ func TestMigrate(t *testing.T) {
dt.TestMigrate(t, m)
})
}

func TestSchemaVersionV2Migration(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding a test!
Also add a test for the case where upgradeVersionTable() is a noop.

dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
ip, portStr, err := c.Port(9042)
if err != nil {
t.Fatal("Unable to get mapped port:", err)
}

port, err := strconv.Atoi(portStr)
if err != nil {
t.Fatal(err)
}

addr := fmt.Sprintf("cassandra://%v:%v/testks", ip, portStr)

// Setup fake legacy version table
cluster := gocql.NewCluster(ip)
cluster.Port = port
cluster.Keyspace = "testks"
cluster.Consistency = gocql.All
sess, err := cluster.CreateSession()
if err != nil {
t.Fatal(err)
}

defer sess.Close()
if err = sess.Query("CREATE TABLE schema_migrations (version bigint, dirty boolean, PRIMARY KEY(version))").Exec(); err != nil {
t.Fatal(err)
}
if err = sess.Query("INSERT INTO schema_migrations (version, dirty) VALUES (20210301171700, false)").Exec(); err != nil {
t.Fatal(err)
}

// Init driver
p := &Cassandra{}
d, err := p.Open(addr)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := d.Close(); err != nil {
t.Error(err)
}
}()

// Migration should have occurred; we need to check that the `dummy` field now exists, and that data has been preserved.
var count int8
err = sess.Query("SELECT COUNT(*) FROM system_schema.columns WHERE keyspace_name = 'testks' AND table_name = 'schema_migrations' AND column_name = 'dummy';").Scan(&count)
if err != nil {
t.Fatal(err)
}

if count != 1 {
t.Error("Expected column dummy to be present in the schema_migrations table")
}

ver, dirty, err := d.Version()
if err != nil {
t.Fatal(err)
}

if ver != 20210301171700 || dirty {
t.Error("Expected data to be preserved after migration")
}
})
}