diff --git a/internal/api/v1/docker.go b/internal/api/v1/docker.go index 13090c3..a3bb0cb 100644 --- a/internal/api/v1/docker.go +++ b/internal/api/v1/docker.go @@ -1,6 +1,8 @@ package v1 import ( + "bytes" + "context" "crypto/sha256" "encoding/hex" "errors" @@ -11,7 +13,6 @@ import ( "os" "sort" "strings" - "sync" "github.com/go-chi/chi/v5" "github.com/google/uuid" @@ -36,52 +37,54 @@ import ( const dockerAPIVersionHeader = "registry/2.0" -// uploadSession is an in-progress chunked blob upload, buffered to a temp file -// on disk. Sessions are held in-memory keyed by upload UUID, so a single push's -// PATCH/PUT chunks must be served by the same replica — true for the -// homelab single-instance deployment. Monolithic uploads avoid this entirely. -type uploadSession struct { - file *os.File - size int64 -} +// Chunked blob uploads are staged in object storage under uploads/ rather +// than in process memory, so the POST / PATCH / PUT of a single push can each be +// served by a different replica (the API runs with minReplicas>1 and no session +// affinity). The upload UUID travels in the Location URL handed back to the +// client, so any replica reconstructs the staging key with no shared in-process +// state. Abandoned stages are dropped by the GC's uploads sweep. +func uploadKey(id string) string { return "uploads/" + id } -type uploadStore struct { - mu sync.Mutex - sessions map[string]*uploadSession -} +var errUploadUnknown = errors.New("unknown upload") -func newUploadStore() *uploadStore { - return &uploadStore{sessions: make(map[string]*uploadSession)} -} - -func (s *uploadStore) create() (string, *uploadSession, error) { - f, err := os.CreateTemp("", "docker-upload-*") +// appendUpload appends a chunk to the staged upload object and returns the new +// total size. The staged bytes live entirely in object storage (download, +// append to a per-request temp file, re-upload), which keeps the session state +// replica-independent. Docker sends the whole layer in one PATCH, so this is a +// single append in the common case. +func (h *ProxyHandler) appendUpload(ctx context.Context, id string, chunk io.Reader) (int64, error) { + key := uploadKey(id) + reader, info, err := h.store.Download(ctx, key) if err != nil { - return "", nil, err + return 0, errUploadUnknown } - id := uuid.NewString() - sess := &uploadSession{file: f} - s.mu.Lock() - s.sessions[id] = sess - s.mu.Unlock() - return id, sess, nil -} -func (s *uploadStore) get(id string) *uploadSession { - s.mu.Lock() - defer s.mu.Unlock() - return s.sessions[id] -} - -func (s *uploadStore) remove(id string) { - s.mu.Lock() - sess := s.sessions[id] - delete(s.sessions, id) - s.mu.Unlock() - if sess != nil { - sess.file.Close() - os.Remove(sess.file.Name()) + tmp, err := os.CreateTemp("", "docker-upload-*") + if err != nil { + reader.Close() + return 0, err } + defer os.Remove(tmp.Name()) + defer tmp.Close() + + if _, err := io.Copy(tmp, reader); err != nil { + reader.Close() + return 0, err + } + reader.Close() + + n, err := io.Copy(tmp, chunk) + if err != nil { + return 0, err + } + size := info.Size + n + if _, err := tmp.Seek(0, io.SeekStart); err != nil { + return 0, err + } + if err := h.store.Upload(ctx, key, tmp, size, "application/octet-stream"); err != nil { + return 0, err + } + return size, nil } // dockerReq is a parsed /v2///... request. kind is one of @@ -205,7 +208,18 @@ func (h *ProxyHandler) dockerDelete(w http.ResponseWriter, r *http.Request) { return } req, ok := parseDockerPath(chi.URLParam(r, "*")) - if !ok || (req.kind != "manifest" && req.kind != "blob") { + if !ok { + dockerError(w, http.StatusNotFound, "NAME_UNKNOWN", "unrecognised registry path") + return + } + // Cancel an in-progress upload: drop its staging object. + if req.kind == "upload" && req.ref != "" { + _ = h.store.Delete(r.Context(), uploadKey(req.ref)) + w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader) + w.WriteHeader(http.StatusNoContent) + return + } + if req.kind != "manifest" && req.kind != "blob" { dockerError(w, http.StatusNotFound, "NAME_UNKNOWN", "unrecognised registry path") return } @@ -333,8 +347,9 @@ func (h *ProxyHandler) dockerStartUpload(w http.ResponseWriter, r *http.Request, return } - id, _, err := h.uploads.create() - if err != nil { + // Stage an empty object keyed by the upload UUID; PATCH/PUT append to it. + id := uuid.NewString() + if err := h.store.Upload(r.Context(), uploadKey(id), bytes.NewReader(nil), 0, "application/octet-stream"); err != nil { dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) return } @@ -352,21 +367,19 @@ func (h *ProxyHandler) dockerPatchUpload(w http.ResponseWriter, r *http.Request, dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload") return } - sess := h.uploads.get(req.ref) - if sess == nil { - dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload") - return - } - n, err := io.Copy(sess.file, r.Body) + size, err := h.appendUpload(r.Context(), req.ref, r.Body) if err != nil { + if errors.Is(err, errUploadUnknown) { + dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload") + return + } dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) return } - sess.size += n loc := fmt.Sprintf("/v2/%s/%s/blobs/uploads/%s", remote.Name, req.image, req.ref) w.Header().Set("Location", loc) w.Header().Set("Docker-Upload-UUID", req.ref) - w.Header().Set("Range", fmt.Sprintf("0-%d", sess.size-1)) + w.Header().Set("Range", fmt.Sprintf("0-%d", size-1)) w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader) w.WriteHeader(http.StatusAccepted) } @@ -384,22 +397,22 @@ func (h *ProxyHandler) dockerFinishUpload(w http.ResponseWriter, r *http.Request h.dockerCommitBlob(w, r, remote, req.image, digest, r.Body) return } - sess := h.uploads.get(req.ref) - if sess == nil { + + key := uploadKey(req.ref) + reader, _, err := h.store.Download(r.Context(), key) + if err != nil { dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload") return } - defer h.uploads.remove(req.ref) + defer reader.Close() + // Drop the staging object once we're done, regardless of outcome; a fresh + // context so cleanup still runs if the client disconnects. + defer h.store.Delete(context.Background(), key) - if _, err := io.Copy(sess.file, r.Body); err != nil { - dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) - return - } - if _, err := sess.file.Seek(0, io.SeekStart); err != nil { - dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) - return - } - h.dockerCommitBlob(w, r, remote, req.image, digest, sess.file) + // Stream the staged bytes plus any trailing PUT body through the CAS in one + // pass — no extra round trip to re-assemble. + combined := io.MultiReader(reader, r.Body) + h.dockerCommitBlob(w, r, remote, req.image, digest, combined) } // dockerCommitBlob stores blob bytes through the CAS, verifies the client's diff --git a/internal/api/v1/proxy.go b/internal/api/v1/proxy.go index 9f24ea0..377b784 100644 --- a/internal/api/v1/proxy.go +++ b/internal/api/v1/proxy.go @@ -24,7 +24,6 @@ type ProxyHandler struct { store *storage.S3 local *v2.LocalHandler cas *storage.CAS - uploads *uploadStore } func NewProxyHandler(engine *proxy.Engine, virtualEngine *virtual.Engine, db *database.DB, store *storage.S3, local *v2.LocalHandler) *ProxyHandler { @@ -35,7 +34,6 @@ func NewProxyHandler(engine *proxy.Engine, virtualEngine *virtual.Engine, db *da store: store, local: local, cas: storage.NewCAS(store), - uploads: newUploadStore(), } } diff --git a/internal/gc/gc.go b/internal/gc/gc.go index fbf0e36..38aa3f1 100644 --- a/internal/gc/gc.go +++ b/internal/gc/gc.go @@ -14,6 +14,11 @@ import ( // before the referencing artifact/local_files row exists. const blobGracePeriod = 1 * time.Hour +// uploadGracePeriod is how long a docker blob-upload staging object +// (uploads/) may sit idle before GC treats it as an abandoned push and +// reaps it. Generous so a slow but live push is never cut off mid-flight. +const uploadGracePeriod = 24 * time.Hour + type Collector struct { db *database.DB store *storage.S3 @@ -43,6 +48,8 @@ func (c *Collector) Run(ctx context.Context) { func (c *Collector) sweep(ctx context.Context) { start := time.Now() + c.sweepUploads(ctx) + orphaned, err := c.db.FindOrphanedBlobs(ctx, blobGracePeriod) if err != nil { slog.Error("gc: find orphaned blobs", "error", err) @@ -70,3 +77,24 @@ func (c *Collector) sweep(ctx context.Context) { ) } } + +// sweepUploads reaps docker blob-upload staging objects abandoned longer than +// uploadGracePeriod (cancelled or interrupted pushes that never finalised). +func (c *Collector) sweepUploads(ctx context.Context) { + stale, err := c.store.ListStaleObjects(ctx, "uploads/", time.Now().Add(-uploadGracePeriod)) + if err != nil { + slog.Error("gc: list stale uploads", "error", err) + return + } + reaped := 0 + for _, key := range stale { + if err := c.store.Delete(ctx, key); err != nil { + slog.Warn("gc: delete stale upload", "key", key, "error", err) + continue + } + reaped++ + } + if reaped > 0 { + slog.Info("gc: reaped stale docker uploads", "count", reaped) + } +} diff --git a/internal/storage/s3.go b/internal/storage/s3.go index 72276a2..97e7da2 100644 --- a/internal/storage/s3.go +++ b/internal/storage/s3.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "log/slog" + "time" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" @@ -97,3 +98,18 @@ func (s *S3) Stat(ctx context.Context, key string) (*minio.ObjectInfo, error) { } return &info, nil } + +// ListStaleObjects returns keys under prefix last modified before cutoff. Used +// by the GC to reap abandoned staging objects (e.g. cancelled docker pushes). +func (s *S3) ListStaleObjects(ctx context.Context, prefix string, cutoff time.Time) ([]string, error) { + var keys []string + for obj := range s.client.ListObjects(ctx, s.bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) { + if obj.Err != nil { + return nil, obj.Err + } + if obj.LastModified.Before(cutoff) { + keys = append(keys, obj.Key) + } + } + return keys, nil +}