dialect/sql/sqlgraph: avoid creating tx blocks for single statement create operation

For example, in PostgreSQL, every statement is executed within a transaction. Therefore, we can avoid creating transaction
blocks manually (group of statements surrounded by BEGIN and COMMIT) for CreateNode operation with single SQL statement.

Benchmark was improved from:

	(2000 Inserts)   8.41s      4206748 ns/op    4595 B/op    115 allocs/op

To:

	(2000 Inserts)   4.66s      2330222 ns/op    4107 B/op    104 allocs/op
This commit is contained in:
Ariel Mashraki
2021-08-20 16:24:33 +03:00
committed by Ariel Mashraki
parent f326c7dfd0
commit ea67be12a4
2 changed files with 53 additions and 56 deletions

View File

@@ -377,16 +377,9 @@ type (
// CreateNode applies the CreateSpec on the graph.
func CreateNode(ctx context.Context, drv dialect.Driver, spec *CreateSpec) error {
tx, err := drv.Tx(ctx)
if err != nil {
return err
}
gr := graph{tx: tx, builder: sql.Dialect(drv.Dialect())}
gr := graph{tx: drv, builder: sql.Dialect(drv.Dialect())}
cr := &creator{CreateSpec: spec, graph: gr}
if err := cr.node(ctx, tx); err != nil {
return rollback(tx, err)
}
return tx.Commit()
return cr.node(ctx, drv)
}
// BatchCreate applies the BatchCreateSpec on the graph.
@@ -722,7 +715,7 @@ func (u *updater) nodes(ctx context.Context, tx dialect.ExecQuerier) (int, error
ids []driver.Value
addEdges = EdgeSpecs(u.Edges.Add).GroupRel()
clearEdges = EdgeSpecs(u.Edges.Clear).GroupRel()
multiple = u.hasExternalEdges(addEdges, clearEdges)
multiple = hasExternalEdges(addEdges, clearEdges)
update = u.builder.Update(u.Node.Table).Schema(u.Node.Schema)
selector = u.builder.Select(u.Node.ID.Column).
From(u.builder.Table(u.Node.Table).Schema(u.Node.Schema)).
@@ -793,23 +786,6 @@ func (u *updater) setExternalEdges(ctx context.Context, ids []driver.Value, addE
return nil
}
func (*updater) hasExternalEdges(addEdges, clearEdges map[Rel][]*EdgeSpec) bool {
// M2M edges reside in a join-table, and O2M edges reside
// in the M2O table (the entity that holds the FK).
if len(clearEdges[M2M]) > 0 || len(addEdges[M2M]) > 0 ||
len(clearEdges[O2M]) > 0 || len(addEdges[O2M]) > 0 {
return true
}
for _, edges := range [][]*EdgeSpec{clearEdges[O2O], addEdges[O2O]} {
for _, e := range edges {
if !e.Inverse {
return true
}
}
}
return false
}
// setTableColumns sets the table columns and foreign_keys used in insert.
func (u *updater) setTableColumns(update *sql.UpdateBuilder, addEdges, clearEdges map[Rel][]*EdgeSpec) error {
// Avoid multiple assignments to the same column.
@@ -879,25 +855,43 @@ type creator struct {
*BatchCreateSpec
}
func (c *creator) node(ctx context.Context, tx dialect.ExecQuerier) error {
func (c *creator) node(ctx context.Context, drv dialect.Driver) error {
var (
edges = EdgeSpecs(c.Edges).GroupRel()
insert = c.builder.Insert(c.Table).Schema(c.Schema).Default()
)
// Set and create the node.
if err := c.setTableColumns(insert, edges); err != nil {
return err
}
if err := c.insert(ctx, tx, insert); err != nil {
return fmt.Errorf("insert node to table %q: %w", c.Table, err)
}
if err := c.graph.addM2MEdges(ctx, []driver.Value{c.ID.Value}, edges[M2M]); err != nil {
tx, err := c.mayTx(ctx, drv, edges)
if err != nil {
return err
}
if err := c.graph.addFKEdges(ctx, []driver.Value{c.ID.Value}, append(edges[O2M], edges[O2O]...)); err != nil {
return err
if err := func() error {
if err := c.insert(ctx, insert); err != nil {
return err
}
if err := c.graph.addM2MEdges(ctx, []driver.Value{c.ID.Value}, edges[M2M]); err != nil {
return err
}
return c.graph.addFKEdges(ctx, []driver.Value{c.ID.Value}, append(edges[O2M], edges[O2O]...))
}(); err != nil {
return rollback(tx, err)
}
return nil
return tx.Commit()
}
// mayTx opens a new transaction if the create operation spans across multiple statements.
func (c *creator) mayTx(ctx context.Context, drv dialect.Driver, edges map[Rel][]*EdgeSpec) (dialect.Tx, error) {
if !hasExternalEdges(edges, nil) {
return dialect.NopTx(drv), nil
}
tx, err := drv.Tx(ctx)
if err != nil {
return nil, err
}
c.tx = tx
return tx, nil
}
func (c *creator) nodes(ctx context.Context, tx dialect.ExecQuerier) error {
@@ -973,7 +967,7 @@ func (c *creator) setTableColumns(insert *sql.InsertBuilder, edges map[Rel][]*Ed
}
// insert inserts the node to its table and sets its ID if it wasn't provided by the user.
func (c *creator) insert(ctx context.Context, tx dialect.ExecQuerier, insert *sql.InsertBuilder) error {
func (c *creator) insert(ctx context.Context, insert *sql.InsertBuilder) error {
if opts := c.CreateSpec.OnConflict; len(opts) > 0 {
insert.OnConflict(opts...)
c.ensureLastInsertID(insert)
@@ -986,10 +980,10 @@ func (c *creator) insert(ctx context.Context, tx dialect.ExecQuerier, insert *sq
// database, and we need to get back the database id field.
if len(c.CreateSpec.OnConflict) == 0 {
query, args := insert.Query()
return tx.Exec(ctx, query, args, &res)
return c.tx.Exec(ctx, query, args, &res)
}
}
return c.insertLastID(ctx, tx, insert.Returning(c.ID.Column))
return c.insertLastID(ctx, insert.Returning(c.ID.Column))
}
// ensureLastInsertID ensures the LAST_INSERT_ID was added to the
@@ -1241,6 +1235,23 @@ func (g *graph) addFKEdges(ctx context.Context, ids []driver.Value, edges []*Edg
return nil
}
func hasExternalEdges(addEdges, clearEdges map[Rel][]*EdgeSpec) bool {
// M2M edges reside in a join-table, and O2M edges reside
// in the M2O table (the entity that holds the FK).
if len(clearEdges[M2M]) > 0 || len(addEdges[M2M]) > 0 ||
len(clearEdges[O2M]) > 0 || len(addEdges[O2M]) > 0 {
return true
}
for _, edges := range [][]*EdgeSpec{clearEdges[O2O], addEdges[O2O]} {
for _, e := range edges {
if !e.Inverse {
return true
}
}
}
return false
}
// setTableColumns is shared between updater and creator.
func setTableColumns(fields []*FieldSpec, edges map[Rel][]*EdgeSpec, set func(string, driver.Value)) (err error) {
for _, fi := range fields {
@@ -1268,7 +1279,7 @@ func setTableColumns(fields []*FieldSpec, edges map[Rel][]*EdgeSpec, set func(st
}
// insertLastID invokes the insert query on the transaction and returns the LastInsertID.
func (c *creator) insertLastID(ctx context.Context, tx dialect.ExecQuerier, insert *sql.InsertBuilder) error {
func (c *creator) insertLastID(ctx context.Context, insert *sql.InsertBuilder) error {
query, args := insert.Query()
if err := insert.Err(); err != nil {
return err
@@ -1276,7 +1287,7 @@ func (c *creator) insertLastID(ctx context.Context, tx dialect.ExecQuerier, inse
// MySQL does not support the "RETURNING" clause.
if insert.Dialect() != dialect.MySQL {
rows := &sql.Rows{}
if err := tx.Query(ctx, query, args, rows); err != nil {
if err := c.tx.Query(ctx, query, args, rows); err != nil {
return err
}
defer rows.Close()
@@ -1294,7 +1305,7 @@ func (c *creator) insertLastID(ctx context.Context, tx dialect.ExecQuerier, inse
}
// MySQL.
var res sql.Result
if err := tx.Exec(ctx, query, args, &res); err != nil {
if err := c.tx.Exec(ctx, query, args, &res); err != nil {
return err
}
// If the ID field is not numeric (e.g. string),