dialect/sql/schema: support providing alternate schema for postgres (#1172)

This commit is contained in:
Ariel Mashraki
2021-01-14 16:55:37 +02:00
committed by GitHub
parent 8420a64be3
commit 91f7e3363a
2 changed files with 35 additions and 24 deletions

View File

@@ -17,6 +17,7 @@ import (
// Postgres is a postgres migration driver.
type Postgres struct {
dialect.Driver
schema string
version string
}
@@ -53,7 +54,7 @@ func (d *Postgres) tableExist(ctx context.Context, tx dialect.Tx, name string) (
query, args := sql.Dialect(dialect.Postgres).
Select(sql.Count("*")).From(sql.Table("tables").Schema("information_schema")).
Where(sql.And(
sql.EQ("table_schema", sql.Raw("CURRENT_SCHEMA()")),
sql.EQ("table_schema", d.tableSchema()),
sql.EQ("table_name", name),
)).Query()
return exist(ctx, tx, query, args...)
@@ -64,7 +65,7 @@ func (d *Postgres) fkExist(ctx context.Context, tx dialect.Tx, name string) (boo
query, args := sql.Dialect(dialect.Postgres).
Select(sql.Count("*")).From(sql.Table("table_constraints").Schema("information_schema")).
Where(sql.And(
sql.EQ("table_schema", sql.Raw("CURRENT_SCHEMA()")),
sql.EQ("table_schema", d.tableSchema()),
sql.EQ("constraint_type", "FOREIGN KEY"),
sql.EQ("constraint_name", name),
)).Query()
@@ -90,7 +91,7 @@ func (d *Postgres) table(ctx context.Context, tx dialect.Tx, name string) (*Tabl
Select("column_name", "data_type", "is_nullable", "column_default", "udt_name").
From(sql.Table("columns").Schema("information_schema")).
Where(sql.And(
sql.EQ("table_schema", sql.Raw("CURRENT_SCHEMA()")),
sql.EQ("table_schema", d.tableSchema()),
sql.EQ("table_name", name),
)).Query()
if err := tx.Query(ctx, query, args, rows); err != nil {
@@ -165,14 +166,21 @@ WHERE t.oid = idx.indrelid
AND a.attrelid = t.oid
AND a.attnum = ANY(idx.indkey)
AND t.relkind = 'r'
AND n.nspname = CURRENT_SCHEMA()
AND n.nspname = %s
AND t.relname = '%s'
ORDER BY index_name, seq_in_index;
`
// indexesQuery returns the query (and its placeholders) for getting table indexes.
func (d *Postgres) indexesQuery(table string) (string, []interface{}) {
expr, args := d.tableSchema().Query()
return fmt.Sprintf(indexesQuery, expr, table), args
}
func (d *Postgres) indexes(ctx context.Context, tx dialect.Tx, table string) (Indexes, error) {
rows := &sql.Rows{}
if err := tx.Query(ctx, fmt.Sprintf(indexesQuery, table), []interface{}{}, rows); err != nil {
query, args := d.indexesQuery(table)
if err := tx.Query(ctx, query, args, rows); err != nil {
return nil, fmt.Errorf("querying indexes for table %s: %v", table, err)
}
defer rows.Close()
@@ -397,7 +405,7 @@ func (d *Postgres) dropIndex(ctx context.Context, tx dialect.Tx, idx *Index, tab
query, args := sql.Dialect(dialect.Postgres).
Select(sql.Count("*")).From(sql.Table("table_constraints").Schema("information_schema")).
Where(sql.And(
sql.EQ("table_schema", sql.Raw("CURRENT_SCHEMA()")),
sql.EQ("table_schema", d.tableSchema()),
sql.EQ("constraint_type", "UNIQUE"),
sql.EQ("constraint_name", name),
)).
@@ -438,7 +446,10 @@ func (d *Postgres) renameIndex(t *Table, old, new *Index) sql.Querier {
// tableSchema returns the query for getting the table schema.
func (d *Postgres) tableSchema() sql.Querier {
return sql.Raw("(CURRENT_SCHEMA())")
if d.schema != "" {
return sql.Expr("?", d.schema)
}
return sql.Raw("CURRENT_SCHEMA()")
}
// alterColumns returns the queries for applying the columns change-set.