Files
ent/blob.go
2026-05-21 14:10:53 +00:00

355 lines
9.8 KiB
Go

// Copyright 2019-present Facebook Inc. All rights reserved.
// This source code is licensed under the Apache 2.0 license found
// in the LICENSE file in the root directory of this source tree.
package ent
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
)
// Blob defines the interface for blob storage operations.
// Implementations should return [io/fs.ErrNotExist] (or an error wrapping it)
// from NewReader when the requested key does not exist.
//
// Single-row SQL create builders write blob data to external storage before
// inserting the database row. If the row insertion fails (for example, due to
// a constraint violation), generated code attempts to delete the just-written blobs.
type Blob interface {
// NewReader opens a reader for the given key.
NewReader(ctx context.Context, key string) (io.ReadCloser, error)
// NewWriter opens a writer for the given key.
NewWriter(ctx context.Context, key string) (io.WriteCloser, error)
// Delete removes the blob at the given key.
// Implementations should return nil (not an error) if the key does not exist.
Delete(ctx context.Context, key string) error
// Close releases any resources held by the bucket.
Close() error
}
// BlobOpener is a function that opens a [Blob] bucket for the given field name.
type BlobOpener func(context.Context, string) (Blob, error)
// BlobKey identifies a blob in storage by field name and key.
type BlobKey struct {
Field string
Key string
}
// BlobQuerier queries existing blob keys from the database.
// [Blobs.Update] passes the mutated field names; [Blobs.Delete] passes nil
// to indicate all fields should be queried.
type BlobQuerier interface {
QueryBlobKeys(ctx context.Context, fields []string) ([]BlobKey, error)
}
// BlobUpdateResult holds post-update blob operations.
type BlobUpdateResult struct {
Rollback BlobOp // Deletes newly-written blobs. Call on SQL failure.
Commit BlobOp // Deletes old replaced blobs. Call after successful SQL commit.
}
// BlobOp is a deferred blob storage operation (e.g. rollback or commit).
type BlobOp func(context.Context) error
// BlobKeyFunc generates a storage key for a blob from its content.
type BlobKeyFunc func(context.Context, []byte) (string, error)
// Blobs orchestrates blob storage operations for a single mutation.
// Use [NewBlobs] to create, then call [Blobs.Set] or [Blobs.SetCleared]
// for each blob field, then [Blobs.Create] or [Blobs.Update].
type Blobs struct {
opener BlobOpener
inputs []blobInput
}
type blobInput struct {
field string
data []byte
newKey BlobKeyFunc
apply func(string)
cleared bool
clear func()
}
// NewBlobs creates a blob orchestrator for the given opener.
func NewBlobs(opener BlobOpener) *Blobs {
return &Blobs{opener: opener}
}
// Set adds a blob field to be written. The apply callback is called with
// the generated key to set it on the SQL spec and node.
func (b *Blobs) Set(f string, data []byte, key BlobKeyFunc, apply func(string)) {
b.inputs = append(b.inputs, blobInput{field: f, data: data, newKey: key, apply: apply})
}
// SetCleared marks a blob field as cleared. The clear callback should
// remove the key column from the SQL spec.
func (b *Blobs) SetCleared(f string, clear func()) {
b.inputs = append(b.inputs, blobInput{field: f, cleared: true, clear: clear})
}
// Create prepares inputs, writes blobs, and returns a rollback [BlobOp].
func (b *Blobs) Create(ctx context.Context) (BlobOp, error) {
writes, err := b.prepare(ctx)
if err != nil {
return nil, err
}
return b.write(ctx, writes)
}
// Update prepares inputs, queries old keys, writes new blobs, and returns
// a [BlobUpdateResult] for post-SQL handling.
func (b *Blobs) Update(ctx context.Context, q BlobQuerier) (*BlobUpdateResult, error) {
if len(b.inputs) == 0 {
return noopBlobResult, nil
}
writes, err := b.prepare(ctx)
if err != nil {
return nil, err
}
mutated := make([]string, len(b.inputs))
cleared := make(map[string]bool)
for i := range b.inputs {
mutated[i] = b.inputs[i].field
if b.inputs[i].cleared {
cleared[b.inputs[i].field] = true
}
}
keys, err := q.QueryBlobKeys(ctx, mutated)
if err != nil {
return nil, fmt.Errorf("querying old blob keys: %w", err)
}
// Build a set of old keys per field to detect unchanged blobs.
oldKeys := make(map[string]string, len(keys))
for _, k := range keys {
oldKeys[k.Field] = k.Key
}
// Filter out writes where the key is unchanged (same content).
filtered := writes[:0]
for _, wr := range writes {
if oldKeys[wr.Field] == wr.Key {
continue
}
filtered = append(filtered, wr)
}
rollback, err := b.write(ctx, filtered)
if err != nil {
return nil, err
}
// Collect orphaned blobs: old keys for fields that changed or were cleared.
var orphaned []BlobKey
for _, k := range keys {
if cleared[k.Field] {
orphaned = append(orphaned, k)
continue
}
for _, wr := range writes {
if wr.Field == k.Field && wr.Key != k.Key {
orphaned = append(orphaned, k)
break
}
}
}
return &BlobUpdateResult{
Rollback: rollback,
Commit: b.deleteOp(orphaned),
}, nil
}
// Delete queries existing blob keys and returns a [BlobOp] that removes
// them from storage. Use for delete mutations.
func (b *Blobs) Delete(ctx context.Context, q BlobQuerier) (BlobOp, error) {
keys, err := q.QueryBlobKeys(ctx, nil)
if err != nil {
return nil, err
}
return b.deleteOp(keys), nil
}
type blobWrite struct {
BlobKey
data []byte
}
func (b *Blobs) prepare(ctx context.Context) ([]blobWrite, error) {
var writes []blobWrite
for _, inp := range b.inputs {
if inp.cleared {
inp.clear()
continue
}
k, err := inp.newKey(ctx, inp.data)
if err != nil {
return nil, fmt.Errorf("generating blob key for %s: %w", inp.field, err)
}
if inp.apply != nil {
inp.apply(k)
}
writes = append(writes, blobWrite{
BlobKey: BlobKey{Field: inp.field, Key: k},
data: inp.data,
})
}
return writes, nil
}
func (b *Blobs) write(ctx context.Context, writes []blobWrite) (BlobOp, error) {
if len(writes) == 0 {
return noOp, nil
}
w := NewBlobStore(b.opener)
var written []BlobKey
for _, wr := range writes {
if err := w.write(ctx, wr.Field, wr.Key, wr.data); err != nil {
var errs []error
errs = append(errs, fmt.Errorf("writing blob for %s: %w", wr.Field, err))
for _, k := range written {
if derr := w.delete(ctx, k.Field, k.Key); derr != nil {
errs = append(errs, derr)
}
}
errs = append(errs, w.Close())
return nil, errors.Join(errs...)
}
written = append(written, wr.BlobKey)
}
if err := w.Close(); err != nil {
return nil, err
}
return b.deleteOp(written), nil
}
func (b *Blobs) deleteOp(keys []BlobKey) BlobOp {
if len(keys) == 0 {
return noOp
}
return func(ctx context.Context) error {
s := NewBlobStore(b.opener)
var errs []error
for _, k := range keys {
if err := s.delete(ctx, k.Field, k.Key); err != nil {
errs = append(errs, err)
}
}
if err := s.Close(); err != nil {
errs = append(errs, err)
}
return errors.Join(errs...)
}
}
var (
noOp = func(context.Context) error { return nil }
noopBlobResult = &BlobUpdateResult{Rollback: noOp, Commit: noOp}
)
// BlobReader returns a reader for the given key from the blob bucket.
// The returned reader closes both the underlying reader and the bucket.
// Returns nil, nil if the blob does not exist (fs.ErrNotExist).
func BlobReader(ctx context.Context, b Blob, key string) (io.ReadCloser, error) {
switch r, err := b.NewReader(ctx, key); {
case errors.Is(err, fs.ErrNotExist):
return nil, b.Close()
case err != nil:
return nil, errors.Join(err, b.Close())
default:
return &blobReadCloser{ReadCloser: r, bucket: b}, nil
}
}
type blobReadCloser struct {
io.ReadCloser
bucket Blob
}
func (r *blobReadCloser) Close() error {
return errors.Join(r.ReadCloser.Close(), r.bucket.Close())
}
// BlobStore manages blob bucket lifecycles for read, write, and delete operations.
// It lazily opens buckets per field and reuses them for subsequent operations.
type BlobStore struct {
opener BlobOpener
buckets map[string]Blob
}
// NewBlobStore creates a store that uses opener to lazily open buckets.
func NewBlobStore(opener BlobOpener) *BlobStore {
return &BlobStore{buckets: make(map[string]Blob), opener: opener}
}
// Close closes all open buckets.
func (s *BlobStore) Close() error {
var errs []error
for _, b := range s.buckets {
errs = append(errs, b.Close())
}
return errors.Join(errs...)
}
// write writes data to the blob at key for the given field.
func (s *BlobStore) write(ctx context.Context, field, key string, data []byte) error {
b, err := s.bucket(ctx, field)
if err != nil {
return err
}
wr, err := b.NewWriter(ctx, key)
if err != nil {
return err
}
if _, err := wr.Write(data); err != nil {
return errors.Join(err, wr.Close(), b.Delete(ctx, key))
}
return wr.Close()
}
// delete removes the blob at key for the given field.
func (s *BlobStore) delete(ctx context.Context, field, key string) error {
b, err := s.bucket(ctx, field)
if err != nil {
return err
}
return b.Delete(ctx, key)
}
// Read reads the blob at key for the given field.
// Returns nil, nil if the blob does not exist (fs.ErrNotExist).
func (s *BlobStore) Read(ctx context.Context, field, key string) ([]byte, error) {
b, err := s.bucket(ctx, field)
if err != nil {
return nil, err
}
rc, err := b.NewReader(ctx, key)
if errors.Is(err, fs.ErrNotExist) {
return nil, nil
}
if err != nil {
return nil, err
}
data, err := io.ReadAll(rc)
if closeErr := rc.Close(); closeErr != nil && err == nil {
err = closeErr
}
return data, err
}
func (s *BlobStore) bucket(ctx context.Context, field string) (Blob, error) {
if b, ok := s.buckets[field]; ok {
return b, nil
}
if s.opener == nil {
return nil, errors.New("ent: blob storage not configured (missing WithBlobOpeners)")
}
b, err := s.opener(ctx, field)
if err != nil {
return nil, err
}
s.buckets[field] = b
return b, nil
}