Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 649f89f58b |
+73
-60
@@ -1,6 +1,8 @@
|
|||||||
package v1
|
package v1
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
@@ -11,7 +13,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
@@ -36,52 +37,54 @@ import (
|
|||||||
|
|
||||||
const dockerAPIVersionHeader = "registry/2.0"
|
const dockerAPIVersionHeader = "registry/2.0"
|
||||||
|
|
||||||
// uploadSession is an in-progress chunked blob upload, buffered to a temp file
|
// Chunked blob uploads are staged in object storage under uploads/<uuid> rather
|
||||||
// on disk. Sessions are held in-memory keyed by upload UUID, so a single push's
|
// than in process memory, so the POST / PATCH / PUT of a single push can each be
|
||||||
// PATCH/PUT chunks must be served by the same replica — true for the
|
// served by a different replica (the API runs with minReplicas>1 and no session
|
||||||
// homelab single-instance deployment. Monolithic uploads avoid this entirely.
|
// affinity). The upload UUID travels in the Location URL handed back to the
|
||||||
type uploadSession struct {
|
// client, so any replica reconstructs the staging key with no shared in-process
|
||||||
file *os.File
|
// state. Abandoned stages are dropped by the GC's uploads sweep.
|
||||||
size int64
|
func uploadKey(id string) string { return "uploads/" + id }
|
||||||
}
|
|
||||||
|
|
||||||
type uploadStore struct {
|
var errUploadUnknown = errors.New("unknown upload")
|
||||||
mu sync.Mutex
|
|
||||||
sessions map[string]*uploadSession
|
|
||||||
}
|
|
||||||
|
|
||||||
func newUploadStore() *uploadStore {
|
// appendUpload appends a chunk to the staged upload object and returns the new
|
||||||
return &uploadStore{sessions: make(map[string]*uploadSession)}
|
// total size. The staged bytes live entirely in object storage (download,
|
||||||
}
|
// append to a per-request temp file, re-upload), which keeps the session state
|
||||||
|
// replica-independent. Docker sends the whole layer in one PATCH, so this is a
|
||||||
func (s *uploadStore) create() (string, *uploadSession, error) {
|
// single append in the common case.
|
||||||
f, err := os.CreateTemp("", "docker-upload-*")
|
func (h *ProxyHandler) appendUpload(ctx context.Context, id string, chunk io.Reader) (int64, error) {
|
||||||
|
key := uploadKey(id)
|
||||||
|
reader, info, err := h.store.Download(ctx, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, err
|
return 0, errUploadUnknown
|
||||||
}
|
|
||||||
id := uuid.NewString()
|
|
||||||
sess := &uploadSession{file: f}
|
|
||||||
s.mu.Lock()
|
|
||||||
s.sessions[id] = sess
|
|
||||||
s.mu.Unlock()
|
|
||||||
return id, sess, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *uploadStore) get(id string) *uploadSession {
|
tmp, err := os.CreateTemp("", "docker-upload-*")
|
||||||
s.mu.Lock()
|
if err != nil {
|
||||||
defer s.mu.Unlock()
|
reader.Close()
|
||||||
return s.sessions[id]
|
return 0, err
|
||||||
}
|
}
|
||||||
|
defer os.Remove(tmp.Name())
|
||||||
|
defer tmp.Close()
|
||||||
|
|
||||||
func (s *uploadStore) remove(id string) {
|
if _, err := io.Copy(tmp, reader); err != nil {
|
||||||
s.mu.Lock()
|
reader.Close()
|
||||||
sess := s.sessions[id]
|
return 0, err
|
||||||
delete(s.sessions, id)
|
|
||||||
s.mu.Unlock()
|
|
||||||
if sess != nil {
|
|
||||||
sess.file.Close()
|
|
||||||
os.Remove(sess.file.Name())
|
|
||||||
}
|
}
|
||||||
|
reader.Close()
|
||||||
|
|
||||||
|
n, err := io.Copy(tmp, chunk)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
size := info.Size + n
|
||||||
|
if _, err := tmp.Seek(0, io.SeekStart); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if err := h.store.Upload(ctx, key, tmp, size, "application/octet-stream"); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return size, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// dockerReq is a parsed /v2/<remote>/<image>/... request. kind is one of
|
// dockerReq is a parsed /v2/<remote>/<image>/... request. kind is one of
|
||||||
@@ -205,7 +208,18 @@ func (h *ProxyHandler) dockerDelete(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
req, ok := parseDockerPath(chi.URLParam(r, "*"))
|
req, ok := parseDockerPath(chi.URLParam(r, "*"))
|
||||||
if !ok || (req.kind != "manifest" && req.kind != "blob") {
|
if !ok {
|
||||||
|
dockerError(w, http.StatusNotFound, "NAME_UNKNOWN", "unrecognised registry path")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Cancel an in-progress upload: drop its staging object.
|
||||||
|
if req.kind == "upload" && req.ref != "" {
|
||||||
|
_ = h.store.Delete(r.Context(), uploadKey(req.ref))
|
||||||
|
w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader)
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if req.kind != "manifest" && req.kind != "blob" {
|
||||||
dockerError(w, http.StatusNotFound, "NAME_UNKNOWN", "unrecognised registry path")
|
dockerError(w, http.StatusNotFound, "NAME_UNKNOWN", "unrecognised registry path")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -333,8 +347,9 @@ func (h *ProxyHandler) dockerStartUpload(w http.ResponseWriter, r *http.Request,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
id, _, err := h.uploads.create()
|
// Stage an empty object keyed by the upload UUID; PATCH/PUT append to it.
|
||||||
if err != nil {
|
id := uuid.NewString()
|
||||||
|
if err := h.store.Upload(r.Context(), uploadKey(id), bytes.NewReader(nil), 0, "application/octet-stream"); err != nil {
|
||||||
dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error())
|
dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -352,21 +367,19 @@ func (h *ProxyHandler) dockerPatchUpload(w http.ResponseWriter, r *http.Request,
|
|||||||
dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload")
|
dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
sess := h.uploads.get(req.ref)
|
size, err := h.appendUpload(r.Context(), req.ref, r.Body)
|
||||||
if sess == nil {
|
if err != nil {
|
||||||
|
if errors.Is(err, errUploadUnknown) {
|
||||||
dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload")
|
dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
n, err := io.Copy(sess.file, r.Body)
|
|
||||||
if err != nil {
|
|
||||||
dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error())
|
dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
sess.size += n
|
|
||||||
loc := fmt.Sprintf("/v2/%s/%s/blobs/uploads/%s", remote.Name, req.image, req.ref)
|
loc := fmt.Sprintf("/v2/%s/%s/blobs/uploads/%s", remote.Name, req.image, req.ref)
|
||||||
w.Header().Set("Location", loc)
|
w.Header().Set("Location", loc)
|
||||||
w.Header().Set("Docker-Upload-UUID", req.ref)
|
w.Header().Set("Docker-Upload-UUID", req.ref)
|
||||||
w.Header().Set("Range", fmt.Sprintf("0-%d", sess.size-1))
|
w.Header().Set("Range", fmt.Sprintf("0-%d", size-1))
|
||||||
w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader)
|
w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader)
|
||||||
w.WriteHeader(http.StatusAccepted)
|
w.WriteHeader(http.StatusAccepted)
|
||||||
}
|
}
|
||||||
@@ -384,22 +397,22 @@ func (h *ProxyHandler) dockerFinishUpload(w http.ResponseWriter, r *http.Request
|
|||||||
h.dockerCommitBlob(w, r, remote, req.image, digest, r.Body)
|
h.dockerCommitBlob(w, r, remote, req.image, digest, r.Body)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
sess := h.uploads.get(req.ref)
|
|
||||||
if sess == nil {
|
key := uploadKey(req.ref)
|
||||||
|
reader, _, err := h.store.Download(r.Context(), key)
|
||||||
|
if err != nil {
|
||||||
dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload")
|
dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer h.uploads.remove(req.ref)
|
defer reader.Close()
|
||||||
|
// Drop the staging object once we're done, regardless of outcome; a fresh
|
||||||
|
// context so cleanup still runs if the client disconnects.
|
||||||
|
defer h.store.Delete(context.Background(), key)
|
||||||
|
|
||||||
if _, err := io.Copy(sess.file, r.Body); err != nil {
|
// Stream the staged bytes plus any trailing PUT body through the CAS in one
|
||||||
dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error())
|
// pass — no extra round trip to re-assemble.
|
||||||
return
|
combined := io.MultiReader(reader, r.Body)
|
||||||
}
|
h.dockerCommitBlob(w, r, remote, req.image, digest, combined)
|
||||||
if _, err := sess.file.Seek(0, io.SeekStart); err != nil {
|
|
||||||
dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.dockerCommitBlob(w, r, remote, req.image, digest, sess.file)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// dockerCommitBlob stores blob bytes through the CAS, verifies the client's
|
// dockerCommitBlob stores blob bytes through the CAS, verifies the client's
|
||||||
|
|||||||
@@ -24,7 +24,6 @@ type ProxyHandler struct {
|
|||||||
store *storage.S3
|
store *storage.S3
|
||||||
local *v2.LocalHandler
|
local *v2.LocalHandler
|
||||||
cas *storage.CAS
|
cas *storage.CAS
|
||||||
uploads *uploadStore
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProxyHandler(engine *proxy.Engine, virtualEngine *virtual.Engine, db *database.DB, store *storage.S3, local *v2.LocalHandler) *ProxyHandler {
|
func NewProxyHandler(engine *proxy.Engine, virtualEngine *virtual.Engine, db *database.DB, store *storage.S3, local *v2.LocalHandler) *ProxyHandler {
|
||||||
@@ -35,7 +34,6 @@ func NewProxyHandler(engine *proxy.Engine, virtualEngine *virtual.Engine, db *da
|
|||||||
store: store,
|
store: store,
|
||||||
local: local,
|
local: local,
|
||||||
cas: storage.NewCAS(store),
|
cas: storage.NewCAS(store),
|
||||||
uploads: newUploadStore(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -14,6 +14,11 @@ import (
|
|||||||
// before the referencing artifact/local_files row exists.
|
// before the referencing artifact/local_files row exists.
|
||||||
const blobGracePeriod = 1 * time.Hour
|
const blobGracePeriod = 1 * time.Hour
|
||||||
|
|
||||||
|
// uploadGracePeriod is how long a docker blob-upload staging object
|
||||||
|
// (uploads/<uuid>) may sit idle before GC treats it as an abandoned push and
|
||||||
|
// reaps it. Generous so a slow but live push is never cut off mid-flight.
|
||||||
|
const uploadGracePeriod = 24 * time.Hour
|
||||||
|
|
||||||
type Collector struct {
|
type Collector struct {
|
||||||
db *database.DB
|
db *database.DB
|
||||||
store *storage.S3
|
store *storage.S3
|
||||||
@@ -43,6 +48,8 @@ func (c *Collector) Run(ctx context.Context) {
|
|||||||
func (c *Collector) sweep(ctx context.Context) {
|
func (c *Collector) sweep(ctx context.Context) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
|
c.sweepUploads(ctx)
|
||||||
|
|
||||||
orphaned, err := c.db.FindOrphanedBlobs(ctx, blobGracePeriod)
|
orphaned, err := c.db.FindOrphanedBlobs(ctx, blobGracePeriod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("gc: find orphaned blobs", "error", err)
|
slog.Error("gc: find orphaned blobs", "error", err)
|
||||||
@@ -70,3 +77,24 @@ func (c *Collector) sweep(ctx context.Context) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sweepUploads reaps docker blob-upload staging objects abandoned longer than
|
||||||
|
// uploadGracePeriod (cancelled or interrupted pushes that never finalised).
|
||||||
|
func (c *Collector) sweepUploads(ctx context.Context) {
|
||||||
|
stale, err := c.store.ListStaleObjects(ctx, "uploads/", time.Now().Add(-uploadGracePeriod))
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("gc: list stale uploads", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
reaped := 0
|
||||||
|
for _, key := range stale {
|
||||||
|
if err := c.store.Delete(ctx, key); err != nil {
|
||||||
|
slog.Warn("gc: delete stale upload", "key", key, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
reaped++
|
||||||
|
}
|
||||||
|
if reaped > 0 {
|
||||||
|
slog.Info("gc: reaped stale docker uploads", "count", reaped)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/minio/minio-go/v7"
|
"github.com/minio/minio-go/v7"
|
||||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||||
@@ -97,3 +98,18 @@ func (s *S3) Stat(ctx context.Context, key string) (*minio.ObjectInfo, error) {
|
|||||||
}
|
}
|
||||||
return &info, nil
|
return &info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListStaleObjects returns keys under prefix last modified before cutoff. Used
|
||||||
|
// by the GC to reap abandoned staging objects (e.g. cancelled docker pushes).
|
||||||
|
func (s *S3) ListStaleObjects(ctx context.Context, prefix string, cutoff time.Time) ([]string, error) {
|
||||||
|
var keys []string
|
||||||
|
for obj := range s.client.ListObjects(ctx, s.bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
|
||||||
|
if obj.Err != nil {
|
||||||
|
return nil, obj.Err
|
||||||
|
}
|
||||||
|
if obj.LastModified.Before(cutoff) {
|
||||||
|
keys = append(keys, obj.Key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return keys, nil
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user