diff --git a/README.md b/README.md index e662976..a9bbf2c 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,24 @@ there's nothing to provision. To bring your own key instead, point `TF_SIGNING_KEY_PASSPHRASE`), which takes precedence over the generated one. `TF_PROVIDER_PROTOCOLS` (default `5.0,6.0`) sets the advertised plugin protocols. +### Local docker registry + +A local `docker` repo is a real container registry, not a mirror: it serves the +Docker Registry HTTP API V2 for both push and pull, so any client (`docker`, +`podman`, `skopeo`, `buildah`) can use it directly. + +```sh +docker tag myapp:latest artifactapi.k8s.syd1.au.unkin.net/docker-internal/myapp:latest +docker push artifactapi.k8s.syd1.au.unkin.net/docker-internal/myapp:latest +docker pull artifactapi.k8s.syd1.au.unkin.net/docker-internal/myapp:latest +``` + +The first path segment after `/v2/` is the artifactapi repo name; the remainder +is the image name. Blobs and manifests are stored through the shared +content-addressable store (deduplicated by digest, reaped by GC once +unreferenced); tags are mutable references and re-pushing a tag moves it. Blob +uploads support both the monolithic and chunked (`POST`/`PATCH`/`PUT`) flows. + ## Access Control | Field | Default | Behaviour | diff --git a/e2e-docker/docker_local_test.go b/e2e-docker/docker_local_test.go new file mode 100644 index 0000000..e35e633 --- /dev/null +++ b/e2e-docker/docker_local_test.go @@ -0,0 +1,177 @@ +//go:build dockere2e + +package e2edocker + +import ( + "bytes" + "crypto/sha256" + "encoding/hex" + "fmt" + "net/http" + "strings" + "testing" +) + +func digestOf(b []byte) string { + sum := sha256.Sum256(b) + return "sha256:" + hex.EncodeToString(sum[:]) +} + +// pushBlobMonolithic uploads a blob with POST (open session) then PUT?digest +// (whole body) — the monolithic-after-POST flow. +func pushBlobMonolithic(t *testing.T, repo, image string, blob []byte) { + t.Helper() + dgst := digestOf(blob) + + resp, body := doRequest(t, http.MethodPost, api("/v2/"+repo+"/"+image+"/blobs/uploads/"), nil, "") + if resp.StatusCode != http.StatusAccepted { + t.Fatalf("start upload: status %d: %s", resp.StatusCode, body) + } + loc := resp.Header.Get("Location") + if loc == "" { + t.Fatalf("start upload: no Location header") + } + + resp, body = doRequest(t, http.MethodPut, baseURL()+loc+"?digest="+dgst, blob, "application/octet-stream") + if resp.StatusCode != http.StatusCreated { + t.Fatalf("finish upload: status %d: %s", resp.StatusCode, body) + } + if got := resp.Header.Get("Docker-Content-Digest"); got != dgst { + t.Fatalf("finish upload: digest mismatch: got %q want %q", got, dgst) + } +} + +// pushBlobChunked uploads a blob with POST then PATCH (body) then PUT?digest +// (empty) — the chunked flow a real docker daemon uses. +func pushBlobChunked(t *testing.T, repo, image string, blob []byte) { + t.Helper() + dgst := digestOf(blob) + + resp, body := doRequest(t, http.MethodPost, api("/v2/"+repo+"/"+image+"/blobs/uploads/"), nil, "") + if resp.StatusCode != http.StatusAccepted { + t.Fatalf("start upload: status %d: %s", resp.StatusCode, body) + } + loc := resp.Header.Get("Location") + + resp, body = doRequest(t, http.MethodPatch, baseURL()+loc, blob, "application/octet-stream") + if resp.StatusCode != http.StatusAccepted { + t.Fatalf("patch upload: status %d: %s", resp.StatusCode, body) + } + if got := resp.Header.Get("Range"); got != fmt.Sprintf("0-%d", len(blob)-1) { + t.Fatalf("patch upload: unexpected Range %q", got) + } + loc = resp.Header.Get("Location") + + resp, body = doRequest(t, http.MethodPut, baseURL()+loc+"?digest="+dgst, nil, "") + if resp.StatusCode != http.StatusCreated { + t.Fatalf("finish upload: status %d: %s", resp.StatusCode, body) + } +} + +// TestLocalDockerPushPull exercises a full container push and pull against a +// local docker repo using the Docker Registry HTTP API V2, the way a docker +// client would: upload the config and layer blobs, push the manifest under a +// tag, then pull the manifest and blobs back byte-identically. +func TestLocalDockerPushPull(t *testing.T) { + createRepo(t, `{"name":"docker-internal","package_type":"docker","repo_type":"local"}`) + defer deleteRepo(t, "docker-internal") + + const image = "team/app" + const tag = "v1.0.0" + + // /v2/ version check. + resp, _ := doRequest(t, http.MethodGet, api("/v2/"), nil, "") + if resp.StatusCode != http.StatusOK { + t.Fatalf("/v2/ ping: status %d", resp.StatusCode) + } + + config := []byte(`{"architecture":"amd64","os":"linux","config":{},"rootfs":{"type":"layers","diff_ids":["sha256:0000000000000000000000000000000000000000000000000000000000000000"]}}`) + layer := bytes.Repeat([]byte("artifactapi-layer-data-"), 4096) // ~90 KB opaque layer + + configDigest := digestOf(config) + layerDigest := digestOf(layer) + + // A brand-new blob should be absent (this is the client's mount check). + resp, _ = doRequest(t, http.MethodHead, api("/v2/"+"docker-internal/"+image+"/blobs/"+configDigest), nil, "") + if resp.StatusCode != http.StatusNotFound { + t.Fatalf("pre-push blob HEAD: expected 404, got %d", resp.StatusCode) + } + + pushBlobMonolithic(t, "docker-internal", image, config) + pushBlobChunked(t, "docker-internal", image, layer) + + manifest := []byte(fmt.Sprintf(`{"schemaVersion":2,"mediaType":"application/vnd.docker.distribution.manifest.v2+json","config":{"mediaType":"application/vnd.docker.container.image.v1+json","size":%d,"digest":%q},"layers":[{"mediaType":"application/vnd.docker.image.rootfs.diff.tar.gzip","size":%d,"digest":%q}]}`, + len(config), configDigest, len(layer), layerDigest)) + manifestDigest := digestOf(manifest) + manifestType := "application/vnd.docker.distribution.manifest.v2+json" + + resp, body := doRequest(t, http.MethodPut, api("/v2/docker-internal/"+image+"/manifests/"+tag), manifest, manifestType) + if resp.StatusCode != http.StatusCreated { + t.Fatalf("push manifest: status %d: %s", resp.StatusCode, body) + } + if got := resp.Header.Get("Docker-Content-Digest"); got != manifestDigest { + t.Fatalf("push manifest: digest %q want %q", got, manifestDigest) + } + + // --- pull back --- + + // Manifest by tag. + resp, body = doRequest(t, http.MethodGet, api("/v2/docker-internal/"+image+"/manifests/"+tag), nil, "") + if resp.StatusCode != http.StatusOK { + t.Fatalf("pull manifest by tag: status %d: %s", resp.StatusCode, body) + } + if !bytes.Equal(body, manifest) { + t.Fatalf("pulled manifest bytes differ from pushed") + } + if ct := resp.Header.Get("Content-Type"); ct != manifestType { + t.Fatalf("pulled manifest content-type %q want %q", ct, manifestType) + } + if got := resp.Header.Get("Docker-Content-Digest"); got != manifestDigest { + t.Fatalf("pulled manifest digest %q want %q", got, manifestDigest) + } + + // Manifest by digest. + resp, body = doRequest(t, http.MethodGet, api("/v2/docker-internal/"+image+"/manifests/"+manifestDigest), nil, "") + if resp.StatusCode != http.StatusOK || !bytes.Equal(body, manifest) { + t.Fatalf("pull manifest by digest: status %d, equal=%v", resp.StatusCode, bytes.Equal(body, manifest)) + } + + // Blobs by digest. + for _, tc := range []struct { + name string + digest string + want []byte + }{ + {"config", configDigest, config}, + {"layer", layerDigest, layer}, + } { + resp, body = doRequest(t, http.MethodGet, api("/v2/docker-internal/"+image+"/blobs/"+tc.digest), nil, "") + if resp.StatusCode != http.StatusOK { + t.Fatalf("pull %s blob: status %d", tc.name, resp.StatusCode) + } + if !bytes.Equal(body, tc.want) { + t.Fatalf("pulled %s blob bytes differ", tc.name) + } + if got := resp.Header.Get("Docker-Content-Digest"); got != tc.digest { + t.Fatalf("pulled %s blob digest %q want %q", tc.name, got, tc.digest) + } + } + + // tags/list reflects the pushed tag. + resp, body = doRequest(t, http.MethodGet, api("/v2/docker-internal/"+image+"/tags/list"), nil, "") + if resp.StatusCode != http.StatusOK { + t.Fatalf("tags/list: status %d: %s", resp.StatusCode, body) + } + if !strings.Contains(string(body), `"`+tag+`"`) { + t.Fatalf("tags/list missing tag %q: %s", tag, body) + } + if !strings.Contains(string(body), `"docker-internal/`+image+`"`) { + t.Fatalf("tags/list wrong repository name: %s", body) + } + + // A now-present blob HEAD should succeed (client would skip re-upload). + resp, _ = doRequest(t, http.MethodHead, api("/v2/docker-internal/"+image+"/blobs/"+layerDigest), nil, "") + if resp.StatusCode != http.StatusOK { + t.Fatalf("post-push blob HEAD: expected 200, got %d", resp.StatusCode) + } +} diff --git a/go.mod b/go.mod index b84cfa4..daba034 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/charmbracelet/bubbletea v1.3.10 github.com/charmbracelet/lipgloss v1.1.0 github.com/go-chi/chi/v5 v5.3.0 + github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.10.0 github.com/minio/minio-go/v7 v7.2.0 github.com/redis/go-redis/v9 v9.20.0 @@ -46,7 +47,6 @@ require ( github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect diff --git a/internal/api/v1/docker.go b/internal/api/v1/docker.go new file mode 100644 index 0000000..13090c3 --- /dev/null +++ b/internal/api/v1/docker.go @@ -0,0 +1,473 @@ +package v1 + +import ( + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "sort" + "strings" + "sync" + + "github.com/go-chi/chi/v5" + "github.com/google/uuid" + + "git.unkin.net/unkin/artifactapi/internal/database" + "git.unkin.net/unkin/artifactapi/internal/storage" + "git.unkin.net/unkin/artifactapi/pkg/models" +) + +// This file implements the write half of the Docker Registry HTTP API V2 for +// *local* docker repositories, so a `docker push` / `docker pull` against +// artifactapi treats a local docker repo as a genuine registry (matching the +// project's "local repos are the real thing" principle) rather than a mirror. +// +// Storage reuses the existing content-addressable primitives: +// - blob and manifest bytes are stored via the CAS (deduplicated by sha256) +// - a local_files row per (repo, "/blobs/") and +// (repo, "/manifests/") keeps the blob referenced so the GC +// does not reap it, and lets pulls resolve a reference back to a blob. +// Tags are mutable references (UpsertLocalFile); digests and blobs are +// immutable (CreateLocalFile, tolerating an already-exists on re-push). + +const dockerAPIVersionHeader = "registry/2.0" + +// uploadSession is an in-progress chunked blob upload, buffered to a temp file +// on disk. Sessions are held in-memory keyed by upload UUID, so a single push's +// PATCH/PUT chunks must be served by the same replica — true for the +// homelab single-instance deployment. Monolithic uploads avoid this entirely. +type uploadSession struct { + file *os.File + size int64 +} + +type uploadStore struct { + mu sync.Mutex + sessions map[string]*uploadSession +} + +func newUploadStore() *uploadStore { + return &uploadStore{sessions: make(map[string]*uploadSession)} +} + +func (s *uploadStore) create() (string, *uploadSession, error) { + f, err := os.CreateTemp("", "docker-upload-*") + if err != nil { + return "", nil, err + } + id := uuid.NewString() + sess := &uploadSession{file: f} + s.mu.Lock() + s.sessions[id] = sess + s.mu.Unlock() + return id, sess, nil +} + +func (s *uploadStore) get(id string) *uploadSession { + s.mu.Lock() + defer s.mu.Unlock() + return s.sessions[id] +} + +func (s *uploadStore) remove(id string) { + s.mu.Lock() + sess := s.sessions[id] + delete(s.sessions, id) + s.mu.Unlock() + if sess != nil { + sess.file.Close() + os.Remove(sess.file.Name()) + } +} + +// dockerReq is a parsed /v2///... request. kind is one of +// "manifest", "blob", "upload", "tags". +type dockerReq struct { + image string + kind string + ref string // tag, digest, or upload uuid depending on kind +} + +// parseDockerPath splits the chi "*" remainder (everything after the repo name) +// into the image name and the registry operation. The image name may itself +// contain slashes, so operations are located by their well-known infixes. +func parseDockerPath(rest string) (dockerReq, bool) { + rest = strings.TrimPrefix(rest, "/") + switch { + case strings.HasSuffix(rest, "/tags/list"): + return dockerReq{image: strings.TrimSuffix(rest, "/tags/list"), kind: "tags"}, true + case rest == "tags/list": + return dockerReq{}, false // no image + } + if i := strings.Index(rest, "/blobs/uploads"); i >= 0 { + image := rest[:i] + ref := strings.TrimPrefix(rest[i+len("/blobs/uploads"):], "/") + return dockerReq{image: image, kind: "upload", ref: ref}, image != "" + } + if i := strings.LastIndex(rest, "/manifests/"); i >= 0 { + return dockerReq{image: rest[:i], kind: "manifest", ref: rest[i+len("/manifests/"):]}, true + } + if i := strings.LastIndex(rest, "/blobs/"); i >= 0 { + return dockerReq{image: rest[:i], kind: "blob", ref: rest[i+len("/blobs/"):]}, true + } + return dockerReq{}, false +} + +func isDigest(ref string) bool { return strings.HasPrefix(ref, "sha256:") } + +// localDockerRemote returns the repo if name is a local docker repository. +func (h *ProxyHandler) localDockerRemote(r *http.Request, name string) (*models.Remote, bool) { + remote, err := h.db.GetRemote(r.Context(), name) + if err != nil { + return nil, false + } + return remote, remote.RepoType == models.RepoTypeLocal && remote.PackageType == models.PackageDocker +} + +func dockerError(w http.ResponseWriter, status int, code, msg string) { + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader) + w.WriteHeader(status) + fmt.Fprintf(w, `{"errors":[{"code":%q,"message":%q}]}`, code, msg) +} + +// dockerGet dispatches a registry GET to the local handler for local docker +// repos and falls through to the upstream proxy for everything else. +func (h *ProxyHandler) dockerGet(w http.ResponseWriter, r *http.Request) { + name := chi.URLParam(r, "remoteName") + if remote, ok := h.localDockerRemote(r, name); ok { + h.dockerLocalGet(w, r, remote, false) + return + } + h.handleProxy(w, r) +} + +func (h *ProxyHandler) dockerHead(w http.ResponseWriter, r *http.Request) { + name := chi.URLParam(r, "remoteName") + if remote, ok := h.localDockerRemote(r, name); ok { + h.dockerLocalGet(w, r, remote, true) + return + } + h.handleProxyHead(w, r) +} + +func (h *ProxyHandler) dockerPost(w http.ResponseWriter, r *http.Request) { + name := chi.URLParam(r, "remoteName") + remote, ok := h.localDockerRemote(r, name) + if !ok { + dockerError(w, http.StatusMethodNotAllowed, "UNSUPPORTED", "push is only supported for local docker repositories") + return + } + h.dockerStartUpload(w, r, remote) +} + +func (h *ProxyHandler) dockerPatch(w http.ResponseWriter, r *http.Request) { + name := chi.URLParam(r, "remoteName") + remote, ok := h.localDockerRemote(r, name) + if !ok { + dockerError(w, http.StatusMethodNotAllowed, "UNSUPPORTED", "push is only supported for local docker repositories") + return + } + h.dockerPatchUpload(w, r, remote) +} + +func (h *ProxyHandler) dockerPut(w http.ResponseWriter, r *http.Request) { + name := chi.URLParam(r, "remoteName") + remote, ok := h.localDockerRemote(r, name) + if !ok { + dockerError(w, http.StatusMethodNotAllowed, "UNSUPPORTED", "push is only supported for local docker repositories") + return + } + req, ok := parseDockerPath(chi.URLParam(r, "*")) + if !ok { + dockerError(w, http.StatusNotFound, "NAME_UNKNOWN", "unrecognised registry path") + return + } + switch req.kind { + case "upload": + h.dockerFinishUpload(w, r, remote, req) + case "manifest": + h.dockerPutManifest(w, r, remote, req) + default: + dockerError(w, http.StatusMethodNotAllowed, "UNSUPPORTED", "PUT not supported for this path") + } +} + +func (h *ProxyHandler) dockerDelete(w http.ResponseWriter, r *http.Request) { + name := chi.URLParam(r, "remoteName") + remote, ok := h.localDockerRemote(r, name) + if !ok { + dockerError(w, http.StatusMethodNotAllowed, "UNSUPPORTED", "delete is only supported for local docker repositories") + return + } + req, ok := parseDockerPath(chi.URLParam(r, "*")) + if !ok || (req.kind != "manifest" && req.kind != "blob") { + dockerError(w, http.StatusNotFound, "NAME_UNKNOWN", "unrecognised registry path") + return + } + filePath := req.image + "/" + req.kind + "s/" + req.ref + if err := h.db.DeleteLocalFile(r.Context(), remote.Name, filePath); err != nil { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) + return + } + w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader) + w.WriteHeader(http.StatusAccepted) +} + +// dockerLocalGet serves manifest / blob / tags-list reads for a local repo. +func (h *ProxyHandler) dockerLocalGet(w http.ResponseWriter, r *http.Request, remote *models.Remote, head bool) { + req, ok := parseDockerPath(chi.URLParam(r, "*")) + if !ok { + dockerError(w, http.StatusNotFound, "NAME_UNKNOWN", "unrecognised registry path") + return + } + switch req.kind { + case "tags": + h.dockerTagsList(w, r, remote, req.image) + case "manifest": + h.dockerServeRef(w, r, remote, req.image+"/manifests/"+req.ref, head, true) + case "blob": + h.dockerServeRef(w, r, remote, req.image+"/blobs/"+req.ref, head, false) + default: + dockerError(w, http.StatusNotFound, "NAME_UNKNOWN", "unrecognised registry path") + } +} + +// dockerServeRef streams the blob backing a local_files path. isManifest +// controls only the default content type; the stored blob content type wins. +func (h *ProxyHandler) dockerServeRef(w http.ResponseWriter, r *http.Request, remote *models.Remote, filePath string, head, isManifest bool) { + file, err := h.db.GetLocalFile(r.Context(), remote.Name, filePath) + if err != nil { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) + return + } + if file == nil { + code := "BLOB_UNKNOWN" + if isManifest { + code = "MANIFEST_UNKNOWN" + } + dockerError(w, http.StatusNotFound, code, "not found") + return + } + + s3Key := storage.BlobKey(file.ContentHash[len("sha256:"):]) + reader, info, err := h.store.Download(r.Context(), s3Key) + if err != nil { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) + return + } + defer reader.Close() + + contentType := info.ContentType + if contentType == "" { + if isManifest { + contentType = "application/vnd.docker.distribution.manifest.v2+json" + } else { + contentType = "application/octet-stream" + } + } + w.Header().Set("Content-Type", contentType) + w.Header().Set("Content-Length", fmt.Sprintf("%d", info.Size)) + w.Header().Set("Docker-Content-Digest", file.ContentHash) + w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader) + w.Header().Set("X-Artifact-Source", "local") + if head { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(http.StatusOK) + io.Copy(w, reader) +} + +func (h *ProxyHandler) dockerTagsList(w http.ResponseWriter, r *http.Request, remote *models.Remote, image string) { + prefix := image + "/manifests/" + files, err := h.db.ListLocalFilesByPrefix(r.Context(), remote.Name, prefix) + if err != nil { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) + return + } + tags := []string{} + for _, f := range files { + ref := strings.TrimPrefix(f.FilePath, prefix) + if ref == "" || isDigest(ref) { + continue + } + tags = append(tags, ref) + } + sort.Strings(tags) + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader) + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, `{"name":%q,"tags":`, remote.Name+"/"+image) + writeJSONStringList(w, tags) + fmt.Fprint(w, "}") +} + +func writeJSONStringList(w io.Writer, items []string) { + fmt.Fprint(w, "[") + for i, s := range items { + if i > 0 { + fmt.Fprint(w, ",") + } + fmt.Fprintf(w, "%q", s) + } + fmt.Fprint(w, "]") +} + +// dockerStartUpload begins a blob upload. It honours a monolithic +// POST?digest=... (blob in the POST body) and otherwise opens a chunked +// session, returning its Location for the client's PATCH/PUT. +func (h *ProxyHandler) dockerStartUpload(w http.ResponseWriter, r *http.Request, remote *models.Remote) { + req, ok := parseDockerPath(chi.URLParam(r, "*")) + if !ok || req.kind != "upload" { + dockerError(w, http.StatusNotFound, "NAME_UNKNOWN", "unrecognised registry path") + return + } + + if digest := r.URL.Query().Get("digest"); digest != "" { + h.dockerCommitBlob(w, r, remote, req.image, digest, r.Body) + return + } + + id, _, err := h.uploads.create() + if err != nil { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) + return + } + loc := fmt.Sprintf("/v2/%s/%s/blobs/uploads/%s", remote.Name, req.image, id) + w.Header().Set("Location", loc) + w.Header().Set("Docker-Upload-UUID", id) + w.Header().Set("Range", "0-0") + w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader) + w.WriteHeader(http.StatusAccepted) +} + +func (h *ProxyHandler) dockerPatchUpload(w http.ResponseWriter, r *http.Request, remote *models.Remote) { + req, ok := parseDockerPath(chi.URLParam(r, "*")) + if !ok || req.kind != "upload" || req.ref == "" { + dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload") + return + } + sess := h.uploads.get(req.ref) + if sess == nil { + dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload") + return + } + n, err := io.Copy(sess.file, r.Body) + if err != nil { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) + return + } + sess.size += n + loc := fmt.Sprintf("/v2/%s/%s/blobs/uploads/%s", remote.Name, req.image, req.ref) + w.Header().Set("Location", loc) + w.Header().Set("Docker-Upload-UUID", req.ref) + w.Header().Set("Range", fmt.Sprintf("0-%d", sess.size-1)) + w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader) + w.WriteHeader(http.StatusAccepted) +} + +// dockerFinishUpload completes a chunked upload: appends any final PUT body, +// stores the assembled blob, and verifies its digest. +func (h *ProxyHandler) dockerFinishUpload(w http.ResponseWriter, r *http.Request, remote *models.Remote, req dockerReq) { + digest := r.URL.Query().Get("digest") + if digest == "" { + dockerError(w, http.StatusBadRequest, "DIGEST_INVALID", "digest query parameter required") + return + } + if req.ref == "" { + // Monolithic PUT with no prior session: body is the whole blob. + h.dockerCommitBlob(w, r, remote, req.image, digest, r.Body) + return + } + sess := h.uploads.get(req.ref) + if sess == nil { + dockerError(w, http.StatusNotFound, "BLOB_UPLOAD_UNKNOWN", "unknown upload") + return + } + defer h.uploads.remove(req.ref) + + if _, err := io.Copy(sess.file, r.Body); err != nil { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) + return + } + if _, err := sess.file.Seek(0, io.SeekStart); err != nil { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) + return + } + h.dockerCommitBlob(w, r, remote, req.image, digest, sess.file) +} + +// dockerCommitBlob stores blob bytes through the CAS, verifies the client's +// declared digest, and records the per-image local_files reference. +func (h *ProxyHandler) dockerCommitBlob(w http.ResponseWriter, r *http.Request, remote *models.Remote, image, digest string, body io.Reader) { + result, err := h.cas.Store(r.Context(), body, "application/octet-stream") + if err != nil { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", fmt.Sprintf("store failed: %v", err)) + return + } + if result.ContentHash != digest { + dockerError(w, http.StatusBadRequest, "DIGEST_INVALID", fmt.Sprintf("digest mismatch: got %s, declared %s", result.ContentHash, digest)) + return + } + if err := h.db.UpsertBlob(r.Context(), result.ContentHash, result.S3Key, result.SizeBytes, "application/octet-stream"); err != nil { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) + return + } + if err := h.db.CreateLocalFile(r.Context(), remote.Name, image+"/blobs/"+digest, result.ContentHash); err != nil && !errors.Is(err, database.ErrAlreadyExists) { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) + return + } + w.Header().Set("Location", fmt.Sprintf("/v2/%s/%s/blobs/%s", remote.Name, image, digest)) + w.Header().Set("Docker-Content-Digest", digest) + w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader) + w.WriteHeader(http.StatusCreated) +} + +// dockerPutManifest stores a manifest and points its reference (tag or digest) +// at it. Tags are mutable so a re-push moves the tag; digests are immutable. +func (h *ProxyHandler) dockerPutManifest(w http.ResponseWriter, r *http.Request, remote *models.Remote, req dockerReq) { + body, err := io.ReadAll(r.Body) + if err != nil { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) + return + } + contentType := r.Header.Get("Content-Type") + if contentType == "" { + contentType = "application/vnd.docker.distribution.manifest.v2+json" + } + sum := sha256.Sum256(body) + digest := "sha256:" + hex.EncodeToString(sum[:]) + + result, err := h.cas.Store(r.Context(), strings.NewReader(string(body)), contentType) + if err != nil { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", fmt.Sprintf("store failed: %v", err)) + return + } + if err := h.db.UpsertBlob(r.Context(), result.ContentHash, result.S3Key, result.SizeBytes, contentType); err != nil { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) + return + } + // Always addressable by digest (immutable). + if err := h.db.CreateLocalFile(r.Context(), remote.Name, req.image+"/manifests/"+digest, result.ContentHash); err != nil && !errors.Is(err, database.ErrAlreadyExists) { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) + return + } + // If pushed under a tag, (re)point the tag at this manifest. + if !isDigest(req.ref) { + if err := h.db.UpsertLocalFile(r.Context(), remote.Name, req.image+"/manifests/"+req.ref, result.ContentHash); err != nil { + dockerError(w, http.StatusInternalServerError, "UNKNOWN", err.Error()) + return + } + } + + slog.Info("local docker manifest pushed", "repo", remote.Name, "image", req.image, "ref", req.ref, "digest", digest) + w.Header().Set("Location", fmt.Sprintf("/v2/%s/%s/manifests/%s", remote.Name, req.image, req.ref)) + w.Header().Set("Docker-Content-Digest", digest) + w.Header().Set("Docker-Distribution-Api-Version", dockerAPIVersionHeader) + w.WriteHeader(http.StatusCreated) +} diff --git a/internal/api/v1/docker_test.go b/internal/api/v1/docker_test.go new file mode 100644 index 0000000..0c25e57 --- /dev/null +++ b/internal/api/v1/docker_test.go @@ -0,0 +1,50 @@ +package v1 + +import "testing" + +func TestParseDockerPath(t *testing.T) { + tests := []struct { + name string + rest string + wantOK bool + wantImage string + wantKind string + wantRef string + }{ + {"start upload trailing slash", "team/app/blobs/uploads/", true, "team/app", "upload", ""}, + {"start upload no slash", "team/app/blobs/uploads", true, "team/app", "upload", ""}, + {"patch upload with uuid", "team/app/blobs/uploads/abc-123", true, "team/app", "upload", "abc-123"}, + {"single-segment image upload", "app/blobs/uploads/", true, "app", "upload", ""}, + {"blob by digest", "team/app/blobs/sha256:deadbeef", true, "team/app", "blob", "sha256:deadbeef"}, + {"manifest by tag", "team/app/manifests/v1.0.0", true, "team/app", "manifest", "v1.0.0"}, + {"manifest by digest", "team/app/manifests/sha256:cafe", true, "team/app", "manifest", "sha256:cafe"}, + {"tags list", "team/app/tags/list", true, "team/app", "tags", ""}, + {"leading slash tolerated", "/team/app/manifests/latest", true, "team/app", "manifest", "latest"}, + {"deep image name", "a/b/c/manifests/latest", true, "a/b/c", "manifest", "latest"}, + {"unrecognised", "team/app/whatever", false, "", "", ""}, + {"tags list without image", "tags/list", false, "", "", ""}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got, ok := parseDockerPath(tc.rest) + if ok != tc.wantOK { + t.Fatalf("ok = %v, want %v", ok, tc.wantOK) + } + if !tc.wantOK { + return + } + if got.image != tc.wantImage || got.kind != tc.wantKind || got.ref != tc.wantRef { + t.Fatalf("got %+v, want image=%q kind=%q ref=%q", got, tc.wantImage, tc.wantKind, tc.wantRef) + } + }) + } +} + +func TestIsDigest(t *testing.T) { + if !isDigest("sha256:abc") { + t.Fatal("sha256: prefix should be a digest") + } + if isDigest("v1.0.0") { + t.Fatal("a tag is not a digest") + } +} diff --git a/internal/api/v1/proxy.go b/internal/api/v1/proxy.go index bea9c4d..9f24ea0 100644 --- a/internal/api/v1/proxy.go +++ b/internal/api/v1/proxy.go @@ -23,10 +23,20 @@ type ProxyHandler struct { db *database.DB store *storage.S3 local *v2.LocalHandler + cas *storage.CAS + uploads *uploadStore } func NewProxyHandler(engine *proxy.Engine, virtualEngine *virtual.Engine, db *database.DB, store *storage.S3, local *v2.LocalHandler) *ProxyHandler { - return &ProxyHandler{engine: engine, virtualEngine: virtualEngine, db: db, store: store, local: local} + return &ProxyHandler{ + engine: engine, + virtualEngine: virtualEngine, + db: db, + store: store, + local: local, + cas: storage.NewCAS(store), + uploads: newUploadStore(), + } } func (h *ProxyHandler) Routes() chi.Router { @@ -37,12 +47,20 @@ func (h *ProxyHandler) Routes() chi.Router { return r } +// DockerV2Routes mounts the Docker Registry HTTP API V2. Reads (GET/HEAD) +// dispatch to a local registry implementation for local docker repos and fall +// through to the upstream proxy otherwise; writes (POST/PATCH/PUT/DELETE) are +// only valid for local docker repos and drive push. func (h *ProxyHandler) DockerV2Routes() chi.Router { r := chi.NewRouter() r.Get("/", h.handleDockerPing) r.Head("/", h.handleDockerPing) - r.Get("/{remoteName}/*", h.handleProxy) - r.Head("/{remoteName}/*", h.handleProxyHead) + r.Get("/{remoteName}/*", h.dockerGet) + r.Head("/{remoteName}/*", h.dockerHead) + r.Post("/{remoteName}/*", h.dockerPost) + r.Patch("/{remoteName}/*", h.dockerPatch) + r.Put("/{remoteName}/*", h.dockerPut) + r.Delete("/{remoteName}/*", h.dockerDelete) return r } diff --git a/internal/database/local_files.go b/internal/database/local_files.go index 9f93bdd..8ffa0c9 100644 --- a/internal/database/local_files.go +++ b/internal/database/local_files.go @@ -38,6 +38,20 @@ func (db *DB) CreateLocalFile(ctx context.Context, repoName, filePath, contentHa return nil } +// UpsertLocalFile inserts a local file or repoints an existing path at a new +// blob. Unlike CreateLocalFile it never errors on a duplicate path — it is for +// mutable references such as Docker tags, where re-pushing a tag must move it to +// the newly-pushed manifest rather than being rejected as an overwrite. +func (db *DB) UpsertLocalFile(ctx context.Context, repoName, filePath, contentHash string) error { + _, err := db.Pool.Exec(ctx, ` + INSERT INTO local_files (repo_name, file_path, content_hash) + VALUES ($1, $2, $3) + ON CONFLICT (repo_name, file_path) + DO UPDATE SET content_hash = EXCLUDED.content_hash, created_at = NOW() + `, repoName, filePath, contentHash) + return err +} + func (db *DB) GetLocalFile(ctx context.Context, repoName, filePath string) (*LocalFile, error) { row := db.Pool.QueryRow(ctx, ` SELECT id, repo_name, file_path, content_hash, created_at