From 4651183ed118e51ba11922a5e9fe15a81263bd87 Mon Sep 17 00:00:00 2001 From: Ben Vincent Date: Sat, 25 Apr 2026 16:35:27 +1000 Subject: [PATCH] feat: add Docker registry proxy support with proper cache classification - Add /v2/ endpoint implementing OCI Distribution API for native docker pull support - Add docker_auth.py with Bearer token challenge handling and in-memory token cache - Classify tag-based manifests (/manifests/) as index (short TTL, mutable) - Classify digest-pinned manifests (/manifests/sha256:...) and blobs as file cache (indefinite, immutable) - Deduplicate blob storage by keying on sha256 digest rather than image path - Support username/password auth per docker remote in remotes.yaml --- src/artifactapi/cache.py | 7 +++ src/artifactapi/docker_auth.py | 96 ++++++++++++++++++++++++++++ src/artifactapi/main.py | 112 ++++++++++++++++++++++++++++++++- src/artifactapi/storage.py | 8 +++ 4 files changed, 221 insertions(+), 2 deletions(-) create mode 100644 src/artifactapi/docker_auth.py diff --git a/src/artifactapi/cache.py b/src/artifactapi/cache.py index 42090dc..1c37081 100644 --- a/src/artifactapi/cache.py +++ b/src/artifactapi/cache.py @@ -30,6 +30,13 @@ class RedisCache: ".yaml.xz", ".yaml.gz", ".yaml.bz2", ".yaml.zst", ".asc", ".txt" ))) + # Docker tag-based manifests are mutable (index); digest-pinned are immutable (file) + or ( + "/manifests/" in file_path + and not file_path.split("/manifests/", 1)[1].startswith("sha256:") + ) + or "/tags/list" in file_path + or file_path.endswith("/tags/list") ) def get_index_cache_key(self, remote_name: str, path: str) -> str: diff --git a/src/artifactapi/docker_auth.py b/src/artifactapi/docker_auth.py new file mode 100644 index 0000000..4bf6cda --- /dev/null +++ b/src/artifactapi/docker_auth.py @@ -0,0 +1,96 @@ +import time +import logging +import re +from typing import Optional +import httpx + +logger = logging.getLogger(__name__) + +# In-memory token cache: key -> (token, expires_at) +_token_cache: dict[str, tuple[str, float]] = {} + +_WWW_AUTH_RE = re.compile( + r'Bearer\s+realm="(?P[^"]+)"' + r'(?:,service="(?P[^"]*)")?' + r'(?:,scope="(?P[^"]*)")?', + re.IGNORECASE, +) + + +def _cache_key(realm: str, service: str, scope: str, username: Optional[str]) -> str: + return f"{realm}|{service}|{scope}|{username or ''}" + + +def _get_cached_token(key: str) -> Optional[str]: + entry = _token_cache.get(key) + if entry and entry[1] > time.time(): + return entry[0] + _token_cache.pop(key, None) + return None + + +def _store_token(key: str, token: str, expires_in: int) -> None: + # Expire 30s early to avoid using a token right as it expires + _token_cache[key] = (token, time.time() + max(expires_in - 30, 10)) + + +async def fetch_token( + realm: str, + service: str, + scope: str, + username: Optional[str] = None, + password: Optional[str] = None, +) -> Optional[str]: + """Fetch a Bearer token from a Docker registry auth server.""" + key = _cache_key(realm, service, scope, username) + cached = _get_cached_token(key) + if cached: + return cached + + params: dict[str, str] = {} + if service: + params["service"] = service + if scope: + params["scope"] = scope + + auth = (username, password) if username and password else None + + try: + async with httpx.AsyncClient(follow_redirects=True) as client: + response = await client.get(realm, params=params, auth=auth) + response.raise_for_status() + data = response.json() + except Exception as e: + logger.warning(f"Docker token fetch failed ({realm}): {e}") + return None + + token = data.get("token") or data.get("access_token") + if not token: + logger.warning(f"Docker token response missing token field: {data}") + return None + + expires_in = int(data.get("expires_in", 300)) + _store_token(key, token, expires_in) + logger.debug(f"Docker token obtained (realm={realm}, service={service}, scope={scope}, expires_in={expires_in}s)") + return token + + +def parse_www_authenticate(header: str) -> Optional[tuple[str, str, str]]: + """Parse WWW-Authenticate: Bearer header. Returns (realm, service, scope) or None.""" + m = _WWW_AUTH_RE.search(header) + if not m: + return None + return m.group("realm"), m.group("service") or "", m.group("scope") or "" + + +async def get_docker_token_for_response( + www_authenticate: str, + username: Optional[str] = None, + password: Optional[str] = None, +) -> Optional[str]: + """Given a WWW-Authenticate header value, fetch and return a Bearer token.""" + parsed = parse_www_authenticate(www_authenticate) + if not parsed: + return None + realm, service, scope = parsed + return await fetch_token(realm, service, scope, username, password) diff --git a/src/artifactapi/main.py b/src/artifactapi/main.py index cae2ed7..b7917dc 100644 --- a/src/artifactapi/main.py +++ b/src/artifactapi/main.py @@ -1,10 +1,11 @@ import os import re +import json import hashlib import logging from typing import Dict, Any, Optional import httpx -from fastapi import FastAPI, HTTPException, Response, Query, File, UploadFile +from fastapi import FastAPI, HTTPException, Response, Request, Query, File, UploadFile from fastapi.responses import PlainTextResponse, JSONResponse from pydantic import BaseModel from prometheus_client import generate_latest, CONTENT_TYPE_LATEST @@ -21,6 +22,7 @@ from .database import DatabaseManager from .storage import S3Storage from .cache import RedisCache from .metrics import MetricsManager +from .docker_auth import get_docker_token_for_response class ArtifactRequest(BaseModel): @@ -164,6 +166,12 @@ async def construct_remote_url(remote_name: str, path: str) -> str: status_code=500, detail=f"No base_url configured for remote '{remote_name}'" ) + # Handle Docker registry URLs + if remote_config.get("type") == "docker": + # Convert Docker paths to v2 API format + # e.g., library/nginx/manifests/latest -> v2/library/nginx/manifests/latest + return f"{base_url}/v2/{path}" + return f"{base_url}/{path}" @@ -206,8 +214,35 @@ async def cache_single_artifact(url: str, remote_name: str, path: str) -> dict: } try: + remote_config = config.get_remote_config(remote_name) or {} + is_docker = remote_config.get("type") == "docker" or "/v2/" in url + + # Prepare headers for Docker registry requests + headers = {} + if is_docker: + if "/manifests/" in url: + headers["Accept"] = ( + "application/vnd.docker.distribution.manifest.v2+json," + "application/vnd.oci.image.manifest.v1+json," + "application/vnd.oci.image.index.v1+json," + "application/vnd.docker.distribution.manifest.list.v2+json" + ) + elif "/blobs/" in url: + headers["Accept"] = "application/octet-stream" + async with httpx.AsyncClient(follow_redirects=True) as client: - response = await client.get(url) + response = await client.get(url, headers=headers) + + # Handle Docker Bearer token challenge + if response.status_code == 401 and is_docker: + www_auth = response.headers.get("WWW-Authenticate", "") + username = remote_config.get("username") + password = remote_config.get("password") + token = await get_docker_token_for_response(www_auth, username, password) + if token: + headers["Authorization"] = f"Bearer {token}" + response = await client.get(url, headers=headers) + response.raise_for_status() storage_path = storage.upload(key, response.content) @@ -400,6 +435,79 @@ async def get_artifact(remote_name: str, path: str): raise HTTPException(status_code=500, detail=f"Error serving artifact: {str(e)}") +@app.get("/v2/") +async def docker_v2_ping(): + return Response( + content="{}", + media_type="application/json", + headers={"Docker-Distribution-Api-Version": "registry/2.0"}, + ) + + +@app.api_route("/v2/{remote_name}/{path:path}", methods=["GET", "HEAD"]) +async def docker_v2_proxy(request: Request, remote_name: str, path: str): + remote_config = config.get_remote_config(remote_name) + if not remote_config: + raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured") + if remote_config.get("type") != "docker": + raise HTTPException(status_code=400, detail=f"Remote '{remote_name}' is not a docker remote") + + remote_url = await construct_remote_url(remote_name, path) + + cached_key = storage.get_object_key(remote_name, path) + if not storage.exists(cached_key): + cached_key = None + + is_index = cache.is_index_file(path) + + if cached_key and is_index: + if not cache.is_index_valid(remote_name, path): + logger.info(f"Index EXPIRED: {remote_name}/{path} - removing from cache") + cache.cleanup_expired_index(storage, remote_name, path) + cached_key = None + + if not cached_key: + logger.info(f"Cache MISS: {remote_name}/{path} - fetching from remote: {remote_url}") + result = await cache_single_artifact(remote_url, remote_name, path) + if result["status"] == "error": + raise HTTPException(status_code=502, detail=f"Failed to fetch: {result['error']}") + if result["status"] == "cached" and is_index: + cache_config = config.get_cache_config(remote_name) + index_ttl = cache_config.get("index_ttl", 300) + cache.mark_index_cached(remote_name, path, index_ttl) + logger.info(f"Index file cached with TTL: {remote_name}/{path} (ttl: {index_ttl}s)") + + artifact_data = storage.download_object(storage.get_object_key(remote_name, path)) + + is_blob = "/blobs/" in path + if is_blob: + content_type = "application/octet-stream" + else: + try: + manifest_json = json.loads(artifact_data) + content_type = manifest_json.get("mediaType") + if not content_type: + if "manifests" in manifest_json: + content_type = "application/vnd.oci.image.index.v1+json" + else: + content_type = "application/vnd.oci.image.manifest.v1+json" + except Exception: + content_type = "application/vnd.oci.image.manifest.v1+json" + + digest = f"sha256:{hashlib.sha256(artifact_data).hexdigest()}" + headers = { + "Docker-Distribution-Api-Version": "registry/2.0", + "Docker-Content-Digest": digest, + "Content-Length": str(len(artifact_data)), + } + + if request.method == "HEAD": + return Response(status_code=200, headers=headers, media_type=content_type) + + metrics.record_cache_hit(remote_name, len(artifact_data)) + return Response(content=artifact_data, media_type=content_type, headers=headers) + + async def discover_artifacts(remote: str, include_pattern: str) -> list[str]: if "github.com" in remote: return await discover_github_releases(remote, include_pattern) diff --git a/src/artifactapi/storage.py b/src/artifactapi/storage.py index 3e84792..e8c717f 100644 --- a/src/artifactapi/storage.py +++ b/src/artifactapi/storage.py @@ -60,6 +60,14 @@ class S3Storage: filename = os.path.basename(clean_path) directory_path = os.path.dirname(clean_path) + # Special handling for Docker registry blobs (use digest as key for deduplication) + if "/blobs/sha256:" in clean_path: + # Extract the SHA256 digest for Docker blobs + parts = clean_path.split("/blobs/sha256:") + if len(parts) == 2: + digest = parts[1] + return f"{remote_name}/blobs/sha256/{digest}" + # Hash the directory path to keep keys manageable while preserving remote structure if directory_path: path_hash = hashlib.sha256(directory_path.encode()).hexdigest()[:16]