dialect/sql/schema: file based type store (#2644)

* dialect/sql/schema: file based type store

This PR adds support for a file based type storage when using versioned migrations. The file called `.ent_types` is written to the migration directory alongside the migration files and will be kept in sync for every migration file generation run.

In order to not break existing code, where the type storage might differ for different deployment, global unique ID mut be enabled by using a new option. This will also be raised as an error to the user when attempting to use versioned migrations and global unique ID.

Documentation will be added to this PR once feedback on the code is gathered.

* apply CR

* fix tests

* change format of types file to exclude it from atlas.sum file

* docs and drift test

* apply CR
This commit is contained in:
Jannik Clausen
2022-06-15 16:10:15 +02:00
committed by GitHub
parent 195be2d98d
commit 7017cbc898
12 changed files with 416 additions and 42 deletions

View File

@@ -9,6 +9,8 @@ import (
"database/sql"
"errors"
"fmt"
"io/fs"
"io/ioutil"
"sort"
"strings"
@@ -271,16 +273,32 @@ func WithSumFile() MigrateOption {
}
}
// WithUniversalID instructs atlas to use a file based type store when
// global unique ids are enabled. For more information see the setupAtlas method on Migrate.
//
// ATTENTION:
// The file based PK range store is not backward compatible, since the allocated ranges were computed
// dynamically when computing the diff between a deployed database and the current schema. In cases where there
// exist multiple deployments, the allocated ranges for the same type might be different from each other,
// depending on when the deployment took part.
func WithUniversalID() MigrateOption {
return func(m *Migrate) {
m.universalID = true
m.atlas.typeStoreConsent = true
}
}
type (
// atlasOptions describes the options for atlas.
atlasOptions struct {
enabled bool
diff []DiffHook
apply []ApplyHook
skip ChangeKind
dir migrate.Dir
fmt migrate.Formatter
genSum bool
enabled bool
diff []DiffHook
apply []ApplyHook
skip ChangeKind
dir migrate.Dir
fmt migrate.Formatter
genSum bool
typeStoreConsent bool
}
// atBuilder must be implemented by the different drivers in
@@ -293,9 +311,12 @@ type (
atIncrementC(*schema.Table, *schema.Column)
atIncrementT(*schema.Table, int64)
atIndex(*Index, *schema.Table, *schema.Index) error
atTypeRangeSQL(t ...string) string
}
)
var errConsent = errors.New("sql/schema: use WithUniversalID() instead of WithGlobalUniqueID(true) when using WithDir(): https://entgo.io/docs/migrate#universal-ids")
func (m *Migrate) setupAtlas() error {
// Using one of the Atlas options, opt-in to Atlas migration.
if !m.atlas.enabled && (m.atlas.skip != NoChange || len(m.atlas.diff) > 0 || len(m.atlas.apply) > 0) || m.atlas.dir != nil {
@@ -326,6 +347,16 @@ func (m *Migrate) setupAtlas() error {
if m.atlas.dir != nil && m.atlas.fmt == nil {
m.atlas.fmt = sqltool.GolangMigrateFormatter
}
if m.universalID && m.atlas.dir != nil {
// If global unique ids and a migration directory is given, enable the file based type store for pk ranges.
m.typeStore = &dirTypeStore{dir: m.atlas.dir}
// To guard the user against a possible bug due to backward incompatibility, the file based type store must
// be enabled by an option. For more information see the comment of WithUniversalID function.
if !m.atlas.typeStoreConsent {
return errConsent
}
m.atlas.diff = append(m.atlas.diff, m.ensureTypeTable)
}
return nil
}
@@ -537,6 +568,54 @@ func (m *Migrate) aIndexes(b atBuilder, t1 *Table, t2 *schema.Table) error {
return nil
}
func (m *Migrate) ensureTypeTable(next Differ) Differ {
return DiffFunc(func(current, desired *schema.Schema) ([]schema.Change, error) {
// If there is a types table but no types file yet, the user most likely
// switched from online migration to migration files.
if len(m.dbTypeRanges) == 0 {
var (
at = schema.NewTable(TypeTable)
et = NewTable(TypeTable).
AddPrimary(&Column{Name: "id", Type: field.TypeUint, Increment: true}).
AddColumn(&Column{Name: "type", Type: field.TypeString, Unique: true})
)
m.atTable(et, at)
if err := m.aColumns(m, et, at); err != nil {
return nil, err
}
if err := m.aIndexes(m, et, at); err != nil {
return nil, err
}
desired.Tables = append(desired.Tables, at)
}
// If there is a drift between the types stored in the database and the ones stored in the file,
// stop diffing, as this is potentially destructive. This will most likely happen on the first diffing
// after moving from online-migration to versioned migrations if the "old" ent types are not in sync with
// the deterministic ones computed by the new engine.
if len(m.dbTypeRanges) > 0 && len(m.fileTypeRanges) > 0 && !equal(m.fileTypeRanges, m.dbTypeRanges) {
return nil, fmt.Errorf(
"type allocation range drift detected: %v <> %v: see %s for more information",
m.dbTypeRanges, m.fileTypeRanges,
"https://entgo.io/docs/versioned-migrations#moving-from-auto-migration-to-versioned-migrations",
)
}
changes, err := next.Diff(current, desired)
if err != nil {
return nil, err
}
if len(m.dbTypeRanges) > 0 && len(m.fileTypeRanges) == 0 {
// Override the types file created in the diff process with the "old" allocated types ranges.
if err := m.typeStore.(*dirTypeStore).save(m.dbTypeRanges); err != nil {
return nil, err
}
// Change the type range allocations since they will be added to the migration files when
// writing the migration plan to migration files.
m.typeRanges = m.dbTypeRanges
}
return changes, nil
})
}
func setAtChecks(t1 *Table, t2 *schema.Table) {
if check := t1.Annotation.Check; check != "" {
t2.AddChecks(&schema.Check{
@@ -574,3 +653,62 @@ func descIndexes(idx *Index) map[string]bool {
}
return descs
}
const entTypes = ".ent_types"
// dirTypeStore stores and read pk information from a text file stored alongside generated versioned migrations.
// This behaviour is enabled automatically when using versioned migrations.
type dirTypeStore struct {
dir migrate.Dir
}
const atlasDirective = "atlas:sum ignore\n"
// load the types from the types file.
func (s *dirTypeStore) load(context.Context, dialect.ExecQuerier) ([]string, error) {
f, err := s.dir.Open(entTypes)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return nil, fmt.Errorf("reading types file: %w", err)
}
if errors.Is(err, fs.ErrNotExist) {
return nil, nil
}
defer f.Close()
c, err := ioutil.ReadAll(f)
if err != nil {
return nil, fmt.Errorf("reading types file: %w", err)
}
return strings.Split(strings.TrimPrefix(string(c), atlasDirective), ","), nil
}
// add a new type entry to the types file.
func (s *dirTypeStore) add(ctx context.Context, conn dialect.ExecQuerier, t string) error {
ts, err := s.load(ctx, conn)
if err != nil {
return fmt.Errorf("adding type %q: %w", t, err)
}
return s.save(append(ts, t))
}
// save takes the given allocation range and writes them to the types file.
// The types file will be overridden.
func (s *dirTypeStore) save(ts []string) error {
if err := s.dir.WriteFile(entTypes, []byte(atlasDirective+strings.Join(ts, ","))); err != nil {
return fmt.Errorf("writing types file: %w", err)
}
return nil
}
var _ typeStore = (*dirTypeStore)(nil)
func equal(s1, s2 []string) bool {
if len(s1) != len(s2) {
return false
}
for i := range s1 {
if s1[i] != s2[i] {
return false
}
}
return true
}

View File

@@ -0,0 +1,33 @@
// Copyright 2019-present Facebook Inc. All rights reserved.
// This source code is licensed under the Apache 2.0 license found
// in the LICENSE file in the root directory of this source tree.
package schema
import (
"context"
"os"
"path/filepath"
"testing"
"ariga.io/atlas/sql/migrate"
"github.com/stretchr/testify/require"
)
func TestDirTypeStore(t *testing.T) {
ex := []string{"a", "b", "c"}
p := t.TempDir()
d, err := migrate.NewLocalDir(p)
require.NoError(t, err)
s := &dirTypeStore{d}
require.NoError(t, s.save(ex))
require.FileExists(t, filepath.Join(p, entTypes))
c, err := os.ReadFile(filepath.Join(p, entTypes))
require.NoError(t, err)
require.Contains(t, string(c), atlasDirective)
ac, err := s.load(context.Background(), nil)
require.NoError(t, err)
require.Equal(t, ex, ac)
}

View File

@@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"math"
"strings"
"ariga.io/atlas/sql/migrate"
"entgo.io/ent/dialect"
@@ -114,7 +115,9 @@ type Migrate struct {
atlas *atlasOptions // migrate with atlas.
typeRanges []string // types order by their range.
hooks []Hook // hooks to apply before creation
typeStore typeStore
typeStore typeStore // the typeStore to read and save type ranges
fileTypeRanges []string // used internally by ensureTypeTable hook
dbTypeRanges []string // used internally by ensureTypeTable hook
}
// NewMigrate create a migration structure for the given SQL driver.
@@ -163,34 +166,18 @@ func (m *Migrate) Create(ctx context.Context, tables ...*Table) error {
return creator.Create(ctx, tables...)
}
// Diff compares the state read from the StateReader with the state defined by Ent.
// Changes will be written to migration files by the configures Planner.
// Diff compares the state read from the connected database with the state defined by Ent.
// Changes will be written to migration files by the configured Planner.
func (m *Migrate) Diff(ctx context.Context, tables ...*Table) error {
return m.NamedDiff(ctx, "changes", tables...)
}
// NamedDiff compares the state read from the StateReader with the state defined by Ent.
// Changes will be written to migration files by the configures Planner.
// NamedDiff compares the state read from the connected database with the state defined by Ent.
// Changes will be written to migration files by the configured Planner.
func (m *Migrate) NamedDiff(ctx context.Context, name string, tables ...*Table) error {
if m.atlas.dir == nil {
return errors.New("no migration directory given")
}
if err := m.init(ctx, m); err != nil {
return err
}
if m.universalID {
if err := m.types(ctx, m); err != nil {
return err
}
}
plan, err := m.atDiff(ctx, m, name, tables...)
if err != nil {
return err
}
// Skip if the plan has no changes.
if len(plan.Changes) == 0 {
return nil
}
opts := []migrate.PlannerOption{
migrate.WithFormatter(m.atlas.fmt),
}
@@ -202,6 +189,46 @@ func (m *Migrate) NamedDiff(ctx context.Context, name string, tables ...*Table)
} else {
opts = append(opts, migrate.DisableChecksum())
}
if err := m.init(ctx, m); err != nil {
return err
}
if m.universalID {
if err := m.types(ctx, m); err != nil {
return err
}
m.fileTypeRanges = m.typeRanges
ex, err := m.tableExist(ctx, m, TypeTable)
if err != nil {
return err
}
if ex {
m.dbTypeRanges, err = (&dbTypeStore{m}).load(ctx, m)
if err != nil {
return err
}
}
defer func() {
m.fileTypeRanges = nil
m.dbTypeRanges = nil
}()
}
plan, err := m.atDiff(ctx, m, name, tables...)
if err != nil {
return err
}
if m.universalID {
newTypes := m.typeRanges[len(m.dbTypeRanges):]
if len(newTypes) > 0 {
plan.Changes = append(plan.Changes, &migrate.Change{
Cmd: m.atTypeRangeSQL(newTypes...),
Comment: fmt.Sprintf("add pk ranges for %s tables", strings.Join(newTypes, ",")),
})
}
}
// Skip if the plan has no changes.
if len(plan.Changes) == 0 {
return nil
}
return migrate.NewPlanner(nil, m.atlas.dir, opts...).WritePlan(plan)
}

View File

@@ -6,13 +6,17 @@ package schema
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"text/template"
"time"
"ariga.io/atlas/sql/migrate"
"ariga.io/atlas/sql/schema"
"entgo.io/ent/schema/field"
"entgo.io/ent/dialect"
"entgo.io/ent/dialect/sql"
@@ -80,7 +84,7 @@ func TestMigrate_Diff(t *testing.T) {
m, err := NewMigrate(db, WithDir(d))
require.NoError(t, err)
require.NoError(t, m.Diff(context.Background(), &Table{Name: "users"}))
v := time.Now().Format("20060102150405")
v := time.Now().UTC().Format("20060102150405")
requireFileEqual(t, filepath.Join(p, v+"_changes.up.sql"), "-- create \"users\" table\nCREATE TABLE `users` (, PRIMARY KEY ());\n")
requireFileEqual(t, filepath.Join(p, v+"_changes.down.sql"), "-- reverse: create \"users\" table\nDROP TABLE `users`;\n")
require.NoFileExists(t, filepath.Join(p, "atlas.sum"))
@@ -97,6 +101,106 @@ func TestMigrate_Diff(t *testing.T) {
require.FileExists(t, filepath.Join(p, "atlas.sum"))
require.NoError(t, d.WriteFile("tmp.sql", nil))
require.ErrorIs(t, m.Diff(context.Background(), &Table{Name: "users"}), migrate.ErrChecksumMismatch)
// Test type store.
idCol := []*Column{{Name: "id", Type: field.TypeInt, Increment: true}}
p = t.TempDir()
d, err = migrate.NewLocalDir(p)
require.NoError(t, err)
f, err := migrate.NewTemplateFormatter(
template.Must(template.New("").Parse("{{ .Name }}.sql")),
template.Must(template.New("").Parse(
`{{ range .Changes }}{{ printf "%s;\n" .Cmd }}{{ end }}`,
)),
)
require.NoError(t, err)
// If using global unique ID and versioned migrations,
// consent for the file based type store has to be given explicitly.
_, err = NewMigrate(db, WithDir(d), WithGlobalUniqueID(true))
require.ErrorIs(t, err, errConsent)
require.Contains(t, err.Error(), "WithUniversalID")
require.Contains(t, err.Error(), "WithGlobalUniqueID")
require.Contains(t, err.Error(), "WithDir")
m, err = NewMigrate(db, WithFormatter(f), WithDir(d), WithUniversalID(), WithSumFile())
require.NoError(t, err)
require.IsType(t, &dirTypeStore{}, m.typeStore)
require.NoError(t, m.Diff(context.Background(),
&Table{Name: "users", Columns: idCol, PrimaryKey: idCol},
&Table{Name: "groups", Columns: idCol, PrimaryKey: idCol},
))
requireFileEqual(t, filepath.Join(p, ".ent_types"), atlasDirective+"users,groups")
changesSQL := strings.Join([]string{
"CREATE TABLE `users` (`id` integer NOT NULL PRIMARY KEY AUTOINCREMENT);",
"CREATE TABLE `groups` (`id` integer NOT NULL PRIMARY KEY AUTOINCREMENT);",
fmt.Sprintf("INSERT INTO sqlite_sequence (name, seq) VALUES (\"groups\", %d);", 1<<32),
"CREATE TABLE `ent_types` (`id` integer NOT NULL PRIMARY KEY AUTOINCREMENT, `type` text NOT NULL);",
"CREATE UNIQUE INDEX `ent_types_type_key` ON `ent_types` (`type`);",
"INSERT INTO `ent_types` (`type`) VALUES ('users'), ('groups');", "",
}, "\n")
requireFileEqual(t, filepath.Join(p, "changes.sql"), changesSQL)
// types file cannot be part of the sum file.
require.FileExists(t, filepath.Join(p, "atlas.sum"))
sum, err := os.ReadFile(filepath.Join(p, "atlas.sum"))
require.NoError(t, err)
require.NotContains(t, string(sum), ".ent_types")
// Adding another node will result in a new entry to the TypeTable (without actually creating it).
_, err = db.ExecContext(context.Background(), changesSQL, nil, nil)
require.NoError(t, err)
require.NoError(t, m.NamedDiff(context.Background(), "changes_2", &Table{Name: "pets", Columns: idCol, PrimaryKey: idCol}))
requireFileEqual(t, filepath.Join(p, ".ent_types"), atlasDirective+"users,groups,pets")
requireFileEqual(t,
filepath.Join(p, "changes_2.sql"), strings.Join([]string{
"CREATE TABLE `pets` (`id` integer NOT NULL PRIMARY KEY AUTOINCREMENT);",
fmt.Sprintf("INSERT INTO sqlite_sequence (name, seq) VALUES (\"pets\", %d);", 2<<32),
"INSERT INTO `ent_types` (`type`) VALUES ('pets');", "",
}, "\n"))
// types file cannot be part of the sum file.
require.FileExists(t, filepath.Join(p, "atlas.sum"))
sum, err = os.ReadFile(filepath.Join(p, "atlas.sum"))
require.NoError(t, err)
require.NotContains(t, string(sum), ".ent_types")
// Checksum will be updated as well.
require.NoError(t, migrate.Validate(d))
// Running diff against an existing database without having a types file yet
// will result in the types file respect the "old" order of pk allocations.
switchAllocs := func(one, two string) {
for _, stmt := range []string{
"DELETE FROM `ent_types`;",
fmt.Sprintf("INSERT INTO `ent_types` (`type`) VALUES ('%s'), ('%s');", one, two),
} {
_, err = db.ExecContext(context.Background(), stmt)
require.NoError(t, err)
}
}
switchAllocs("groups", "users")
p = t.TempDir()
d, err = migrate.NewLocalDir(p)
require.NoError(t, err)
m, err = NewMigrate(db, WithFormatter(f), WithDir(d), WithUniversalID())
require.NoError(t, err)
require.NoError(t, m.Diff(context.Background(),
&Table{Name: "users", Columns: idCol, PrimaryKey: idCol},
&Table{Name: "groups", Columns: idCol, PrimaryKey: idCol},
))
requireFileEqual(t, filepath.Join(p, ".ent_types"), atlasDirective+"groups,users")
require.NoFileExists(t, filepath.Join(p, "changes.sql"))
// Drifts in the types file and types database will be detected,
switchAllocs("users", "groups")
require.ErrorContains(t, m.Diff(context.Background()), fmt.Sprintf(
"type allocation range drift detected: %v <> %v: see %s for more information",
[]string{"users", "groups"},
[]string{"groups", "users"},
"https://entgo.io/docs/versioned-migrations#moving-from-auto-migration-to-versioned-migrations",
))
}
func requireFileEqual(t *testing.T, name, contents string) {

View File

@@ -970,3 +970,10 @@ func indexType(idx *Index, d string) (string, bool) {
}
return "", false
}
func (MySQL) atTypeRangeSQL(ts ...string) string {
for i := range ts {
ts[i] = fmt.Sprintf("('%s')", ts[i])
}
return fmt.Sprintf("INSERT INTO `%s` (`type`) VALUES %s", TypeTable, strings.Join(ts, ", "))
}

View File

@@ -784,3 +784,10 @@ func (d *Postgres) atIndex(idx1 *Index, t2 *schema.Table, idx2 *schema.Index) er
}
return nil
}
func (Postgres) atTypeRangeSQL(ts ...string) string {
for i := range ts {
ts[i] = fmt.Sprintf("('%s')", ts[i])
}
return fmt.Sprintf(`INSERT INTO "%s" ("type") VALUES %s`, TypeTable, strings.Join(ts, ", "))
}

View File

@@ -449,3 +449,10 @@ func (d *SQLite) atIndex(idx1 *Index, t2 *schema.Table, idx2 *schema.Index) erro
}
return nil
}
func (SQLite) atTypeRangeSQL(ts ...string) string {
for i := range ts {
ts[i] = fmt.Sprintf("('%s')", ts[i])
}
return fmt.Sprintf("INSERT INTO `%s` (`type`) VALUES %s", TypeTable, strings.Join(ts, ", "))
}