refactor: add storage/s3 and auth/docker submodules

- storage/s3.py: S3Storage moved from storage.py; storage/__init__.py re-exports it
- auth/docker.py: Docker Bearer token logic moved from docker_auth.py
- docker_auth.py: thin shim re-exporting all public symbols (including _token_cache)
  for backwards compatibility with existing test and import paths
- main.py: now imports get_docker_token_for_response from .auth

All 187 tests pass.
This commit is contained in:
2026-04-28 22:15:04 +10:00
parent 0df726467a
commit 0daca40156
7 changed files with 128 additions and 103 deletions
+7 -2
View File
@@ -33,15 +33,20 @@ Docker Registry traffic uses the `/v2/{remote}/{path}` endpoint implementing the
src/artifactapi/
├── main.py — FastAPI app, route handlers
├── config.py — ConfigManager (loads remotes.yaml)
├── storage.py — S3Storage (MinIO/S3 abstraction)
├── docker_auth.py — Docker Bearer token fetching
├── metrics.py — Prometheus + Redis metrics
├── docker_auth.py — backwards-compat shim → auth/docker.py
├── auth/
│ ├── __init__.py — re-exports Docker auth helpers
│ └── docker.py — Bearer token fetching + in-memory cache
├── cache/
│ ├── __init__.py — re-exports RedisCache
│ └── redis.py — RedisCache (TTL keys, ETag metadata)
├── database/
│ ├── __init__.py — re-exports DatabaseManager
│ └── postgres.py — DatabaseManager (artifact + local-file tables)
├── storage/
│ ├── __init__.py — re-exports S3Storage
│ └── s3.py — S3Storage (MinIO/S3 abstraction)
└── remote/
├── __init__.py
├── base.py — content-type detection
+3
View File
@@ -0,0 +1,3 @@
from .docker import fetch_token, get_docker_token_for_response, parse_www_authenticate
__all__ = ["fetch_token", "get_docker_token_for_response", "parse_www_authenticate"]
+96
View File
@@ -0,0 +1,96 @@
import logging
import re
import time
import httpx
logger = logging.getLogger(__name__)
# In-memory token cache: key -> (token, expires_at)
_token_cache: dict[str, tuple[str, float]] = {}
_WWW_AUTH_RE = re.compile(
r'Bearer\s+realm="(?P<realm>[^"]+)"'
r'(?:,service="(?P<service>[^"]*)")?'
r'(?:,scope="(?P<scope>[^"]*)")?',
re.IGNORECASE,
)
def _cache_key(realm: str, service: str, scope: str, username: str | None) -> str:
return f"{realm}|{service}|{scope}|{username or ''}"
def _get_cached_token(key: str) -> str | None:
entry = _token_cache.get(key)
if entry and entry[1] > time.time():
return entry[0]
_token_cache.pop(key, None)
return None
def _store_token(key: str, token: str, expires_in: int) -> None:
# Expire 30s early to avoid using a token right as it expires
_token_cache[key] = (token, time.time() + max(expires_in - 30, 10))
async def fetch_token(
realm: str,
service: str,
scope: str,
username: str | None = None,
password: str | None = None,
) -> str | None:
"""Fetch a Bearer token from a Docker registry auth server."""
key = _cache_key(realm, service, scope, username)
cached = _get_cached_token(key)
if cached:
return cached
params: dict[str, str] = {}
if service:
params["service"] = service
if scope:
params["scope"] = scope
auth = (username, password) if username and password else None
try:
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(realm, params=params, auth=auth)
response.raise_for_status()
data = response.json()
except Exception as e:
logger.warning(f"Docker token fetch failed ({realm}): {e}")
return None
token = data.get("token") or data.get("access_token")
if not token:
logger.warning(f"Docker token response missing token field: {data}")
return None
expires_in = int(data.get("expires_in", 300))
_store_token(key, token, expires_in)
logger.debug(f"Docker token obtained (realm={realm}, service={service}, scope={scope}, expires_in={expires_in}s)")
return token
def parse_www_authenticate(header: str) -> tuple[str, str, str] | None:
"""Parse WWW-Authenticate: Bearer header. Returns (realm, service, scope) or None."""
m = _WWW_AUTH_RE.search(header)
if not m:
return None
return m.group("realm"), m.group("service") or "", m.group("scope") or ""
async def get_docker_token_for_response(
www_authenticate: str,
username: str | None = None,
password: str | None = None,
) -> str | None:
"""Given a WWW-Authenticate header value, fetch and return a Bearer token."""
parsed = parse_www_authenticate(www_authenticate)
if not parsed:
return None
realm, service, scope = parsed
return await fetch_token(realm, service, scope, username, password)
+17 -94
View File
@@ -1,96 +1,19 @@
import logging
import re
import time
import httpx
logger = logging.getLogger(__name__)
# In-memory token cache: key -> (token, expires_at)
_token_cache: dict[str, tuple[str, float]] = {}
_WWW_AUTH_RE = re.compile(
r'Bearer\s+realm="(?P<realm>[^"]+)"'
r'(?:,service="(?P<service>[^"]*)")?'
r'(?:,scope="(?P<scope>[^"]*)")?',
re.IGNORECASE,
from .auth.docker import (
_cache_key,
_get_cached_token,
_store_token,
_token_cache,
fetch_token,
get_docker_token_for_response,
parse_www_authenticate,
)
def _cache_key(realm: str, service: str, scope: str, username: str | None) -> str:
return f"{realm}|{service}|{scope}|{username or ''}"
def _get_cached_token(key: str) -> str | None:
entry = _token_cache.get(key)
if entry and entry[1] > time.time():
return entry[0]
_token_cache.pop(key, None)
return None
def _store_token(key: str, token: str, expires_in: int) -> None:
# Expire 30s early to avoid using a token right as it expires
_token_cache[key] = (token, time.time() + max(expires_in - 30, 10))
async def fetch_token(
realm: str,
service: str,
scope: str,
username: str | None = None,
password: str | None = None,
) -> str | None:
"""Fetch a Bearer token from a Docker registry auth server."""
key = _cache_key(realm, service, scope, username)
cached = _get_cached_token(key)
if cached:
return cached
params: dict[str, str] = {}
if service:
params["service"] = service
if scope:
params["scope"] = scope
auth = (username, password) if username and password else None
try:
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(realm, params=params, auth=auth)
response.raise_for_status()
data = response.json()
except Exception as e:
logger.warning(f"Docker token fetch failed ({realm}): {e}")
return None
token = data.get("token") or data.get("access_token")
if not token:
logger.warning(f"Docker token response missing token field: {data}")
return None
expires_in = int(data.get("expires_in", 300))
_store_token(key, token, expires_in)
logger.debug(f"Docker token obtained (realm={realm}, service={service}, scope={scope}, expires_in={expires_in}s)")
return token
def parse_www_authenticate(header: str) -> tuple[str, str, str] | None:
"""Parse WWW-Authenticate: Bearer header. Returns (realm, service, scope) or None."""
m = _WWW_AUTH_RE.search(header)
if not m:
return None
return m.group("realm"), m.group("service") or "", m.group("scope") or ""
async def get_docker_token_for_response(
www_authenticate: str,
username: str | None = None,
password: str | None = None,
) -> str | None:
"""Given a WWW-Authenticate header value, fetch and return a Bearer token."""
parsed = parse_www_authenticate(www_authenticate)
if not parsed:
return None
realm, service, scope = parsed
return await fetch_token(realm, service, scope, username, password)
__all__ = [
"_cache_key",
"_get_cached_token",
"_store_token",
"_token_cache",
"fetch_token",
"get_docker_token_for_response",
"parse_www_authenticate",
]
+1 -1
View File
@@ -20,10 +20,10 @@ except ImportError:
# Fallback for development when package isn't installed
__version__ = "dev"
from .auth import get_docker_token_for_response
from .cache import RedisCache
from .config import ConfigManager
from .database import DatabaseManager
from .docker_auth import get_docker_token_for_response
from .metrics import MetricsManager
from .remote import helm as _helm
from .remote import npm as _npm
+3
View File
@@ -0,0 +1,3 @@
from .s3 import S3Storage
__all__ = ["S3Storage"]
@@ -41,7 +41,6 @@ class S3Storage:
self.client = boto3.client("s3", **client_kwargs)
# Try to ensure bucket exists, but don't fail if MinIO isn't ready yet
try:
self._ensure_bucket_exists()
except Exception as e:
@@ -55,25 +54,21 @@ class S3Storage:
self.client.create_bucket(Bucket=self.bucket)
def get_object_key(self, remote_name: str, path: str) -> str:
# Extract directory path and filename
clean_path = path.lstrip("/")
filename = os.path.basename(clean_path)
directory_path = os.path.dirname(clean_path)
# Special handling for Docker registry blobs (use digest as key for deduplication)
# Docker blobs are keyed by digest for deduplication across images
if "/blobs/sha256:" in clean_path:
# Extract the SHA256 digest for Docker blobs
parts = clean_path.split("/blobs/sha256:")
if len(parts) == 2:
digest = parts[1]
return f"{remote_name}/blobs/sha256/{digest}"
# Hash the directory path to keep keys manageable while preserving remote structure
if directory_path:
path_hash = hashlib.sha256(directory_path.encode()).hexdigest()[:16]
return f"{remote_name}/{path_hash}/{filename}"
else:
# If no directory, just use remote and filename
return f"{remote_name}/{filename}"
def exists(self, key: str) -> bool: