Compare commits

..

1 Commits

Author SHA1 Message Date
unkinben 1cca9fef00 feat: testing develop build
- using the makerun image in k8s
2026-01-10 16:17:56 +11:00
10 changed files with 133 additions and 560 deletions
-4
View File
@@ -45,7 +45,3 @@ uv.lock
# Docker volumes # Docker volumes
minio_data/ minio_data/
# Local configuration overrides
docker-compose.yml
ca-bundle.pem
+4 -4
View File
@@ -23,6 +23,10 @@ RUN wget -O /app/uv-x86_64-unknown-linux-musl.tar.gz https://github.com/astral-s
chmod +x /usr/local/bin/uv && \ chmod +x /usr/local/bin/uv && \
uv --version uv --version
# Copy CA bundle from host
COPY ca-bundle.pem /app/ca-bundle.pem
RUN chmod 644 /app/ca-bundle.pem
# Create non-root user first # Create non-root user first
RUN adduser -D -s /bin/sh appuser && \ RUN adduser -D -s /bin/sh appuser && \
chown -R appuser:appuser /app chown -R appuser:appuser /app
@@ -32,15 +36,11 @@ COPY --chown=appuser:appuser pyproject.toml uv.lock README.md ./
# Switch to appuser and install Python dependencies # Switch to appuser and install Python dependencies
USER appuser USER appuser
ARG VERSION=dev
ENV HATCH_VCS_PRETEND_VERSION=${VERSION} \
SETUPTOOLS_SCM_PRETEND_VERSION=${VERSION}
RUN uv sync --frozen RUN uv sync --frozen
# Copy application source # Copy application source
COPY --chown=appuser:appuser src/ ./src/ COPY --chown=appuser:appuser src/ ./src/
COPY --chown=appuser:appuser remotes.yaml ./ COPY --chown=appuser:appuser remotes.yaml ./
COPY --chown=appuser:appuser ca-bundle.pem ./
# Expose port # Expose port
EXPOSE 8000 EXPOSE 8000
+2 -24
View File
@@ -26,6 +26,8 @@ format:
uv run ruff format . uv run ruff format .
run: run:
uv venv --python 3.11 && \
source .venv/bin/activate && \
uv run python -m src.artifactapi.main uv run python -m src.artifactapi.main
docker-up: docker-up:
@@ -45,27 +47,3 @@ docker-clean:
docker system prune -f docker system prune -f
docker-restart: docker-down docker-up docker-restart: docker-down docker-up
# Bump helpers — reads the latest semver tag and creates the next one.
# If no tag exists yet, starts from v0.0.0.
_LATEST := $(shell git tag --sort=-v:refname | grep -E '^v[0-9]+\.[0-9]+\.[0-9]+$$' | head -1)
_BASE := $(if $(_LATEST),$(_LATEST),v0.0.0)
_MAJ := $(shell echo $(_BASE) | sed 's/^v//' | cut -d. -f1)
_MIN := $(shell echo $(_BASE) | sed 's/^v//' | cut -d. -f2)
_PAT := $(shell echo $(_BASE) | sed 's/^v//' | cut -d. -f3)
patch:
@NEW=v$(_MAJ).$(_MIN).$(shell expr $(_PAT) + 1); \
git tag $$NEW && echo "Tagged $$NEW" && $(MAKE) _tag TAG=$$NEW
minor:
@NEW=v$(_MAJ).$(shell expr $(_MIN) + 1).0; \
git tag $$NEW && echo "Tagged $$NEW" && $(MAKE) _tag TAG=$$NEW
major:
@NEW=v$(shell expr $(_MAJ) + 1).0.0; \
git tag $$NEW && echo "Tagged $$NEW" && $(MAKE) _tag TAG=$$NEW
_tag:
git push origin $(TAG)
docker-compose build --no-cache --build-arg VERSION=$(TAG:v%=%)
-137
View File
@@ -1,137 +0,0 @@
# ArtifactAPI Specification
## Repository model
Every repository entry in `remotes.yaml` has two orthogonal fields:
| field | values | meaning |
|---|---|---|
| `type` | `local`, `remote`, `virtual` | repository kind — how the repo is served |
| `package` | `docker`, `rpm`, `alpine`, `generic` | package format — what protocol and caching rules to apply |
**type**
- `local` — files are uploaded directly to the API and stored in S3; no upstream.
- `remote` — proxies and caches content from an upstream URL (`base_url`).
- `virtual` — aggregates multiple repositories (not yet implemented).
**package**
- `docker` — upstream speaks the OCI Distribution API (Bearer auth, manifest/blob paths).
- `rpm` — upstream is an RPM repository; repodata files are index files.
- `alpine` — upstream is an Alpine APK repository; `APKINDEX.tar.gz` is an index file.
- `generic` — plain HTTP file download; no format-specific logic.
---
## Caching
Two cache classes determine retention:
| class | stored | TTL |
|---|---|---|
| **file** | S3 object, no Redis entry | `file_ttl``0` means indefinite |
| **index** | S3 object + Redis TTL key | `index_ttl` — when the Redis key expires the S3 object is deleted and re-fetched |
Index files are mutable metadata that must expire. File-class objects are treated as immutable and cached indefinitely (unless `file_ttl` is set).
---
## Docker package rules
### URL construction
Remote URLs are prefixed with `/v2/` for `package: docker` remotes:
```
{base_url}/v2/{path}
```
e.g. `library/nginx/manifests/latest``https://registry-1.docker.io/v2/library/nginx/manifests/latest`
### Authentication
Docker registries use Bearer token challenges. On a `401 Unauthorized` response, the API:
1. Parses the `WWW-Authenticate: Bearer` header for `realm`, `service`, and `scope`.
2. Fetches a token from the auth realm, supplying `username`/`password` from the remote config if present.
3. Retries the request with `Authorization: Bearer <token>`.
Tokens are cached in-memory keyed by `(realm, service, scope, username)` and expire 30 seconds before their stated `expires_in`.
### Cache classification
| path pattern | mutable | class | TTL source |
|---|---|---|---|
| `/manifests/<tag>` | yes | index | `index_ttl` |
| `/tags/list` | yes | index | `index_ttl` |
| `/manifests/sha256:<digest>` | no | file | `file_ttl` |
| `/blobs/sha256:<digest>` | no | file | `file_ttl` |
Tag-based manifests and tag lists are mutable and cached as index. Digest-pinned manifests and blobs are content-addressed and cached indefinitely as files.
### Blob deduplication
Blobs are stored under a digest-keyed path shared across all images on the same remote:
```
{remote_name}/blobs/sha256/{digest}
```
The same layer pulled by different images is stored once.
### Accept headers
| path | `Accept` header sent upstream |
|---|---|
| `/manifests/…` | `application/vnd.docker.distribution.manifest.v2+json`, `application/vnd.oci.image.manifest.v1+json`, `application/vnd.oci.image.index.v1+json`, `application/vnd.docker.distribution.manifest.list.v2+json` |
| `/blobs/…` | `application/octet-stream` |
---
## OCI Distribution API endpoint
The API exposes a native Docker registry interface so clients can use `docker pull` directly:
```
GET /v2/ — version ping
GET /v2/{remote}/{image}/manifests/{ref} — fetch manifest
HEAD /v2/{remote}/{image}/manifests/{ref} — manifest metadata
GET /v2/{remote}/{image}/blobs/{digest} — fetch blob
HEAD /v2/{remote}/{image}/blobs/{digest} — blob metadata
```
Responses include `Docker-Distribution-Api-Version`, `Docker-Content-Digest`, and the correct OCI `Content-Type` (detected from the manifest `mediaType` field).
Only remotes with `package: docker` are accessible via this endpoint. All other remotes return `400`.
---
## include_patterns
`include_patterns` is a list of Python regexes applied to every request before any upstream fetch or cache lookup.
**Generic remotes (`/api/v1/remote/…`):**
- Patterns match against the file path and the full path.
- Index files (mutable metadata) bypass pattern checks and are always allowed.
**Docker remotes (`/v2/…`):**
- Patterns match against the image name (first two path segments, e.g. `library/nginx`) and the full path.
- The index-file exemption does **not** apply — patterns restrict whole images, including their manifests and tag lists.
- No patterns configured → all images allowed.
Returns `403` when a request is blocked.
---
## Versioning
The package version is derived from git tags via `hatch-vcs`. Tags follow the format `v{MAJOR}.{MINOR}.{PATCH}`.
Docker images are built with the version injected at build time:
```
SETUPTOOLS_SCM_PRETEND_VERSION=<version> uv sync --frozen
```
The `Makefile` provides `patch`, `minor`, and `major` targets that tag the current commit and rebuild the container image.
+86
View File
@@ -0,0 +1,86 @@
version: '3.8'
services:
artifactapi:
build:
context: .
dockerfile: Dockerfile
no_cache: true
ports:
- "8000:8000"
environment:
- CONFIG_PATH=/app/remotes.yaml
- DBHOST=postgres
- DBPORT=5432
- DBUSER=artifacts
- DBPASS=artifacts123
- DBNAME=artifacts
- REDIS_URL=redis://redis:6379
- MINIO_ENDPOINT=minio:9000
- MINIO_ACCESS_KEY=minioadmin
- MINIO_SECRET_KEY=minioadmin
- MINIO_BUCKET=artifacts
- MINIO_SECURE=false
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
minio:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
minio:
image: minio/minio:latest
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ":9001"
volumes:
- minio_data:/data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --save 20 1
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 30s
timeout: 10s
retries: 3
postgres:
image: postgres:15-alpine
ports:
- "5432:5432"
environment:
POSTGRES_DB: artifacts
POSTGRES_USER: artifacts
POSTGRES_PASSWORD: artifacts123
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U artifacts -d artifacts"]
interval: 30s
timeout: 10s
retries: 3
volumes:
minio_data:
redis_data:
postgres_data:
+2 -5
View File
@@ -1,6 +1,6 @@
[project] [project]
name = "artifactapi" name = "artifactapi"
dynamic = ["version"] version = "2.0.0"
description = "Generic artifact caching system with support for various package managers" description = "Generic artifact caching system with support for various package managers"
dependencies = [ dependencies = [
@@ -23,12 +23,9 @@ license = {text = "MIT"}
artifactapi = "artifactapi.main:main" artifactapi = "artifactapi.main:main"
[build-system] [build-system]
requires = ["hatchling", "hatch-vcs"] requires = ["hatchling"]
build-backend = "hatchling.build" build-backend = "hatchling.build"
[tool.hatch.version]
source = "vcs"
[tool.hatch.metadata] [tool.hatch.metadata]
allow-direct-references = true allow-direct-references = true
+6 -28
View File
@@ -23,20 +23,8 @@ class RedisCache:
file_path.endswith("APKINDEX.tar.gz") file_path.endswith("APKINDEX.tar.gz")
or file_path.endswith("Packages.gz") or file_path.endswith("Packages.gz")
or file_path.endswith("repomd.xml") or file_path.endswith("repomd.xml")
or ("repodata/" in file_path or "repodata/" in file_path
and file_path.endswith(( and file_path.endswith((".xml", ".xml.gz", ".xml.bz2", ".xml.xz"))
".xml", ".xml.gz", ".xml.bz2", ".xml.xz", ".xml.zck", ".xml.zst",
".sqlite", ".sqlite.gz", ".sqlite.bz2", ".sqlite.xz", ".sqlite.zck", ".sqlite.zst",
".yaml.xz", ".yaml.gz", ".yaml.bz2", ".yaml.zst",
".asc", ".txt"
)))
# Docker tag-based manifests are mutable (index); digest-pinned are immutable (file)
or (
"/manifests/" in file_path
and not file_path.split("/manifests/", 1)[1].startswith("sha256:")
)
or "/tags/list" in file_path
or file_path.endswith("/tags/list")
) )
def get_index_cache_key(self, remote_name: str, path: str) -> str: def get_index_cache_key(self, remote_name: str, path: str) -> str:
@@ -73,19 +61,9 @@ class RedisCache:
return return
try: try:
# Construct the URL the same way as in the main flow # Get the S3 key and remove it
from .config import ConfigManager s3_key = storage.get_object_key_from_path(remote_name, path)
import os if storage.exists(s3_key):
config_path = os.environ.get("CONFIG_PATH") storage.client.delete_object(Bucket=storage.bucket, Key=s3_key)
if config_path:
config = ConfigManager(config_path)
remote_config = config.get_remote_config(remote_name)
if remote_config:
base_url = remote_config.get("base_url")
if base_url:
# Use hierarchical path-based key (same as cache_single_artifact)
s3_key = storage.get_object_key(remote_name, path)
if storage.exists(s3_key):
storage.client.delete_object(Bucket=storage.bucket, Key=s3_key)
except Exception: except Exception:
pass pass
-96
View File
@@ -1,96 +0,0 @@
import time
import logging
import re
from typing import Optional
import httpx
logger = logging.getLogger(__name__)
# In-memory token cache: key -> (token, expires_at)
_token_cache: dict[str, tuple[str, float]] = {}
_WWW_AUTH_RE = re.compile(
r'Bearer\s+realm="(?P<realm>[^"]+)"'
r'(?:,service="(?P<service>[^"]*)")?'
r'(?:,scope="(?P<scope>[^"]*)")?',
re.IGNORECASE,
)
def _cache_key(realm: str, service: str, scope: str, username: Optional[str]) -> str:
return f"{realm}|{service}|{scope}|{username or ''}"
def _get_cached_token(key: str) -> Optional[str]:
entry = _token_cache.get(key)
if entry and entry[1] > time.time():
return entry[0]
_token_cache.pop(key, None)
return None
def _store_token(key: str, token: str, expires_in: int) -> None:
# Expire 30s early to avoid using a token right as it expires
_token_cache[key] = (token, time.time() + max(expires_in - 30, 10))
async def fetch_token(
realm: str,
service: str,
scope: str,
username: Optional[str] = None,
password: Optional[str] = None,
) -> Optional[str]:
"""Fetch a Bearer token from a Docker registry auth server."""
key = _cache_key(realm, service, scope, username)
cached = _get_cached_token(key)
if cached:
return cached
params: dict[str, str] = {}
if service:
params["service"] = service
if scope:
params["scope"] = scope
auth = (username, password) if username and password else None
try:
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(realm, params=params, auth=auth)
response.raise_for_status()
data = response.json()
except Exception as e:
logger.warning(f"Docker token fetch failed ({realm}): {e}")
return None
token = data.get("token") or data.get("access_token")
if not token:
logger.warning(f"Docker token response missing token field: {data}")
return None
expires_in = int(data.get("expires_in", 300))
_store_token(key, token, expires_in)
logger.debug(f"Docker token obtained (realm={realm}, service={service}, scope={scope}, expires_in={expires_in}s)")
return token
def parse_www_authenticate(header: str) -> Optional[tuple[str, str, str]]:
"""Parse WWW-Authenticate: Bearer header. Returns (realm, service, scope) or None."""
m = _WWW_AUTH_RE.search(header)
if not m:
return None
return m.group("realm"), m.group("service") or "", m.group("scope") or ""
async def get_docker_token_for_response(
www_authenticate: str,
username: Optional[str] = None,
password: Optional[str] = None,
) -> Optional[str]:
"""Given a WWW-Authenticate header value, fetch and return a Bearer token."""
parsed = parse_www_authenticate(www_authenticate)
if not parsed:
return None
realm, service, scope = parsed
return await fetch_token(realm, service, scope, username, password)
+22 -242
View File
@@ -1,28 +1,18 @@
import os import os
import re import re
import json
import hashlib import hashlib
import logging
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
import httpx import httpx
from fastapi import FastAPI, HTTPException, Response, Request, Query, File, UploadFile from fastapi import FastAPI, HTTPException, Response, Query, File, UploadFile
from fastapi.responses import PlainTextResponse, JSONResponse from fastapi.responses import PlainTextResponse, JSONResponse
from pydantic import BaseModel from pydantic import BaseModel
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
try:
from importlib.metadata import version
__version__ = version("artifactapi")
except ImportError:
# Fallback for development when package isn't installed
__version__ = "dev"
from .config import ConfigManager from .config import ConfigManager
from .database import DatabaseManager from .database import DatabaseManager
from .storage import S3Storage from .storage import S3Storage
from .cache import RedisCache from .cache import RedisCache
from .metrics import MetricsManager from .metrics import MetricsManager
from .docker_auth import get_docker_token_for_response
class ArtifactRequest(BaseModel): class ArtifactRequest(BaseModel):
@@ -30,14 +20,7 @@ class ArtifactRequest(BaseModel):
include_pattern: str include_pattern: str
# Configure logging app = FastAPI(title="Artifact Storage API", version="2.0.0")
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = FastAPI(title="Artifact Storage API", version=__version__)
# Initialize components using config # Initialize components using config
config_path = os.environ.get("CONFIG_PATH") config_path = os.environ.get("CONFIG_PATH")
@@ -62,7 +45,7 @@ def read_root():
config._check_reload() config._check_reload()
return { return {
"message": "Artifact Storage API", "message": "Artifact Storage API",
"version": app.version, "version": "2.0.0",
"remotes": list(config.config.get("remotes", {}).keys()), "remotes": list(config.config.get("remotes", {}).keys()),
} }
@@ -72,87 +55,6 @@ def health_check():
return {"status": "healthy"} return {"status": "healthy"}
@app.put("/cache/flush")
def flush_cache(
remote: str = Query(default=None, description="Specific remote to flush (optional)"),
cache_type: str = Query(default="all", description="Type to flush: 'all', 'index', 'files', 'metrics'")
):
"""Flush cache entries for specified remote or all remotes"""
try:
result = {
"remote": remote,
"cache_type": cache_type,
"flushed": {
"redis_keys": 0,
"s3_objects": 0,
"operations": []
}
}
# Flush Redis entries based on cache_type
if cache_type in ["all", "index", "metrics"] and cache.available and cache.client:
patterns = []
if cache_type in ["all", "index"]:
if remote:
patterns.append(f"index:{remote}:*")
else:
patterns.append("index:*")
if cache_type in ["all", "metrics"]:
if remote:
patterns.append(f"metrics:*:{remote}")
else:
patterns.append("metrics:*")
for pattern in patterns:
keys = cache.client.keys(pattern)
if keys:
cache.client.delete(*keys)
result["flushed"]["redis_keys"] += len(keys)
logger.info(f"Cache flush: Deleted {len(keys)} Redis keys matching '{pattern}'")
if result["flushed"]["redis_keys"] > 0:
result["flushed"]["operations"].append(f"Deleted {result['flushed']['redis_keys']} Redis keys")
# Flush S3 objects if requested
if cache_type in ["all", "files"]:
try:
# Use prefix filtering for remote-specific deletion
list_params = {"Bucket": storage.bucket}
if remote:
list_params["Prefix"] = f"{remote}/"
response = storage.client.list_objects_v2(**list_params)
if 'Contents' in response:
objects_to_delete = [obj['Key'] for obj in response['Contents']]
for key in objects_to_delete:
try:
storage.client.delete_object(Bucket=storage.bucket, Key=key)
result["flushed"]["s3_objects"] += 1
except Exception as e:
logger.warning(f"Failed to delete S3 object {key}: {e}")
if objects_to_delete:
scope = f" for remote '{remote}'" if remote else ""
result["flushed"]["operations"].append(f"Deleted {len(objects_to_delete)} S3 objects{scope}")
logger.info(f"Cache flush: Deleted {len(objects_to_delete)} S3 objects{scope}")
except Exception as e:
result["flushed"]["operations"].append(f"S3 flush failed: {str(e)}")
logger.error(f"Cache flush S3 error: {e}")
if not result["flushed"]["operations"]:
result["flushed"]["operations"].append("No cache entries found to flush")
return result
except Exception as e:
logger.error(f"Cache flush error: {e}")
raise HTTPException(status_code=500, detail=f"Cache flush failed: {str(e)}")
async def construct_remote_url(remote_name: str, path: str) -> str: async def construct_remote_url(remote_name: str, path: str) -> str:
remote_config = config.get_remote_config(remote_name) remote_config = config.get_remote_config(remote_name)
if not remote_config: if not remote_config:
@@ -166,12 +68,6 @@ async def construct_remote_url(remote_name: str, path: str) -> str:
status_code=500, detail=f"No base_url configured for remote '{remote_name}'" status_code=500, detail=f"No base_url configured for remote '{remote_name}'"
) )
# Handle Docker registry URLs
if remote_config.get("package") == "docker":
# Convert Docker paths to v2 API format
# e.g., library/nginx/manifests/latest -> v2/library/nginx/manifests/latest
return f"{base_url}/v2/{path}"
return f"{base_url}/{path}" return f"{base_url}/{path}"
@@ -202,11 +98,13 @@ async def check_artifact_patterns(
async def cache_single_artifact(url: str, remote_name: str, path: str) -> dict: async def cache_single_artifact(url: str, remote_name: str, path: str) -> dict:
# Use hierarchical path-based key # Check if using URL-based key or path-based key
key = storage.get_object_key(remote_name, path) if url.startswith("http"):
key = storage.get_object_key(url)
else:
key = storage.get_object_key_from_path(remote_name, path)
if storage.exists(key): if storage.exists(key):
logger.info(f"Cache ALREADY EXISTS: {url} (key: {key})")
return { return {
"url": url, "url": url,
"cached_url": storage.get_url(key), "cached_url": storage.get_url(key),
@@ -214,41 +112,12 @@ async def cache_single_artifact(url: str, remote_name: str, path: str) -> dict:
} }
try: try:
remote_config = config.get_remote_config(remote_name) or {}
is_docker = remote_config.get("package") == "docker" or "/v2/" in url
# Prepare headers for Docker registry requests
headers = {}
if is_docker:
if "/manifests/" in url:
headers["Accept"] = (
"application/vnd.docker.distribution.manifest.v2+json,"
"application/vnd.oci.image.manifest.v1+json,"
"application/vnd.oci.image.index.v1+json,"
"application/vnd.docker.distribution.manifest.list.v2+json"
)
elif "/blobs/" in url:
headers["Accept"] = "application/octet-stream"
async with httpx.AsyncClient(follow_redirects=True) as client: async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(url, headers=headers) response = await client.get(url)
# Handle Docker Bearer token challenge
if response.status_code == 401 and is_docker:
www_auth = response.headers.get("WWW-Authenticate", "")
username = remote_config.get("username")
password = remote_config.get("password")
token = await get_docker_token_for_response(www_auth, username, password)
if token:
headers["Authorization"] = f"Bearer {token}"
response = await client.get(url, headers=headers)
response.raise_for_status() response.raise_for_status()
storage_path = storage.upload(key, response.content) storage_path = storage.upload(key, response.content)
logger.info(f"Cache ADD SUCCESS: {url} (size: {len(response.content)} bytes, key: {key})")
return { return {
"url": url, "url": url,
"cached_url": storage.get_url(key), "cached_url": storage.get_url(key),
@@ -304,7 +173,6 @@ async def get_artifact(remote_name: str, path: str):
# Check if artifact matches configured patterns # Check if artifact matches configured patterns
if not await check_artifact_patterns(remote_name, repo_path, file_path, path): if not await check_artifact_patterns(remote_name, repo_path, file_path, path):
logger.info(f"PATTERN BLOCKED: {remote_name}/{path} - not matching include patterns")
raise HTTPException( raise HTTPException(
status_code=403, detail="Artifact not allowed by configuration patterns" status_code=403, detail="Artifact not allowed by configuration patterns"
) )
@@ -312,20 +180,24 @@ async def get_artifact(remote_name: str, path: str):
# Construct the remote URL # Construct the remote URL
remote_url = await construct_remote_url(remote_name, path) remote_url = await construct_remote_url(remote_name, path)
# Check if artifact is already cached # Check if artifact is already cached (try both URL and path-based keys)
cached_key = storage.get_object_key(remote_name, path) url_key = storage.get_object_key(remote_url)
if not storage.exists(cached_key): path_key = storage.get_object_key_from_path(remote_name, path)
cached_key = None
cached_key = None
if storage.exists(url_key):
cached_key = url_key
elif storage.exists(path_key):
cached_key = path_key
# For index files, check Redis TTL validity # For index files, check Redis TTL validity
filename = os.path.basename(path) filename = os.path.basename(path)
is_index = cache.is_index_file(path) # Check full path, not just filename is_index = cache.is_index_file(filename)
if cached_key and is_index: if cached_key and is_index:
# Index file exists, but check if it's still valid # Index file exists, but check if it's still valid
if not cache.is_index_valid(remote_name, path): if not cache.is_index_valid(remote_name, path):
# Index has expired, remove it from S3 # Index has expired, remove it from S3
logger.info(f"Index EXPIRED: {remote_name}/{path} - removing from cache")
cache.cleanup_expired_index(storage, remote_name, path) cache.cleanup_expired_index(storage, remote_name, path)
cached_key = None # Force re-download cached_key = None # Force re-download
@@ -335,9 +207,6 @@ async def get_artifact(remote_name: str, path: str):
artifact_data = storage.download_object(cached_key) artifact_data = storage.download_object(cached_key)
filename = os.path.basename(path) filename = os.path.basename(path)
# Log cache hit
logger.info(f"Cache HIT: {remote_name}/{path} (size: {len(artifact_data)} bytes, key: {cached_key})")
# Determine content type based on file extension # Determine content type based on file extension
content_type = "application/octet-stream" content_type = "application/octet-stream"
if filename.endswith(".tar.gz"): if filename.endswith(".tar.gz"):
@@ -376,11 +245,9 @@ async def get_artifact(remote_name: str, path: str):
) )
# Artifact not cached, cache it first # Artifact not cached, cache it first
logger.info(f"Cache MISS: {remote_name}/{path} - fetching from remote: {remote_url}")
result = await cache_single_artifact(remote_url, remote_name, path) result = await cache_single_artifact(remote_url, remote_name, path)
if result["status"] == "error": if result["status"] == "error":
logger.error(f"Cache ADD FAILED: {remote_name}/{path} - {result['error']}")
raise HTTPException( raise HTTPException(
status_code=502, detail=f"Failed to fetch artifact: {result['error']}" status_code=502, detail=f"Failed to fetch artifact: {result['error']}"
) )
@@ -391,11 +258,10 @@ async def get_artifact(remote_name: str, path: str):
cache_config = config.get_cache_config(remote_name) cache_config = config.get_cache_config(remote_name)
index_ttl = cache_config.get("index_ttl", 300) # Default 5 minutes index_ttl = cache_config.get("index_ttl", 300) # Default 5 minutes
cache.mark_index_cached(remote_name, path, index_ttl) cache.mark_index_cached(remote_name, path, index_ttl)
logger.info(f"Index file cached with TTL: {remote_name}/{path} (ttl: {index_ttl}s)")
# Now return the cached artifact # Now return the cached artifact
try: try:
cache_key = storage.get_object_key(remote_name, path) cache_key = storage.get_object_key(remote_url)
artifact_data = storage.download_object(cache_key) artifact_data = storage.download_object(cache_key)
filename = os.path.basename(path) filename = os.path.basename(path)
@@ -417,7 +283,7 @@ async def get_artifact(remote_name: str, path: str):
metrics.record_cache_miss(remote_name, len(artifact_data)) metrics.record_cache_miss(remote_name, len(artifact_data))
# Record artifact mapping in database # Record artifact mapping in database
cache_key = storage.get_object_key(remote_name, path) cache_key = storage.get_object_key(remote_url)
database.record_artifact_mapping( database.record_artifact_mapping(
cache_key, remote_name, path, len(artifact_data) cache_key, remote_name, path, len(artifact_data)
) )
@@ -435,88 +301,6 @@ async def get_artifact(remote_name: str, path: str):
raise HTTPException(status_code=500, detail=f"Error serving artifact: {str(e)}") raise HTTPException(status_code=500, detail=f"Error serving artifact: {str(e)}")
@app.get("/v2/")
async def docker_v2_ping():
return Response(
content="{}",
media_type="application/json",
headers={"Docker-Distribution-Api-Version": "registry/2.0"},
)
@app.api_route("/v2/{remote_name}/{path:path}", methods=["GET", "HEAD"])
async def docker_v2_proxy(request: Request, remote_name: str, path: str):
remote_config = config.get_remote_config(remote_name)
if not remote_config:
raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured")
if remote_config.get("package") != "docker":
raise HTTPException(status_code=400, detail=f"Remote '{remote_name}' is not a docker remote")
# Check include_patterns against the image name (e.g. "library/nginx")
patterns = config.get_repository_patterns(remote_name, "")
if patterns:
path_parts = path.split("/")
image_name = "/".join(path_parts[:2]) if len(path_parts) >= 2 else path
if not any(re.search(p, path) or re.search(p, image_name) for p in patterns):
logger.info(f"PATTERN BLOCKED: {remote_name}/{path}")
raise HTTPException(status_code=403, detail="Image not allowed by configuration patterns")
remote_url = await construct_remote_url(remote_name, path)
cached_key = storage.get_object_key(remote_name, path)
if not storage.exists(cached_key):
cached_key = None
is_index = cache.is_index_file(path)
if cached_key and is_index:
if not cache.is_index_valid(remote_name, path):
logger.info(f"Index EXPIRED: {remote_name}/{path} - removing from cache")
cache.cleanup_expired_index(storage, remote_name, path)
cached_key = None
if not cached_key:
logger.info(f"Cache MISS: {remote_name}/{path} - fetching from remote: {remote_url}")
result = await cache_single_artifact(remote_url, remote_name, path)
if result["status"] == "error":
raise HTTPException(status_code=502, detail=f"Failed to fetch: {result['error']}")
if result["status"] == "cached" and is_index:
cache_config = config.get_cache_config(remote_name)
index_ttl = cache_config.get("index_ttl", 300)
cache.mark_index_cached(remote_name, path, index_ttl)
logger.info(f"Index file cached with TTL: {remote_name}/{path} (ttl: {index_ttl}s)")
artifact_data = storage.download_object(storage.get_object_key(remote_name, path))
is_blob = "/blobs/" in path
if is_blob:
content_type = "application/octet-stream"
else:
try:
manifest_json = json.loads(artifact_data)
content_type = manifest_json.get("mediaType")
if not content_type:
if "manifests" in manifest_json:
content_type = "application/vnd.oci.image.index.v1+json"
else:
content_type = "application/vnd.oci.image.manifest.v1+json"
except Exception:
content_type = "application/vnd.oci.image.manifest.v1+json"
digest = f"sha256:{hashlib.sha256(artifact_data).hexdigest()}"
headers = {
"Docker-Distribution-Api-Version": "registry/2.0",
"Docker-Content-Digest": digest,
"Content-Length": str(len(artifact_data)),
}
if request.method == "HEAD":
return Response(status_code=200, headers=headers, media_type=content_type)
metrics.record_cache_hit(remote_name, len(artifact_data))
return Response(content=artifact_data, media_type=content_type, headers=headers)
async def discover_artifacts(remote: str, include_pattern: str) -> list[str]: async def discover_artifacts(remote: str, include_pattern: str) -> list[str]:
if "github.com" in remote: if "github.com" in remote:
return await discover_github_releases(remote, include_pattern) return await discover_github_releases(remote, include_pattern)
@@ -747,11 +531,7 @@ async def list_cached_artifacts(
cached_artifacts = [] cached_artifacts = []
for url in matching_urls: for url in matching_urls:
# Extract path from URL for hierarchical key generation key = storage.get_object_key(url)
from urllib.parse import urlparse
parsed = urlparse(url)
path = parsed.path
key = storage.get_object_key(remote, path)
if storage.exists(key): if storage.exists(key):
cached_artifacts.append( cached_artifacts.append(
{"url": url, "cached_url": storage.get_url(key), "key": key} {"url": url, "cached_url": storage.get_url(key), "key": key}
+11 -20
View File
@@ -1,5 +1,6 @@
import os import os
import hashlib import hashlib
from urllib.parse import urlparse
import boto3 import boto3
from botocore.config import Config from botocore.config import Config
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
@@ -54,27 +55,17 @@ class S3Storage:
except ClientError: except ClientError:
self.client.create_bucket(Bucket=self.bucket) self.client.create_bucket(Bucket=self.bucket)
def get_object_key(self, remote_name: str, path: str) -> str: def get_object_key(self, url: str) -> str:
# Extract directory path and filename url_hash = hashlib.sha256(url.encode()).hexdigest()[:16]
clean_path = path.lstrip('/') parsed = urlparse(url)
filename = os.path.basename(clean_path) filename = os.path.basename(parsed.path)
directory_path = os.path.dirname(clean_path) return f"{parsed.netloc}/{url_hash}/{filename}"
# Special handling for Docker registry blobs (use digest as key for deduplication) def get_object_key_from_path(self, remote_name: str, path: str) -> str:
if "/blobs/sha256:" in clean_path: # Create a key based on the API path for direct access
# Extract the SHA256 digest for Docker blobs path_hash = hashlib.sha256(path.encode()).hexdigest()[:16]
parts = clean_path.split("/blobs/sha256:") filename = os.path.basename(path)
if len(parts) == 2: return f"{remote_name}/{path_hash}/{filename}"
digest = parts[1]
return f"{remote_name}/blobs/sha256/{digest}"
# Hash the directory path to keep keys manageable while preserving remote structure
if directory_path:
path_hash = hashlib.sha256(directory_path.encode()).hexdigest()[:16]
return f"{remote_name}/{path_hash}/{filename}"
else:
# If no directory, just use remote and filename
return f"{remote_name}/{filename}"
def exists(self, key: str) -> bool: def exists(self, key: str) -> bool:
try: try: