From 9f10b544e19fbeef225d8569239534e23c50821f Mon Sep 17 00:00:00 2001 From: "Giau. Tran Minh" Date: Tue, 5 May 2026 10:16:37 +0000 Subject: [PATCH] WIP: more logic --- dialect/sql/schema/mysql.go | 2 + dialect/sql/schema/postgres.go | 2 + dialect/sql/schema/sqlite.go | 2 + entc/gen/template/client.tmpl | 27 ++++ entc/gen/template/dialect/gremlin/decode.tmpl | 4 + entc/gen/template/dialect/sql/create.tmpl | 58 +++++++- entc/gen/template/dialect/sql/update.tmpl | 42 +++++- entc/gen/template/ent.tmpl | 12 +- entc/gen/template/mutation.tmpl | 2 + entc/gen/type.go | 4 +- entc/integration/blob/blob.go | 10 ++ entc/integration/ent/client.go | 27 ++++ entc/integration/ent/document.go | 24 ++- entc/integration/ent/document_create.go | 140 ++++++------------ entc/integration/ent/document_update.go | 39 +++-- entc/integration/gremlin/ent/client.go | 27 ++++ entc/integration/gremlin/ent/document.go | 40 ++--- schema/field/type.go | 2 +- 18 files changed, 325 insertions(+), 139 deletions(-) diff --git a/dialect/sql/schema/mysql.go b/dialect/sql/schema/mysql.go index 7f30af792..47aff25d5 100644 --- a/dialect/sql/schema/mysql.go +++ b/dialect/sql/schema/mysql.go @@ -135,6 +135,8 @@ func (d *MySQL) atTypeC(c1 *Column, c2 *schema.Column) error { switch c1.Type { case field.TypeBool: t = &schema.BoolType{T: "boolean"} + case field.TypeBlob: + return fmt.Errorf("blob fields are not stored in the database") case field.TypeInt8: t = &schema.IntegerType{T: mysql.TypeTinyInt} case field.TypeUint8: diff --git a/dialect/sql/schema/postgres.go b/dialect/sql/schema/postgres.go index 364b081c3..77ea1c57a 100644 --- a/dialect/sql/schema/postgres.go +++ b/dialect/sql/schema/postgres.go @@ -113,6 +113,8 @@ func (d *Postgres) atTypeC(c1 *Column, c2 *schema.Column) error { } var t schema.Type switch c1.Type { + case field.TypeBlob: + return fmt.Errorf("blob fields are not stored in the database") case field.TypeBool: t = &schema.BoolType{T: postgres.TypeBoolean} case field.TypeUint8, field.TypeInt8, field.TypeInt16: diff --git a/dialect/sql/schema/sqlite.go b/dialect/sql/schema/sqlite.go index 33708e881..944c0f80e 100644 --- a/dialect/sql/schema/sqlite.go +++ b/dialect/sql/schema/sqlite.go @@ -114,6 +114,8 @@ func (d *SQLite) atTypeC(c1 *Column, c2 *schema.Column) error { } var t schema.Type switch c1.Type { + case field.TypeBlob: + return fmt.Errorf("blob fields are not stored in the database") case field.TypeBool: t = &schema.BoolType{T: "bool"} case field.TypeInt8, field.TypeUint8, field.TypeInt16, field.TypeUint16, field.TypeInt32, diff --git a/entc/gen/template/client.tmpl b/entc/gen/template/client.tmpl index 2ea13f4f4..74b327bd2 100644 --- a/entc/gen/template/client.tmpl +++ b/entc/gen/template/client.tmpl @@ -163,11 +163,18 @@ func Driver(driver dialect.Driver) Option { // Blob defines the interface for blob storage operations. // Implementations should return [io/fs.ErrNotExist] (or an error wrapping it) // from NewReader when the requested key does not exist. +// +// Blob writes are not transactional with the database. If a bulk operation +// fails partway through, already-written blobs will be cleaned up on a +// best-effort basis using the Delete method. type Blob interface { // NewReader opens a reader for the given key. NewReader(ctx context.Context, key string) (io.ReadCloser, error) // NewWriter opens a writer for the given key. NewWriter(ctx context.Context, key string) (io.WriteCloser, error) + // Delete removes the blob stored at key. It should return nil + // if the key does not exist. + Delete(ctx context.Context, key string) error // Close releases any resources held by the bucket. Close() error } @@ -188,6 +195,26 @@ func WithBlobOpeners(openers BlobOpeners) Option { c.blobOpeners = openers } } + +// blobReadCloser wraps an io.ReadCloser to also close the parent bucket on Close. +type blobReadCloser struct { + io.ReadCloser + bucket Blob +} + +func (r *blobReadCloser) Close() error { + return errors.Join(r.ReadCloser.Close(), r.bucket.Close()) +} + +// blobWriteCloser wraps an io.WriteCloser to also close the parent bucket on Close. +type blobWriteCloser struct { + io.WriteCloser + bucket Blob +} + +func (w *blobWriteCloser) Close() error { + return errors.Join(w.WriteCloser.Close(), w.bucket.Close()) +} {{- end }} // Open opens a database/sql.DB specified by the driver name and diff --git a/entc/gen/template/dialect/gremlin/decode.tmpl b/entc/gen/template/dialect/gremlin/decode.tmpl index f5cc43e5d..40ddc3035 100644 --- a/entc/gen/template/dialect/gremlin/decode.tmpl +++ b/entc/gen/template/dialect/gremlin/decode.tmpl @@ -19,6 +19,7 @@ func ({{ $receiver }} *{{ $.Name }}) FromResponse(res *gremlin.Response) error { var {{ $scan }} struct { ID {{ $.ID.Type }} `json:"id,omitempty"` {{ range $f := $.Fields }} + {{- if $f.IsBlob }}{{ continue }}{{ end }} {{- $f.StructField }} {{ if and $f.IsTime (not $f.HasGoType) }}int64{{ else }}{{ if $f.NillableValue }}*{{ end }}{{ $f.Type }}{{ end }} `json:"{{ $f.StorageKey }},omitempty"` {{ end }} } @@ -27,6 +28,7 @@ func ({{ $receiver }} *{{ $.Name }}) FromResponse(res *gremlin.Response) error { } {{ $receiver }}.ID = {{ $scan }}.ID {{- range $i, $f := $.Fields }} + {{- if $f.IsBlob }}{{ continue }}{{ end }} {{- if and $f.IsTime (not $f.HasGoType) }} {{- if $f.Nillable }} v{{ $i }} := time.Unix(0, {{ $scan }}.{{ $f.StructField }}) @@ -56,6 +58,7 @@ func ({{ $receiver }} *{{ $slice }}) FromResponse(res *gremlin.Response) error { var {{ $scan }} []struct { ID {{ $.ID.Type }} `json:"id,omitempty"` {{ range $f := $.Fields }} + {{- if $f.IsBlob }}{{ continue }}{{ end }} {{- $f.StructField }} {{ if and $f.IsTime (not $f.HasGoType) }}int64{{ else }}{{ if $f.NillableValue }}*{{ end }}{{ $f.Type }}{{ end }} `json:"{{ $f.StorageKey }},omitempty"` {{ end }} } @@ -65,6 +68,7 @@ func ({{ $receiver }} *{{ $slice }}) FromResponse(res *gremlin.Response) error { for _, v := range {{ $scan }} { node := &{{ $.Name }}{ID: v.ID} {{- range $i, $f := $.Fields }} + {{- if $f.IsBlob }}{{ continue }}{{ end }} {{- if and $f.IsTime (not $f.HasGoType) }} {{- if $f.Nillable }} v{{ $i }} := time.Unix(0, v.{{ $f.StructField }}) diff --git a/entc/gen/template/dialect/sql/create.tmpl b/entc/gen/template/dialect/sql/create.tmpl index 1d6f2a3a6..e8546a7bb 100644 --- a/entc/gen/template/dialect/sql/create.tmpl +++ b/entc/gen/template/dialect/sql/create.tmpl @@ -78,26 +78,66 @@ func ({{ $receiver }} *{{ $builder }}) sqlSave(ctx context.Context) (*{{ $.Name {{ $mutation }}.done = true {{- end }} {{- if $.HasBlobFields }} - {{- range $f := $.BlobFields }} + {{- $blobFields := $.BlobFields }} + {{- if gt (len $blobFields) 1 }} + type blobWritten struct { field string; key string } + var _blobWritten []blobWritten + _blobCleanup := func(ctx context.Context) error { + var errs []error + for _, bw := range _blobWritten { + b, err := {{ $mutation }}.blobOpeners.{{ $.Name }}(ctx, bw.field) + if err != nil { + errs = append(errs, err) + continue + } + errs = append(errs, b.Delete(ctx, bw.key), b.Close()) + } + return errors.Join(errs...) + } + {{- end }} + {{- range $i, $f := $blobFields }} if r, ok := {{ $mutation }}.{{ $f.StructField }}(); ok { b, err := {{ $mutation }}.blobOpeners.{{ $.Name }}(ctx, {{ $.Package }}.{{ $f.Constant }}) if err != nil { + {{- if gt (len $blobFields) 1 }} + return nil, errors.Join(fmt.Errorf("{{ $pkg }}: opening blob bucket for {{ $f.Name }}: %w", err), _blobCleanup(ctx)) + {{- else }} return nil, fmt.Errorf("{{ $pkg }}: opening blob bucket for {{ $f.Name }}: %w", err) + {{- end }} } key, err := _node.{{ $f.StructField }}Key(ctx) if err != nil { + {{- if gt (len $blobFields) 1 }} + return nil, errors.Join(fmt.Errorf("{{ $pkg }}: blob key for {{ $f.Name }}: %w", err), b.Close(), _blobCleanup(ctx)) + {{- else }} return nil, errors.Join(fmt.Errorf("{{ $pkg }}: blob key for {{ $f.Name }}: %w", err), b.Close()) + {{- end }} } w, err := b.NewWriter(ctx, key) if err != nil { + {{- if gt (len $blobFields) 1 }} + return nil, errors.Join(fmt.Errorf("{{ $pkg }}: creating writer for {{ $f.Name }}: %w", err), b.Close(), _blobCleanup(ctx)) + {{- else }} return nil, errors.Join(fmt.Errorf("{{ $pkg }}: creating writer for {{ $f.Name }}: %w", err), b.Close()) + {{- end }} } if _, err := io.Copy(w, r); err != nil { + {{- if gt (len $blobFields) 1 }} + return nil, errors.Join(fmt.Errorf("{{ $pkg }}: writing blob for {{ $f.Name }}: %w", err), w.Close(), b.Close(), _blobCleanup(ctx)) + {{- else }} return nil, errors.Join(fmt.Errorf("{{ $pkg }}: writing blob for {{ $f.Name }}: %w", err), w.Close(), b.Close()) + {{- end }} } if err := errors.Join(w.Close(), b.Close()); err != nil { + {{- if gt (len $blobFields) 1 }} + return nil, errors.Join(fmt.Errorf("{{ $pkg }}: closing blob for {{ $f.Name }}: %w", err), _blobCleanup(ctx)) + {{- else }} return nil, fmt.Errorf("{{ $pkg }}: closing blob for {{ $f.Name }}: %w", err) + {{- end }} } + {{- if gt (len $blobFields) 1 }} + _blobWritten = append(_blobWritten, blobWritten{field: {{ $.Package }}.{{ $f.Constant }}, key: key}) + {{- end }} } {{- end }} {{- end }} @@ -202,6 +242,7 @@ func ({{ $receiver }} *{{ $builder }}) Save(ctx context.Context) ([]*{{ $.Name } {{- if $.HasBlobFields }} {{- range $f := $.BlobFields }} var _blob{{ $f.StructField }} Blob + var _blobKeys{{ $f.StructField }} []string {{- end }} closeBlobs := func() error { var errs []error @@ -212,6 +253,18 @@ func ({{ $receiver }} *{{ $builder }}) Save(ctx context.Context) ([]*{{ $.Name } {{- end }} return errors.Join(errs...) } + cleanupBlobs := func(ctx context.Context) error { + var errs []error + {{- range $f := $.BlobFields }} + if _blob{{ $f.StructField }} != nil { + for _, key := range _blobKeys{{ $f.StructField }} { + errs = append(errs, _blob{{ $f.StructField }}.Delete(ctx, key)) + } + } + {{- end }} + errs = append(errs, closeBlobs()) + return errors.Join(errs...) + } {{- end }} for i := range {{ $receiver }}.builders { func(i int, root context.Context) { @@ -309,6 +362,7 @@ func ({{ $receiver }} *{{ $builder }}) Save(ctx context.Context) ([]*{{ $.Name } if err := w.Close(); err != nil { return nil, fmt.Errorf("{{ $pkg }}: closing writer for {{ $f.Name }}: %w", err) } + _blobKeys{{ $f.StructField }} = append(_blobKeys{{ $f.StructField }}, key) } {{- end }} return nodes[i], nil @@ -322,7 +376,7 @@ func ({{ $receiver }} *{{ $builder }}) Save(ctx context.Context) ([]*{{ $.Name } if len(mutators) > 0 { if _, err := mutators[0].Mutate(ctx, {{ $receiver }}.builders[0].mutation); err != nil { {{- if $.HasBlobFields }} - return nil, errors.Join(err, closeBlobs()) + return nil, errors.Join(err, cleanupBlobs(ctx)) {{- else }} return nil, err {{- end }} diff --git a/entc/gen/template/dialect/sql/update.tmpl b/entc/gen/template/dialect/sql/update.tmpl index 3f8e180f6..1e52c5a7b 100644 --- a/entc/gen/template/dialect/sql/update.tmpl +++ b/entc/gen/template/dialect/sql/update.tmpl @@ -188,26 +188,66 @@ func ({{ $receiver }} *{{ $builder }}) sqlSave(ctx context.Context) (_node {{ if } {{ $mutation }}.done = true {{- if and $one $.HasBlobFields }} - {{- range $f := $.BlobFields }} + {{- $blobFields := $.BlobFields }} + {{- if gt (len $blobFields) 1 }} + type blobWritten struct { field string; key string } + var _blobWritten []blobWritten + _blobCleanup := func(ctx context.Context) error { + var errs []error + for _, bw := range _blobWritten { + b, err := {{ $mutation }}.blobOpeners.{{ $.Name }}(ctx, bw.field) + if err != nil { + errs = append(errs, err) + continue + } + errs = append(errs, b.Delete(ctx, bw.key), b.Close()) + } + return errors.Join(errs...) + } + {{- end }} + {{- range $i, $f := $blobFields }} if r, ok := {{ $mutation }}.{{ $f.StructField }}(); ok { b, err := {{ $mutation }}.blobOpeners.{{ $.Name }}(ctx, {{ $.Package }}.{{ $f.Constant }}) if err != nil { + {{- if gt (len $blobFields) 1 }} + return nil, errors.Join(fmt.Errorf("{{ $pkg }}: opening blob bucket for {{ $f.Name }}: %w", err), _blobCleanup(ctx)) + {{- else }} return nil, fmt.Errorf("{{ $pkg }}: opening blob bucket for {{ $f.Name }}: %w", err) + {{- end }} } key, err := _node.{{ $f.StructField }}Key(ctx) if err != nil { + {{- if gt (len $blobFields) 1 }} + return nil, errors.Join(fmt.Errorf("{{ $pkg }}: blob key for {{ $f.Name }}: %w", err), b.Close(), _blobCleanup(ctx)) + {{- else }} return nil, errors.Join(fmt.Errorf("{{ $pkg }}: blob key for {{ $f.Name }}: %w", err), b.Close()) + {{- end }} } w, err := b.NewWriter(ctx, key) if err != nil { + {{- if gt (len $blobFields) 1 }} + return nil, errors.Join(fmt.Errorf("{{ $pkg }}: creating writer for {{ $f.Name }}: %w", err), b.Close(), _blobCleanup(ctx)) + {{- else }} return nil, errors.Join(fmt.Errorf("{{ $pkg }}: creating writer for {{ $f.Name }}: %w", err), b.Close()) + {{- end }} } if _, err := io.Copy(w, r); err != nil { + {{- if gt (len $blobFields) 1 }} + return nil, errors.Join(fmt.Errorf("{{ $pkg }}: writing blob for {{ $f.Name }}: %w", err), w.Close(), b.Close(), _blobCleanup(ctx)) + {{- else }} return nil, errors.Join(fmt.Errorf("{{ $pkg }}: writing blob for {{ $f.Name }}: %w", err), w.Close(), b.Close()) + {{- end }} } if err := errors.Join(w.Close(), b.Close()); err != nil { + {{- if gt (len $blobFields) 1 }} + return nil, errors.Join(fmt.Errorf("{{ $pkg }}: closing blob for {{ $f.Name }}: %w", err), _blobCleanup(ctx)) + {{- else }} return nil, fmt.Errorf("{{ $pkg }}: closing blob for {{ $f.Name }}: %w", err) + {{- end }} } + {{- if gt (len $blobFields) 1 }} + _blobWritten = append(_blobWritten, blobWritten{field: {{ $.Package }}.{{ $f.Constant }}, key: key}) + {{- end }} } {{- end }} {{- end }} diff --git a/entc/gen/template/ent.tmpl b/entc/gen/template/ent.tmpl index 4f2bc53fd..af6f93aa8 100644 --- a/entc/gen/template/ent.tmpl +++ b/entc/gen/template/ent.tmpl @@ -116,16 +116,20 @@ type {{ $edgesType }} struct { } switch r, err := b.NewReader(ctx, key); { case errors.Is(err, fs.ErrNotExist): - return nil, b.Close() + if err := b.Close(); err != nil { + return nil, fmt.Errorf("{{ $pkg }}: closing blob bucket for {{ $f.Name }}: %w", err) + } + return nil, nil case err != nil: return nil, errors.Join(fmt.Errorf("{{ $pkg }}: creating reader for {{ $f.Name }}: %w", err), b.Close()) default: - return r, nil + return &blobReadCloser{ReadCloser: r, bucket: b}, nil } } // {{ $f.StructField }}Writer opens a writer for the "{{ $f.Name }}" field in blob storage. - // The caller must close the returned writer when done. + // The caller must close the returned writer when done. Closing the writer + // also releases the underlying bucket resources. // Writing via this method does not go through the mutation pipeline. func ({{ $receiver }} *{{ $.Name }}) {{ $f.StructField }}Writer(ctx context.Context) (io.WriteCloser, error) { key, err := {{ $receiver }}.{{ $f.StructField }}Key(ctx) @@ -140,7 +144,7 @@ type {{ $edgesType }} struct { if err != nil { return nil, errors.Join(fmt.Errorf("{{ $pkg }}: creating writer for {{ $f.Name }}: %w", err), b.Close()) } - return w, nil + return &blobWriteCloser{WriteCloser: w, bucket: b}, nil } {{- with $tmpls := matchTemplate (printf "blob/key/%s/%s" $.Name $f.Name) (printf "blob/key/%s" $.Name) }} diff --git a/entc/gen/template/mutation.tmpl b/entc/gen/template/mutation.tmpl index 05f9da279..b0f612019 100644 --- a/entc/gen/template/mutation.tmpl +++ b/entc/gen/template/mutation.tmpl @@ -466,6 +466,7 @@ func (m *Mutation) ClearedFields() []string { {{- if $.HasOptional }} var fields []string {{- range $f := $.Fields }} + {{- if $f.IsBlob }}{{ continue }}{{ end }} {{- if $f.Optional }} if m.FieldCleared({{ $f.Constant }}) { fields = append(fields, {{ $f.Constant }}) @@ -491,6 +492,7 @@ func (m *Mutation) ClearField(name string) error { {{- if $.HasOptional }} switch name { {{- range $f := $.Fields }} + {{- if $f.IsBlob }}{{ continue }}{{ end }} {{- if $f.Optional }} case {{ $f.Constant }}: m.Clear{{ $f.StructField }}() diff --git a/entc/gen/type.go b/entc/gen/type.go index e5aa7064f..3c5ec4ab4 100644 --- a/entc/gen/type.go +++ b/entc/gen/type.go @@ -551,7 +551,7 @@ func (t Type) NumConstraint() int { func (t Type) MutableFields() []*Field { fields := make([]*Field, 0, len(t.Fields)) for _, f := range t.Fields { - if f.Immutable { + if f.Immutable || f.IsBlob() { continue } if e, err := f.Edge(); err == nil && e.Immutable { @@ -566,7 +566,7 @@ func (t Type) MutableFields() []*Field { func (t Type) ImmutableFields() []*Field { fields := make([]*Field, 0, len(t.Fields)) for _, f := range t.Fields { - if f.Immutable { + if f.Immutable && !f.IsBlob() { fields = append(fields, f) } } diff --git a/entc/integration/blob/blob.go b/entc/integration/blob/blob.go index 130fb8ee2..253a87c7d 100644 --- a/entc/integration/blob/blob.go +++ b/entc/integration/blob/blob.go @@ -70,6 +70,16 @@ func (b *GoBucket) NewWriter(ctx context.Context, key string) (io.WriteCloser, e return b.b.NewWriter(ctx, key, b.writerOpts) } +// Delete removes the blob stored at key. +// It returns nil if the key does not exist. +func (b *GoBucket) Delete(ctx context.Context, key string) error { + err := b.b.Delete(ctx, key) + if gcerrors.Code(err) == gcerrors.NotFound { + return nil + } + return err +} + // Close releases the underlying gocloud bucket resources. func (b *GoBucket) Close() error { return b.b.Close() diff --git a/entc/integration/ent/client.go b/entc/integration/ent/client.go index 25a75f7c7..5d64f7c84 100644 --- a/entc/integration/ent/client.go +++ b/entc/integration/ent/client.go @@ -184,11 +184,18 @@ func Driver(driver dialect.Driver) Option { // Blob defines the interface for blob storage operations. // Implementations should return [io/fs.ErrNotExist] (or an error wrapping it) // from NewReader when the requested key does not exist. +// +// Blob writes are not transactional with the database. If a bulk operation +// fails partway through, already-written blobs will be cleaned up on a +// best-effort basis using the Delete method. type Blob interface { // NewReader opens a reader for the given key. NewReader(ctx context.Context, key string) (io.ReadCloser, error) // NewWriter opens a writer for the given key. NewWriter(ctx context.Context, key string) (io.WriteCloser, error) + // Delete removes the blob stored at key. It should return nil + // if the key does not exist. + Delete(ctx context.Context, key string) error // Close releases any resources held by the bucket. Close() error } @@ -206,6 +213,26 @@ func WithBlobOpeners(openers BlobOpeners) Option { } } +// blobReadCloser wraps an io.ReadCloser to also close the parent bucket on Close. +type blobReadCloser struct { + io.ReadCloser + bucket Blob +} + +func (r *blobReadCloser) Close() error { + return errors.Join(r.ReadCloser.Close(), r.bucket.Close()) +} + +// blobWriteCloser wraps an io.WriteCloser to also close the parent bucket on Close. +type blobWriteCloser struct { + io.WriteCloser + bucket Blob +} + +func (w *blobWriteCloser) Close() error { + return errors.Join(w.WriteCloser.Close(), w.bucket.Close()) +} + // Open opens a database/sql.DB specified by the driver name and // the data source name, and returns a new client attached to it. // Optional parameters can be added for configuring the client. diff --git a/entc/integration/ent/document.go b/entc/integration/ent/document.go index 5e2f13525..d84aae436 100644 --- a/entc/integration/ent/document.go +++ b/entc/integration/ent/document.go @@ -94,16 +94,20 @@ func (_m *Document) Content(ctx context.Context) (io.ReadCloser, error) { } switch r, err := b.NewReader(ctx, key); { case errors.Is(err, fs.ErrNotExist): - return nil, b.Close() + if err := b.Close(); err != nil { + return nil, fmt.Errorf("ent: closing blob bucket for content: %w", err) + } + return nil, nil case err != nil: return nil, errors.Join(fmt.Errorf("ent: creating reader for content: %w", err), b.Close()) default: - return r, nil + return &blobReadCloser{ReadCloser: r, bucket: b}, nil } } // ContentWriter opens a writer for the "content" field in blob storage. -// The caller must close the returned writer when done. +// The caller must close the returned writer when done. Closing the writer +// also releases the underlying bucket resources. // Writing via this method does not go through the mutation pipeline. func (_m *Document) ContentWriter(ctx context.Context) (io.WriteCloser, error) { key, err := _m.ContentKey(ctx) @@ -118,7 +122,7 @@ func (_m *Document) ContentWriter(ctx context.Context) (io.WriteCloser, error) { if err != nil { return nil, errors.Join(fmt.Errorf("ent: creating writer for content: %w", err), b.Close()) } - return w, nil + return &blobWriteCloser{WriteCloser: w, bucket: b}, nil } // ContentKey returns the blob storage key for the "content" field. @@ -140,16 +144,20 @@ func (_m *Document) Thumbnail(ctx context.Context) (io.ReadCloser, error) { } switch r, err := b.NewReader(ctx, key); { case errors.Is(err, fs.ErrNotExist): - return nil, b.Close() + if err := b.Close(); err != nil { + return nil, fmt.Errorf("ent: closing blob bucket for thumbnail: %w", err) + } + return nil, nil case err != nil: return nil, errors.Join(fmt.Errorf("ent: creating reader for thumbnail: %w", err), b.Close()) default: - return r, nil + return &blobReadCloser{ReadCloser: r, bucket: b}, nil } } // ThumbnailWriter opens a writer for the "thumbnail" field in blob storage. -// The caller must close the returned writer when done. +// The caller must close the returned writer when done. Closing the writer +// also releases the underlying bucket resources. // Writing via this method does not go through the mutation pipeline. func (_m *Document) ThumbnailWriter(ctx context.Context) (io.WriteCloser, error) { key, err := _m.ThumbnailKey(ctx) @@ -164,7 +172,7 @@ func (_m *Document) ThumbnailWriter(ctx context.Context) (io.WriteCloser, error) if err != nil { return nil, errors.Join(fmt.Errorf("ent: creating writer for thumbnail: %w", err), b.Close()) } - return w, nil + return &blobWriteCloser{WriteCloser: w, bucket: b}, nil } // ThumbnailKey returns a hash-based blob storage key for the "thumbnail" field. diff --git a/entc/integration/ent/document_create.go b/entc/integration/ent/document_create.go index ae21aeea0..8e6ed8328 100644 --- a/entc/integration/ent/document_create.go +++ b/entc/integration/ent/document_create.go @@ -105,45 +105,64 @@ func (_c *DocumentCreate) sqlSave(ctx context.Context) (*Document, error) { _node.ID = int(id) _c.mutation.id = &_node.ID _c.mutation.done = true + type blobWritten struct { + field string + key string + } + var _blobWritten []blobWritten + _blobCleanup := func(ctx context.Context) error { + var errs []error + for _, bw := range _blobWritten { + b, err := _c.mutation.blobOpeners.Document(ctx, bw.field) + if err != nil { + errs = append(errs, err) + continue + } + errs = append(errs, b.Delete(ctx, bw.key), b.Close()) + } + return errors.Join(errs...) + } if r, ok := _c.mutation.Content(); ok { b, err := _c.mutation.blobOpeners.Document(ctx, document.FieldContent) if err != nil { - return nil, fmt.Errorf("ent: opening blob bucket for content: %w", err) + return nil, errors.Join(fmt.Errorf("ent: opening blob bucket for content: %w", err), _blobCleanup(ctx)) } key, err := _node.ContentKey(ctx) if err != nil { - return nil, errors.Join(fmt.Errorf("ent: blob key for content: %w", err), b.Close()) + return nil, errors.Join(fmt.Errorf("ent: blob key for content: %w", err), b.Close(), _blobCleanup(ctx)) } w, err := b.NewWriter(ctx, key) if err != nil { - return nil, errors.Join(fmt.Errorf("ent: creating writer for content: %w", err), b.Close()) + return nil, errors.Join(fmt.Errorf("ent: creating writer for content: %w", err), b.Close(), _blobCleanup(ctx)) } if _, err := io.Copy(w, r); err != nil { - return nil, errors.Join(fmt.Errorf("ent: writing blob for content: %w", err), w.Close(), b.Close()) + return nil, errors.Join(fmt.Errorf("ent: writing blob for content: %w", err), w.Close(), b.Close(), _blobCleanup(ctx)) } if err := errors.Join(w.Close(), b.Close()); err != nil { - return nil, fmt.Errorf("ent: closing blob for content: %w", err) + return nil, errors.Join(fmt.Errorf("ent: closing blob for content: %w", err), _blobCleanup(ctx)) } + _blobWritten = append(_blobWritten, blobWritten{field: document.FieldContent, key: key}) } if r, ok := _c.mutation.Thumbnail(); ok { b, err := _c.mutation.blobOpeners.Document(ctx, document.FieldThumbnail) if err != nil { - return nil, fmt.Errorf("ent: opening blob bucket for thumbnail: %w", err) + return nil, errors.Join(fmt.Errorf("ent: opening blob bucket for thumbnail: %w", err), _blobCleanup(ctx)) } key, err := _node.ThumbnailKey(ctx) if err != nil { - return nil, errors.Join(fmt.Errorf("ent: blob key for thumbnail: %w", err), b.Close()) + return nil, errors.Join(fmt.Errorf("ent: blob key for thumbnail: %w", err), b.Close(), _blobCleanup(ctx)) } w, err := b.NewWriter(ctx, key) if err != nil { - return nil, errors.Join(fmt.Errorf("ent: creating writer for thumbnail: %w", err), b.Close()) + return nil, errors.Join(fmt.Errorf("ent: creating writer for thumbnail: %w", err), b.Close(), _blobCleanup(ctx)) } if _, err := io.Copy(w, r); err != nil { - return nil, errors.Join(fmt.Errorf("ent: writing blob for thumbnail: %w", err), w.Close(), b.Close()) + return nil, errors.Join(fmt.Errorf("ent: writing blob for thumbnail: %w", err), w.Close(), b.Close(), _blobCleanup(ctx)) } if err := errors.Join(w.Close(), b.Close()); err != nil { - return nil, fmt.Errorf("ent: closing blob for thumbnail: %w", err) + return nil, errors.Join(fmt.Errorf("ent: closing blob for thumbnail: %w", err), _blobCleanup(ctx)) } + _blobWritten = append(_blobWritten, blobWritten{field: document.FieldThumbnail, key: key}) } return _node, nil } @@ -222,30 +241,6 @@ func (u *DocumentUpsert) UpdateName() *DocumentUpsert { return u } -// SetContent sets the "content" field. -func (u *DocumentUpsert) SetContent(v io.Reader) *DocumentUpsert { - u.Set(document.FieldContent, v) - return u -} - -// UpdateContent sets the "content" field to the value that was provided on create. -func (u *DocumentUpsert) UpdateContent() *DocumentUpsert { - u.SetExcluded(document.FieldContent) - return u -} - -// SetThumbnail sets the "thumbnail" field. -func (u *DocumentUpsert) SetThumbnail(v io.Reader) *DocumentUpsert { - u.Set(document.FieldThumbnail, v) - return u -} - -// UpdateThumbnail sets the "thumbnail" field to the value that was provided on create. -func (u *DocumentUpsert) UpdateThumbnail() *DocumentUpsert { - u.SetExcluded(document.FieldThumbnail) - return u -} - // UpdateNewValues updates the mutable fields using the new values that were set on create. // Using this option is equivalent to using: // @@ -300,34 +295,6 @@ func (u *DocumentUpsertOne) UpdateName() *DocumentUpsertOne { }) } -// SetContent sets the "content" field. -func (u *DocumentUpsertOne) SetContent(v io.Reader) *DocumentUpsertOne { - return u.Update(func(s *DocumentUpsert) { - s.SetContent(v) - }) -} - -// UpdateContent sets the "content" field to the value that was provided on create. -func (u *DocumentUpsertOne) UpdateContent() *DocumentUpsertOne { - return u.Update(func(s *DocumentUpsert) { - s.UpdateContent() - }) -} - -// SetThumbnail sets the "thumbnail" field. -func (u *DocumentUpsertOne) SetThumbnail(v io.Reader) *DocumentUpsertOne { - return u.Update(func(s *DocumentUpsert) { - s.SetThumbnail(v) - }) -} - -// UpdateThumbnail sets the "thumbnail" field to the value that was provided on create. -func (u *DocumentUpsertOne) UpdateThumbnail() *DocumentUpsertOne { - return u.Update(func(s *DocumentUpsert) { - s.UpdateThumbnail() - }) -} - // Exec executes the query. func (u *DocumentUpsertOne) Exec(ctx context.Context) error { if len(u.create.conflict) == 0 { @@ -378,7 +345,9 @@ func (_c *DocumentCreateBulk) Save(ctx context.Context) ([]*Document, error) { nodes := make([]*Document, len(_c.builders)) mutators := make([]Mutator, len(_c.builders)) var _blobContent Blob + var _blobKeysContent []string var _blobThumbnail Blob + var _blobKeysThumbnail []string closeBlobs := func() error { var errs []error if _blobContent != nil { @@ -389,6 +358,21 @@ func (_c *DocumentCreateBulk) Save(ctx context.Context) ([]*Document, error) { } return errors.Join(errs...) } + cleanupBlobs := func(ctx context.Context) error { + var errs []error + if _blobContent != nil { + for _, key := range _blobKeysContent { + errs = append(errs, _blobContent.Delete(ctx, key)) + } + } + if _blobThumbnail != nil { + for _, key := range _blobKeysThumbnail { + errs = append(errs, _blobThumbnail.Delete(ctx, key)) + } + } + errs = append(errs, closeBlobs()) + return errors.Join(errs...) + } for i := range _c.builders { func(i int, root context.Context) { builder := _c.builders[i] @@ -444,6 +428,7 @@ func (_c *DocumentCreateBulk) Save(ctx context.Context) ([]*Document, error) { if err := w.Close(); err != nil { return nil, fmt.Errorf("ent: closing writer for content: %w", err) } + _blobKeysContent = append(_blobKeysContent, key) } if r, ok := mutation.Thumbnail(); ok { if _blobThumbnail == nil { @@ -465,6 +450,7 @@ func (_c *DocumentCreateBulk) Save(ctx context.Context) ([]*Document, error) { if err := w.Close(); err != nil { return nil, fmt.Errorf("ent: closing writer for thumbnail: %w", err) } + _blobKeysThumbnail = append(_blobKeysThumbnail, key) } return nodes[i], nil }) @@ -476,7 +462,7 @@ func (_c *DocumentCreateBulk) Save(ctx context.Context) ([]*Document, error) { } if len(mutators) > 0 { if _, err := mutators[0].Mutate(ctx, _c.builders[0].mutation); err != nil { - return nil, errors.Join(err, closeBlobs()) + return nil, errors.Join(err, cleanupBlobs(ctx)) } } return nodes, closeBlobs() @@ -599,34 +585,6 @@ func (u *DocumentUpsertBulk) UpdateName() *DocumentUpsertBulk { }) } -// SetContent sets the "content" field. -func (u *DocumentUpsertBulk) SetContent(v io.Reader) *DocumentUpsertBulk { - return u.Update(func(s *DocumentUpsert) { - s.SetContent(v) - }) -} - -// UpdateContent sets the "content" field to the value that was provided on create. -func (u *DocumentUpsertBulk) UpdateContent() *DocumentUpsertBulk { - return u.Update(func(s *DocumentUpsert) { - s.UpdateContent() - }) -} - -// SetThumbnail sets the "thumbnail" field. -func (u *DocumentUpsertBulk) SetThumbnail(v io.Reader) *DocumentUpsertBulk { - return u.Update(func(s *DocumentUpsert) { - s.SetThumbnail(v) - }) -} - -// UpdateThumbnail sets the "thumbnail" field to the value that was provided on create. -func (u *DocumentUpsertBulk) UpdateThumbnail() *DocumentUpsertBulk { - return u.Update(func(s *DocumentUpsert) { - s.UpdateThumbnail() - }) -} - // Exec executes the query. func (u *DocumentUpsertBulk) Exec(ctx context.Context) error { if u.create.err != nil { diff --git a/entc/integration/ent/document_update.go b/entc/integration/ent/document_update.go index 18122c31d..082ae048d 100644 --- a/entc/integration/ent/document_update.go +++ b/entc/integration/ent/document_update.go @@ -250,45 +250,64 @@ func (_u *DocumentUpdateOne) sqlSave(ctx context.Context) (_node *Document, err return nil, err } _u.mutation.done = true + type blobWritten struct { + field string + key string + } + var _blobWritten []blobWritten + _blobCleanup := func(ctx context.Context) error { + var errs []error + for _, bw := range _blobWritten { + b, err := _u.mutation.blobOpeners.Document(ctx, bw.field) + if err != nil { + errs = append(errs, err) + continue + } + errs = append(errs, b.Delete(ctx, bw.key), b.Close()) + } + return errors.Join(errs...) + } if r, ok := _u.mutation.Content(); ok { b, err := _u.mutation.blobOpeners.Document(ctx, document.FieldContent) if err != nil { - return nil, fmt.Errorf("ent: opening blob bucket for content: %w", err) + return nil, errors.Join(fmt.Errorf("ent: opening blob bucket for content: %w", err), _blobCleanup(ctx)) } key, err := _node.ContentKey(ctx) if err != nil { - return nil, errors.Join(fmt.Errorf("ent: blob key for content: %w", err), b.Close()) + return nil, errors.Join(fmt.Errorf("ent: blob key for content: %w", err), b.Close(), _blobCleanup(ctx)) } w, err := b.NewWriter(ctx, key) if err != nil { - return nil, errors.Join(fmt.Errorf("ent: creating writer for content: %w", err), b.Close()) + return nil, errors.Join(fmt.Errorf("ent: creating writer for content: %w", err), b.Close(), _blobCleanup(ctx)) } if _, err := io.Copy(w, r); err != nil { - return nil, errors.Join(fmt.Errorf("ent: writing blob for content: %w", err), w.Close(), b.Close()) + return nil, errors.Join(fmt.Errorf("ent: writing blob for content: %w", err), w.Close(), b.Close(), _blobCleanup(ctx)) } if err := errors.Join(w.Close(), b.Close()); err != nil { - return nil, fmt.Errorf("ent: closing blob for content: %w", err) + return nil, errors.Join(fmt.Errorf("ent: closing blob for content: %w", err), _blobCleanup(ctx)) } + _blobWritten = append(_blobWritten, blobWritten{field: document.FieldContent, key: key}) } if r, ok := _u.mutation.Thumbnail(); ok { b, err := _u.mutation.blobOpeners.Document(ctx, document.FieldThumbnail) if err != nil { - return nil, fmt.Errorf("ent: opening blob bucket for thumbnail: %w", err) + return nil, errors.Join(fmt.Errorf("ent: opening blob bucket for thumbnail: %w", err), _blobCleanup(ctx)) } key, err := _node.ThumbnailKey(ctx) if err != nil { - return nil, errors.Join(fmt.Errorf("ent: blob key for thumbnail: %w", err), b.Close()) + return nil, errors.Join(fmt.Errorf("ent: blob key for thumbnail: %w", err), b.Close(), _blobCleanup(ctx)) } w, err := b.NewWriter(ctx, key) if err != nil { - return nil, errors.Join(fmt.Errorf("ent: creating writer for thumbnail: %w", err), b.Close()) + return nil, errors.Join(fmt.Errorf("ent: creating writer for thumbnail: %w", err), b.Close(), _blobCleanup(ctx)) } if _, err := io.Copy(w, r); err != nil { - return nil, errors.Join(fmt.Errorf("ent: writing blob for thumbnail: %w", err), w.Close(), b.Close()) + return nil, errors.Join(fmt.Errorf("ent: writing blob for thumbnail: %w", err), w.Close(), b.Close(), _blobCleanup(ctx)) } if err := errors.Join(w.Close(), b.Close()); err != nil { - return nil, fmt.Errorf("ent: closing blob for thumbnail: %w", err) + return nil, errors.Join(fmt.Errorf("ent: closing blob for thumbnail: %w", err), _blobCleanup(ctx)) } + _blobWritten = append(_blobWritten, blobWritten{field: document.FieldThumbnail, key: key}) } return _node, nil } diff --git a/entc/integration/gremlin/ent/client.go b/entc/integration/gremlin/ent/client.go index 72dd36787..846da8259 100644 --- a/entc/integration/gremlin/ent/client.go +++ b/entc/integration/gremlin/ent/client.go @@ -179,11 +179,18 @@ func Driver(driver dialect.Driver) Option { // Blob defines the interface for blob storage operations. // Implementations should return [io/fs.ErrNotExist] (or an error wrapping it) // from NewReader when the requested key does not exist. +// +// Blob writes are not transactional with the database. If a bulk operation +// fails partway through, already-written blobs will be cleaned up on a +// best-effort basis using the Delete method. type Blob interface { // NewReader opens a reader for the given key. NewReader(ctx context.Context, key string) (io.ReadCloser, error) // NewWriter opens a writer for the given key. NewWriter(ctx context.Context, key string) (io.WriteCloser, error) + // Delete removes the blob stored at key. It should return nil + // if the key does not exist. + Delete(ctx context.Context, key string) error // Close releases any resources held by the bucket. Close() error } @@ -201,6 +208,26 @@ func WithBlobOpeners(openers BlobOpeners) Option { } } +// blobReadCloser wraps an io.ReadCloser to also close the parent bucket on Close. +type blobReadCloser struct { + io.ReadCloser + bucket Blob +} + +func (r *blobReadCloser) Close() error { + return errors.Join(r.ReadCloser.Close(), r.bucket.Close()) +} + +// blobWriteCloser wraps an io.WriteCloser to also close the parent bucket on Close. +type blobWriteCloser struct { + io.WriteCloser + bucket Blob +} + +func (w *blobWriteCloser) Close() error { + return errors.Join(w.WriteCloser.Close(), w.bucket.Close()) +} + // Open opens a database/sql.DB specified by the driver name and // the data source name, and returns a new client attached to it. // Optional parameters can be added for configuring the client. diff --git a/entc/integration/gremlin/ent/document.go b/entc/integration/gremlin/ent/document.go index 3b90b9b29..3cc273b47 100644 --- a/entc/integration/gremlin/ent/document.go +++ b/entc/integration/gremlin/ent/document.go @@ -36,18 +36,14 @@ func (_m *Document) FromResponse(res *gremlin.Response) error { return err } var scan_m struct { - ID string `json:"id,omitempty"` - Name string `json:"name,omitempty"` - Content io.Reader `json:"content,omitempty"` - Thumbnail io.Reader `json:"thumbnail,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` } if err := vmap.Decode(&scan_m); err != nil { return err } _m.ID = scan_m.ID _m.Name = scan_m.Name - _m.Content = scan_m.Content - _m.Thumbnail = scan_m.Thumbnail return nil } @@ -65,16 +61,20 @@ func (_m *Document) Content(ctx context.Context) (io.ReadCloser, error) { } switch r, err := b.NewReader(ctx, key); { case errors.Is(err, fs.ErrNotExist): - return nil, b.Close() + if err := b.Close(); err != nil { + return nil, fmt.Errorf("ent: closing blob bucket for content: %w", err) + } + return nil, nil case err != nil: return nil, errors.Join(fmt.Errorf("ent: creating reader for content: %w", err), b.Close()) default: - return r, nil + return &blobReadCloser{ReadCloser: r, bucket: b}, nil } } // ContentWriter opens a writer for the "content" field in blob storage. -// The caller must close the returned writer when done. +// The caller must close the returned writer when done. Closing the writer +// also releases the underlying bucket resources. // Writing via this method does not go through the mutation pipeline. func (_m *Document) ContentWriter(ctx context.Context) (io.WriteCloser, error) { key, err := _m.ContentKey(ctx) @@ -89,7 +89,7 @@ func (_m *Document) ContentWriter(ctx context.Context) (io.WriteCloser, error) { if err != nil { return nil, errors.Join(fmt.Errorf("ent: creating writer for content: %w", err), b.Close()) } - return w, nil + return &blobWriteCloser{WriteCloser: w, bucket: b}, nil } // ContentKey returns the blob storage key for the "content" field. @@ -111,16 +111,20 @@ func (_m *Document) Thumbnail(ctx context.Context) (io.ReadCloser, error) { } switch r, err := b.NewReader(ctx, key); { case errors.Is(err, fs.ErrNotExist): - return nil, b.Close() + if err := b.Close(); err != nil { + return nil, fmt.Errorf("ent: closing blob bucket for thumbnail: %w", err) + } + return nil, nil case err != nil: return nil, errors.Join(fmt.Errorf("ent: creating reader for thumbnail: %w", err), b.Close()) default: - return r, nil + return &blobReadCloser{ReadCloser: r, bucket: b}, nil } } // ThumbnailWriter opens a writer for the "thumbnail" field in blob storage. -// The caller must close the returned writer when done. +// The caller must close the returned writer when done. Closing the writer +// also releases the underlying bucket resources. // Writing via this method does not go through the mutation pipeline. func (_m *Document) ThumbnailWriter(ctx context.Context) (io.WriteCloser, error) { key, err := _m.ThumbnailKey(ctx) @@ -135,7 +139,7 @@ func (_m *Document) ThumbnailWriter(ctx context.Context) (io.WriteCloser, error) if err != nil { return nil, errors.Join(fmt.Errorf("ent: creating writer for thumbnail: %w", err), b.Close()) } - return w, nil + return &blobWriteCloser{WriteCloser: w, bucket: b}, nil } // ThumbnailKey returns a hash-based blob storage key for the "thumbnail" field. @@ -183,10 +187,8 @@ func (_m *Documents) FromResponse(res *gremlin.Response) error { return err } var scan_m []struct { - ID string `json:"id,omitempty"` - Name string `json:"name,omitempty"` - Content io.Reader `json:"content,omitempty"` - Thumbnail io.Reader `json:"thumbnail,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` } if err := vmap.Decode(&scan_m); err != nil { return err @@ -194,8 +196,6 @@ func (_m *Documents) FromResponse(res *gremlin.Response) error { for _, v := range scan_m { node := &Document{ID: v.ID} node.Name = v.Name - node.Content = v.Content - node.Thumbnail = v.Thumbnail *_m = append(*_m, node) } return nil diff --git a/schema/field/type.go b/schema/field/type.go index cdac06276..b10a4ac5f 100644 --- a/schema/field/type.go +++ b/schema/field/type.go @@ -167,7 +167,7 @@ var ( TypeEnum: "string", TypeString: "string", TypeOther: "other", - TypeBlob: "io.Reader", + TypeBlob: "blob", TypeInt: "int", TypeInt8: "int8", TypeInt16: "int16",