dialect/sql/sqlgraph: avoid creating tx blocks for single statement batch-create operation (#2272)

For example, in PostgreSQL, every statement is executed within a transaction. Therefore, we can avoid creating transaction
blocks manually (group of statements surrounded by BEGIN and COMMIT) for CreateNode operation with single SQL statement.

Benchmark was improved from:

	(4000 BatchInserts)    17.16s      4289178 ns/op  412893 B/op   4913 allocs/op

To:

	(4000 BatchInserts)   9.16s      2288807 ns/op  412142 B/op   4899 allocs/op
This commit is contained in:
Ariel Mashraki
2022-01-15 23:04:37 +02:00
committed by GitHub
parent 3a426390de
commit b70754d12f
2 changed files with 120 additions and 23 deletions

View File

@@ -379,16 +379,9 @@ func CreateNode(ctx context.Context, drv dialect.Driver, spec *CreateSpec) error
// BatchCreate applies the BatchCreateSpec on the graph.
func BatchCreate(ctx context.Context, drv dialect.Driver, spec *BatchCreateSpec) error {
tx, err := drv.Tx(ctx)
if err != nil {
return err
}
gr := graph{tx: tx, builder: sql.Dialect(drv.Dialect())}
gr := graph{tx: drv, builder: sql.Dialect(drv.Dialect())}
cr := &batchCreator{BatchCreateSpec: spec, graph: gr}
if err := cr.nodes(ctx, tx); err != nil {
return rollback(tx, err)
}
return tx.Commit()
return cr.nodes(ctx, drv)
}
type (
@@ -954,7 +947,7 @@ type batchCreator struct {
*BatchCreateSpec
}
func (c *batchCreator) nodes(ctx context.Context, tx dialect.ExecQuerier) error {
func (c *batchCreator) nodes(ctx context.Context, drv dialect.Driver) error {
if len(c.Nodes) == 0 {
return nil
}
@@ -1001,21 +994,43 @@ func (c *batchCreator) nodes(ctx context.Context, tx dialect.ExecQuerier) error
}
insert.Values(vs...)
}
if err := c.batchInsert(ctx, tx, insert); err != nil {
return fmt.Errorf("insert nodes to table %q: %w", c.Nodes[0].Table, err)
}
if err := c.batchAddM2M(ctx, c.BatchCreateSpec); err != nil {
tx, err := c.mayTx(ctx, drv)
if err != nil {
return err
}
// FKs that exist in different tables can't be updated in batch (using the CASE
// statement), because we rely on RowsAffected to check if the FK column is NULL.
for _, node := range c.Nodes {
edges := EdgeSpecs(node.Edges).GroupRel()
if err := c.graph.addFKEdges(ctx, []driver.Value{node.ID.Value}, append(edges[O2M], edges[O2O]...)); err != nil {
c.tx = tx
if err := func() error {
if err := c.batchInsert(ctx, tx, insert); err != nil {
return fmt.Errorf("insert nodes to table %q: %w", c.Nodes[0].Table, err)
}
if err := c.batchAddM2M(ctx, c.BatchCreateSpec); err != nil {
return err
}
// FKs that exist in different tables can't be updated in batch (using the CASE
// statement), because we rely on RowsAffected to check if the FK column is NULL.
for _, node := range c.Nodes {
edges := EdgeSpecs(node.Edges).GroupRel()
if err := c.graph.addFKEdges(ctx, []driver.Value{node.ID.Value}, append(edges[O2M], edges[O2O]...)); err != nil {
return err
}
}
return nil
}(); err != nil {
return rollback(tx, err)
}
return nil
return tx.Commit()
}
// mayTx opens a new transaction if the create operation spans across multiple statements.
func (c *batchCreator) mayTx(ctx context.Context, drv dialect.Driver) (dialect.Tx, error) {
for _, node := range c.Nodes {
for _, edge := range node.Edges {
if isExternalEdge(edge) {
return drv.Tx(ctx)
}
}
}
return dialect.NopTx(drv), nil
}
// batchInsert inserts a batch of nodes to their table and sets their ID if it was not provided by the user.
@@ -1205,8 +1220,8 @@ func (g *graph) clearFKEdges(ctx context.Context, ids []driver.Value, edges []*E
func (g *graph) addFKEdges(ctx context.Context, ids []driver.Value, edges []*EdgeSpec) error {
id := ids[0]
if len(ids) > 1 && len(edges) != 0 {
// O2M and O2O edges are defined by a FK in the "other" table.
// Therefore, ids[i+1] will override ids[i] which is invalid.
// O2M and non-inverse O2O edges are defined by a FK in the "other"
// table. Therefore, ids[i+1] will override ids[i] which is invalid.
return fmt.Errorf("unable to link FK edge to more than 1 node: %v", ids)
}
for _, edge := range edges {
@@ -1258,6 +1273,12 @@ func hasExternalEdges(addEdges, clearEdges map[Rel][]*EdgeSpec) bool {
return false
}
// isExternalEdge reports if the given edge requires an UPDATE
// or an INSERT to other table.
func isExternalEdge(e *EdgeSpec) bool {
return e.Rel == M2M || e.Rel == O2M || e.Rel == O2O && !e.Inverse
}
// setTableColumns is shared between updater and creator.
func setTableColumns(fields []*FieldSpec, edges map[Rel][]*EdgeSpec, set func(string, driver.Value)) (err error) {
for _, fi := range fields {

View File

@@ -1284,10 +1284,86 @@ func TestBatchCreate(t *testing.T) {
},
},
expect: func(m sqlmock.Sqlmock) {
m.ExpectBegin()
m.ExpectExec(escape("INSERT INTO `users` (`active`, `age`, `name`) VALUES (?, ?, ?), (?, ?, ?) ON DUPLICATE KEY UPDATE `active` = `users`.`active`, `age` = `users`.`age`, `name` = `users`.`name`")).
WithArgs(false, 32, "a8m", true, 30, "nati").
WillReturnResult(sqlmock.NewResult(10, 2))
},
},
{
name: "no tx",
spec: &BatchCreateSpec{
Nodes: []*CreateSpec{
{
Table: "users",
ID: &FieldSpec{Column: "id", Type: field.TypeInt},
Fields: []*FieldSpec{
{Column: "age", Type: field.TypeInt, Value: 32},
{Column: "name", Type: field.TypeString, Value: "a8m"},
{Column: "active", Type: field.TypeBool, Value: false},
},
Edges: []*EdgeSpec{
{Rel: M2O, Table: "company", Columns: []string{"workplace_id"}, Target: &EdgeTarget{Nodes: []driver.Value{2}}},
{Rel: O2O, Inverse: true, Table: "users", Columns: []string{"best_friend_id"}, Target: &EdgeTarget{Nodes: []driver.Value{3}, IDSpec: &FieldSpec{Column: "id"}}},
},
},
{
Table: "users",
ID: &FieldSpec{Column: "id", Type: field.TypeInt},
Fields: []*FieldSpec{
{Column: "age", Type: field.TypeInt, Value: 30},
{Column: "name", Type: field.TypeString, Value: "nati"},
},
Edges: []*EdgeSpec{
{Rel: M2O, Table: "company", Columns: []string{"workplace_id"}, Target: &EdgeTarget{Nodes: []driver.Value{2}}},
{Rel: O2O, Inverse: true, Table: "users", Columns: []string{"best_friend_id"}, Target: &EdgeTarget{Nodes: []driver.Value{4}, IDSpec: &FieldSpec{Column: "id"}}},
},
},
},
},
expect: func(m sqlmock.Sqlmock) {
// Insert nodes with FKs.
m.ExpectExec(escape("INSERT INTO `users` (`active`, `age`, `best_friend_id`, `name`, `workplace_id`) VALUES (?, ?, ?, ?, ?), (?, ?, ?, ?, ?)")).
WithArgs(false, 32, 3, "a8m", 2, nil, 30, 4, "nati", 2).
WillReturnResult(sqlmock.NewResult(10, 2))
},
},
{
name: "with tx",
spec: &BatchCreateSpec{
Nodes: []*CreateSpec{
{
Table: "users",
ID: &FieldSpec{Column: "id", Type: field.TypeInt},
Fields: []*FieldSpec{
{Column: "name", Type: field.TypeString, Value: "a8m"},
},
Edges: []*EdgeSpec{
{Rel: O2O, Table: "cards", Columns: []string{"owner_id"}, Target: &EdgeTarget{Nodes: []driver.Value{3}, IDSpec: &FieldSpec{Column: "id"}}},
},
},
{
Table: "users",
ID: &FieldSpec{Column: "id", Type: field.TypeInt},
Fields: []*FieldSpec{
{Column: "name", Type: field.TypeString, Value: "nati"},
},
Edges: []*EdgeSpec{
{Rel: O2O, Table: "cards", Columns: []string{"owner_id"}, Target: &EdgeTarget{Nodes: []driver.Value{4}, IDSpec: &FieldSpec{Column: "id"}}},
},
},
},
},
expect: func(m sqlmock.Sqlmock) {
m.ExpectBegin()
m.ExpectExec(escape("INSERT INTO `users` (`name`) VALUES (?), (?)")).
WithArgs("a8m", "nati").
WillReturnResult(sqlmock.NewResult(10, 2))
m.ExpectExec(escape("UPDATE `cards` SET `owner_id` = ? WHERE `id` = ? AND `owner_id` IS NULL")).
WithArgs(10 /* LAST_INSERT_ID() */, 3).
WillReturnResult(sqlmock.NewResult(1, 1))
m.ExpectExec(escape("UPDATE `cards` SET `owner_id` = ? WHERE `id` = ? AND `owner_id` IS NULL")).
WithArgs(11 /* LAST_INSERT_ID() + 1 */, 4).
WillReturnResult(sqlmock.NewResult(1, 1))
m.ExpectCommit()
},
},