fix: make local docker uploads replica-independent (#104)
ci/woodpecker/tag/docker Pipeline was successful

## Why

Chunked blob uploads kept the in-progress session in **process memory** keyed by upload UUID, so the `POST`/`PATCH`/`PUT` of a single `docker push` had to land on the same replica. The API runs at `minReplicas: 2` with no session affinity (see argocd-apps `api-hpa.yaml`), so a real push — which streams the layer via `PATCH` then finalises with `PUT` — intermittently 404s with `BLOB_UPLOAD_UNKNOWN` when a chunk hits a replica that never saw the `POST`. This was flagged when the local docker registry landed (#103).

## Changes

- Stage chunked uploads in object storage under `uploads/<uuid>` instead of an in-memory temp file. The UUID travels in the `Location` URL handed to the client, so any replica reconstructs the staging key with no shared in-process state. Finalise streams the staged bytes plus any trailing `PUT` body through the CAS in one pass; monolithic uploads are unchanged.
- Support `DELETE` of an in-progress upload (cancel) by dropping its staging object.
- Reap abandoned staging objects in the GC (`uploads/` older than 24h) via a new `S3.ListStaleObjects`, so cancelled/interrupted pushes don't leak.

## Verification

- Split a single push across **two instances sharing one Postgres+MinIO**: `POST`→A, `PATCH`→B, `PUT`→A finalises with the correct digest, and the blob pulls back **byte-identical from both** replicas. Config-blob and manifest pushes split the same way succeed; `tags/list` is correct. (Pre-fix, the cross-replica `PATCH` 404s.)
- `scripts/docker-e2e.sh` still passes (incl. `TestLocalDockerPushPull`); unit tests + `go vet` clean.

Reviewed-on: #104
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
This commit was merged in pull request #104.
This commit is contained in:
2026-07-05 17:39:49 +10:00
committed by BenVincent
parent a92ede23f6
commit 649f89f58b
4 changed files with 121 additions and 66 deletions
+73 -60
View File
@@ -1,6 +1,8 @@
package v1 package v1
import ( import (
"bytes"
"context"
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"errors" "errors"
@@ -11,7 +13,6 @@ import (
"os" "os"
"sort" "sort"
"strings" "strings"
"sync"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/google/uuid" "github.com/google/uuid"
@@ -36,52 +37,54 @@ import (
const dockerAPIVersionHeader = "registry/2.0" const dockerAPIVersionHeader = "registry/2.0"
// uploadSession is an in-progress chunked blob upload, buffered to a temp file // Chunked blob uploads are staged in object storage under uploads/<uuid> rather
// on disk. Sessions are held in-memory keyed by upload UUID, so a single push's // than in process memory, so the POST / PATCH / PUT of a single push can each be
// PATCH/PUT chunks must be served by the same replica — true for the // served by a different replica (the API runs with minReplicas>1 and no session
// homelab single-instance deployment. Monolithic uploads avoid this entirely. // affinity). The upload UUID travels in the Location URL handed back to the
type uploadSession struct { // client, so any replica reconstructs the staging key with no shared in-process
file *os.File // state. Abandoned stages are dropped by the GC's uploads sweep.
size int64 func uploadKey(id string) string { return "uploads/" + id }
}
type uploadStore struct { var errUploadUnknown = errors.New("unknown upload")
mu sync.Mutex
sessions map[string]*uploadSession
}
func newUploadStore() *uploadStore { // appendUpload appends a chunk to the staged upload object and returns the new
return &uploadStore{sessions: make(map[string]*uploadSession)} // total size. The staged bytes live entirely in object storage (download,
} // append to a per-request temp file, re-upload), which keeps the session state
// replica-independent. Docker sends the whole layer in one PATCH, so this is a
func (s *uploadStore) create() (string, *uploadSession, error) { // single append in the common case.
f, err := os.CreateTemp("", "docker-upload-*") func (h *ProxyHandler) appendUpload(ctx context.Context, id string, chunk io.Reader) (int64, error) {
key := uploadKey(id)
reader, info, err := h.store.Download(ctx, key)
if err != nil { if err != nil {
return "", nil, err return 0, errUploadUnknown
}
id := uuid.NewString()
sess := &uploadSession{file: f}
s.mu.Lock()
s.sessions[id] = sess
s.mu.Unlock()
return id, sess, nil
} }
func (s *uploadStore) get(id string) *uploadSession { tmp, err := os.CreateTemp("", "docker-upload-*")
s.mu.Lock() if err != nil {
defer s.mu.Unlock() reader.Close()
return s.sessions[id] return 0, err
} }
defer os.Remove(tmp.Name())
defer tmp.Close()
func (s *uploadStore) remove(id string) { if _, err := io.Copy(tmp, reader); err != nil {
s.mu.Lock() reader.Close()
sess := s.sessions[id] return 0, err
delete(s.sessions, id)
s.mu.Unlock()
if sess != nil {
sess.file.Close()
os.Remove(sess.file.Name())
} }
reader.Close()
n, err := io.Copy(tmp, chunk)
if err != nil {
return 0, err
}
size := info.Size + n
if _, err := tmp.Seek(0, io.SeekStart); err != nil {
return 0, err
}
if err := h.store.Upload(ctx, key, tmp, size, "application/octet-stream"); err != nil {
return 0, err
}
return size, nil
} }
// dockerReq is a parsed /v2/<remote>/<image>/... request. kind is one of // dockerReq is a parsed /v2/<remote>/<image>/... request. kind is one of
@@ -205,7 +208,18 @@ func (h *ProxyHandler) dockerDelete(w http.ResponseWriter, r *http.Request) {
return return
} }
req, ok := parseDockerPath(chi.URLParam(r, "*")) req, ok := parseDockerPath(chi.URLParam(r, "*"))
if !ok || (req.kind != "manifest" && req.kind != "blob") { if !ok {
dockerError(w, http.StatusNotFound, "NAME_UNKNOWN", "unrecognised registry path")
return
}
// Cancel an in-progress upload: drop its staging object.
if req.kind == "upload" && req.ref != "" {
_ = h.store.Delete(r.Context(), uploadKey(req.ref))
w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader)
w.WriteHeader(http.StatusNoContent)
return
}
if req.kind != "manifest" && req.kind != "blob" {
dockerError(w, http.StatusNotFound, "NAME_UNKNOWN", "unrecognised registry path") dockerError(w, http.StatusNotFound, "NAME_UNKNOWN", "unrecognised registry path")
return return
} }
@@ -333,8 +347,9 @@ func (h *ProxyHandler) dockerStartUpload(w http.ResponseWriter, r *http.Request,
return return
} }
id, _, err := h.uploads.create() // Stage an empty object keyed by the upload UUID; PATCH/PUT append to it.
if err != nil { id := uuid.NewString()
if err := h.store.Upload(r.Context(), uploadKey(id), bytes.NewReader(nil), 0, "application/octet-stream"); err != nil {
dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error())
return return
} }
@@ -352,21 +367,19 @@ func (h *ProxyHandler) dockerPatchUpload(w http.ResponseWriter, r *http.Request,
dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload") dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload")
return return
} }
sess := h.uploads.get(req.ref) size, err := h.appendUpload(r.Context(), req.ref, r.Body)
if sess == nil { if err != nil {
if errors.Is(err, errUploadUnknown) {
dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload") dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload")
return return
} }
n, err := io.Copy(sess.file, r.Body)
if err != nil {
dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error())
return return
} }
sess.size += n
loc := fmt.Sprintf("/v2/%s/%s/blobs/uploads/%s", remote.Name, req.image, req.ref) loc := fmt.Sprintf("/v2/%s/%s/blobs/uploads/%s", remote.Name, req.image, req.ref)
w.Header().Set("Location", loc) w.Header().Set("Location", loc)
w.Header().Set("Docker-Upload-UUID", req.ref) w.Header().Set("Docker-Upload-UUID", req.ref)
w.Header().Set("Range", fmt.Sprintf("0-%d", sess.size-1)) w.Header().Set("Range", fmt.Sprintf("0-%d", size-1))
w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader) w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader)
w.WriteHeader(http.StatusAccepted) w.WriteHeader(http.StatusAccepted)
} }
@@ -384,22 +397,22 @@ func (h *ProxyHandler) dockerFinishUpload(w http.ResponseWriter, r *http.Request
h.dockerCommitBlob(w, r, remote, req.image, digest, r.Body) h.dockerCommitBlob(w, r, remote, req.image, digest, r.Body)
return return
} }
sess := h.uploads.get(req.ref)
if sess == nil { key := uploadKey(req.ref)
reader, _, err := h.store.Download(r.Context(), key)
if err != nil {
dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload") dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload")
return return
} }
defer h.uploads.remove(req.ref) defer reader.Close()
// Drop the staging object once we're done, regardless of outcome; a fresh
// context so cleanup still runs if the client disconnects.
defer h.store.Delete(context.Background(), key)
if _, err := io.Copy(sess.file, r.Body); err != nil { // Stream the staged bytes plus any trailing PUT body through the CAS in one
dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) // pass — no extra round trip to re-assemble.
return combined := io.MultiReader(reader, r.Body)
} h.dockerCommitBlob(w, r, remote, req.image, digest, combined)
if _, err := sess.file.Seek(0, io.SeekStart); err != nil {
dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error())
return
}
h.dockerCommitBlob(w, r, remote, req.image, digest, sess.file)
} }
// dockerCommitBlob stores blob bytes through the CAS, verifies the client's // dockerCommitBlob stores blob bytes through the CAS, verifies the client's
-2
View File
@@ -24,7 +24,6 @@ type ProxyHandler struct {
store *storage.S3 store *storage.S3
local *v2.LocalHandler local *v2.LocalHandler
cas *storage.CAS cas *storage.CAS
uploads *uploadStore
} }
func NewProxyHandler(engine *proxy.Engine, virtualEngine *virtual.Engine, db *database.DB, store *storage.S3, local *v2.LocalHandler) *ProxyHandler { func NewProxyHandler(engine *proxy.Engine, virtualEngine *virtual.Engine, db *database.DB, store *storage.S3, local *v2.LocalHandler) *ProxyHandler {
@@ -35,7 +34,6 @@ func NewProxyHandler(engine *proxy.Engine, virtualEngine *virtual.Engine, db *da
store: store, store: store,
local: local, local: local,
cas: storage.NewCAS(store), cas: storage.NewCAS(store),
uploads: newUploadStore(),
} }
} }
+28
View File
@@ -14,6 +14,11 @@ import (
// before the referencing artifact/local_files row exists. // before the referencing artifact/local_files row exists.
const blobGracePeriod = 1 * time.Hour const blobGracePeriod = 1 * time.Hour
// uploadGracePeriod is how long a docker blob-upload staging object
// (uploads/<uuid>) may sit idle before GC treats it as an abandoned push and
// reaps it. Generous so a slow but live push is never cut off mid-flight.
const uploadGracePeriod = 24 * time.Hour
type Collector struct { type Collector struct {
db *database.DB db *database.DB
store *storage.S3 store *storage.S3
@@ -43,6 +48,8 @@ func (c *Collector) Run(ctx context.Context) {
func (c *Collector) sweep(ctx context.Context) { func (c *Collector) sweep(ctx context.Context) {
start := time.Now() start := time.Now()
c.sweepUploads(ctx)
orphaned, err := c.db.FindOrphanedBlobs(ctx, blobGracePeriod) orphaned, err := c.db.FindOrphanedBlobs(ctx, blobGracePeriod)
if err != nil { if err != nil {
slog.Error("gc: find orphaned blobs", "error", err) slog.Error("gc: find orphaned blobs", "error", err)
@@ -70,3 +77,24 @@ func (c *Collector) sweep(ctx context.Context) {
) )
} }
} }
// sweepUploads reaps docker blob-upload staging objects abandoned longer than
// uploadGracePeriod (cancelled or interrupted pushes that never finalised).
func (c *Collector) sweepUploads(ctx context.Context) {
stale, err := c.store.ListStaleObjects(ctx, "uploads/", time.Now().Add(-uploadGracePeriod))
if err != nil {
slog.Error("gc: list stale uploads", "error", err)
return
}
reaped := 0
for _, key := range stale {
if err := c.store.Delete(ctx, key); err != nil {
slog.Warn("gc: delete stale upload", "key", key, "error", err)
continue
}
reaped++
}
if reaped > 0 {
slog.Info("gc: reaped stale docker uploads", "count", reaped)
}
}
+16
View File
@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"log/slog" "log/slog"
"time"
"github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/credentials"
@@ -97,3 +98,18 @@ func (s *S3) Stat(ctx context.Context, key string) (*minio.ObjectInfo, error) {
} }
return &info, nil return &info, nil
} }
// ListStaleObjects returns keys under prefix last modified before cutoff. Used
// by the GC to reap abandoned staging objects (e.g. cancelled docker pushes).
func (s *S3) ListStaleObjects(ctx context.Context, prefix string, cutoff time.Time) ([]string, error) {
var keys []string
for obj := range s.client.ListObjects(ctx, s.bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
if obj.Err != nil {
return nil, obj.Err
}
if obj.LastModified.Before(cutoff) {
keys = append(keys, obj.Key)
}
}
return keys, nil
}