Merge pull request 'feat: add Docker registry proxy support with proper cache classification' (#8) from benvin/docker-caching into master

Reviewed-on: #8
This commit was merged in pull request #8.
This commit is contained in:
2026-04-25 16:37:38 +10:00
4 changed files with 221 additions and 2 deletions
+7
View File
@@ -30,6 +30,13 @@ class RedisCache:
".yaml.xz", ".yaml.gz", ".yaml.bz2", ".yaml.zst", ".yaml.xz", ".yaml.gz", ".yaml.bz2", ".yaml.zst",
".asc", ".txt" ".asc", ".txt"
))) )))
# Docker tag-based manifests are mutable (index); digest-pinned are immutable (file)
or (
"/manifests/" in file_path
and not file_path.split("/manifests/", 1)[1].startswith("sha256:")
)
or "/tags/list" in file_path
or file_path.endswith("/tags/list")
) )
def get_index_cache_key(self, remote_name: str, path: str) -> str: def get_index_cache_key(self, remote_name: str, path: str) -> str:
+96
View File
@@ -0,0 +1,96 @@
import time
import logging
import re
from typing import Optional
import httpx
logger = logging.getLogger(__name__)
# In-memory token cache: key -> (token, expires_at)
_token_cache: dict[str, tuple[str, float]] = {}
_WWW_AUTH_RE = re.compile(
r'Bearer\s+realm="(?P<realm>[^"]+)"'
r'(?:,service="(?P<service>[^"]*)")?'
r'(?:,scope="(?P<scope>[^"]*)")?',
re.IGNORECASE,
)
def _cache_key(realm: str, service: str, scope: str, username: Optional[str]) -> str:
return f"{realm}|{service}|{scope}|{username or ''}"
def _get_cached_token(key: str) -> Optional[str]:
entry = _token_cache.get(key)
if entry and entry[1] > time.time():
return entry[0]
_token_cache.pop(key, None)
return None
def _store_token(key: str, token: str, expires_in: int) -> None:
# Expire 30s early to avoid using a token right as it expires
_token_cache[key] = (token, time.time() + max(expires_in - 30, 10))
async def fetch_token(
realm: str,
service: str,
scope: str,
username: Optional[str] = None,
password: Optional[str] = None,
) -> Optional[str]:
"""Fetch a Bearer token from a Docker registry auth server."""
key = _cache_key(realm, service, scope, username)
cached = _get_cached_token(key)
if cached:
return cached
params: dict[str, str] = {}
if service:
params["service"] = service
if scope:
params["scope"] = scope
auth = (username, password) if username and password else None
try:
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(realm, params=params, auth=auth)
response.raise_for_status()
data = response.json()
except Exception as e:
logger.warning(f"Docker token fetch failed ({realm}): {e}")
return None
token = data.get("token") or data.get("access_token")
if not token:
logger.warning(f"Docker token response missing token field: {data}")
return None
expires_in = int(data.get("expires_in", 300))
_store_token(key, token, expires_in)
logger.debug(f"Docker token obtained (realm={realm}, service={service}, scope={scope}, expires_in={expires_in}s)")
return token
def parse_www_authenticate(header: str) -> Optional[tuple[str, str, str]]:
"""Parse WWW-Authenticate: Bearer header. Returns (realm, service, scope) or None."""
m = _WWW_AUTH_RE.search(header)
if not m:
return None
return m.group("realm"), m.group("service") or "", m.group("scope") or ""
async def get_docker_token_for_response(
www_authenticate: str,
username: Optional[str] = None,
password: Optional[str] = None,
) -> Optional[str]:
"""Given a WWW-Authenticate header value, fetch and return a Bearer token."""
parsed = parse_www_authenticate(www_authenticate)
if not parsed:
return None
realm, service, scope = parsed
return await fetch_token(realm, service, scope, username, password)
+110 -2
View File
@@ -1,10 +1,11 @@
import os import os
import re import re
import json
import hashlib import hashlib
import logging import logging
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
import httpx import httpx
from fastapi import FastAPI, HTTPException, Response, Query, File, UploadFile from fastapi import FastAPI, HTTPException, Response, Request, Query, File, UploadFile
from fastapi.responses import PlainTextResponse, JSONResponse from fastapi.responses import PlainTextResponse, JSONResponse
from pydantic import BaseModel from pydantic import BaseModel
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
@@ -21,6 +22,7 @@ from .database import DatabaseManager
from .storage import S3Storage from .storage import S3Storage
from .cache import RedisCache from .cache import RedisCache
from .metrics import MetricsManager from .metrics import MetricsManager
from .docker_auth import get_docker_token_for_response
class ArtifactRequest(BaseModel): class ArtifactRequest(BaseModel):
@@ -164,6 +166,12 @@ async def construct_remote_url(remote_name: str, path: str) -> str:
status_code=500, detail=f"No base_url configured for remote '{remote_name}'" status_code=500, detail=f"No base_url configured for remote '{remote_name}'"
) )
# Handle Docker registry URLs
if remote_config.get("type") == "docker":
# Convert Docker paths to v2 API format
# e.g., library/nginx/manifests/latest -> v2/library/nginx/manifests/latest
return f"{base_url}/v2/{path}"
return f"{base_url}/{path}" return f"{base_url}/{path}"
@@ -206,8 +214,35 @@ async def cache_single_artifact(url: str, remote_name: str, path: str) -> dict:
} }
try: try:
remote_config = config.get_remote_config(remote_name) or {}
is_docker = remote_config.get("type") == "docker" or "/v2/" in url
# Prepare headers for Docker registry requests
headers = {}
if is_docker:
if "/manifests/" in url:
headers["Accept"] = (
"application/vnd.docker.distribution.manifest.v2+json,"
"application/vnd.oci.image.manifest.v1+json,"
"application/vnd.oci.image.index.v1+json,"
"application/vnd.docker.distribution.manifest.list.v2+json"
)
elif "/blobs/" in url:
headers["Accept"] = "application/octet-stream"
async with httpx.AsyncClient(follow_redirects=True) as client: async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(url) response = await client.get(url, headers=headers)
# Handle Docker Bearer token challenge
if response.status_code == 401 and is_docker:
www_auth = response.headers.get("WWW-Authenticate", "")
username = remote_config.get("username")
password = remote_config.get("password")
token = await get_docker_token_for_response(www_auth, username, password)
if token:
headers["Authorization"] = f"Bearer {token}"
response = await client.get(url, headers=headers)
response.raise_for_status() response.raise_for_status()
storage_path = storage.upload(key, response.content) storage_path = storage.upload(key, response.content)
@@ -400,6 +435,79 @@ async def get_artifact(remote_name: str, path: str):
raise HTTPException(status_code=500, detail=f"Error serving artifact: {str(e)}") raise HTTPException(status_code=500, detail=f"Error serving artifact: {str(e)}")
@app.get("/v2/")
async def docker_v2_ping():
return Response(
content="{}",
media_type="application/json",
headers={"Docker-Distribution-Api-Version": "registry/2.0"},
)
@app.api_route("/v2/{remote_name}/{path:path}", methods=["GET", "HEAD"])
async def docker_v2_proxy(request: Request, remote_name: str, path: str):
remote_config = config.get_remote_config(remote_name)
if not remote_config:
raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured")
if remote_config.get("type") != "docker":
raise HTTPException(status_code=400, detail=f"Remote '{remote_name}' is not a docker remote")
remote_url = await construct_remote_url(remote_name, path)
cached_key = storage.get_object_key(remote_name, path)
if not storage.exists(cached_key):
cached_key = None
is_index = cache.is_index_file(path)
if cached_key and is_index:
if not cache.is_index_valid(remote_name, path):
logger.info(f"Index EXPIRED: {remote_name}/{path} - removing from cache")
cache.cleanup_expired_index(storage, remote_name, path)
cached_key = None
if not cached_key:
logger.info(f"Cache MISS: {remote_name}/{path} - fetching from remote: {remote_url}")
result = await cache_single_artifact(remote_url, remote_name, path)
if result["status"] == "error":
raise HTTPException(status_code=502, detail=f"Failed to fetch: {result['error']}")
if result["status"] == "cached" and is_index:
cache_config = config.get_cache_config(remote_name)
index_ttl = cache_config.get("index_ttl", 300)
cache.mark_index_cached(remote_name, path, index_ttl)
logger.info(f"Index file cached with TTL: {remote_name}/{path} (ttl: {index_ttl}s)")
artifact_data = storage.download_object(storage.get_object_key(remote_name, path))
is_blob = "/blobs/" in path
if is_blob:
content_type = "application/octet-stream"
else:
try:
manifest_json = json.loads(artifact_data)
content_type = manifest_json.get("mediaType")
if not content_type:
if "manifests" in manifest_json:
content_type = "application/vnd.oci.image.index.v1+json"
else:
content_type = "application/vnd.oci.image.manifest.v1+json"
except Exception:
content_type = "application/vnd.oci.image.manifest.v1+json"
digest = f"sha256:{hashlib.sha256(artifact_data).hexdigest()}"
headers = {
"Docker-Distribution-Api-Version": "registry/2.0",
"Docker-Content-Digest": digest,
"Content-Length": str(len(artifact_data)),
}
if request.method == "HEAD":
return Response(status_code=200, headers=headers, media_type=content_type)
metrics.record_cache_hit(remote_name, len(artifact_data))
return Response(content=artifact_data, media_type=content_type, headers=headers)
async def discover_artifacts(remote: str, include_pattern: str) -> list[str]: async def discover_artifacts(remote: str, include_pattern: str) -> list[str]:
if "github.com" in remote: if "github.com" in remote:
return await discover_github_releases(remote, include_pattern) return await discover_github_releases(remote, include_pattern)
+8
View File
@@ -60,6 +60,14 @@ class S3Storage:
filename = os.path.basename(clean_path) filename = os.path.basename(clean_path)
directory_path = os.path.dirname(clean_path) directory_path = os.path.dirname(clean_path)
# Special handling for Docker registry blobs (use digest as key for deduplication)
if "/blobs/sha256:" in clean_path:
# Extract the SHA256 digest for Docker blobs
parts = clean_path.split("/blobs/sha256:")
if len(parts) == 2:
digest = parts[1]
return f"{remote_name}/blobs/sha256/{digest}"
# Hash the directory path to keep keys manageable while preserving remote structure # Hash the directory path to keep keys manageable while preserving remote structure
if directory_path: if directory_path:
path_hash = hashlib.sha256(directory_path.encode()).hexdigest()[:16] path_hash = hashlib.sha256(directory_path.encode()).hexdigest()[:16]