-
Notifications
You must be signed in to change notification settings - Fork 1.5k
chore: Use an optimized schema for Cassandra #513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -226,27 +226,27 @@ func (c *Cassandra) Run(migration io.Reader) error { | |
| } | ||
|
|
||
| func (c *Cassandra) SetVersion(version int, dirty bool) error { | ||
| query := `TRUNCATE "` + c.config.MigrationsTable + `"` | ||
| if err := c.session.Query(query).Exec(); err != nil { | ||
| return &database.Error{OrigErr: err, Query: []byte(query)} | ||
| } | ||
|
|
||
| // Also re-write the schema version for nil dirty versions to prevent | ||
| // empty schema version for failed down migration on the first migration | ||
| // See: https://github.com/golang-migrate/migrate/issues/330 | ||
| if version >= 0 || (version == database.NilVersion && dirty) { | ||
| query = `INSERT INTO "` + c.config.MigrationsTable + `" (version, dirty) VALUES (?, ?)` | ||
| query := `UPDATE "` + c.config.MigrationsTable + `" SET version = ?, dirty = ? WHERE dummy = 1` | ||
| if err := c.session.Query(query, version, dirty).Exec(); err != nil { | ||
| return &database.Error{OrigErr: err, Query: []byte(query)} | ||
| } | ||
| } else { | ||
| query := `DELETE FROM "` + c.config.MigrationsTable + `" WHERE dummy = 1` | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this necessary?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is to reproduce the original behavior of the TRUNCATE that were present before, which would have left the database empty if the condition wasn't met |
||
| if err := c.session.Query(query).Exec(); err != nil { | ||
| return &database.Error{OrigErr: err, Query: []byte(query)} | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Return current keyspace version | ||
| func (c *Cassandra) Version() (version int, dirty bool, err error) { | ||
| query := `SELECT version, dirty FROM "` + c.config.MigrationsTable + `" LIMIT 1` | ||
| query := `SELECT version, dirty FROM "` + c.config.MigrationsTable + `" WHERE dummy = 1 LIMIT 1` | ||
| err = c.session.Query(query).Scan(&version, &dirty) | ||
| switch { | ||
| case err == gocql.ErrNotFound: | ||
|
|
@@ -296,16 +296,71 @@ func (c *Cassandra) ensureVersionTable() (err error) { | |
| } | ||
| }() | ||
|
|
||
| err = c.session.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (version bigint, dirty boolean, PRIMARY KEY(version))", c.config.MigrationsTable)).Exec() | ||
| err = c.createVersionTable() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if _, _, err = c.Version(); err != nil { | ||
| var count int8 | ||
| err = c.session.Query(`SELECT COUNT(*) FROM system_schema.columns WHERE keyspace_name = '` + c.config.KeyspaceName + `' AND table_name = '` + c.config.MigrationsTable + `' AND column_name = 'dummy'`).Scan(&count) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not move this into
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've put it here so in normal circumstances, this query is only performed if there is an error (which might indicate the table needs an update). Moving this in |
||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if count == 0 { | ||
| if err = c.upgradeVersionTable(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if _, _, err = c.Version(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
| return err | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // upgradeVersionTable is responsible for upgrading legacy migrate users from the | ||
| // older version table schema to the new one without disruption. | ||
| // See: https://github.com/golang-migrate/migrate/issues/455 | ||
| func (c *Cassandra) upgradeVersionTable() (err error) { | ||
| version := -1 | ||
| dirty := false | ||
| skip := false | ||
|
|
||
| query := `SELECT version, dirty FROM "` + c.config.MigrationsTable + `" LIMIT 1` | ||
| if err = c.session.Query(query).Scan(&version, &dirty); err != nil { | ||
| if err == gocql.ErrNotFound { | ||
| skip = true | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Upgrades should only be performed when we detect that an upgrade is needed. Given that, we should error on attempted schema upgrade failures. e.g. failing to fetch the previous applied migration version & status when upgrading the cassandra schema table is a stopping error condition Also, the previous version and status should be logged so subsequent failures can be manually recovered.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Given the previous implementation, the table may have been left in an empty state since the table is always emptied when
That's a valid point, I'll add some logging |
||
| } else { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| if err = c.session.Query(`DROP TABLE "` + c.config.MigrationsTable + `"`).Exec(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if err = c.createVersionTable(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if !skip { | ||
| if err = c.SetVersion(version, dirty); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (c *Cassandra) createVersionTable() (err error) { | ||
| return c.session.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (dummy tinyint, version bigint, dirty boolean, PRIMARY KEY(dummy))", c.config.MigrationsTable)).Exec() | ||
| } | ||
|
|
||
| // ParseConsistency wraps gocql.ParseConsistency | ||
| // to return an error instead of a panicking. | ||
| func parseConsistency(consistencyStr string) (consistency gocql.Consistency, err error) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,19 +3,16 @@ package cassandra | |
| import ( | ||
| "context" | ||
| "fmt" | ||
| "github.com/golang-migrate/migrate/v4" | ||
| "strconv" | ||
| "testing" | ||
| ) | ||
|
|
||
| import ( | ||
| "github.com/dhui/dktest" | ||
| "github.com/gocql/gocql" | ||
| ) | ||
| "github.com/golang-migrate/migrate/v4" | ||
|
|
||
| import ( | ||
| dt "github.com/golang-migrate/migrate/v4/database/testing" | ||
| "github.com/golang-migrate/migrate/v4/dktesting" | ||
|
|
||
| _ "github.com/golang-migrate/migrate/v4/source/file" | ||
| ) | ||
|
|
||
|
|
@@ -52,7 +49,7 @@ func isReady(ctx context.Context, c dktest.ContainerInfo) bool { | |
| } | ||
| defer p.Close() | ||
| // Create keyspace for tests | ||
| if err = p.Query("CREATE KEYSPACE testks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor':1}").Exec(); err != nil { | ||
| if err = p.Query("CREATE KEYSPACE testks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}").Exec(); err != nil { | ||
| return false | ||
| } | ||
| return true | ||
|
|
@@ -104,3 +101,69 @@ func TestMigrate(t *testing.T) { | |
| dt.TestMigrate(t, m) | ||
| }) | ||
| } | ||
|
|
||
| func TestSchemaVersionV2Migration(t *testing.T) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for adding a test! |
||
| dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { | ||
| ip, portStr, err := c.Port(9042) | ||
| if err != nil { | ||
| t.Fatal("Unable to get mapped port:", err) | ||
| } | ||
|
|
||
| port, err := strconv.Atoi(portStr) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| addr := fmt.Sprintf("cassandra://%v:%v/testks", ip, portStr) | ||
|
|
||
| // Setup fake legacy version table | ||
| cluster := gocql.NewCluster(ip) | ||
| cluster.Port = port | ||
| cluster.Keyspace = "testks" | ||
| cluster.Consistency = gocql.All | ||
| sess, err := cluster.CreateSession() | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| defer sess.Close() | ||
| if err = sess.Query("CREATE TABLE schema_migrations (version bigint, dirty boolean, PRIMARY KEY(version))").Exec(); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| if err = sess.Query("INSERT INTO schema_migrations (version, dirty) VALUES (20210301171700, false)").Exec(); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| // Init driver | ||
| p := &Cassandra{} | ||
| d, err := p.Open(addr) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| defer func() { | ||
| if err := d.Close(); err != nil { | ||
| t.Error(err) | ||
| } | ||
| }() | ||
|
|
||
| // Migration should have occurred; we need to check that the `dummy` field now exists, and that data has been preserved. | ||
| var count int8 | ||
| err = sess.Query("SELECT COUNT(*) FROM system_schema.columns WHERE keyspace_name = 'testks' AND table_name = 'schema_migrations' AND column_name = 'dummy';").Scan(&count) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| if count != 1 { | ||
| t.Error("Expected column dummy to be present in the schema_migrations table") | ||
| } | ||
|
|
||
| ver, dirty, err := d.Version() | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| if ver != 20210301171700 || dirty { | ||
| t.Error("Expected data to be preserved after migration") | ||
| } | ||
| }) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drop the mention of scylla unless there are tests against it.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought this would be fine since MariaDB is also mentioned despite not being tested as of now; I'll add tests