diff --git a/README.md b/README.md index bf27022..f9beae2 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,42 @@ client → /api/v1/remote/{remote}/{path} Docker Registry traffic uses the `/v2/{remote}/{path}` endpoint implementing the Docker Registry HTTP API v2. +### Code layout + +``` +src/artifactapi/ +├── main.py — FastAPI app + thin route declarations only +├── config.py — ConfigManager (loads remotes.yaml) +├── metrics.py — Prometheus + Redis metrics +├── docker_auth.py — backwards-compat shim → auth/docker.py +├── artifact/ — route handler implementations +│ ├── proxy.py — GET /api/v1/remote (remote proxy, cache, revalidation) +│ ├── local.py — PUT/HEAD/DELETE /api/v1/remote (local repos) +│ ├── docker.py — /v2/ Docker Registry v2 proxy +│ ├── discovery.py — /api/v1/artifacts discovery + bulk cache +│ └── flush.py — PUT /cache/flush +├── auth/ +│ ├── __init__.py — re-exports Docker auth helpers +│ └── docker.py — Bearer token fetching + in-memory cache +├── cache/ +│ ├── __init__.py — re-exports RedisCache +│ └── redis.py — RedisCache (TTL keys, ETag metadata) +├── database/ +│ ├── __init__.py — re-exports DatabaseManager +│ └── postgres.py — DatabaseManager (artifact + local-file tables) +├── storage/ +│ ├── __init__.py — re-exports S3Storage +│ └── s3.py — S3Storage (MinIO/S3 abstraction) +└── remote/ + ├── __init__.py + ├── base.py — content-type detection + ├── generic.py — generic HTTP remotes + ├── helm.py — Helm index.yaml URL rewriting + ├── npm.py — npm metadata URL rewriting + ├── python.py — PyPI URL construction + HTML rewriting + └── rpm.py — RPM remotes +``` + ## API Endpoints | Method | Path | Description | diff --git a/src/artifactapi/artifact/__init__.py b/src/artifactapi/artifact/__init__.py new file mode 100644 index 0000000..9a52d4e --- /dev/null +++ b/src/artifactapi/artifact/__init__.py @@ -0,0 +1,3 @@ +from . import discovery, docker, flush, local, proxy + +__all__ = ["discovery", "docker", "flush", "local", "proxy"] diff --git a/src/artifactapi/artifact/discovery.py b/src/artifactapi/artifact/discovery.py new file mode 100644 index 0000000..786ccb5 --- /dev/null +++ b/src/artifactapi/artifact/discovery.py @@ -0,0 +1,82 @@ +import logging +import re +from typing import Any +from urllib.parse import urlparse + +import httpx +from fastapi import HTTPException + +from .proxy import cache_single_artifact + +logger = logging.getLogger(__name__) + + +async def _discover_github_releases(remote: str, include_pattern: str) -> list[str]: + match = re.match(r"github\.com/([^/]+)/([^/]+)", remote) + if not match: + raise HTTPException(status_code=400, detail="Invalid GitHub remote format") + + owner, repo = match.groups() + + async with httpx.AsyncClient(follow_redirects=True) as client: + response = await client.get(f"https://api.github.com/repos/{owner}/{repo}/releases") + if response.status_code != 200: + raise HTTPException(status_code=response.status_code, detail=f"Failed to fetch releases: {response.text}") + + releases = response.json() + regex = re.compile(include_pattern.replace("*", ".*")) + return [ + asset["browser_download_url"] + for release in releases + for asset in release.get("assets", []) + if regex.search(asset["browser_download_url"]) + ] + + +async def _discover(remote: str, include_pattern: str) -> list[str]: + if "github.com" in remote: + return await _discover_github_releases(remote, include_pattern) + raise HTTPException(status_code=400, detail=f"Unsupported remote: {remote}") + + +async def cache_artifacts(remote: str, include_pattern: str, storage) -> dict[str, Any]: + try: + matching_urls = await _discover(remote, include_pattern) + + if not matching_urls: + return {"message": "No matching artifacts found", "cached_count": 0, "artifacts": []} + + cached_artifacts = [] + for url in matching_urls: + result = await cache_single_artifact(url, "", "", storage, {}) + cached_artifacts.append(result) + + cached_count = sum(1 for a in cached_artifacts if a["status"] in ["cached", "already_cached"]) + return { + "message": f"Processed {len(matching_urls)} artifacts, {cached_count} successfully cached", + "cached_count": cached_count, + "artifacts": cached_artifacts, + } + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +async def list_artifacts(remote: str, include_pattern: str, storage) -> dict[str, Any]: + try: + matching_urls = await _discover(remote, include_pattern) + cached_artifacts = [] + for url in matching_urls: + parsed = urlparse(url) + key = storage.get_object_key(remote, parsed.path) + if storage.exists(key): + cached_artifacts.append({"url": url, "cached_url": storage.get_url(key), "key": key}) + + return { + "remote": remote, + "pattern": include_pattern, + "total_found": len(matching_urls), + "cached_count": len(cached_artifacts), + "artifacts": cached_artifacts, + } + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/src/artifactapi/artifact/docker.py b/src/artifactapi/artifact/docker.py new file mode 100644 index 0000000..5e3f4e5 --- /dev/null +++ b/src/artifactapi/artifact/docker.py @@ -0,0 +1,91 @@ +import hashlib +import json +import logging +import re + +from fastapi import HTTPException, Request, Response + +from . import proxy as _proxy + +logger = logging.getLogger(__name__) + + +def ping() -> Response: + return Response( + content="{}", + media_type="application/json", + headers={"Docker-Distribution-Api-Version": "registry/2.0"}, + ) + + +async def proxy(request: Request, remote_name: str, path: str, storage, cache, config, metrics) -> Response: + remote_config = config.get_remote_config(remote_name) + if not remote_config: + raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured") + if remote_config.get("package") != "docker": + raise HTTPException(status_code=400, detail=f"Remote '{remote_name}' is not a docker remote") + + patterns = config.get_immutable_patterns(remote_name, "") + if patterns: + path_parts = path.split("/") + image_name = "/".join(path_parts[:2]) if len(path_parts) >= 2 else path + if not any(re.search(p, path) or re.search(p, image_name) for p in patterns): + logger.info(f"PATTERN BLOCKED: {remote_name}/{path}") + raise HTTPException(status_code=403, detail="Image not allowed by configuration patterns") + + base_url = remote_config.get("base_url", "").rstrip("/") + remote_url = f"{base_url}/v2/{path}" + + cached_key = storage.get_object_key(remote_name, path) + if not storage.exists(cached_key): + cached_key = None + + is_mutable = cache.is_mutable_file(path, config.get_mutable_patterns(remote_name)) + + if cached_key and is_mutable: + if not cache.is_index_valid(remote_name, path): + if not await _proxy.handle_expired_mutable(remote_name, path, remote_url, config, cache, storage): + cached_key = None + + if not cached_key: + logger.info(f"Cache MISS: {remote_name}/{path} - fetching from remote: {remote_url}") + result = await _proxy.cache_single_artifact(remote_url, remote_name, path, storage, remote_config) + if result["status"] == "error": + raise HTTPException(status_code=502, detail=f"Failed to fetch: {result['error']}") + if result["status"] == "cached" and is_mutable: + cache_config = config.get_cache_config(remote_name) + mutable_ttl = cache_config.get("mutable_ttl", 3600) + cache.mark_index_cached(remote_name, path, mutable_ttl) + logger.info(f"Mutable file cached with TTL: {remote_name}/{path} (ttl: {mutable_ttl}s)") + if result.get("etag") or result.get("last_modified"): + cache.store_mutable_meta(remote_name, path, result.get("etag"), result.get("last_modified")) + + artifact_data = storage.download_object(storage.get_object_key(remote_name, path)) + + is_blob = "/blobs/" in path + if is_blob: + content_type = "application/octet-stream" + else: + try: + manifest_json = json.loads(artifact_data) + content_type = manifest_json.get("mediaType") + if not content_type: + if "manifests" in manifest_json: + content_type = "application/vnd.oci.image.index.v1+json" + else: + content_type = "application/vnd.oci.image.manifest.v1+json" + except Exception: + content_type = "application/vnd.oci.image.manifest.v1+json" + + digest = f"sha256:{hashlib.sha256(artifact_data).hexdigest()}" + headers = { + "Docker-Distribution-Api-Version": "registry/2.0", + "Docker-Content-Digest": digest, + "Content-Length": str(len(artifact_data)), + } + + if request.method == "HEAD": + return Response(status_code=200, headers=headers, media_type=content_type) + + metrics.record_cache_hit(remote_name, len(artifact_data)) + return Response(content=artifact_data, media_type=content_type, headers=headers) diff --git a/src/artifactapi/artifact/flush.py b/src/artifactapi/artifact/flush.py new file mode 100644 index 0000000..c446066 --- /dev/null +++ b/src/artifactapi/artifact/flush.py @@ -0,0 +1,66 @@ +import logging + +from fastapi import HTTPException + +logger = logging.getLogger(__name__) + + +def handle(remote: str | None, cache_type: str, cache, storage) -> dict: + try: + result = {"remote": remote, "cache_type": cache_type, "flushed": {"redis_keys": 0, "s3_objects": 0, "operations": []}} + + if cache_type in ["all", "index", "metrics"] and cache.available and cache.client: + patterns = [] + + if cache_type in ["all", "index"]: + if remote: + patterns += [f"index:{remote}:*", f"mutable:meta:{remote}:*"] + else: + patterns += ["index:*", "mutable:meta:*"] + + if cache_type in ["all", "metrics"]: + patterns.append(f"metrics:*:{remote}" if remote else "metrics:*") + + for pattern in patterns: + keys = cache.client.keys(pattern) + if keys: + cache.client.delete(*keys) + result["flushed"]["redis_keys"] += len(keys) + logger.info(f"Cache flush: deleted {len(keys)} Redis keys matching '{pattern}'") + + if result["flushed"]["redis_keys"] > 0: + result["flushed"]["operations"].append(f"Deleted {result['flushed']['redis_keys']} Redis keys") + + if cache_type in ["all", "files"]: + try: + list_params = {"Bucket": storage.bucket} + if remote: + list_params["Prefix"] = f"{remote}/" + + response = storage.client.list_objects_v2(**list_params) + if "Contents" in response: + objects_to_delete = [obj["Key"] for obj in response["Contents"]] + for key in objects_to_delete: + try: + storage.client.delete_object(Bucket=storage.bucket, Key=key) + result["flushed"]["s3_objects"] += 1 + except Exception as e: + logger.warning(f"Failed to delete S3 object {key}: {e}") + + if objects_to_delete: + scope = f" for remote '{remote}'" if remote else "" + result["flushed"]["operations"].append(f"Deleted {len(objects_to_delete)} S3 objects{scope}") + logger.info(f"Cache flush: deleted {len(objects_to_delete)} S3 objects{scope}") + + except Exception as e: + result["flushed"]["operations"].append(f"S3 flush failed: {str(e)}") + logger.error(f"Cache flush S3 error: {e}") + + if not result["flushed"]["operations"]: + result["flushed"]["operations"].append("No cache entries found to flush") + + return result + + except Exception as e: + logger.error(f"Cache flush error: {e}") + raise HTTPException(status_code=500, detail=f"Cache flush failed: {str(e)}") diff --git a/src/artifactapi/artifact/local.py b/src/artifactapi/artifact/local.py new file mode 100644 index 0000000..5b76da1 --- /dev/null +++ b/src/artifactapi/artifact/local.py @@ -0,0 +1,108 @@ +import hashlib +import logging + +from fastapi import HTTPException, Response, UploadFile +from fastapi.responses import JSONResponse + +logger = logging.getLogger(__name__) + + +async def upload(remote_name: str, path: str, file: UploadFile, storage, database, config) -> JSONResponse: + remote_config = config.get_remote_config(remote_name) + if not remote_config: + raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured") + if remote_config.get("type") != "local": + raise HTTPException(status_code=400, detail="Upload only supported for local repositories") + + try: + content = await file.read() + sha256_sum = hashlib.sha256(content).hexdigest() + + if database.file_exists(remote_name, path): + raise HTTPException(status_code=409, detail="File already exists") + + s3_key = f"local/{remote_name}/{path}" + content_type = file.content_type or "application/octet-stream" + + try: + storage.upload(s3_key, content) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Upload failed: {e}") + + success = database.add_local_file( + repository_name=remote_name, + file_path=path, + s3_key=s3_key, + size_bytes=len(content), + sha256_sum=sha256_sum, + content_type=content_type, + ) + + if not success: + storage.delete_object(s3_key) + raise HTTPException(status_code=500, detail="Failed to save file metadata") + + return JSONResponse( + { + "message": "File uploaded successfully", + "file_path": path, + "size_bytes": len(content), + "sha256_sum": sha256_sum, + "content_type": content_type, + } + ) + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}") + + +def check_exists(remote_name: str, path: str, database, config) -> Response: + remote_config = config.get_remote_config(remote_name) + if not remote_config: + raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured") + + if remote_config.get("type") != "local": + raise HTTPException(status_code=405, detail="HEAD method only supported for local repositories") + + try: + metadata = database.get_local_file_metadata(remote_name, path) + if not metadata: + raise HTTPException(status_code=404, detail="File not found") + + return Response( + headers={ + "Content-Length": str(metadata["size_bytes"]), + "Content-Type": metadata.get("content_type", "application/octet-stream"), + "X-SHA256": metadata["sha256_sum"], + "X-Created-At": metadata["created_at"].isoformat() if metadata["created_at"] else "", + "X-Uploaded-At": metadata["uploaded_at"].isoformat() if metadata["uploaded_at"] else "", + } + ) + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Check failed: {str(e)}") + + +def delete(remote_name: str, path: str, storage, database, config) -> JSONResponse: + remote_config = config.get_remote_config(remote_name) + if not remote_config: + raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured") + if remote_config.get("type") != "local": + raise HTTPException(status_code=400, detail="Delete only supported for local repositories") + + try: + s3_key = database.delete_local_file(remote_name, path) + if not s3_key: + raise HTTPException(status_code=404, detail="File not found") + + if not storage.delete_object(s3_key): + logger.warning(f"Failed to delete S3 object {s3_key} after database removal") + + return JSONResponse({"message": "File deleted successfully"}) + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Delete failed: {str(e)}") diff --git a/src/artifactapi/artifact/proxy.py b/src/artifactapi/artifact/proxy.py new file mode 100644 index 0000000..9741736 --- /dev/null +++ b/src/artifactapi/artifact/proxy.py @@ -0,0 +1,277 @@ +import base64 +import logging +import os +import re + +import httpx +from fastapi import HTTPException, Request, Response + +from ..auth import get_docker_token_for_response +from ..remote import helm as _helm +from ..remote import npm as _npm +from ..remote import python as _pypi +from ..remote.base import get_content_type + +logger = logging.getLogger(__name__) + + +class UpstreamUnreachable(Exception): + """Raised when the upstream backend cannot be contacted (network or timeout error).""" + + +def _basic_auth_header(remote_cfg: dict) -> dict[str, str]: + username = remote_cfg.get("username") + password = remote_cfg.get("password") + if username and password: + token = base64.b64encode(f"{username}:{password}".encode()).decode() + return {"Authorization": f"Basic {token}"} + return {} + + +def _resolve_content( + data: bytes, + path: str, + filename: str, + remote_config: dict, + request: Request, + remote_name: str = "", +) -> tuple[bytes, str]: + package = remote_config.get("package") + proxy_base = str(request.base_url).rstrip("/") + base_url = remote_config.get("base_url", "").rstrip("/") + + if package == "pypi": + return _pypi.resolve_content(data, path, filename, remote_config.get("immutable_patterns", []), base_url, proxy_base, remote_name) + if package == "npm": + return _npm.resolve_content(data, path, filename, remote_config.get("immutable_patterns", []), base_url, proxy_base, remote_name) + if package == "helm": + return _helm.resolve_content(data, path, filename, base_url, proxy_base, remote_name) + return data, get_content_type(filename) + + +def construct_url(remote_config: dict, path: str) -> str: + base_url = remote_config.get("base_url", "").rstrip("/") + if remote_config.get("package") == "docker": + return f"{base_url}/v2/{path}" + if remote_config.get("package") == "pypi": + return _pypi.construct_url(base_url, path) + return f"{base_url}/{path}" + + +async def cache_single_artifact(url: str, remote_name: str, path: str, storage, remote_config: dict) -> dict: + key = storage.get_object_key(remote_name, path) + + if storage.exists(key): + logger.info(f"Cache ALREADY EXISTS: {url} (key: {key})") + return {"url": url, "cached_url": storage.get_url(key), "status": "already_cached"} + + try: + is_docker = remote_config.get("package") == "docker" or "/v2/" in url + headers = {} + username = remote_config.get("username") + password = remote_config.get("password") + + if is_docker: + if "/manifests/" in url: + headers["Accept"] = ( + "application/vnd.docker.distribution.manifest.v2+json," + "application/vnd.oci.image.manifest.v1+json," + "application/vnd.oci.image.index.v1+json," + "application/vnd.docker.distribution.manifest.list.v2+json" + ) + elif "/blobs/" in url: + headers["Accept"] = "application/octet-stream" + elif username and password: + headers["Authorization"] = "Basic " + base64.b64encode(f"{username}:{password}".encode()).decode() + + async with httpx.AsyncClient(follow_redirects=True) as client: + response = await client.get(url, headers=headers) + + if response.status_code == 401 and is_docker: + www_auth = response.headers.get("WWW-Authenticate", "") + token = await get_docker_token_for_response(www_auth, username, password) + if token: + headers["Authorization"] = f"Bearer {token}" + response = await client.get(url, headers=headers) + + response.raise_for_status() + storage.upload(key, response.content) + logger.info(f"Cache ADD SUCCESS: {url} (size: {len(response.content)} bytes, key: {key})") + + return { + "url": url, + "cached_url": storage.get_url(key), + "storage_path": f"s3://{storage.bucket}/{key}", + "size": len(response.content), + "status": "cached", + "etag": response.headers.get("ETag"), + "last_modified": response.headers.get("Last-Modified"), + } + + except Exception as e: + return {"url": url, "status": "error", "error": str(e)} + + +async def _upstream_reachable(url: str, auth_headers: dict | None = None) -> bool: + try: + async with httpx.AsyncClient(follow_redirects=True) as client: + await client.head(url, headers=auth_headers or {}, timeout=10.0) + return True + except (httpx.NetworkError, httpx.TimeoutException): + return False + except Exception: + return True + + +async def check_upstream_changed(remote_url: str, remote_name: str, path: str, cache, auth_headers: dict | None = None) -> bool: + meta = cache.get_mutable_meta(remote_name, path) + if not meta: + return True + + headers = dict(auth_headers or {}) + if meta.get("etag"): + headers["If-None-Match"] = meta["etag"] + if meta.get("last_modified"): + headers["If-Modified-Since"] = meta["last_modified"] + if not (meta.get("etag") or meta.get("last_modified")): + return True + + try: + async with httpx.AsyncClient(follow_redirects=True) as client: + response = await client.head(remote_url, headers=headers) + return response.status_code != 304 + except (httpx.NetworkError, httpx.TimeoutException) as exc: + raise UpstreamUnreachable(str(exc)) from exc + + +async def handle_expired_mutable(remote_name: str, path: str, remote_url: str, config, cache, storage) -> bool: + """Handle an expired mutable file. Returns True if the cached copy is still valid.""" + mutable_ttl = config.get_cache_config(remote_name).get("mutable_ttl", 3600) + remote_cfg = config.get_remote_config(remote_name) or {} + auth = _basic_auth_header(remote_cfg) + check_updates = remote_cfg.get("check_mutable_updates", False) + user_mutable = check_updates and cache.is_mutable_file(path, config.get_user_mutable_patterns(remote_name)) + + if user_mutable: + try: + changed = await check_upstream_changed(remote_url, remote_name, path, cache, auth) + except UpstreamUnreachable: + cache.mark_index_cached(remote_name, path, mutable_ttl) + logger.warning(f"Mutable STALE (backend unreachable): {remote_name}/{path} - TTL extended ({mutable_ttl}s)") + return True + if not changed: + cache.mark_index_cached(remote_name, path, mutable_ttl) + logger.info(f"Mutable file UNCHANGED: {remote_name}/{path} - TTL refreshed ({mutable_ttl}s)") + return True + logger.info(f"Mutable file CHANGED: {remote_name}/{path} - re-downloading") + else: + if not await _upstream_reachable(remote_url, auth): + cache.mark_index_cached(remote_name, path, mutable_ttl) + logger.warning(f"Mutable STALE (backend unreachable): {remote_name}/{path} - TTL extended ({mutable_ttl}s)") + return True + logger.info(f"Mutable file EXPIRED: {remote_name}/{path} - removing from cache") + + cache.cleanup_expired_index(storage, remote_name, path) + return False + + +async def handle(request: Request, remote_name: str, path: str, storage, cache, config, database, metrics) -> Response: + remote_config = config.get_remote_config(remote_name) + if not remote_config: + raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured") + + if remote_config.get("type") == "local": + metadata = database.get_local_file_metadata(remote_name, path) + if not metadata: + raise HTTPException(status_code=404, detail="File not found") + content = storage.download_object(metadata["s3_key"]) + if content is None: + raise HTTPException(status_code=500, detail="File not accessible") + return Response( + content=content, + media_type=metadata.get("content_type", "application/octet-stream"), + headers={"Content-Disposition": f"attachment; filename={os.path.basename(path)}"}, + ) + + path_parts = path.split("/") + if len(path_parts) >= 2: + repo_path = f"{path_parts[0]}/{path_parts[1]}" + file_path = "/".join(path_parts[2:]) + else: + repo_path = path + file_path = path + + mutable_patterns = config.get_mutable_patterns(remote_name) + if not cache.is_mutable_file(file_path, mutable_patterns) and not cache.is_mutable_file(path, mutable_patterns): + patterns = config.get_immutable_patterns(remote_name, repo_path) + if patterns and not any(re.search(p, file_path) or re.search(p, path) for p in patterns): + logger.info(f"PATTERN BLOCKED: {remote_name}/{path} - not matching include patterns") + raise HTTPException(status_code=403, detail="Artifact not allowed by configuration patterns") + + remote_url = construct_url(remote_config, path) + if not remote_config.get("base_url"): + raise HTTPException(status_code=500, detail=f"No base_url configured for remote '{remote_name}'") + + cached_key = storage.get_object_key(remote_name, path) + if not storage.exists(cached_key): + cached_key = None + + filename = os.path.basename(path) + is_mutable = cache.is_mutable_file(path, mutable_patterns) + + if cached_key and is_mutable: + if not cache.is_index_valid(remote_name, path): + if not await handle_expired_mutable(remote_name, path, remote_url, config, cache, storage): + cached_key = None + + if cached_key: + try: + artifact_data = storage.download_object(cached_key) + artifact_data, content_type = _resolve_content(artifact_data, path, filename, remote_config, request, remote_name) + logger.info(f"Cache HIT: {remote_name}/{path} (size: {len(artifact_data)} bytes, key: {cached_key})") + metrics.record_cache_hit(remote_name, len(artifact_data)) + database.record_artifact_mapping(cached_key, remote_name, path, len(artifact_data)) + return Response( + content=artifact_data, + media_type=content_type, + headers={ + "Content-Disposition": f"attachment; filename={filename}", + "X-Artifact-Source": "cache", + "X-Artifact-Size": str(len(artifact_data)), + }, + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error retrieving cached artifact: {str(e)}") + + logger.info(f"Cache MISS: {remote_name}/{path} - fetching from remote: {remote_url}") + result = await cache_single_artifact(remote_url, remote_name, path, storage, remote_config) + + if result["status"] == "error": + logger.error(f"Cache ADD FAILED: {remote_name}/{path} - {result['error']}") + raise HTTPException(status_code=502, detail=f"Failed to fetch artifact: {result['error']}") + + if result["status"] == "cached" and is_mutable: + cache_config = config.get_cache_config(remote_name) + mutable_ttl = cache_config.get("mutable_ttl", 3600) + cache.mark_index_cached(remote_name, path, mutable_ttl) + logger.info(f"Mutable file cached with TTL: {remote_name}/{path} (ttl: {mutable_ttl}s)") + if result.get("etag") or result.get("last_modified"): + cache.store_mutable_meta(remote_name, path, result.get("etag"), result.get("last_modified")) + + try: + cache_key = storage.get_object_key(remote_name, path) + artifact_data = storage.download_object(cache_key) + artifact_data, content_type = _resolve_content(artifact_data, path, filename, remote_config, request, remote_name) + metrics.record_cache_miss(remote_name, len(artifact_data)) + database.record_artifact_mapping(cache_key, remote_name, path, len(artifact_data)) + return Response( + content=artifact_data, + media_type=content_type, + headers={ + "Content-Disposition": f"attachment; filename={filename}", + "X-Artifact-Source": "remote", + "X-Artifact-Size": str(len(artifact_data)), + }, + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error serving artifact: {str(e)}") diff --git a/src/artifactapi/auth/__init__.py b/src/artifactapi/auth/__init__.py new file mode 100644 index 0000000..faffd6e --- /dev/null +++ b/src/artifactapi/auth/__init__.py @@ -0,0 +1,3 @@ +from .docker import fetch_token, get_docker_token_for_response, parse_www_authenticate + +__all__ = ["fetch_token", "get_docker_token_for_response", "parse_www_authenticate"] diff --git a/src/artifactapi/auth/docker.py b/src/artifactapi/auth/docker.py new file mode 100644 index 0000000..b781a7f --- /dev/null +++ b/src/artifactapi/auth/docker.py @@ -0,0 +1,96 @@ +import logging +import re +import time + +import httpx + +logger = logging.getLogger(__name__) + +# In-memory token cache: key -> (token, expires_at) +_token_cache: dict[str, tuple[str, float]] = {} + +_WWW_AUTH_RE = re.compile( + r'Bearer\s+realm="(?P[^"]+)"' + r'(?:,service="(?P[^"]*)")?' + r'(?:,scope="(?P[^"]*)")?', + re.IGNORECASE, +) + + +def _cache_key(realm: str, service: str, scope: str, username: str | None) -> str: + return f"{realm}|{service}|{scope}|{username or ''}" + + +def _get_cached_token(key: str) -> str | None: + entry = _token_cache.get(key) + if entry and entry[1] > time.time(): + return entry[0] + _token_cache.pop(key, None) + return None + + +def _store_token(key: str, token: str, expires_in: int) -> None: + # Expire 30s early to avoid using a token right as it expires + _token_cache[key] = (token, time.time() + max(expires_in - 30, 10)) + + +async def fetch_token( + realm: str, + service: str, + scope: str, + username: str | None = None, + password: str | None = None, +) -> str | None: + """Fetch a Bearer token from a Docker registry auth server.""" + key = _cache_key(realm, service, scope, username) + cached = _get_cached_token(key) + if cached: + return cached + + params: dict[str, str] = {} + if service: + params["service"] = service + if scope: + params["scope"] = scope + + auth = (username, password) if username and password else None + + try: + async with httpx.AsyncClient(follow_redirects=True) as client: + response = await client.get(realm, params=params, auth=auth) + response.raise_for_status() + data = response.json() + except Exception as e: + logger.warning(f"Docker token fetch failed ({realm}): {e}") + return None + + token = data.get("token") or data.get("access_token") + if not token: + logger.warning(f"Docker token response missing token field: {data}") + return None + + expires_in = int(data.get("expires_in", 300)) + _store_token(key, token, expires_in) + logger.debug(f"Docker token obtained (realm={realm}, service={service}, scope={scope}, expires_in={expires_in}s)") + return token + + +def parse_www_authenticate(header: str) -> tuple[str, str, str] | None: + """Parse WWW-Authenticate: Bearer header. Returns (realm, service, scope) or None.""" + m = _WWW_AUTH_RE.search(header) + if not m: + return None + return m.group("realm"), m.group("service") or "", m.group("scope") or "" + + +async def get_docker_token_for_response( + www_authenticate: str, + username: str | None = None, + password: str | None = None, +) -> str | None: + """Given a WWW-Authenticate header value, fetch and return a Bearer token.""" + parsed = parse_www_authenticate(www_authenticate) + if not parsed: + return None + realm, service, scope = parsed + return await fetch_token(realm, service, scope, username, password) diff --git a/src/artifactapi/cache/__init__.py b/src/artifactapi/cache/__init__.py new file mode 100644 index 0000000..7f06ae6 --- /dev/null +++ b/src/artifactapi/cache/__init__.py @@ -0,0 +1,3 @@ +from .redis import RedisCache + +__all__ = ["RedisCache"] diff --git a/src/artifactapi/cache.py b/src/artifactapi/cache/redis.py similarity index 87% rename from src/artifactapi/cache.py rename to src/artifactapi/cache/redis.py index 9e3940b..9d22a3b 100644 --- a/src/artifactapi/cache.py +++ b/src/artifactapi/cache/redis.py @@ -11,7 +11,6 @@ class RedisCache: try: self.client = redis.from_url(self.redis_url, decode_responses=True) - # Test connection self.client.ping() self.available = True except Exception as e: @@ -20,7 +19,6 @@ class RedisCache: self.available = False def is_mutable_file(self, file_path: str, patterns: list[str] | None = None) -> bool: - """Return True if file_path matches any of the mutable patterns.""" if patterns is None: patterns = [] return any(re.search(p, file_path) for p in patterns) @@ -32,10 +30,8 @@ class RedisCache: return f"mutable:meta:{remote_name}:{hashlib.sha256(path.encode()).hexdigest()[:16]}" def is_index_valid(self, remote_name: str, path: str) -> bool: - """Check if mutable file is still within its TTL window.""" if not self.available: return False - try: key = self.get_index_cache_key(remote_name, path) return self.client.exists(key) > 0 @@ -43,10 +39,8 @@ class RedisCache: return False def mark_index_cached(self, remote_name: str, path: str, ttl: int = 300) -> None: - """Set or refresh the TTL key for a mutable file.""" if not self.available: return - try: key = self.get_index_cache_key(remote_name, path) self.client.setex(key, ttl, str(int(time.time()))) @@ -54,7 +48,6 @@ class RedisCache: pass def store_mutable_meta(self, remote_name: str, path: str, etag: str | None, last_modified: str | None) -> None: - """Persist ETag and Last-Modified for future conditional requests.""" if not self.available: return data = {} @@ -70,7 +63,6 @@ class RedisCache: pass def get_mutable_meta(self, remote_name: str, path: str) -> dict: - """Return stored ETag/Last-Modified for a mutable file, or {}.""" if not self.available: return {} try: @@ -87,14 +79,13 @@ class RedisCache: pass def cleanup_expired_index(self, storage, remote_name: str, path: str) -> None: - """Remove an expired mutable file from S3 and clear its Redis meta.""" if not self.available: return try: import os - from .config import ConfigManager + from ..config import ConfigManager config_path = os.environ.get("CONFIG_PATH") if config_path: diff --git a/src/artifactapi/database/__init__.py b/src/artifactapi/database/__init__.py new file mode 100644 index 0000000..2a4fd1c --- /dev/null +++ b/src/artifactapi/database/__init__.py @@ -0,0 +1,3 @@ +from .postgres import DatabaseManager + +__all__ = ["DatabaseManager"] diff --git a/src/artifactapi/database.py b/src/artifactapi/database/postgres.py similarity index 93% rename from src/artifactapi/database.py rename to src/artifactapi/database/postgres.py index c8e462a..733c131 100644 --- a/src/artifactapi/database.py +++ b/src/artifactapi/database/postgres.py @@ -9,7 +9,6 @@ class DatabaseManager: self._init_database() def _init_database(self): - """Initialize database connection and create schema if needed""" try: self.connection = psycopg2.connect(self.db_url) self.connection.autocommit = True @@ -21,10 +20,8 @@ class DatabaseManager: self.available = False def _create_schema(self): - """Create tables if they don't exist""" try: with self.connection.cursor() as cursor: - # Create table to map S3 keys to remote names cursor.execute(""" CREATE TABLE IF NOT EXISTS artifact_mappings ( id SERIAL PRIMARY KEY, @@ -51,7 +48,6 @@ class DatabaseManager: ) """) - # Create indexes separately cursor.execute("CREATE INDEX IF NOT EXISTS idx_s3_key ON artifact_mappings (s3_key)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_remote_name ON artifact_mappings (remote_name)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_local_repo_path ON local_files (repository_name, file_path)") @@ -61,7 +57,6 @@ class DatabaseManager: print(f"Error creating schema: {e}") def record_artifact_mapping(self, s3_key: str, remote_name: str, file_path: str, size_bytes: int): - """Record mapping between S3 key and remote""" if not self.available: return @@ -83,7 +78,6 @@ class DatabaseManager: print(f"Error recording artifact mapping: {e}") def get_storage_by_remote(self) -> dict[str, int]: - """Get storage size breakdown by remote from database""" if not self.available: return {} @@ -101,7 +95,6 @@ class DatabaseManager: return {} def get_remote_for_s3_key(self, s3_key: str) -> str | None: - """Get remote name for given S3 key""" if not self.available: return None @@ -126,7 +119,6 @@ class DatabaseManager: sha256_sum: str, content_type: str = None, ): - """Add a file to local repository""" if not self.available: return False @@ -153,7 +145,6 @@ class DatabaseManager: return False def get_local_file_metadata(self, repository_name: str, file_path: str): - """Get metadata for a local file""" if not self.available: return None @@ -185,7 +176,6 @@ class DatabaseManager: return None def list_local_files(self, repository_name: str, prefix: str = ""): - """List files in local repository with optional path prefix""" if not self.available: return [] @@ -229,7 +219,6 @@ class DatabaseManager: return [] def delete_local_file(self, repository_name: str, file_path: str): - """Delete a file from local repository""" if not self.available: return False @@ -251,7 +240,6 @@ class DatabaseManager: return None def file_exists(self, repository_name: str, file_path: str): - """Check if file exists in local repository""" if not self.available: return False diff --git a/src/artifactapi/docker_auth.py b/src/artifactapi/docker_auth.py index b781a7f..c331c3f 100644 --- a/src/artifactapi/docker_auth.py +++ b/src/artifactapi/docker_auth.py @@ -1,96 +1,19 @@ -import logging -import re -import time - -import httpx - -logger = logging.getLogger(__name__) - -# In-memory token cache: key -> (token, expires_at) -_token_cache: dict[str, tuple[str, float]] = {} - -_WWW_AUTH_RE = re.compile( - r'Bearer\s+realm="(?P[^"]+)"' - r'(?:,service="(?P[^"]*)")?' - r'(?:,scope="(?P[^"]*)")?', - re.IGNORECASE, +from .auth.docker import ( + _cache_key, + _get_cached_token, + _store_token, + _token_cache, + fetch_token, + get_docker_token_for_response, + parse_www_authenticate, ) - -def _cache_key(realm: str, service: str, scope: str, username: str | None) -> str: - return f"{realm}|{service}|{scope}|{username or ''}" - - -def _get_cached_token(key: str) -> str | None: - entry = _token_cache.get(key) - if entry and entry[1] > time.time(): - return entry[0] - _token_cache.pop(key, None) - return None - - -def _store_token(key: str, token: str, expires_in: int) -> None: - # Expire 30s early to avoid using a token right as it expires - _token_cache[key] = (token, time.time() + max(expires_in - 30, 10)) - - -async def fetch_token( - realm: str, - service: str, - scope: str, - username: str | None = None, - password: str | None = None, -) -> str | None: - """Fetch a Bearer token from a Docker registry auth server.""" - key = _cache_key(realm, service, scope, username) - cached = _get_cached_token(key) - if cached: - return cached - - params: dict[str, str] = {} - if service: - params["service"] = service - if scope: - params["scope"] = scope - - auth = (username, password) if username and password else None - - try: - async with httpx.AsyncClient(follow_redirects=True) as client: - response = await client.get(realm, params=params, auth=auth) - response.raise_for_status() - data = response.json() - except Exception as e: - logger.warning(f"Docker token fetch failed ({realm}): {e}") - return None - - token = data.get("token") or data.get("access_token") - if not token: - logger.warning(f"Docker token response missing token field: {data}") - return None - - expires_in = int(data.get("expires_in", 300)) - _store_token(key, token, expires_in) - logger.debug(f"Docker token obtained (realm={realm}, service={service}, scope={scope}, expires_in={expires_in}s)") - return token - - -def parse_www_authenticate(header: str) -> tuple[str, str, str] | None: - """Parse WWW-Authenticate: Bearer header. Returns (realm, service, scope) or None.""" - m = _WWW_AUTH_RE.search(header) - if not m: - return None - return m.group("realm"), m.group("service") or "", m.group("scope") or "" - - -async def get_docker_token_for_response( - www_authenticate: str, - username: str | None = None, - password: str | None = None, -) -> str | None: - """Given a WWW-Authenticate header value, fetch and return a Bearer token.""" - parsed = parse_www_authenticate(www_authenticate) - if not parsed: - return None - realm, service, scope = parsed - return await fetch_token(realm, service, scope, username, password) +__all__ = [ + "_cache_key", + "_get_cached_token", + "_store_token", + "_token_cache", + "fetch_token", + "get_docker_token_for_response", + "parse_www_authenticate", +] diff --git a/src/artifactapi/main.py b/src/artifactapi/main.py index 8816249..1f99e96 100644 --- a/src/artifactapi/main.py +++ b/src/artifactapi/main.py @@ -1,14 +1,8 @@ -import base64 -import hashlib -import json import logging import os -import re -from typing import Any -import httpx -from fastapi import FastAPI, File, HTTPException, Query, Request, Response, UploadFile -from fastapi.responses import JSONResponse, PlainTextResponse +from fastapi import FastAPI, File, Query, Request, UploadFile +from fastapi.responses import PlainTextResponse from prometheus_client import CONTENT_TYPE_LATEST, generate_latest from pydantic import BaseModel @@ -17,58 +11,45 @@ try: __version__ = version("artifactapi") except ImportError: - # Fallback for development when package isn't installed __version__ = "dev" +from .artifact import discovery, flush, local, proxy +from .artifact import docker as docker_handler from .cache import RedisCache from .config import ConfigManager from .database import DatabaseManager -from .docker_auth import get_docker_token_for_response from .metrics import MetricsManager from .storage import S3Storage +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + +app = FastAPI(title="Artifact Storage API", version=__version__) + +config_path = os.environ.get("CONFIG_PATH") +if not config_path: + raise ValueError("CONFIG_PATH environment variable is required") +config = ConfigManager(config_path) + +s3_config = config.get_s3_config() +redis_config = config.get_redis_config() +db_config = config.get_database_config() + +storage = S3Storage(**s3_config) +cache = RedisCache(redis_config["url"]) +database = DatabaseManager(db_config["url"]) +metrics = MetricsManager(cache, database) + class ArtifactRequest(BaseModel): remote: str include_pattern: str -class UpstreamUnreachable(Exception): - """Raised when the upstream backend cannot be contacted (network or timeout error).""" - - -# Configure logging -logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") -logger = logging.getLogger(__name__) - -app = FastAPI(title="Artifact Storage API", version=__version__) - -# Initialize components using config -config_path = os.environ.get("CONFIG_PATH") -if not config_path: - raise ValueError("CONFIG_PATH environment variable is required") -config = ConfigManager(config_path) - -# Get configurations -s3_config = config.get_s3_config() -redis_config = config.get_redis_config() -db_config = config.get_database_config() - -# Initialize services -storage = S3Storage(**s3_config) -cache = RedisCache(redis_config["url"]) -database = DatabaseManager(db_config["url"]) -metrics = MetricsManager(cache, database) - - @app.get("/") def read_root(): config._check_reload() - return { - "message": "Artifact Storage API", - "version": app.version, - "remotes": list(config.config.get("remotes", {}).keys()), - } + return {"message": "Artifact Storage API", "version": app.version, "remotes": list(config.config.get("remotes", {}).keys())} @app.get("/health") @@ -76,775 +57,66 @@ def health_check(): return {"status": "healthy"} +@app.get("/config") +def get_config(): + return config.config + + +@app.get("/metrics") +def get_metrics(json: bool | None = Query(False, description="Return JSON format instead of Prometheus")): + config._check_reload() + if json: + return metrics.get_metrics(storage, config) + metrics.get_metrics(storage, config) + return PlainTextResponse(generate_latest().decode("utf-8"), media_type=CONTENT_TYPE_LATEST) + + @app.put("/cache/flush") def flush_cache( remote: str = Query(default=None, description="Specific remote to flush (optional)"), cache_type: str = Query(default="all", description="Type to flush: 'all', 'index', 'files', 'metrics'"), ): - """Flush cache entries for specified remote or all remotes""" - try: - result = {"remote": remote, "cache_type": cache_type, "flushed": {"redis_keys": 0, "s3_objects": 0, "operations": []}} - - # Flush Redis entries based on cache_type - if cache_type in ["all", "index", "metrics"] and cache.available and cache.client: - patterns = [] - - if cache_type in ["all", "index"]: - if remote: - patterns.append(f"index:{remote}:*") - patterns.append(f"mutable:meta:{remote}:*") - else: - patterns.append("index:*") - patterns.append("mutable:meta:*") - - if cache_type in ["all", "metrics"]: - if remote: - patterns.append(f"metrics:*:{remote}") - else: - patterns.append("metrics:*") - - for pattern in patterns: - keys = cache.client.keys(pattern) - if keys: - cache.client.delete(*keys) - result["flushed"]["redis_keys"] += len(keys) - logger.info(f"Cache flush: Deleted {len(keys)} Redis keys matching '{pattern}'") - - if result["flushed"]["redis_keys"] > 0: - result["flushed"]["operations"].append(f"Deleted {result['flushed']['redis_keys']} Redis keys") - - # Flush S3 objects if requested - if cache_type in ["all", "files"]: - try: - # Use prefix filtering for remote-specific deletion - list_params = {"Bucket": storage.bucket} - if remote: - list_params["Prefix"] = f"{remote}/" - - response = storage.client.list_objects_v2(**list_params) - if "Contents" in response: - objects_to_delete = [obj["Key"] for obj in response["Contents"]] - - for key in objects_to_delete: - try: - storage.client.delete_object(Bucket=storage.bucket, Key=key) - result["flushed"]["s3_objects"] += 1 - except Exception as e: - logger.warning(f"Failed to delete S3 object {key}: {e}") - - if objects_to_delete: - scope = f" for remote '{remote}'" if remote else "" - result["flushed"]["operations"].append(f"Deleted {len(objects_to_delete)} S3 objects{scope}") - logger.info(f"Cache flush: Deleted {len(objects_to_delete)} S3 objects{scope}") - - except Exception as e: - result["flushed"]["operations"].append(f"S3 flush failed: {str(e)}") - logger.error(f"Cache flush S3 error: {e}") - - if not result["flushed"]["operations"]: - result["flushed"]["operations"].append("No cache entries found to flush") - - return result - - except Exception as e: - logger.error(f"Cache flush error: {e}") - raise HTTPException(status_code=500, detail=f"Cache flush failed: {str(e)}") - - -async def construct_remote_url(remote_name: str, path: str) -> str: - remote_config = config.get_remote_config(remote_name) - if not remote_config: - raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured") - - base_url = remote_config.get("base_url") - if not base_url: - raise HTTPException(status_code=500, detail=f"No base_url configured for remote '{remote_name}'") - - if remote_config.get("package") == "docker": - return f"{base_url}/v2/{path}" - - # PyPI splits index and files across two hosts; redirect simple/ requests to pypi.org - if remote_config.get("package") == "pypi" and base_url.rstrip("/") == "https://files.pythonhosted.org" and "simple/" in path: - return f"https://pypi.org/{path}" - - return f"{base_url}/{path}" - - -async def check_artifact_patterns(remote_name: str, repo_path: str, file_path: str, full_path: str) -> bool: - # Mutable files (index files) are always allowed through - mutable_patterns = config.get_mutable_patterns(remote_name) - if cache.is_mutable_file(file_path, mutable_patterns) or cache.is_mutable_file(full_path, mutable_patterns): - return True - - # Check immutable include patterns - patterns = config.get_immutable_patterns(remote_name, repo_path) - if not patterns: - return True # Allow all if no patterns configured - - pattern_matched = False - for pattern in patterns: - # Check both file_path and full_path to handle different pattern types - if re.search(pattern, file_path) or re.search(pattern, full_path): - pattern_matched = True - break - - if not pattern_matched: - return False - - return True - - -async def cache_single_artifact(url: str, remote_name: str, path: str) -> dict: - # Use hierarchical path-based key - key = storage.get_object_key(remote_name, path) - - if storage.exists(key): - logger.info(f"Cache ALREADY EXISTS: {url} (key: {key})") - return { - "url": url, - "cached_url": storage.get_url(key), - "status": "already_cached", - } - - try: - remote_config = config.get_remote_config(remote_name) or {} - is_docker = remote_config.get("package") == "docker" or "/v2/" in url - - # Prepare headers - headers = {} - username = remote_config.get("username") - password = remote_config.get("password") - - if is_docker: - if "/manifests/" in url: - headers["Accept"] = ( - "application/vnd.docker.distribution.manifest.v2+json," - "application/vnd.oci.image.manifest.v1+json," - "application/vnd.oci.image.index.v1+json," - "application/vnd.docker.distribution.manifest.list.v2+json" - ) - elif "/blobs/" in url: - headers["Accept"] = "application/octet-stream" - elif username and password: - headers["Authorization"] = "Basic " + base64.b64encode(f"{username}:{password}".encode()).decode() - - async with httpx.AsyncClient(follow_redirects=True) as client: - response = await client.get(url, headers=headers) - - # Handle Docker Bearer token challenge - if response.status_code == 401 and is_docker: - www_auth = response.headers.get("WWW-Authenticate", "") - username = remote_config.get("username") - password = remote_config.get("password") - token = await get_docker_token_for_response(www_auth, username, password) - if token: - headers["Authorization"] = f"Bearer {token}" - response = await client.get(url, headers=headers) - - response.raise_for_status() - - storage_path = storage.upload(key, response.content) - - logger.info(f"Cache ADD SUCCESS: {url} (size: {len(response.content)} bytes, key: {key})") - - return { - "url": url, - "cached_url": storage.get_url(key), - "storage_path": storage_path, - "size": len(response.content), - "status": "cached", - "etag": response.headers.get("ETag"), - "last_modified": response.headers.get("Last-Modified"), - } - - except Exception as e: - return {"url": url, "status": "error", "error": str(e)} - - -def _basic_auth_header(remote_cfg: dict) -> dict[str, str]: - username = remote_cfg.get("username") - password = remote_cfg.get("password") - if username and password: - token = base64.b64encode(f"{username}:{password}".encode()).decode() - return {"Authorization": f"Basic {token}"} - return {} - - -async def _upstream_reachable(url: str, auth_headers: dict | None = None) -> bool: - """HEAD with a short timeout. Returns False only on network/timeout errors.""" - try: - async with httpx.AsyncClient(follow_redirects=True) as client: - await client.head(url, headers=auth_headers or {}, timeout=10.0) - return True - except (httpx.NetworkError, httpx.TimeoutException): - return False - except Exception: - return True # 4xx/5xx means backend is up - - -async def check_upstream_changed(remote_url: str, remote_name: str, path: str, auth_headers: dict | None = None) -> bool: - """Conditional HEAD against upstream. Returns False only on a definitive 304. - Raises UpstreamUnreachable if the backend cannot be contacted.""" - meta = cache.get_mutable_meta(remote_name, path) - if not meta: - return True - - headers = dict(auth_headers or {}) - if meta.get("etag"): - headers["If-None-Match"] = meta["etag"] - if meta.get("last_modified"): - headers["If-Modified-Since"] = meta["last_modified"] - if not (meta.get("etag") or meta.get("last_modified")): - return True - - try: - async with httpx.AsyncClient(follow_redirects=True) as client: - response = await client.head(remote_url, headers=headers) - return response.status_code != 304 - except (httpx.NetworkError, httpx.TimeoutException) as exc: - raise UpstreamUnreachable(str(exc)) from exc - - -async def handle_expired_mutable(remote_name: str, path: str, remote_url: str) -> bool: - """Handle an expired mutable file. Returns True if the cached copy is still valid.""" - mutable_ttl = config.get_cache_config(remote_name).get("mutable_ttl", 3600) - - remote_cfg = config.get_remote_config(remote_name) or {} - auth = _basic_auth_header(remote_cfg) - check_updates = remote_cfg.get("check_mutable_updates", False) - user_mutable = check_updates and cache.is_mutable_file(path, config.get_user_mutable_patterns(remote_name)) - - if user_mutable: - try: - changed = await check_upstream_changed(remote_url, remote_name, path, auth) - except UpstreamUnreachable: - cache.mark_index_cached(remote_name, path, mutable_ttl) - logger.warning(f"Mutable STALE (backend unreachable): {remote_name}/{path} - TTL extended ({mutable_ttl}s)") - return True - if not changed: - cache.mark_index_cached(remote_name, path, mutable_ttl) - logger.info(f"Mutable file UNCHANGED: {remote_name}/{path} - TTL refreshed ({mutable_ttl}s)") - return True - logger.info(f"Mutable file CHANGED: {remote_name}/{path} - re-downloading") - else: - if not await _upstream_reachable(remote_url, auth): - cache.mark_index_cached(remote_name, path, mutable_ttl) - logger.warning(f"Mutable STALE (backend unreachable): {remote_name}/{path} - TTL extended ({mutable_ttl}s)") - return True - logger.info(f"Mutable file EXPIRED: {remote_name}/{path} - removing from cache") - - cache.cleanup_expired_index(storage, remote_name, path) - return False - - -def _get_content_type(filename: str) -> str: - if filename.endswith((".tar.gz", ".tgz")): - return "application/gzip" - if filename.endswith(".zip") or filename.endswith(".whl"): - return "application/zip" - if filename.endswith(".exe"): - return "application/x-msdownload" - if filename.endswith(".rpm"): - return "application/x-rpm" - if filename.endswith(".xml"): - return "application/xml" - if filename.endswith((".xml.gz", ".xml.bz2", ".xml.xz")): - return "application/gzip" - if filename.endswith((".yaml", ".yml")): - return "text/yaml" - return "application/octet-stream" - - -def _resolve_content( - data: bytes, - path: str, - filename: str, - remote_config: dict, - request: Request, - remote_name: str = "", -) -> tuple[bytes, str]: - """Return (possibly-rewritten data, content_type) for a cached artifact.""" - if remote_config.get("package") == "pypi": - immutable = remote_config.get("immutable_patterns", []) - if not any(re.search(p, path) for p in immutable): - proxy_base = str(request.base_url).rstrip("/") - base_url = remote_config.get("base_url", "").rstrip("/") - data = data.replace( - base_url.encode(), - f"{proxy_base}/api/v1/remote/{remote_name}".encode(), - ) - return data, "text/html; charset=utf-8" - if remote_config.get("package") == "npm": - immutable = remote_config.get("immutable_patterns", []) - if not any(re.search(p, path) for p in immutable): - proxy_base = str(request.base_url).rstrip("/") - base_url = remote_config.get("base_url", "").rstrip("/") - data = data.replace( - base_url.encode(), - f"{proxy_base}/api/v1/remote/{remote_name}".encode(), - ) - return data, "application/json" - if remote_config.get("package") == "helm" and filename == "index.yaml": - proxy_base = str(request.base_url).rstrip("/") - base_url = remote_config.get("base_url", "").rstrip("/") - data = data.replace( - base_url.encode(), - f"{proxy_base}/api/v1/remote/{remote_name}".encode(), - ) - return data, "text/yaml" - return data, _get_content_type(filename) - - -@app.get("/api/v1/remote/{remote_name}/{path:path}") -async def get_artifact(request: Request, remote_name: str, path: str): - # Check if remote is configured - remote_config = config.get_remote_config(remote_name) - if not remote_config: - raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured") - - # Check if this is a local repository - if remote_config.get("type") == "local": - # Handle local repository download - metadata = database.get_local_file_metadata(remote_name, path) - if not metadata: - raise HTTPException(status_code=404, detail="File not found") - - # Get file from S3 - content = storage.download_object(metadata["s3_key"]) - if content is None: - raise HTTPException(status_code=500, detail="File not accessible") - - # Determine content type - content_type = metadata.get("content_type", "application/octet-stream") - - return Response( - content=content, - media_type=content_type, - headers={"Content-Disposition": f"attachment; filename={os.path.basename(path)}"}, - ) - - # Extract repository path for pattern checking - path_parts = path.split("/") - if len(path_parts) >= 2: - repo_path = f"{path_parts[0]}/{path_parts[1]}" - file_path = "/".join(path_parts[2:]) - else: - repo_path = path - file_path = path - - # Check if artifact matches configured patterns - if not await check_artifact_patterns(remote_name, repo_path, file_path, path): - logger.info(f"PATTERN BLOCKED: {remote_name}/{path} - not matching include patterns") - raise HTTPException(status_code=403, detail="Artifact not allowed by configuration patterns") - - # Construct the remote URL - remote_url = await construct_remote_url(remote_name, path) - - # Check if artifact is already cached - cached_key = storage.get_object_key(remote_name, path) - if not storage.exists(cached_key): - cached_key = None - - # For mutable files, check Redis TTL validity - filename = os.path.basename(path) - is_mutable = cache.is_mutable_file(path, config.get_mutable_patterns(remote_name)) - - if cached_key and is_mutable: - if not cache.is_index_valid(remote_name, path): - if not await handle_expired_mutable(remote_name, path, remote_url): - cached_key = None - - if cached_key: - # Return cached artifact - try: - artifact_data = storage.download_object(cached_key) - filename = os.path.basename(path) - artifact_data, content_type = _resolve_content(artifact_data, path, filename, remote_config, request, remote_name) - - logger.info(f"Cache HIT: {remote_name}/{path} (size: {len(artifact_data)} bytes, key: {cached_key})") - - metrics.record_cache_hit(remote_name, len(artifact_data)) - database.record_artifact_mapping(cached_key, remote_name, path, len(artifact_data)) - - return Response( - content=artifact_data, - media_type=content_type, - headers={ - "Content-Disposition": f"attachment; filename={filename}", - "X-Artifact-Source": "cache", - "X-Artifact-Size": str(len(artifact_data)), - }, - ) - except Exception as e: - raise HTTPException(status_code=500, detail=f"Error retrieving cached artifact: {str(e)}") - - # Artifact not cached, cache it first - logger.info(f"Cache MISS: {remote_name}/{path} - fetching from remote: {remote_url}") - result = await cache_single_artifact(remote_url, remote_name, path) - - if result["status"] == "error": - logger.error(f"Cache ADD FAILED: {remote_name}/{path} - {result['error']}") - raise HTTPException(status_code=502, detail=f"Failed to fetch artifact: {result['error']}") - - # Mark mutable files as cached in Redis with TTL - if result["status"] == "cached" and is_mutable: - cache_config = config.get_cache_config(remote_name) - mutable_ttl = cache_config.get("mutable_ttl", 3600) - cache.mark_index_cached(remote_name, path, mutable_ttl) - logger.info(f"Mutable file cached with TTL: {remote_name}/{path} (ttl: {mutable_ttl}s)") - if result.get("etag") or result.get("last_modified"): - cache.store_mutable_meta(remote_name, path, result.get("etag"), result.get("last_modified")) - - # Now return the cached artifact - try: - cache_key = storage.get_object_key(remote_name, path) - artifact_data = storage.download_object(cache_key) - filename = os.path.basename(path) - artifact_data, content_type = _resolve_content(artifact_data, path, filename, remote_config, request, remote_name) - - metrics.record_cache_miss(remote_name, len(artifact_data)) - cache_key = storage.get_object_key(remote_name, path) - database.record_artifact_mapping(cache_key, remote_name, path, len(artifact_data)) - - return Response( - content=artifact_data, - media_type=content_type, - headers={ - "Content-Disposition": f"attachment; filename={filename}", - "X-Artifact-Source": "remote", - "X-Artifact-Size": str(len(artifact_data)), - }, - ) - except Exception as e: - raise HTTPException(status_code=500, detail=f"Error serving artifact: {str(e)}") + return flush.handle(remote, cache_type, cache, storage) @app.get("/v2/") async def docker_v2_ping(): - return Response( - content="{}", - media_type="application/json", - headers={"Docker-Distribution-Api-Version": "registry/2.0"}, - ) + return docker_handler.ping() @app.api_route("/v2/{remote_name}/{path:path}", methods=["GET", "HEAD"]) async def docker_v2_proxy(request: Request, remote_name: str, path: str): - remote_config = config.get_remote_config(remote_name) - if not remote_config: - raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured") - if remote_config.get("package") != "docker": - raise HTTPException(status_code=400, detail=f"Remote '{remote_name}' is not a docker remote") - - # Check immutable_patterns against the image name (e.g. "library/nginx") - patterns = config.get_immutable_patterns(remote_name, "") - if patterns: - path_parts = path.split("/") - image_name = "/".join(path_parts[:2]) if len(path_parts) >= 2 else path - if not any(re.search(p, path) or re.search(p, image_name) for p in patterns): - logger.info(f"PATTERN BLOCKED: {remote_name}/{path}") - raise HTTPException(status_code=403, detail="Image not allowed by configuration patterns") - - remote_url = await construct_remote_url(remote_name, path) - - cached_key = storage.get_object_key(remote_name, path) - if not storage.exists(cached_key): - cached_key = None - - is_mutable = cache.is_mutable_file(path, config.get_mutable_patterns(remote_name)) - - if cached_key and is_mutable: - if not cache.is_index_valid(remote_name, path): - if not await handle_expired_mutable(remote_name, path, remote_url): - cached_key = None - - if not cached_key: - logger.info(f"Cache MISS: {remote_name}/{path} - fetching from remote: {remote_url}") - result = await cache_single_artifact(remote_url, remote_name, path) - if result["status"] == "error": - raise HTTPException(status_code=502, detail=f"Failed to fetch: {result['error']}") - if result["status"] == "cached" and is_mutable: - cache_config = config.get_cache_config(remote_name) - mutable_ttl = cache_config.get("mutable_ttl", 3600) - cache.mark_index_cached(remote_name, path, mutable_ttl) - logger.info(f"Mutable file cached with TTL: {remote_name}/{path} (ttl: {mutable_ttl}s)") - if result.get("etag") or result.get("last_modified"): - cache.store_mutable_meta(remote_name, path, result.get("etag"), result.get("last_modified")) - - artifact_data = storage.download_object(storage.get_object_key(remote_name, path)) - - is_blob = "/blobs/" in path - if is_blob: - content_type = "application/octet-stream" - else: - try: - manifest_json = json.loads(artifact_data) - content_type = manifest_json.get("mediaType") - if not content_type: - if "manifests" in manifest_json: - content_type = "application/vnd.oci.image.index.v1+json" - else: - content_type = "application/vnd.oci.image.manifest.v1+json" - except Exception: - content_type = "application/vnd.oci.image.manifest.v1+json" - - digest = f"sha256:{hashlib.sha256(artifact_data).hexdigest()}" - headers = { - "Docker-Distribution-Api-Version": "registry/2.0", - "Docker-Content-Digest": digest, - "Content-Length": str(len(artifact_data)), - } - - if request.method == "HEAD": - return Response(status_code=200, headers=headers, media_type=content_type) - - metrics.record_cache_hit(remote_name, len(artifact_data)) - return Response(content=artifact_data, media_type=content_type, headers=headers) + return await docker_handler.proxy(request, remote_name, path, storage, cache, config, metrics) -async def discover_artifacts(remote: str, include_pattern: str) -> list[str]: - if "github.com" in remote: - return await discover_github_releases(remote, include_pattern) - else: - raise HTTPException(status_code=400, detail=f"Unsupported remote: {remote}") - - -async def discover_github_releases(remote: str, include_pattern: str) -> list[str]: - match = re.match(r"github\.com/([^/]+)/([^/]+)", remote) - if not match: - raise HTTPException(status_code=400, detail="Invalid GitHub remote format") - - owner, repo = match.groups() - - async with httpx.AsyncClient(follow_redirects=True) as client: - response = await client.get(f"https://api.github.com/repos/{owner}/{repo}/releases") - - if response.status_code != 200: - raise HTTPException( - status_code=response.status_code, - detail=f"Failed to fetch releases: {response.text}", - ) - - releases = response.json() - - matching_urls = [] - pattern = include_pattern.replace("*", ".*") - regex = re.compile(pattern) - - for release in releases: - for asset in release.get("assets", []): - download_url = asset["browser_download_url"] - if regex.search(download_url): - matching_urls.append(download_url) - - return matching_urls +@app.get("/api/v1/remote/{remote_name}/{path:path}") +async def get_artifact(request: Request, remote_name: str, path: str): + return await proxy.handle(request, remote_name, path, storage, cache, config, database, metrics) @app.put("/api/v1/remote/{remote_name}/{path:path}") async def upload_file(remote_name: str, path: str, file: UploadFile = File(...)): - """Upload a file to local repository""" - # Check if remote is configured and is local - remote_config = config.get_remote_config(remote_name) - if not remote_config: - raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured") - - if remote_config.get("type") != "local": - raise HTTPException(status_code=400, detail="Upload only supported for local repositories") - - try: - # Read file content - content = await file.read() - - # Calculate SHA256 - sha256_sum = hashlib.sha256(content).hexdigest() - - # Check if file already exists (prevent overwrite) - if database.file_exists(remote_name, path): - raise HTTPException(status_code=409, detail="File already exists") - - # Generate S3 key - s3_key = f"local/{remote_name}/{path}" - - # Determine content type - content_type = file.content_type or "application/octet-stream" - - # Upload to S3 - try: - storage.upload(s3_key, content) - except Exception as e: - raise HTTPException(status_code=500, detail=f"Upload failed: {e}") - - # Add to database - success = database.add_local_file( - repository_name=remote_name, - file_path=path, - s3_key=s3_key, - size_bytes=len(content), - sha256_sum=sha256_sum, - content_type=content_type, - ) - - if not success: - # Clean up S3 if database insert failed - storage.delete_object(s3_key) - raise HTTPException(status_code=500, detail="Failed to save file metadata") - - return JSONResponse( - { - "message": "File uploaded successfully", - "file_path": path, - "size_bytes": len(content), - "sha256_sum": sha256_sum, - "content_type": content_type, - } - ) - - except HTTPException: - raise - except Exception as e: - raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}") + return await local.upload(remote_name, path, file, storage, database, config) @app.head("/api/v1/remote/{remote_name}/{path:path}") def check_file_exists(remote_name: str, path: str): - """Check if file exists (for CI jobs) - supports local repositories only""" - # Check if remote is configured - remote_config = config.get_remote_config(remote_name) - if not remote_config: - raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured") - - # Handle local repository - if remote_config.get("type") == "local": - try: - metadata = database.get_local_file_metadata(remote_name, path) - if not metadata: - raise HTTPException(status_code=404, detail="File not found") - - return Response( - headers={ - "Content-Length": str(metadata["size_bytes"]), - "Content-Type": metadata.get("content_type", "application/octet-stream"), - "X-SHA256": metadata["sha256_sum"], - "X-Created-At": metadata["created_at"].isoformat() if metadata["created_at"] else "", - "X-Uploaded-At": metadata["uploaded_at"].isoformat() if metadata["uploaded_at"] else "", - } - ) - except HTTPException: - raise - except Exception as e: - raise HTTPException(status_code=500, detail=f"Check failed: {str(e)}") - else: - # For remote repositories, just return 405 Method Not Allowed - raise HTTPException(status_code=405, detail="HEAD method only supported for local repositories") + return local.check_exists(remote_name, path, database, config) @app.delete("/api/v1/remote/{remote_name}/{path:path}") def delete_file(remote_name: str, path: str): - """Delete a file from local repository""" - # Check if remote is configured and is local - remote_config = config.get_remote_config(remote_name) - if not remote_config: - raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured") - - if remote_config.get("type") != "local": - raise HTTPException(status_code=400, detail="Delete only supported for local repositories") - - try: - # Get S3 key before deleting from database - s3_key = database.delete_local_file(remote_name, path) - if not s3_key: - raise HTTPException(status_code=404, detail="File not found") - - # Delete from S3 - if not storage.delete_object(s3_key): - # File was deleted from database but not from S3 - log warning but continue - print(f"Warning: Failed to delete S3 object {s3_key}") - - return JSONResponse({"message": "File deleted successfully"}) - except HTTPException: - raise - except Exception as e: - raise HTTPException(status_code=500, detail=f"Delete failed: {str(e)}") + return local.delete(remote_name, path, storage, database, config) @app.post("/api/v1/artifacts/cache") -async def cache_artifact(request: ArtifactRequest) -> dict[str, Any]: - try: - matching_urls = await discover_artifacts(request.remote, request.include_pattern) - - if not matching_urls: - return { - "message": "No matching artifacts found", - "cached_count": 0, - "artifacts": [], - } - - cached_artifacts = [] - - for url in matching_urls: - result = await cache_single_artifact(url, "", "") - cached_artifacts.append(result) - - cached_count = sum(1 for artifact in cached_artifacts if artifact["status"] in ["cached", "already_cached"]) - - return { - "message": f"Processed {len(matching_urls)} artifacts, {cached_count} successfully cached", - "cached_count": cached_count, - "artifacts": cached_artifacts, - } - - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) +async def cache_artifact(request: ArtifactRequest): + return await discovery.cache_artifacts(request.remote, request.include_pattern, storage) @app.get("/api/v1/artifacts/{remote:path}") -async def list_cached_artifacts(remote: str, include_pattern: str = ".*") -> dict[str, Any]: - try: - matching_urls = await discover_artifacts(remote, include_pattern) - - cached_artifacts = [] - for url in matching_urls: - # Extract path from URL for hierarchical key generation - from urllib.parse import urlparse - - parsed = urlparse(url) - path = parsed.path - key = storage.get_object_key(remote, path) - if storage.exists(key): - cached_artifacts.append({"url": url, "cached_url": storage.get_url(key), "key": key}) - - return { - "remote": remote, - "pattern": include_pattern, - "total_found": len(matching_urls), - "cached_count": len(cached_artifacts), - "artifacts": cached_artifacts, - } - - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - - -@app.get("/metrics") -def get_metrics( - json: bool | None = Query(False, description="Return JSON format instead of Prometheus"), -): - """Get comprehensive metrics about the artifact storage system""" - config._check_reload() - - if json: - # Return JSON format - return metrics.get_metrics(storage, config) - else: - # Return Prometheus format - metrics.get_metrics(storage, config) # Update gauges - prometheus_data = generate_latest().decode("utf-8") - return PlainTextResponse(prometheus_data, media_type=CONTENT_TYPE_LATEST) - - -@app.get("/config") -def get_config(): - return config.config +async def list_cached_artifacts(remote: str, include_pattern: str = ".*"): + return await discovery.list_artifacts(remote, include_pattern, storage) def main(): diff --git a/src/artifactapi/remote/__init__.py b/src/artifactapi/remote/__init__.py new file mode 100644 index 0000000..96efd4d --- /dev/null +++ b/src/artifactapi/remote/__init__.py @@ -0,0 +1,4 @@ +from . import generic, helm, npm, python, rpm +from .base import get_content_type + +__all__ = ["generic", "helm", "npm", "python", "rpm", "get_content_type"] diff --git a/src/artifactapi/remote/base.py b/src/artifactapi/remote/base.py new file mode 100644 index 0000000..ce5f523 --- /dev/null +++ b/src/artifactapi/remote/base.py @@ -0,0 +1,16 @@ +def get_content_type(filename: str) -> str: + if filename.endswith((".tar.gz", ".tgz")): + return "application/gzip" + if filename.endswith(".zip") or filename.endswith(".whl"): + return "application/zip" + if filename.endswith(".exe"): + return "application/x-msdownload" + if filename.endswith(".rpm"): + return "application/x-rpm" + if filename.endswith(".xml"): + return "application/xml" + if filename.endswith((".xml.gz", ".xml.bz2", ".xml.xz")): + return "application/gzip" + if filename.endswith((".yaml", ".yml")): + return "text/yaml" + return "application/octet-stream" diff --git a/src/artifactapi/remote/generic.py b/src/artifactapi/remote/generic.py new file mode 100644 index 0000000..3a41962 --- /dev/null +++ b/src/artifactapi/remote/generic.py @@ -0,0 +1,3 @@ +from .base import get_content_type + +__all__ = ["get_content_type"] diff --git a/src/artifactapi/remote/helm.py b/src/artifactapi/remote/helm.py new file mode 100644 index 0000000..dc0aa79 --- /dev/null +++ b/src/artifactapi/remote/helm.py @@ -0,0 +1,18 @@ +from .base import get_content_type + + +def resolve_content( + data: bytes, + path: str, + filename: str, + base_url: str, + proxy_url: str, + remote_name: str, +) -> tuple[bytes, str]: + if filename == "index.yaml": + data = data.replace( + base_url.encode(), + f"{proxy_url}/api/v1/remote/{remote_name}".encode(), + ) + return data, "text/yaml" + return data, get_content_type(filename) diff --git a/src/artifactapi/remote/npm.py b/src/artifactapi/remote/npm.py new file mode 100644 index 0000000..3547b2d --- /dev/null +++ b/src/artifactapi/remote/npm.py @@ -0,0 +1,21 @@ +import re + +from .base import get_content_type + + +def resolve_content( + data: bytes, + path: str, + filename: str, + immutable_patterns: list[str], + base_url: str, + proxy_url: str, + remote_name: str, +) -> tuple[bytes, str]: + if not any(re.search(p, path) for p in immutable_patterns): + data = data.replace( + base_url.encode(), + f"{proxy_url}/api/v1/remote/{remote_name}".encode(), + ) + return data, "application/json" + return data, get_content_type(filename) diff --git a/src/artifactapi/remote/python.py b/src/artifactapi/remote/python.py new file mode 100644 index 0000000..bed8d2d --- /dev/null +++ b/src/artifactapi/remote/python.py @@ -0,0 +1,32 @@ +import re + +from .base import get_content_type + + +def construct_url(base_url: str, path: str) -> str: + """Build the upstream URL for a PyPI request. + + PyPI splits simple/ index pages (pypi.org) from file downloads + (files.pythonhosted.org), so simple/ requests are redirected to pypi.org. + """ + if base_url.rstrip("/") == "https://files.pythonhosted.org" and "simple/" in path: + return f"https://pypi.org/{path}" + return f"{base_url}/{path}" + + +def resolve_content( + data: bytes, + path: str, + filename: str, + immutable_patterns: list[str], + base_url: str, + proxy_url: str, + remote_name: str, +) -> tuple[bytes, str]: + if not any(re.search(p, path) for p in immutable_patterns): + data = data.replace( + base_url.encode(), + f"{proxy_url}/api/v1/remote/{remote_name}".encode(), + ) + return data, "text/html; charset=utf-8" + return data, get_content_type(filename) diff --git a/src/artifactapi/remote/rpm.py b/src/artifactapi/remote/rpm.py new file mode 100644 index 0000000..3a41962 --- /dev/null +++ b/src/artifactapi/remote/rpm.py @@ -0,0 +1,3 @@ +from .base import get_content_type + +__all__ = ["get_content_type"] diff --git a/src/artifactapi/storage/__init__.py b/src/artifactapi/storage/__init__.py new file mode 100644 index 0000000..64272bd --- /dev/null +++ b/src/artifactapi/storage/__init__.py @@ -0,0 +1,3 @@ +from .s3 import S3Storage + +__all__ = ["S3Storage"] diff --git a/src/artifactapi/storage.py b/src/artifactapi/storage/s3.py similarity index 90% rename from src/artifactapi/storage.py rename to src/artifactapi/storage/s3.py index b58498f..b2bcbc6 100644 --- a/src/artifactapi/storage.py +++ b/src/artifactapi/storage/s3.py @@ -41,7 +41,6 @@ class S3Storage: self.client = boto3.client("s3", **client_kwargs) - # Try to ensure bucket exists, but don't fail if MinIO isn't ready yet try: self._ensure_bucket_exists() except Exception as e: @@ -55,25 +54,21 @@ class S3Storage: self.client.create_bucket(Bucket=self.bucket) def get_object_key(self, remote_name: str, path: str) -> str: - # Extract directory path and filename clean_path = path.lstrip("/") filename = os.path.basename(clean_path) directory_path = os.path.dirname(clean_path) - # Special handling for Docker registry blobs (use digest as key for deduplication) + # Docker blobs are keyed by digest for deduplication across images if "/blobs/sha256:" in clean_path: - # Extract the SHA256 digest for Docker blobs parts = clean_path.split("/blobs/sha256:") if len(parts) == 2: digest = parts[1] return f"{remote_name}/blobs/sha256/{digest}" - # Hash the directory path to keep keys manageable while preserving remote structure if directory_path: path_hash = hashlib.sha256(directory_path.encode()).hexdigest()[:16] return f"{remote_name}/{path_hash}/{filename}" else: - # If no directory, just use remote and filename return f"{remote_name}/{filename}" def exists(self, key: str) -> bool: diff --git a/tests/test_routes.py b/tests/test_routes.py index dbe4815..f755c77 100644 --- a/tests/test_routes.py +++ b/tests/test_routes.py @@ -204,7 +204,7 @@ class TestDockerProxy: deps["cache"].is_mutable_file.return_value = True with patch( - "artifactapi.main.cache_single_artifact", + "artifactapi.artifact.proxy.cache_single_artifact", new_callable=AsyncMock, return_value={"status": "cached"}, ) as mock_fetch: @@ -226,7 +226,7 @@ class TestDockerProxy: deps["cache"].is_mutable_file.return_value = True with patch( - "artifactapi.main.cache_single_artifact", + "artifactapi.artifact.proxy.cache_single_artifact", new_callable=AsyncMock, return_value={"status": "cached"}, ): @@ -248,9 +248,9 @@ class TestDockerProxy: deps["cache"].is_index_valid.return_value = False # but TTL expired deps["storage"].download_object.return_value = manifest - with patch("artifactapi.main._upstream_reachable", new_callable=AsyncMock, return_value=True): + with patch("artifactapi.artifact.proxy._upstream_reachable", new_callable=AsyncMock, return_value=True): with patch( - "artifactapi.main.cache_single_artifact", + "artifactapi.artifact.proxy.cache_single_artifact", new_callable=AsyncMock, return_value={"status": "cached"}, ) as mock_fetch: @@ -352,7 +352,7 @@ class TestGenericArtifactRoute: deps["cache"].is_mutable_file.return_value = False with patch( - "artifactapi.main.cache_single_artifact", + "artifactapi.artifact.proxy.cache_single_artifact", new_callable=AsyncMock, return_value={"status": "cached"}, ) as mock_fetch: @@ -369,7 +369,7 @@ class TestGenericArtifactRoute: deps["cache"].is_mutable_file.return_value = False with patch( - "artifactapi.main.cache_single_artifact", + "artifactapi.artifact.proxy.cache_single_artifact", new_callable=AsyncMock, return_value={"status": "cached"}, ): @@ -384,7 +384,7 @@ class TestGenericArtifactRoute: deps["cache"].is_mutable_file.return_value = True with patch( - "artifactapi.main.cache_single_artifact", + "artifactapi.artifact.proxy.cache_single_artifact", new_callable=AsyncMock, return_value={"status": "cached"}, ): @@ -399,7 +399,7 @@ class TestGenericArtifactRoute: deps["cache"].is_mutable_file.return_value = False with patch( - "artifactapi.main.cache_single_artifact", + "artifactapi.artifact.proxy.cache_single_artifact", new_callable=AsyncMock, return_value={"status": "error", "error": "upstream unreachable"}, ): @@ -430,7 +430,7 @@ class TestGenericArtifactRoute: deps["cache"].is_index_valid.return_value = False deps["cache"].get_mutable_meta.return_value = {"etag": '"abc"'} - with patch("artifactapi.main.check_upstream_changed", new_callable=AsyncMock, return_value=False): + with patch("artifactapi.artifact.proxy.check_upstream_changed", new_callable=AsyncMock, return_value=False): response = client.get("/api/v1/remote/check-mutable-test/metadata.json") assert response.status_code == 200 @@ -446,8 +446,8 @@ class TestGenericArtifactRoute: deps["cache"].is_index_valid.return_value = False deps["cache"].get_mutable_meta.return_value = {"etag": '"abc"'} - with patch("artifactapi.main.check_upstream_changed", new_callable=AsyncMock, return_value=True): - with patch("artifactapi.main.cache_single_artifact", new_callable=AsyncMock) as mock_cache: + with patch("artifactapi.artifact.proxy.check_upstream_changed", new_callable=AsyncMock, return_value=True): + with patch("artifactapi.artifact.proxy.cache_single_artifact", new_callable=AsyncMock) as mock_cache: mock_cache.return_value = {"status": "error", "error": "upstream gone"} response = client.get("/api/v1/remote/check-mutable-test/metadata.json") @@ -462,8 +462,8 @@ class TestGenericArtifactRoute: deps["cache"].is_index_valid.return_value = False deps["cache"].get_mutable_meta.return_value = {"etag": '"abc"'} - with patch("artifactapi.main.check_upstream_changed", new_callable=AsyncMock, return_value=True): - with patch("artifactapi.main.cache_single_artifact", new_callable=AsyncMock) as mock_cache: + with patch("artifactapi.artifact.proxy.check_upstream_changed", new_callable=AsyncMock, return_value=True): + with patch("artifactapi.artifact.proxy.cache_single_artifact", new_callable=AsyncMock) as mock_cache: mock_cache.return_value = {"status": "cached", "etag": '"def"', "last_modified": None} response = client.get("/api/v1/remote/check-mutable-test/metadata.json") @@ -472,7 +472,7 @@ class TestGenericArtifactRoute: def test_mutable_backend_unreachable_on_check_updates_keeps_stale(self, client, patched_deps): """When check_mutable_updates=True and backend is unreachable, stale copy is kept and TTL refreshed.""" - from artifactapi.main import UpstreamUnreachable + from artifactapi.artifact.proxy import UpstreamUnreachable deps = patched_deps deps["storage"].exists.return_value = True @@ -481,7 +481,7 @@ class TestGenericArtifactRoute: deps["cache"].is_index_valid.return_value = False deps["cache"].get_mutable_meta.return_value = {"etag": '"abc"'} - with patch("artifactapi.main.check_upstream_changed", side_effect=UpstreamUnreachable("connection refused")): + with patch("artifactapi.artifact.proxy.check_upstream_changed", side_effect=UpstreamUnreachable("connection refused")): response = client.get("/api/v1/remote/check-mutable-test/metadata.json") assert response.status_code == 200 @@ -496,7 +496,7 @@ class TestGenericArtifactRoute: deps["cache"].is_mutable_file.return_value = True deps["cache"].is_index_valid.return_value = False - with patch("artifactapi.main._upstream_reachable", new_callable=AsyncMock, return_value=False): + with patch("artifactapi.artifact.proxy._upstream_reachable", new_callable=AsyncMock, return_value=False): response = client.get("/api/v1/remote/alpine-test/alpine/v3.18/x86_64/APKINDEX.tar.gz") assert response.status_code == 200 @@ -510,8 +510,8 @@ class TestGenericArtifactRoute: deps["cache"].is_mutable_file.return_value = True deps["cache"].is_index_valid.return_value = False - with patch("artifactapi.main.check_upstream_changed", new_callable=AsyncMock) as mock_check: - with patch("artifactapi.main.cache_single_artifact", new_callable=AsyncMock) as mock_cache: + with patch("artifactapi.artifact.proxy.check_upstream_changed", new_callable=AsyncMock) as mock_check: + with patch("artifactapi.artifact.proxy.cache_single_artifact", new_callable=AsyncMock) as mock_cache: mock_cache.return_value = {"status": "error", "error": "upstream gone"} client.get("/api/v1/remote/custom-index-test/metadata.json") @@ -706,7 +706,7 @@ class TestPyPIRemote: deps["cache"].is_mutable_file.return_value = True with patch( - "artifactapi.main.cache_single_artifact", + "artifactapi.artifact.proxy.cache_single_artifact", new_callable=AsyncMock, return_value={"status": "cached"}, ) as mock_fetch: @@ -821,7 +821,7 @@ class TestNpmRemote: deps["cache"].is_mutable_file.return_value = True with patch( - "artifactapi.main.cache_single_artifact", + "artifactapi.artifact.proxy.cache_single_artifact", new_callable=AsyncMock, return_value={"status": "cached"}, ) as mock_fetch: @@ -907,7 +907,7 @@ class TestHelmRemote: deps["cache"].is_mutable_file.return_value = True with patch( - "artifactapi.main.cache_single_artifact", + "artifactapi.artifact.proxy.cache_single_artifact", new_callable=AsyncMock, return_value={"status": "cached"}, ) as mock_fetch: