Compare commits

..

13 Commits

Author SHA1 Message Date
unkinben 2887ce4476 Merge pull request 'build: align Dockerfile with packer build and add docker-compose dev mounts' (#12) from benvin/packer-aligned-dockerfile into master
ci/woodpecker/tag/docker Pipeline was successful
Reviewed-on: #12
2026-04-25 22:23:59 +10:00
unkinben 9e52929d73 build: align Dockerfile with packer build and add docker-compose dev mounts
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/pr/test Pipeline was successful
ci/woodpecker/pr/build Pipeline was successful
- Rebase Dockerfile onto almalinux9-base, install via uv tool install
- Remove dev artifacts (remotes.yaml, ca-bundle.pem) from image
- Mount gitignored dev files via docker-compose volumes instead
- Add .dockerignore to keep secrets out of build context
- Track docker-compose.yml in git (no secrets; dev files mounted as volumes)
2026-04-25 22:17:36 +10:00
unkinben 788d469063 Merge pull request 'benvin/configurable-index-patterns' (#11) from benvin/configurable-index-patterns into master
ci/woodpecker/tag/docker Pipeline failed
Reviewed-on: #11
2026-04-25 21:04:25 +10:00
unkinben 1cbe836f1b ci: add Woodpecker pipelines for pre-commit, tests, and Docker build
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/pr/test Pipeline was successful
2026-04-25 21:02:39 +10:00
unkinben f3394b9ca6 docs: add RKE2 image rewriting guide and expand pattern examples
Add a new "Docker Image Rewriting with RKE2" section covering:
- How the /v2/ proxy integrates with registries.yaml mirror rewrites
- Per-registry examples (docker.io, ghcr.io, registry.k8s.io, quay.io)
- include_patterns for restricting which images are cached
- TLS CA configuration for private certificate authorities
- Apply and verification commands

Expand the Configuration section with:
- Richer include_patterns examples (anchored, extension, architecture,
  Docker image name patterns, repodata directories)
- New index_patterns section explaining built-in defaults per package
  type and how to add custom patterns (Helm index.yaml, APT InRelease/
  Packages.gz, extra RPM comps.xml)
2026-04-25 20:20:42 +10:00
unkinben 8da43e610e tests: resolve all peer-review issues across test suite
Address every substantive critique from the peer review:

test_cache: replace tautological same-inputs key test with hardcoded
hash assertion; assert setex call + TTL in mark_index_cached test;
assert client is None for unavailable no-op; rename Packages.gz test
to document intentional behaviour; add alpine sig/tmp negatives; add
hyphenated and date-tag docker positive cases; add key hash-length
assertion.

test_config: replace live-constant comparisons with literal string
assertions for alpine/rpm/docker; add unknown package type test;
add dict-keyed repositories branch coverage (per-repo override and
fallback); fix cache config to full equality check; add explicit empty
index_patterns test.

test_docker_auth: fix case-insensitive test to verify realm value;
add field-order (scope before service) limitation test; add pipe-char
collision documentation test; add missing fetch_token edge cases
(no token field, HTTPStatusError, missing expires_in default 300);
replace rubber-stamp delegate test with end-to-end parse→fetch test.

test_storage: replace split prefix/suffix assertions with structural
3-part check + pinned sha256 assertion; fix Docker blob digests to
64-char hex; add secure=True URL test; add upload return value test;
add download_object 404-on-ClientError test; remove redundant subset
test.

test_routes: add metrics.record_cache_hit/miss assertions; add
mark_index_cached assertion after cache miss on index (docker + generic);
add Content-Disposition, X-Artifact-Size header checks; add rpm/xml
content-type tests; add flush test that verifies Redis keys are deleted
when cache is available; add smoke coverage for upload (PUT), HEAD, DELETE,
/metrics, and /config routes.
2026-04-25 19:58:33 +10:00
unkinben 3a13d76f7e chore: add .tox, .pytest_cache, .pre-commit-cache, .ruff_cache to .gitignore 2026-04-25 19:21:43 +10:00
unkinben 2d0e2c64e6 feat: add test suite, tox, pre-commit, and ruff formatting
- tests/: 107 unit tests across config, cache, docker_auth, storage,
  and FastAPI routes; all passing under pytest-asyncio auto mode
- tox.ini: runs pytest via uvx --with tox-uv tox (py311)
- .pre-commit-config.yaml: ruff lint + ruff-format at v0.15.12
- pyproject.toml: pytest config (asyncio_mode=auto), ruff config
  (line-length=140), tox/pre-commit added to dev extras
- Makefile: test/tox/pre-commit targets via uvx --python 3.11
- Source files reformatted by ruff-format (no logic changes)
2026-04-25 19:21:05 +10:00
unkinben 2414ddfdd3 feat: make index file patterns configurable per remote
Replace hardcoded is_index_file logic with regex patterns driven by
remotes.yaml. Package-level defaults (alpine/rpm/docker) are merged with
any extra patterns listed under index_patterns in the remote config.
2026-04-25 18:40:45 +10:00
unkinben b3d12f4962 docs: add SPEC.md with repository model and caching requirements 2026-04-25 18:31:27 +10:00
unkinben 92b9f9a03e refactor: use package: docker instead of type: docker
Align with intended type=local|remote|virtual / package=docker|rpm|alpine|generic
model. All docker-specific logic now keyed on package field; type field
correctly reflects the repository kind (remote vs local).
2026-04-25 18:27:31 +10:00
unkinben 7930023de8 Merge pull request 'feat: enforce include_patterns on docker /v2/ proxy route' (#10) from benvin/docker-include-patterns into master
Reviewed-on: #10
2026-04-25 18:14:50 +10:00
unkinben 869a1f8c02 feat: enforce include_patterns on docker /v2/ proxy route
Adds pattern checking to docker_v2_proxy before any upstream fetch.
Patterns match against the full path and the image name (first two
path segments), bypassing the index-file exemption that check_artifact_patterns
applies — so restrictions apply equally to manifests, blobs, and tag lists.
Returns 403 when no pattern matches, consistent with the non-docker route.
2026-04-25 18:09:12 +10:00
28 changed files with 2384 additions and 296 deletions
+15
View File
@@ -0,0 +1,15 @@
.git/
.venv/
dist/
tests/
remotes.yaml
ca-bundle.pem
.env
*.log
docker-compose.yml
.woodpecker/
.tox/
.ruff_cache/
.pytest_cache/
.pre-commit-cache/
minio_data/
+12 -1
View File
@@ -43,9 +43,20 @@ remotes.yaml
# uv
uv.lock
# tox
.tox/
# pytest
.pytest_cache/
# pre-commit
.pre-commit-cache/
# ruff
.ruff_cache/
# Docker volumes
minio_data/
# Local configuration overrides
docker-compose.yml
ca-bundle.pem
+7
View File
@@ -0,0 +1,7 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.15.12
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
- id: ruff-format
+9
View File
@@ -0,0 +1,9 @@
when:
- event: pull_request
steps:
- name: docker-build
image: woodpeckerci/plugin-docker-buildx
settings:
repo: git.unkin.net/unkin/artifactapi
dry_run: true
+18
View File
@@ -0,0 +1,18 @@
when:
- event: tag
ref: refs/tags/v*
steps:
- name: docker
image: woodpeckerci/plugin-docker-buildx
settings:
registry: git.unkin.net
repo: git.unkin.net/unkin/artifactapi
username: droneci
password:
from_secret: DRONECI_PASSWORD
tags:
- ${CI_COMMIT_TAG}
- latest
build_args:
- VERSION=${CI_COMMIT_TAG##v}
+9
View File
@@ -0,0 +1,9 @@
when:
- event: pull_request
steps:
- name: pre-commit
image: git.unkin.net/unkin/almalinux9-base:20260308
commands:
- uvx pre-commit run --all-files
+8
View File
@@ -0,0 +1,8 @@
when:
- event: pull_request
steps:
- name: test
image: git.unkin.net/unkin/almalinux9-base:20260308
commands:
- uvx --python 3.11 --with tox-uv tox
+15 -45
View File
@@ -1,53 +1,23 @@
# Use Alpine Linux as base image
FROM python:3.11-alpine
FROM git.unkin.net/unkin/almalinux9-base:latest
# Set working directory
WORKDIR /app
ARG VERSION=0.0.0.dev0
# Install system dependencies
RUN apk add --no-cache \
gcc \
musl-dev \
libffi-dev \
postgresql-dev \
curl \
wget \
tar
COPY . /build
# Install uv
ARG PACKAGE_VERSION=0.9.21
RUN wget -O /app/uv-x86_64-unknown-linux-musl.tar.gz https://github.com/astral-sh/uv/releases/download/${PACKAGE_VERSION}/uv-x86_64-unknown-linux-musl.tar.gz && \
tar xf /app/uv-x86_64-unknown-linux-musl.tar.gz -C /app && \
mv /app/uv-x86_64-unknown-linux-musl/uv /usr/local/bin/uv && \
rm -rf /app/uv-x86_64-unknown-linux-musl* && \
chmod +x /usr/local/bin/uv && \
uv --version
RUN HATCH_VCS_PRETEND_VERSION=${VERSION} \
SETUPTOOLS_SCM_PRETEND_VERSION=${VERSION} \
uv build --wheel --directory /build && \
useradd -m -r -s /bin/sh appuser
# Create non-root user first
RUN adduser -D -s /bin/sh appuser && \
chown -R appuser:appuser /app
# Copy dependency files and change ownership
COPY --chown=appuser:appuser pyproject.toml uv.lock README.md ./
# Switch to appuser and install Python dependencies
USER appuser
ARG VERSION=dev
ENV HATCH_VCS_PRETEND_VERSION=${VERSION} \
SETUPTOOLS_SCM_PRETEND_VERSION=${VERSION}
RUN uv sync --frozen
RUN uv tool install --from /build/dist/*.whl artifactapi
# Copy application source
COPY --chown=appuser:appuser src/ ./src/
COPY --chown=appuser:appuser remotes.yaml ./
COPY --chown=appuser:appuser ca-bundle.pem ./
USER root
RUN rm -rf /build
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run the application
CMD ["uv", "run", "python", "-m", "src.artifactapi.main"]
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 CMD curl -f http://localhost:8000/health || exit 1
USER appuser
ENV PATH="/home/appuser/.local/bin:$PATH"
WORKDIR /app
CMD ["artifactapi"]
+9 -3
View File
@@ -1,7 +1,7 @@
.PHONY: build install dev clean test lint format docker-build docker-up docker-down docker-logs docker-rebuild docker-clean docker-restart
.PHONY: build install dev clean test lint format pre-commit tox docker-build docker-up docker-down docker-logs docker-rebuild docker-clean docker-restart
build:
docker build --no-cache -t artifactapi:latest .
docker build -t artifactapi:dev .
install: build
@@ -17,7 +17,13 @@ clean:
rm -rf *.egg-info/
test:
uv run pytest
uvx --python 3.11 --with tox-uv tox
tox:
uvx --python 3.11 --with tox-uv tox
pre-commit:
uvx --python 3.11 pre-commit run --all-files
lint:
uv run ruff check --fix .
+275 -10
View File
@@ -161,27 +161,101 @@ remotes:
### Include Patterns
Include patterns are regular expressions that control which files can be accessed:
Include patterns are regular expressions that control which files can be accessed. Patterns use Python `re.search`, so they match anywhere in the path unless anchored with `^` or `$`. Only files matching at least one pattern are served; all others return HTTP 403.
```yaml
include_patterns:
# Specific project patterns
# Exact project + architecture — most restrictive
- "^gruntwork-io/terragrunt/releases/download/.*/terragrunt_linux_amd64$"
# Any release asset for a project, any version
- "gruntwork-io/terragrunt/.*terragrunt_linux_amd64.*"
# File extension patterns
# File extension only — allow all files of a given type from any path
- ".*\\.tar\\.gz$"
- ".*\\.zip$"
- ".*\\.rpm$"
- ".*\\.zip$"
# Architecture-specific patterns
# Architecture subtree — allow everything under x86_64/
- ".*/x86_64/.*"
- ".*/linux-amd64/.*"
# Version-specific patterns
- "prometheus/node_exporter/.*/node_exporter-.*\\.linux-amd64\\.tar\\.gz$"
# Combined: architecture + extension
- ".*/x86_64/.*\\.rpm$"
- ".*/noarch/.*\\.rpm$"
# Docker image names (used with package: docker remotes)
- "^library/nginx" # nginx official images only
- "^rancher/" # all rancher/* images
- "^rancher/rke2-runtime" # specific image
# Repodata directories — allow all metadata for an RPM repo
- ".*/repodata/.*$"
```
**Security Note**: Only files matching at least one include pattern are accessible. Files not matching any pattern return HTTP 403.
**Security note**: Omitting `include_patterns` entirely allows all files from that remote. Index files (e.g. `APKINDEX.tar.gz`, `repomd.xml`, tag manifests) always bypass pattern enforcement — they are served unconditionally so clients can discover available packages.
### Index Patterns
Index patterns identify repository metadata files. Index files get special treatment:
- **Always served** regardless of `include_patterns`
- **Cached with `index_ttl`** instead of `file_ttl`
- **Automatically refreshed** when the TTL expires — the cached copy is evicted and re-fetched on next request
Built-in defaults per package type:
| Package type | Built-in index patterns |
|---|---|
| `alpine` | `APKINDEX\.tar\.gz$` |
| `rpm` | `repomd\.xml$`, `repodata/` metadata (xml, sqlite, yaml, asc, txt variants), `Packages\.gz$` |
| `docker` | Tag manifests (non-digest refs), `/tags/list` |
| `generic` | *(none)* |
Use `index_patterns` to add extra patterns on top of the defaults. Duplicates are ignored automatically.
```yaml
remotes:
helm-charts:
base_url: "https://charts.example.com"
type: "remote"
package: "generic"
include_patterns:
- ".*\\.tgz$" # chart archives
index_patterns:
- "index\\.yaml$" # Helm repo index — re-fetched on every TTL expiry
cache:
file_ttl: 0
index_ttl: 600 # re-check the index every 10 minutes
apt-mirror:
base_url: "https://apt.example.com"
type: "remote"
package: "generic"
include_patterns:
- ".*\\.deb$"
index_patterns:
- "InRelease$" # signed APT release file
- "Release$" # unsigned APT release file
- "Packages\\.gz$" # compressed package list
- "Packages\\.xz$"
cache:
file_ttl: 0
index_ttl: 3600 # hourly index refresh
almalinux-with-extras:
base_url: "https://mirror.example.com/almalinux"
type: "remote"
package: "rpm" # inherits repomd.xml + repodata/* defaults
include_patterns:
- ".*/x86_64/.*\\.rpm$"
- ".*/noarch/.*\\.rpm$"
index_patterns:
- "comps\\.xml$" # optional group metadata (adds to rpm defaults)
cache:
file_ttl: 0
index_ttl: 7200
```
Pattern matching uses `re.search`, so `"index\\.yaml$"` matches `/stable/index.yaml` and `/index.yaml`. Anchor with `^` to restrict to the path root.
### Cache Configuration
@@ -661,4 +735,195 @@ curl "http://localhost:8000/api/github/gruntwork-io/terragrunt/releases/download
- Use external managed databases for production workloads
- Configure backup strategies for persistent volumes
- Set up proper TLS certificates for ingress
- Consider using StatefulSets for databases with persistent storage
- Consider using StatefulSets for databases with persistent storage
## Docker Image Rewriting with RKE2
RKE2 can route container image pulls through registry mirrors using `/etc/rancher/rke2/registries.yaml`. The artifact API implements the Docker Registry HTTP API v2 at `/v2/`, so it acts as a transparent caching mirror for any upstream registry.
### How it works
1. A pod requests `docker.io/library/nginx:latest`
2. RKE2 intercepts the pull and rewrites the image path using the `rewrite` rules
3. The rewritten request hits the artifact API (`/v2/dockerhub/library/nginx/manifests/latest`)
4. On first access the API fetches the manifest and layers from Docker Hub and caches them in S3
5. Subsequent pulls are served directly from cache, with no upstream traffic
### registries.yaml
Place this file on every RKE2 node at `/etc/rancher/rke2/registries.yaml`. The `rewrite` field maps the original image path (as the upstream registry sees it) to the path the artifact API expects under `/v2/{remote_name}/...`.
#### Docker Hub
Docker Hub resolves unqualified image names like `nginx` as `library/nginx`. The rewrite prepends the remote name so the request lands on the correct remote.
```yaml
# /etc/rancher/rke2/registries.yaml
mirrors:
docker.io:
endpoint:
- "https://artifacts.example.com"
rewrite:
"^(.*)$": "dockerhub/$1"
```
Corresponding `remotes.yaml` entry:
```yaml
remotes:
dockerhub:
base_url: "https://registry-1.docker.io"
type: "remote"
package: "docker"
username: "your-dockerhub-username"
password: "your-dockerhub-token" # PAT with read scope
cache:
file_ttl: 0
index_ttl: 300
```
A pull of `nginx:latest` becomes `/v2/dockerhub/library/nginx/manifests/latest` on the artifact API.
#### GitHub Container Registry (ghcr.io)
```yaml
mirrors:
ghcr.io:
endpoint:
- "https://artifacts.example.com"
rewrite:
"^(.*)$": "ghcr/$1"
```
```yaml
remotes:
ghcr:
base_url: "https://ghcr.io"
type: "remote"
package: "docker"
username: "your-github-username"
password: "ghp_your_github_pat" # read:packages scope required
cache:
file_ttl: 0
index_ttl: 300
```
A pull of `ghcr.io/rancher/rke2-runtime:v1.30.0-rke2r1` becomes `/v2/ghcr/rancher/rke2-runtime/manifests/v1.30.0-rke2r1`.
#### Multiple registries
```yaml
# /etc/rancher/rke2/registries.yaml
mirrors:
docker.io:
endpoint:
- "https://artifacts.example.com"
rewrite:
"^(.*)$": "dockerhub/$1"
ghcr.io:
endpoint:
- "https://artifacts.example.com"
rewrite:
"^(.*)$": "ghcr/$1"
registry.k8s.io:
endpoint:
- "https://artifacts.example.com"
rewrite:
"^(.*)$": "k8s-registry/$1"
quay.io:
endpoint:
- "https://artifacts.example.com"
rewrite:
"^(.*)$": "quay/$1"
```
Each entry needs a matching remote in `remotes.yaml` using the name from the rewrite target (e.g. `k8s-registry`, `quay`).
#### Restricting which images are cached
Use `include_patterns` on the remote to allow only specific images through the proxy. Requests for images not matching any pattern return HTTP 403 to the node.
```yaml
remotes:
dockerhub:
base_url: "https://registry-1.docker.io"
type: "remote"
package: "docker"
include_patterns:
- "^library/nginx" # official nginx only
- "^library/redis" # official redis only
- "^rancher/" # all rancher images
- "^grafana/grafana" # specific image
cache:
file_ttl: 0
index_ttl: 300
```
Omit `include_patterns` to allow all images from that registry.
#### TLS configuration
If the artifact API uses a private CA certificate, tell containerd about it in `registries.yaml`:
```yaml
mirrors:
docker.io:
endpoint:
- "https://artifacts.example.com"
rewrite:
"^(.*)$": "dockerhub/$1"
configs:
"artifacts.example.com":
tls:
ca_file: /etc/ssl/certs/internal-ca.crt
```
### Applying the configuration
```bash
# Write registries.yaml on each node (server and agent)
sudo mkdir -p /etc/rancher/rke2
sudo tee /etc/rancher/rke2/registries.yaml <<'EOF'
mirrors:
docker.io:
endpoint:
- "https://artifacts.example.com"
rewrite:
"^(.*)$": "dockerhub/$1"
ghcr.io:
endpoint:
- "https://artifacts.example.com"
rewrite:
"^(.*)$": "ghcr/$1"
EOF
# Restart the RKE2 service (server nodes)
sudo systemctl restart rke2-server
# Or on agent nodes
sudo systemctl restart rke2-agent
# Confirm containerd picked up the mirror config
sudo /var/lib/rancher/rke2/bin/crictl info | jq '.config.registry.mirrors'
```
### Verifying pulls go through the cache
```bash
# Pull an image on a node
sudo /var/lib/rancher/rke2/bin/crictl pull nginx:latest
# Check the artifact API received the request
kubectl logs deployment/artifactapi -n artifact-storage | grep "nginx"
# Expect: Cache MISS on first pull, Cache HIT on subsequent pulls
# Query the manifest endpoint directly — 200 means it's cached
curl -I https://artifacts.example.com/v2/dockerhub/library/nginx/manifests/latest
# Check what's stored in the cache
curl https://artifacts.example.com/ | jq '.remotes'
```
+137
View File
@@ -0,0 +1,137 @@
# ArtifactAPI Specification
## Repository model
Every repository entry in `remotes.yaml` has two orthogonal fields:
| field | values | meaning |
|---|---|---|
| `type` | `local`, `remote`, `virtual` | repository kind — how the repo is served |
| `package` | `docker`, `rpm`, `alpine`, `generic` | package format — what protocol and caching rules to apply |
**type**
- `local` — files are uploaded directly to the API and stored in S3; no upstream.
- `remote` — proxies and caches content from an upstream URL (`base_url`).
- `virtual` — aggregates multiple repositories (not yet implemented).
**package**
- `docker` — upstream speaks the OCI Distribution API (Bearer auth, manifest/blob paths).
- `rpm` — upstream is an RPM repository; repodata files are index files.
- `alpine` — upstream is an Alpine APK repository; `APKINDEX.tar.gz` is an index file.
- `generic` — plain HTTP file download; no format-specific logic.
---
## Caching
Two cache classes determine retention:
| class | stored | TTL |
|---|---|---|
| **file** | S3 object, no Redis entry | `file_ttl``0` means indefinite |
| **index** | S3 object + Redis TTL key | `index_ttl` — when the Redis key expires the S3 object is deleted and re-fetched |
Index files are mutable metadata that must expire. File-class objects are treated as immutable and cached indefinitely (unless `file_ttl` is set).
---
## Docker package rules
### URL construction
Remote URLs are prefixed with `/v2/` for `package: docker` remotes:
```
{base_url}/v2/{path}
```
e.g. `library/nginx/manifests/latest``https://registry-1.docker.io/v2/library/nginx/manifests/latest`
### Authentication
Docker registries use Bearer token challenges. On a `401 Unauthorized` response, the API:
1. Parses the `WWW-Authenticate: Bearer` header for `realm`, `service`, and `scope`.
2. Fetches a token from the auth realm, supplying `username`/`password` from the remote config if present.
3. Retries the request with `Authorization: Bearer <token>`.
Tokens are cached in-memory keyed by `(realm, service, scope, username)` and expire 30 seconds before their stated `expires_in`.
### Cache classification
| path pattern | mutable | class | TTL source |
|---|---|---|---|
| `/manifests/<tag>` | yes | index | `index_ttl` |
| `/tags/list` | yes | index | `index_ttl` |
| `/manifests/sha256:<digest>` | no | file | `file_ttl` |
| `/blobs/sha256:<digest>` | no | file | `file_ttl` |
Tag-based manifests and tag lists are mutable and cached as index. Digest-pinned manifests and blobs are content-addressed and cached indefinitely as files.
### Blob deduplication
Blobs are stored under a digest-keyed path shared across all images on the same remote:
```
{remote_name}/blobs/sha256/{digest}
```
The same layer pulled by different images is stored once.
### Accept headers
| path | `Accept` header sent upstream |
|---|---|
| `/manifests/…` | `application/vnd.docker.distribution.manifest.v2+json`, `application/vnd.oci.image.manifest.v1+json`, `application/vnd.oci.image.index.v1+json`, `application/vnd.docker.distribution.manifest.list.v2+json` |
| `/blobs/…` | `application/octet-stream` |
---
## OCI Distribution API endpoint
The API exposes a native Docker registry interface so clients can use `docker pull` directly:
```
GET /v2/ — version ping
GET /v2/{remote}/{image}/manifests/{ref} — fetch manifest
HEAD /v2/{remote}/{image}/manifests/{ref} — manifest metadata
GET /v2/{remote}/{image}/blobs/{digest} — fetch blob
HEAD /v2/{remote}/{image}/blobs/{digest} — blob metadata
```
Responses include `Docker-Distribution-Api-Version`, `Docker-Content-Digest`, and the correct OCI `Content-Type` (detected from the manifest `mediaType` field).
Only remotes with `package: docker` are accessible via this endpoint. All other remotes return `400`.
---
## include_patterns
`include_patterns` is a list of Python regexes applied to every request before any upstream fetch or cache lookup.
**Generic remotes (`/api/v1/remote/…`):**
- Patterns match against the file path and the full path.
- Index files (mutable metadata) bypass pattern checks and are always allowed.
**Docker remotes (`/v2/…`):**
- Patterns match against the image name (first two path segments, e.g. `library/nginx`) and the full path.
- The index-file exemption does **not** apply — patterns restrict whole images, including their manifests and tag lists.
- No patterns configured → all images allowed.
Returns `403` when a request is blocked.
---
## Versioning
The package version is derived from git tags via `hatch-vcs`. Tags follow the format `v{MAJOR}.{MINOR}.{PATCH}`.
Docker images are built with the version injected at build time:
```
SETUPTOOLS_SCM_PRETEND_VERSION=<version> uv sync --frozen
```
The `Makefile` provides `patch`, `minor`, and `major` targets that tag the current commit and rebuild the container image.
+91
View File
@@ -0,0 +1,91 @@
version: '3.8'
services:
artifactapi:
build:
context: .
dockerfile: Dockerfile
args:
- VERSION=dev
ports:
- "8000:8000"
volumes:
- ./remotes.yaml:/app/remotes.yaml:ro
- ./ca-bundle.pem:/app/ca-bundle.pem:ro
environment:
- CONFIG_PATH=/app/remotes.yaml
- DBHOST=postgres
- DBPORT=5432
- DBUSER=artifacts
- DBPASS=artifacts123
- DBNAME=artifacts
- REDIS_URL=redis://redis:6379
- MINIO_ENDPOINT=minio:9000
- MINIO_ACCESS_KEY=minioadmin
- MINIO_SECRET_KEY=minioadmin
- MINIO_BUCKET=artifacts
- MINIO_SECURE=false
- REQUESTS_CA_BUNDLE=/app/ca-bundle.pem
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
minio:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
minio:
image: minio/minio:latest
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ":9001"
volumes:
- minio_data:/data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --save 20 1
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 30s
timeout: 10s
retries: 3
postgres:
image: postgres:15-alpine
ports:
- "5432:5432"
environment:
POSTGRES_DB: artifacts
POSTGRES_USER: artifacts
POSTGRES_PASSWORD: artifacts123
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U artifacts -d artifacts"]
interval: 30s
timeout: 10s
retries: 3
volumes:
minio_data:
redis_data:
postgres_data:
+14 -1
View File
@@ -42,5 +42,18 @@ dev = [
"black>=23.9.0",
"isort>=5.12.0",
"mypy>=1.6.0",
"ruff>=0.1.0",
"ruff>=0.4.0",
"tox>=4.0.0",
"pre-commit>=3.0.0",
]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
[tool.ruff]
line-length = 140
[tool.ruff.lint]
select = ["E", "F", "I", "UP"]
ignore = ["E501"]
+12 -26
View File
@@ -1,5 +1,7 @@
import time
import hashlib
import re
import time
import redis
@@ -17,35 +19,17 @@ class RedisCache:
self.client = None
self.available = False
def is_index_file(self, file_path: str) -> bool:
"""Check if the file is an index file that should have TTL"""
return (
file_path.endswith("APKINDEX.tar.gz")
or file_path.endswith("Packages.gz")
or file_path.endswith("repomd.xml")
or ("repodata/" in file_path
and file_path.endswith((
".xml", ".xml.gz", ".xml.bz2", ".xml.xz", ".xml.zck", ".xml.zst",
".sqlite", ".sqlite.gz", ".sqlite.bz2", ".sqlite.xz", ".sqlite.zck", ".sqlite.zst",
".yaml.xz", ".yaml.gz", ".yaml.bz2", ".yaml.zst",
".asc", ".txt"
)))
# Docker tag-based manifests are mutable (index); digest-pinned are immutable (file)
or (
"/manifests/" in file_path
and not file_path.split("/manifests/", 1)[1].startswith("sha256:")
)
or "/tags/list" in file_path
or file_path.endswith("/tags/list")
)
def is_index_file(self, file_path: str, patterns: list[str] | None = None) -> bool:
"""Return True if file_path matches any of the index patterns."""
if patterns is None:
patterns = []
return any(re.search(p, file_path) for p in patterns)
def get_index_cache_key(self, remote_name: str, path: str) -> str:
"""Generate cache key for index files"""
return f"index:{remote_name}:{hashlib.sha256(path.encode()).hexdigest()[:16]}"
def is_index_valid(
self, remote_name: str, path: str, ttl_override: int = None
) -> bool:
def is_index_valid(self, remote_name: str, path: str, ttl_override: int = None) -> bool:
"""Check if index file is still valid (not expired)"""
if not self.available:
return False
@@ -74,8 +58,10 @@ class RedisCache:
try:
# Construct the URL the same way as in the main flow
from .config import ConfigManager
import os
from .config import ConfigManager
config_path = os.environ.get("CONFIG_PATH")
if config_path:
config = ConfigManager(config_path)
+43 -11
View File
@@ -1,7 +1,25 @@
import os
import json
import os
import yaml
from typing import Optional
_PACKAGE_INDEX_PATTERNS: dict[str, list[str]] = {
"alpine": [
r"APKINDEX\.tar\.gz$",
],
"rpm": [
r"repomd\.xml$",
r"repodata/.*\.(xml|xml\.gz|xml\.bz2|xml\.xz|xml\.zck|xml\.zst"
r"|sqlite|sqlite\.gz|sqlite\.bz2|sqlite\.xz|sqlite\.zck|sqlite\.zst"
r"|yaml\.xz|yaml\.gz|yaml\.bz2|yaml\.zst|asc|txt)$",
r"Packages\.gz$",
],
"docker": [
r"/manifests/(?!sha256:)[^/]+$",
r"/tags/list$",
],
"generic": [],
}
class ConfigManager:
@@ -12,10 +30,8 @@ class ConfigManager:
def _load_config(self) -> dict:
try:
with open(self.config_file, "r") as f:
if self.config_file.endswith(".yaml") or self.config_file.endswith(
".yml"
):
with open(self.config_file) as f:
if self.config_file.endswith(".yaml") or self.config_file.endswith(".yml"):
return yaml.safe_load(f)
else:
return json.load(f)
@@ -35,7 +51,7 @@ class ConfigManager:
except OSError:
pass
def get_remote_config(self, remote_name: str) -> Optional[dict]:
def get_remote_config(self, remote_name: str) -> dict | None:
self._check_reload()
return self.config.get("remotes", {}).get(remote_name)
@@ -92,9 +108,7 @@ class ConfigManager:
if not redis_url:
raise ValueError("REDIS_URL environment variable is required")
return {
"url": redis_url
}
return {"url": redis_url}
def get_database_config(self) -> dict:
"""Get database configuration from environment variables"""
@@ -105,12 +119,30 @@ class ConfigManager:
db_name = os.getenv("DBNAME")
if not all([db_host, db_port, db_user, db_pass, db_name]):
missing = [var for var, val in [("DBHOST", db_host), ("DBPORT", db_port), ("DBUSER", db_user), ("DBPASS", db_pass), ("DBNAME", db_name)] if not val]
missing = [
var
for var, val in [("DBHOST", db_host), ("DBPORT", db_port), ("DBUSER", db_user), ("DBPASS", db_pass), ("DBNAME", db_name)]
if not val
]
raise ValueError(f"All database environment variables are required: {', '.join(missing)}")
db_url = f"postgresql://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}"
return {"url": db_url}
def get_index_patterns(self, remote_name: str) -> list[str]:
"""Return index-file patterns for a remote.
Merges the package-level defaults with any extra patterns listed under
``index_patterns`` in the remote's config.
"""
remote_config = self.get_remote_config(remote_name)
if not remote_config:
return []
package = remote_config.get("package", "generic")
defaults = _PACKAGE_INDEX_PATTERNS.get(package, [])
extra = remote_config.get("index_patterns", [])
return defaults + [p for p in extra if p not in defaults]
def get_cache_config(self, remote_name: str) -> dict:
"""Get cache configuration for a specific remote"""
remote_config = self.get_remote_config(remote_name)
+6 -18
View File
@@ -1,5 +1,3 @@
import os
from typing import Optional
import psycopg2
from psycopg2.extras import RealDictCursor
@@ -54,25 +52,15 @@ class DatabaseManager:
""")
# Create indexes separately
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_s3_key ON artifact_mappings (s3_key)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_remote_name ON artifact_mappings (remote_name)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_local_repo_path ON local_files (repository_name, file_path)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_local_s3_key ON local_files (s3_key)"
)
cursor.execute("CREATE INDEX IF NOT EXISTS idx_s3_key ON artifact_mappings (s3_key)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_remote_name ON artifact_mappings (remote_name)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_local_repo_path ON local_files (repository_name, file_path)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_local_s3_key ON local_files (s3_key)")
print("Database schema initialized")
except Exception as e:
print(f"Error creating schema: {e}")
def record_artifact_mapping(
self, s3_key: str, remote_name: str, file_path: str, size_bytes: int
):
def record_artifact_mapping(self, s3_key: str, remote_name: str, file_path: str, size_bytes: int):
"""Record mapping between S3 key and remote"""
if not self.available:
return
@@ -112,7 +100,7 @@ class DatabaseManager:
print(f"Error getting storage by remote: {e}")
return {}
def get_remote_for_s3_key(self, s3_key: str) -> Optional[str]:
def get_remote_for_s3_key(self, s3_key: str) -> str | None:
"""Get remote name for given S3 key"""
if not self.available:
return None
+11 -11
View File
@@ -1,7 +1,7 @@
import time
import logging
import re
from typing import Optional
import time
import httpx
logger = logging.getLogger(__name__)
@@ -17,11 +17,11 @@ _WWW_AUTH_RE = re.compile(
)
def _cache_key(realm: str, service: str, scope: str, username: Optional[str]) -> str:
def _cache_key(realm: str, service: str, scope: str, username: str | None) -> str:
return f"{realm}|{service}|{scope}|{username or ''}"
def _get_cached_token(key: str) -> Optional[str]:
def _get_cached_token(key: str) -> str | None:
entry = _token_cache.get(key)
if entry and entry[1] > time.time():
return entry[0]
@@ -38,9 +38,9 @@ async def fetch_token(
realm: str,
service: str,
scope: str,
username: Optional[str] = None,
password: Optional[str] = None,
) -> Optional[str]:
username: str | None = None,
password: str | None = None,
) -> str | None:
"""Fetch a Bearer token from a Docker registry auth server."""
key = _cache_key(realm, service, scope, username)
cached = _get_cached_token(key)
@@ -75,7 +75,7 @@ async def fetch_token(
return token
def parse_www_authenticate(header: str) -> Optional[tuple[str, str, str]]:
def parse_www_authenticate(header: str) -> tuple[str, str, str] | None:
"""Parse WWW-Authenticate: Bearer header. Returns (realm, service, scope) or None."""
m = _WWW_AUTH_RE.search(header)
if not m:
@@ -85,9 +85,9 @@ def parse_www_authenticate(header: str) -> Optional[tuple[str, str, str]]:
async def get_docker_token_for_response(
www_authenticate: str,
username: Optional[str] = None,
password: Optional[str] = None,
) -> Optional[str]:
username: str | None = None,
password: str | None = None,
) -> str | None:
"""Given a WWW-Authenticate header value, fetch and return a Bearer token."""
parsed = parse_www_authenticate(www_authenticate)
if not parsed:
+60 -110
View File
@@ -1,28 +1,30 @@
import hashlib
import json
import logging
import os
import re
import json
import hashlib
import logging
from typing import Dict, Any, Optional
from typing import Any
import httpx
from fastapi import FastAPI, HTTPException, Response, Request, Query, File, UploadFile
from fastapi.responses import PlainTextResponse, JSONResponse
from fastapi import FastAPI, File, HTTPException, Query, Request, Response, UploadFile
from fastapi.responses import JSONResponse, PlainTextResponse
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
from pydantic import BaseModel
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
try:
from importlib.metadata import version
__version__ = version("artifactapi")
except ImportError:
# Fallback for development when package isn't installed
__version__ = "dev"
from .cache import RedisCache
from .config import ConfigManager
from .database import DatabaseManager
from .storage import S3Storage
from .cache import RedisCache
from .metrics import MetricsManager
from .docker_auth import get_docker_token_for_response
from .metrics import MetricsManager
from .storage import S3Storage
class ArtifactRequest(BaseModel):
@@ -31,10 +33,7 @@ class ArtifactRequest(BaseModel):
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
app = FastAPI(title="Artifact Storage API", version=__version__)
@@ -75,19 +74,11 @@ def health_check():
@app.put("/cache/flush")
def flush_cache(
remote: str = Query(default=None, description="Specific remote to flush (optional)"),
cache_type: str = Query(default="all", description="Type to flush: 'all', 'index', 'files', 'metrics'")
cache_type: str = Query(default="all", description="Type to flush: 'all', 'index', 'files', 'metrics'"),
):
"""Flush cache entries for specified remote or all remotes"""
try:
result = {
"remote": remote,
"cache_type": cache_type,
"flushed": {
"redis_keys": 0,
"s3_objects": 0,
"operations": []
}
}
result = {"remote": remote, "cache_type": cache_type, "flushed": {"redis_keys": 0, "s3_objects": 0, "operations": []}}
# Flush Redis entries based on cache_type
if cache_type in ["all", "index", "metrics"] and cache.available and cache.client:
@@ -124,8 +115,8 @@ def flush_cache(
list_params["Prefix"] = f"{remote}/"
response = storage.client.list_objects_v2(**list_params)
if 'Contents' in response:
objects_to_delete = [obj['Key'] for obj in response['Contents']]
if "Contents" in response:
objects_to_delete = [obj["Key"] for obj in response["Contents"]]
for key in objects_to_delete:
try:
@@ -156,18 +147,14 @@ def flush_cache(
async def construct_remote_url(remote_name: str, path: str) -> str:
remote_config = config.get_remote_config(remote_name)
if not remote_config:
raise HTTPException(
status_code=404, detail=f"Remote '{remote_name}' not configured"
)
raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured")
base_url = remote_config.get("base_url")
if not base_url:
raise HTTPException(
status_code=500, detail=f"No base_url configured for remote '{remote_name}'"
)
raise HTTPException(status_code=500, detail=f"No base_url configured for remote '{remote_name}'")
# Handle Docker registry URLs
if remote_config.get("type") == "docker":
if remote_config.get("package") == "docker":
# Convert Docker paths to v2 API format
# e.g., library/nginx/manifests/latest -> v2/library/nginx/manifests/latest
return f"{base_url}/v2/{path}"
@@ -175,11 +162,10 @@ async def construct_remote_url(remote_name: str, path: str) -> str:
return f"{base_url}/{path}"
async def check_artifact_patterns(
remote_name: str, repo_path: str, file_path: str, full_path: str
) -> bool:
async def check_artifact_patterns(remote_name: str, repo_path: str, file_path: str, full_path: str) -> bool:
# First check if this is an index file - always allow index files
if cache.is_index_file(file_path) or cache.is_index_file(full_path):
index_patterns = config.get_index_patterns(remote_name)
if cache.is_index_file(file_path, index_patterns) or cache.is_index_file(full_path, index_patterns):
return True
# Then check basic include patterns
@@ -215,7 +201,7 @@ async def cache_single_artifact(url: str, remote_name: str, path: str) -> dict:
try:
remote_config = config.get_remote_config(remote_name) or {}
is_docker = remote_config.get("type") == "docker" or "/v2/" in url
is_docker = remote_config.get("package") == "docker" or "/v2/" in url
# Prepare headers for Docker registry requests
headers = {}
@@ -266,9 +252,7 @@ async def get_artifact(remote_name: str, path: str):
# Check if remote is configured
remote_config = config.get_remote_config(remote_name)
if not remote_config:
raise HTTPException(
status_code=404, detail=f"Remote '{remote_name}' not configured"
)
raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured")
# Check if this is a local repository
if remote_config.get("type") == "local":
@@ -288,9 +272,7 @@ async def get_artifact(remote_name: str, path: str):
return Response(
content=content,
media_type=content_type,
headers={
"Content-Disposition": f"attachment; filename={os.path.basename(path)}"
},
headers={"Content-Disposition": f"attachment; filename={os.path.basename(path)}"},
)
# Extract repository path for pattern checking
@@ -305,9 +287,7 @@ async def get_artifact(remote_name: str, path: str):
# Check if artifact matches configured patterns
if not await check_artifact_patterns(remote_name, repo_path, file_path, path):
logger.info(f"PATTERN BLOCKED: {remote_name}/{path} - not matching include patterns")
raise HTTPException(
status_code=403, detail="Artifact not allowed by configuration patterns"
)
raise HTTPException(status_code=403, detail="Artifact not allowed by configuration patterns")
# Construct the remote URL
remote_url = await construct_remote_url(remote_name, path)
@@ -319,7 +299,7 @@ async def get_artifact(remote_name: str, path: str):
# For index files, check Redis TTL validity
filename = os.path.basename(path)
is_index = cache.is_index_file(path) # Check full path, not just filename
is_index = cache.is_index_file(path, config.get_index_patterns(remote_name))
if cached_key and is_index:
# Index file exists, but check if it's still valid
@@ -357,9 +337,7 @@ async def get_artifact(remote_name: str, path: str):
metrics.record_cache_hit(remote_name, len(artifact_data))
# Record artifact mapping in database if not already recorded
database.record_artifact_mapping(
cached_key, remote_name, path, len(artifact_data)
)
database.record_artifact_mapping(cached_key, remote_name, path, len(artifact_data))
return Response(
content=artifact_data,
@@ -371,9 +349,7 @@ async def get_artifact(remote_name: str, path: str):
},
)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error retrieving cached artifact: {str(e)}"
)
raise HTTPException(status_code=500, detail=f"Error retrieving cached artifact: {str(e)}")
# Artifact not cached, cache it first
logger.info(f"Cache MISS: {remote_name}/{path} - fetching from remote: {remote_url}")
@@ -381,9 +357,7 @@ async def get_artifact(remote_name: str, path: str):
if result["status"] == "error":
logger.error(f"Cache ADD FAILED: {remote_name}/{path} - {result['error']}")
raise HTTPException(
status_code=502, detail=f"Failed to fetch artifact: {result['error']}"
)
raise HTTPException(status_code=502, detail=f"Failed to fetch artifact: {result['error']}")
# Mark index files as cached in Redis if this was a new download
if result["status"] == "cached" and is_index:
@@ -418,9 +392,7 @@ async def get_artifact(remote_name: str, path: str):
# Record artifact mapping in database
cache_key = storage.get_object_key(remote_name, path)
database.record_artifact_mapping(
cache_key, remote_name, path, len(artifact_data)
)
database.record_artifact_mapping(cache_key, remote_name, path, len(artifact_data))
return Response(
content=artifact_data,
@@ -449,16 +421,25 @@ async def docker_v2_proxy(request: Request, remote_name: str, path: str):
remote_config = config.get_remote_config(remote_name)
if not remote_config:
raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured")
if remote_config.get("type") != "docker":
if remote_config.get("package") != "docker":
raise HTTPException(status_code=400, detail=f"Remote '{remote_name}' is not a docker remote")
# Check include_patterns against the image name (e.g. "library/nginx")
patterns = config.get_repository_patterns(remote_name, "")
if patterns:
path_parts = path.split("/")
image_name = "/".join(path_parts[:2]) if len(path_parts) >= 2 else path
if not any(re.search(p, path) or re.search(p, image_name) for p in patterns):
logger.info(f"PATTERN BLOCKED: {remote_name}/{path}")
raise HTTPException(status_code=403, detail="Image not allowed by configuration patterns")
remote_url = await construct_remote_url(remote_name, path)
cached_key = storage.get_object_key(remote_name, path)
if not storage.exists(cached_key):
cached_key = None
is_index = cache.is_index_file(path)
is_index = cache.is_index_file(path, config.get_index_patterns(remote_name))
if cached_key and is_index:
if not cache.is_index_valid(remote_name, path):
@@ -523,9 +504,7 @@ async def discover_github_releases(remote: str, include_pattern: str) -> list[st
owner, repo = match.groups()
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(
f"https://api.github.com/repos/{owner}/{repo}/releases"
)
response = await client.get(f"https://api.github.com/repos/{owner}/{repo}/releases")
if response.status_code != 200:
raise HTTPException(
@@ -554,14 +533,10 @@ async def upload_file(remote_name: str, path: str, file: UploadFile = File(...))
# Check if remote is configured and is local
remote_config = config.get_remote_config(remote_name)
if not remote_config:
raise HTTPException(
status_code=404, detail=f"Remote '{remote_name}' not configured"
)
raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured")
if remote_config.get("type") != "local":
raise HTTPException(
status_code=400, detail="Upload only supported for local repositories"
)
raise HTTPException(status_code=400, detail="Upload only supported for local repositories")
try:
# Read file content
@@ -623,9 +598,7 @@ def check_file_exists(remote_name: str, path: str):
# Check if remote is configured
remote_config = config.get_remote_config(remote_name)
if not remote_config:
raise HTTPException(
status_code=404, detail=f"Remote '{remote_name}' not configured"
)
raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured")
# Handle local repository
if remote_config.get("type") == "local":
@@ -637,16 +610,10 @@ def check_file_exists(remote_name: str, path: str):
return Response(
headers={
"Content-Length": str(metadata["size_bytes"]),
"Content-Type": metadata.get(
"content_type", "application/octet-stream"
),
"Content-Type": metadata.get("content_type", "application/octet-stream"),
"X-SHA256": metadata["sha256_sum"],
"X-Created-At": metadata["created_at"].isoformat()
if metadata["created_at"]
else "",
"X-Uploaded-At": metadata["uploaded_at"].isoformat()
if metadata["uploaded_at"]
else "",
"X-Created-At": metadata["created_at"].isoformat() if metadata["created_at"] else "",
"X-Uploaded-At": metadata["uploaded_at"].isoformat() if metadata["uploaded_at"] else "",
}
)
except HTTPException:
@@ -655,9 +622,7 @@ def check_file_exists(remote_name: str, path: str):
raise HTTPException(status_code=500, detail=f"Check failed: {str(e)}")
else:
# For remote repositories, just return 405 Method Not Allowed
raise HTTPException(
status_code=405, detail="HEAD method only supported for local repositories"
)
raise HTTPException(status_code=405, detail="HEAD method only supported for local repositories")
@app.delete("/api/v1/remote/{remote_name}/{path:path}")
@@ -666,14 +631,10 @@ def delete_file(remote_name: str, path: str):
# Check if remote is configured and is local
remote_config = config.get_remote_config(remote_name)
if not remote_config:
raise HTTPException(
status_code=404, detail=f"Remote '{remote_name}' not configured"
)
raise HTTPException(status_code=404, detail=f"Remote '{remote_name}' not configured")
if remote_config.get("type") != "local":
raise HTTPException(
status_code=400, detail="Delete only supported for local repositories"
)
raise HTTPException(status_code=400, detail="Delete only supported for local repositories")
try:
# Get S3 key before deleting from database
@@ -694,11 +655,9 @@ def delete_file(remote_name: str, path: str):
@app.post("/api/v1/artifacts/cache")
async def cache_artifact(request: ArtifactRequest) -> Dict[str, Any]:
async def cache_artifact(request: ArtifactRequest) -> dict[str, Any]:
try:
matching_urls = await discover_artifacts(
request.remote, request.include_pattern
)
matching_urls = await discover_artifacts(request.remote, request.include_pattern)
if not matching_urls:
return {
@@ -713,11 +672,7 @@ async def cache_artifact(request: ArtifactRequest) -> Dict[str, Any]:
result = await cache_single_artifact(url, "", "")
cached_artifacts.append(result)
cached_count = sum(
1
for artifact in cached_artifacts
if artifact["status"] in ["cached", "already_cached"]
)
cached_count = sum(1 for artifact in cached_artifacts if artifact["status"] in ["cached", "already_cached"])
return {
"message": f"Processed {len(matching_urls)} artifacts, {cached_count} successfully cached",
@@ -730,9 +685,7 @@ async def cache_artifact(request: ArtifactRequest) -> Dict[str, Any]:
@app.get("/api/v1/artifacts/{remote:path}")
async def list_cached_artifacts(
remote: str, include_pattern: str = ".*"
) -> Dict[str, Any]:
async def list_cached_artifacts(remote: str, include_pattern: str = ".*") -> dict[str, Any]:
try:
matching_urls = await discover_artifacts(remote, include_pattern)
@@ -740,13 +693,12 @@ async def list_cached_artifacts(
for url in matching_urls:
# Extract path from URL for hierarchical key generation
from urllib.parse import urlparse
parsed = urlparse(url)
path = parsed.path
key = storage.get_object_key(remote, path)
if storage.exists(key):
cached_artifacts.append(
{"url": url, "cached_url": storage.get_url(key), "key": key}
)
cached_artifacts.append({"url": url, "cached_url": storage.get_url(key), "key": key})
return {
"remote": remote,
@@ -762,9 +714,7 @@ async def list_cached_artifacts(
@app.get("/metrics")
def get_metrics(
json: Optional[bool] = Query(
False, description="Return JSON format instead of Prometheus"
),
json: bool | None = Query(False, description="Return JSON format instead of Prometheus"),
):
"""Get comprehensive metrics about the artifact storage system"""
config._check_reload()
+18 -51
View File
@@ -1,22 +1,14 @@
from datetime import datetime
from typing import Dict, Any
from typing import Any
from prometheus_client import Counter, Gauge
# Prometheus metrics
request_counter = Counter(
"artifact_requests_total", "Total artifact requests", ["remote", "status"]
)
request_counter = Counter("artifact_requests_total", "Total artifact requests", ["remote", "status"])
cache_hit_counter = Counter("artifact_cache_hits_total", "Total cache hits", ["remote"])
cache_miss_counter = Counter(
"artifact_cache_misses_total", "Total cache misses", ["remote"]
)
bandwidth_saved_counter = Counter(
"artifact_bandwidth_saved_bytes_total", "Total bandwidth saved", ["remote"]
)
storage_size_gauge = Gauge(
"artifact_storage_size_bytes", "Storage size by remote", ["remote"]
)
cache_miss_counter = Counter("artifact_cache_misses_total", "Total cache misses", ["remote"])
bandwidth_saved_counter = Counter("artifact_bandwidth_saved_bytes_total", "Total bandwidth saved", ["remote"])
storage_size_gauge = Gauge("artifact_storage_size_bytes", "Storage size by remote", ["remote"])
redis_keys_gauge = Gauge("artifact_redis_keys_total", "Total Redis keys")
@@ -44,9 +36,7 @@ class MetricsManager:
# Increment per-remote counters
self.redis_client.client.incr(f"metrics:cache_hits:{remote_name}")
self.redis_client.client.incr(f"metrics:total_requests:{remote_name}")
self.redis_client.client.incrby(
f"metrics:bandwidth_saved:{remote_name}", size_bytes
)
self.redis_client.client.incrby(f"metrics:bandwidth_saved:{remote_name}", size_bytes)
except Exception:
pass
@@ -91,7 +81,7 @@ class MetricsManager:
except Exception:
return 0
def get_s3_size_by_remote(self, storage, config_manager) -> Dict[str, int]:
def get_s3_size_by_remote(self, storage, config_manager) -> dict[str, int]:
"""Get size of stored data per remote using database mappings"""
if self.database_manager and self.database_manager.available:
# Get from database if available
@@ -146,7 +136,7 @@ class MetricsManager:
except Exception:
return {}
def get_metrics(self, storage, config_manager) -> Dict[str, Any]:
def get_metrics(self, storage, config_manager) -> dict[str, Any]:
"""Get comprehensive metrics"""
# Update Redis keys gauge
redis_key_count = self.get_redis_key_count()
@@ -173,54 +163,31 @@ class MetricsManager:
if self.redis_client and self.redis_client.available:
try:
# Get global metrics
cache_hits = int(
self.redis_client.client.get("metrics:cache_hits") or 0
)
cache_misses = int(
self.redis_client.client.get("metrics:cache_misses") or 0
)
cache_hits = int(self.redis_client.client.get("metrics:cache_hits") or 0)
cache_misses = int(self.redis_client.client.get("metrics:cache_misses") or 0)
total_requests = cache_hits + cache_misses
bandwidth_saved = int(
self.redis_client.client.get("metrics:bandwidth_saved") or 0
)
bandwidth_saved = int(self.redis_client.client.get("metrics:bandwidth_saved") or 0)
metrics["requests"]["cache_hits"] = cache_hits
metrics["requests"]["cache_misses"] = cache_misses
metrics["requests"]["total_requests"] = total_requests
metrics["requests"]["cache_hit_ratio"] = (
cache_hits / total_requests if total_requests > 0 else 0.0
)
metrics["requests"]["cache_hit_ratio"] = cache_hits / total_requests if total_requests > 0 else 0.0
metrics["bandwidth"]["saved_bytes"] = bandwidth_saved
# Get per-remote metrics
for remote in config_manager.config.get("remotes", {}).keys():
remote_cache_hits = int(
self.redis_client.client.get(f"metrics:cache_hits:{remote}")
or 0
)
remote_cache_misses = int(
self.redis_client.client.get(f"metrics:cache_misses:{remote}")
or 0
)
remote_cache_hits = int(self.redis_client.client.get(f"metrics:cache_hits:{remote}") or 0)
remote_cache_misses = int(self.redis_client.client.get(f"metrics:cache_misses:{remote}") or 0)
remote_total = remote_cache_hits + remote_cache_misses
remote_bandwidth_saved = int(
self.redis_client.client.get(
f"metrics:bandwidth_saved:{remote}"
)
or 0
)
remote_bandwidth_saved = int(self.redis_client.client.get(f"metrics:bandwidth_saved:{remote}") or 0)
metrics["per_remote"][remote] = {
"cache_hits": remote_cache_hits,
"cache_misses": remote_cache_misses,
"total_requests": remote_total,
"cache_hit_ratio": remote_cache_hits / remote_total
if remote_total > 0
else 0.0,
"cache_hit_ratio": remote_cache_hits / remote_total if remote_total > 0 else 0.0,
"bandwidth_saved_bytes": remote_bandwidth_saved,
"storage_size_bytes": metrics["storage"]["size_by_remote"].get(
remote, 0
),
"storage_size_bytes": metrics["storage"]["size_by_remote"].get(remote, 0),
}
except Exception:
+9 -9
View File
@@ -1,5 +1,6 @@
import os
import hashlib
import os
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
@@ -21,23 +22,22 @@ class S3Storage:
self.bucket = bucket
self.secure = secure
ca_bundle = os.environ.get('REQUESTS_CA_BUNDLE') or os.environ.get('SSL_CERT_FILE')
config_kwargs = {
"request_checksum_calculation": "when_required",
"response_checksum_validation": "when_required"
}
ca_bundle = os.environ.get("REQUESTS_CA_BUNDLE") or os.environ.get("SSL_CERT_FILE")
config_kwargs = {"request_checksum_calculation": "when_required", "response_checksum_validation": "when_required"}
client_kwargs = {
"endpoint_url": f"http{'s' if self.secure else ''}://{self.endpoint}",
"aws_access_key_id": self.access_key,
"aws_secret_access_key": self.secret_key,
"config": Config(**config_kwargs)
"config": Config(**config_kwargs),
}
if ca_bundle and os.path.exists(ca_bundle):
client_kwargs["verify"] = ca_bundle
print(f"Debug: Using CA bundle: {ca_bundle}")
else:
print(f"Debug: No CA bundle found. REQUESTS_CA_BUNDLE={os.environ.get('REQUESTS_CA_BUNDLE')}, SSL_CERT_FILE={os.environ.get('SSL_CERT_FILE')}")
print(
f"Debug: No CA bundle found. REQUESTS_CA_BUNDLE={os.environ.get('REQUESTS_CA_BUNDLE')}, SSL_CERT_FILE={os.environ.get('SSL_CERT_FILE')}"
)
self.client = boto3.client("s3", **client_kwargs)
@@ -56,7 +56,7 @@ class S3Storage:
def get_object_key(self, remote_name: str, path: str) -> str:
# Extract directory path and filename
clean_path = path.lstrip('/')
clean_path = path.lstrip("/")
filename = os.path.basename(clean_path)
directory_path = os.path.dirname(clean_path)
View File
+131
View File
@@ -0,0 +1,131 @@
"""
Pytest configuration and shared fixtures.
Module-level setup (env vars + connection patches) runs before any test
module is imported, so the FastAPI app initialises against mocks rather
than real S3 / Redis / PostgreSQL services.
"""
import os
import tempfile
from unittest.mock import MagicMock, patch
import yaml
# ---------------------------------------------------------------------------
# Test remote configuration
# ---------------------------------------------------------------------------
TEST_REMOTES = {
"remotes": {
"alpine-test": {
"base_url": "https://dl-cdn.alpinelinux.org",
"type": "remote",
"package": "alpine",
"include_patterns": [".*/x86_64/.*\\.apk$"],
"cache": {"file_ttl": 0, "index_ttl": 3600},
},
"rpm-test": {
"base_url": "https://example.com/rpm",
"type": "remote",
"package": "rpm",
"include_patterns": [".*/x86_64/.*\\.rpm$", ".*/repodata/.*$"],
"cache": {"file_ttl": 0, "index_ttl": 3600},
},
"docker-test": {
"base_url": "https://registry.example.com",
"type": "remote",
"package": "docker",
"cache": {"file_ttl": 0, "index_ttl": 300},
},
"docker-restricted": {
"base_url": "https://registry.example.com",
"type": "remote",
"package": "docker",
"include_patterns": ["^library/nginx"],
"cache": {"file_ttl": 0, "index_ttl": 300},
},
"generic-test": {
"base_url": "https://releases.example.com",
"type": "remote",
"package": "generic",
"include_patterns": [".*\\.tar\\.gz$"],
"cache": {"file_ttl": 0, "index_ttl": 0},
},
"custom-index-test": {
"base_url": "https://example.com",
"type": "remote",
"package": "generic",
"index_patterns": ["metadata\\.json$"],
"cache": {"file_ttl": 0, "index_ttl": 600},
},
"local-test": {
"type": "local",
"package": "generic",
"cache": {"file_ttl": 0, "index_ttl": 0},
},
}
}
# ---------------------------------------------------------------------------
# Write temp config and set env vars BEFORE importing the package
# ---------------------------------------------------------------------------
_tmpdir = tempfile.mkdtemp()
_config_path = os.path.join(_tmpdir, "remotes.yaml")
with open(_config_path, "w") as _f:
yaml.dump(TEST_REMOTES, _f)
os.environ.update(
{
"CONFIG_PATH": _config_path,
"MINIO_ENDPOINT": "localhost:9000",
"MINIO_ACCESS_KEY": "testkey",
"MINIO_SECRET_KEY": "testsecret",
"MINIO_BUCKET": "testbucket",
"REDIS_URL": "redis://localhost:6379/0",
"DBHOST": "localhost",
"DBPORT": "5432",
"DBUSER": "test",
"DBPASS": "test",
"DBNAME": "test",
}
)
# Patch external service connections before the package is imported.
# These stay active for the whole session (process exits after tests finish).
_boto3_patch = patch("boto3.client", return_value=MagicMock())
_redis_patch = patch("redis.from_url", return_value=MagicMock())
_psycopg2_patch = patch("psycopg2.connect", return_value=MagicMock())
_boto3_patch.start()
_redis_patch.start()
_psycopg2_patch.start()
# ---------------------------------------------------------------------------
# Shared fixtures
# ---------------------------------------------------------------------------
import pytest # noqa: E402
from fastapi.testclient import TestClient # noqa: E402
@pytest.fixture(scope="session")
def app():
from artifactapi.main import app as fastapi_app
return fastapi_app
@pytest.fixture(scope="session")
def client(app):
return TestClient(app)
@pytest.fixture
def config_path():
return _config_path
@pytest.fixture
def test_remotes():
return TEST_REMOTES
+237
View File
@@ -0,0 +1,237 @@
"""Tests for RedisCache, focusing on is_index_file with configurable patterns."""
import hashlib
from unittest.mock import ANY, MagicMock, patch
import pytest
from artifactapi.cache import RedisCache
from artifactapi.config import _PACKAGE_INDEX_PATTERNS
@pytest.fixture
def bare_cache():
"""RedisCache instance bypassing __init__ (no Redis needed for pure-logic tests)."""
return RedisCache.__new__(RedisCache)
@pytest.fixture
def unavailable_cache():
"""RedisCache where Redis is not reachable."""
with patch("redis.from_url", side_effect=Exception("connection refused")):
return RedisCache("redis://localhost:6379/0")
@pytest.fixture
def mock_redis_client():
return MagicMock()
@pytest.fixture
def cache_with_redis(mock_redis_client):
"""RedisCache backed by a MagicMock Redis client."""
with patch("redis.from_url", return_value=mock_redis_client):
c = RedisCache("redis://localhost:6379/0")
c.client = mock_redis_client
c.available = True
return c
# ---------------------------------------------------------------------------
# is_index_file — alpine patterns
# ---------------------------------------------------------------------------
class TestIsIndexFileAlpine:
def test_apkindex_tarball_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["alpine"]
assert bare_cache.is_index_file("alpine/v3.18/x86_64/APKINDEX.tar.gz", patterns)
def test_nested_apkindex_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["alpine"]
assert bare_cache.is_index_file("mirrors/dl-cdn/alpine/v3.19/community/x86_64/APKINDEX.tar.gz", patterns)
def test_apk_package_is_not_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["alpine"]
assert not bare_cache.is_index_file("alpine/v3.18/x86_64/musl-1.2.4-r2.apk", patterns)
def test_random_tarball_is_not_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["alpine"]
assert not bare_cache.is_index_file("some/path/archive.tar.gz", patterns)
def test_apkindex_signature_file_is_not_index(self, bare_cache):
# Signature file adjacent to the index should not be treated as an index
patterns = _PACKAGE_INDEX_PATTERNS["alpine"]
assert not bare_cache.is_index_file("alpine/v3.18/x86_64/APKINDEX.tar.gz.sig", patterns)
def test_apkindex_tmp_file_is_not_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["alpine"]
assert not bare_cache.is_index_file("alpine/v3.18/x86_64/APKINDEX.tar.gz.tmp", patterns)
# ---------------------------------------------------------------------------
# is_index_file — rpm patterns
# ---------------------------------------------------------------------------
class TestIsIndexFileRpm:
def test_repomd_xml_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["rpm"]
assert bare_cache.is_index_file("almalinux/9/x86_64/repomd.xml", patterns)
def test_repodata_primary_xml_gz_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["rpm"]
assert bare_cache.is_index_file("repo/repodata/primary.xml.gz", patterns)
def test_repodata_sqlite_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["rpm"]
assert bare_cache.is_index_file("repo/repodata/primary.sqlite", patterns)
def test_repodata_sqlite_bz2_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["rpm"]
assert bare_cache.is_index_file("repo/repodata/other.sqlite.bz2", patterns)
def test_repodata_yaml_xz_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["rpm"]
assert bare_cache.is_index_file("repo/repodata/comps.yaml.xz", patterns)
def test_packages_gz_pattern_matches_any_path(self, bare_cache):
# The Packages.gz$ regex is a carryover from the original hardcoded logic and
# deliberately matches any path ending in Packages.gz — including Debian-style paths.
# This test documents that intentional behaviour.
patterns = _PACKAGE_INDEX_PATTERNS["rpm"]
assert bare_cache.is_index_file("debian/dists/stable/main/binary-amd64/Packages.gz", patterns)
def test_rpm_package_is_not_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["rpm"]
assert not bare_cache.is_index_file("almalinux/9/x86_64/Packages/bash-5.1.8.x86_64.rpm", patterns)
def test_arbitrary_xml_outside_repodata_is_not_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["rpm"]
assert not bare_cache.is_index_file("some/path/config.xml", patterns)
# ---------------------------------------------------------------------------
# is_index_file — docker patterns
# ---------------------------------------------------------------------------
class TestIsIndexFileDocker:
def test_tag_manifest_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["docker"]
assert bare_cache.is_index_file("library/nginx/manifests/latest", patterns)
def test_version_tag_manifest_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["docker"]
assert bare_cache.is_index_file("library/nginx/manifests/1.25.3", patterns)
def test_hyphenated_tag_manifest_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["docker"]
assert bare_cache.is_index_file("library/nginx/manifests/latest-rc", patterns)
def test_numeric_date_tag_manifest_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["docker"]
assert bare_cache.is_index_file("library/nginx/manifests/20240101", patterns)
def test_digest_manifest_is_not_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["docker"]
digest = "sha256:" + "a" * 64
assert not bare_cache.is_index_file(f"library/nginx/manifests/{digest}", patterns)
def test_tags_list_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["docker"]
assert bare_cache.is_index_file("library/nginx/tags/list", patterns)
def test_blob_is_not_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["docker"]
assert not bare_cache.is_index_file("library/nginx/blobs/sha256:abc123", patterns)
# ---------------------------------------------------------------------------
# is_index_file — edge cases
# ---------------------------------------------------------------------------
class TestIsIndexFileEdgeCases:
def test_empty_patterns_nothing_is_index(self, bare_cache):
assert not bare_cache.is_index_file("APKINDEX.tar.gz", [])
assert not bare_cache.is_index_file("repomd.xml", [])
assert not bare_cache.is_index_file("library/nginx/manifests/latest", [])
def test_none_patterns_nothing_is_index(self, bare_cache):
assert not bare_cache.is_index_file("APKINDEX.tar.gz", None)
assert not bare_cache.is_index_file("repomd.xml", None)
def test_custom_patterns_match(self, bare_cache):
patterns = [r"metadata\.json$", r"index\.yaml$"]
assert bare_cache.is_index_file("repo/metadata.json", patterns)
assert bare_cache.is_index_file("repo/subdir/index.yaml", patterns)
assert not bare_cache.is_index_file("repo/data.tar.gz", patterns)
def test_custom_pattern_does_not_match_standard_index(self, bare_cache):
patterns = [r"metadata\.json$"]
assert not bare_cache.is_index_file("APKINDEX.tar.gz", patterns)
# ---------------------------------------------------------------------------
# get_index_cache_key
# ---------------------------------------------------------------------------
class TestGetIndexCacheKey:
def test_key_format_is_deterministic(self, bare_cache):
# Assert against a pre-computed value to pin the hash algorithm,
# truncation length, and format string in one assertion.
path = "alpine/v3.18/x86_64/APKINDEX.tar.gz"
expected_hash = hashlib.sha256(path.encode()).hexdigest()[:16]
key = bare_cache.get_index_cache_key("alpine-test", path)
assert key == f"index:alpine-test:{expected_hash}"
def test_different_paths_produce_different_keys(self, bare_cache):
k1 = bare_cache.get_index_cache_key("alpine-test", "alpine/v3.18/x86_64/APKINDEX.tar.gz")
k2 = bare_cache.get_index_cache_key("alpine-test", "alpine/v3.19/x86_64/APKINDEX.tar.gz")
assert k1 != k2
def test_different_remotes_produce_different_keys(self, bare_cache):
k1 = bare_cache.get_index_cache_key("remote-a", "path/to/APKINDEX.tar.gz")
k2 = bare_cache.get_index_cache_key("remote-b", "path/to/APKINDEX.tar.gz")
assert k1 != k2
def test_key_starts_with_index_prefix_and_remote(self, bare_cache):
key = bare_cache.get_index_cache_key("myremote", "some/path")
assert key.startswith("index:myremote:")
def test_key_hash_segment_is_16_chars(self, bare_cache):
key = bare_cache.get_index_cache_key("myremote", "some/path/file.xml")
# Format: index:<remote>:<16-char hash> — the fixed length matters for key-space hygiene
parts = key.split(":")
assert len(parts) == 3
assert len(parts[2]) == 16
# ---------------------------------------------------------------------------
# mark_index_cached / is_index_valid
# ---------------------------------------------------------------------------
class TestIndexValidity:
def test_mark_index_cached_calls_setex_with_correct_ttl(self, cache_with_redis, mock_redis_client):
cache_with_redis.mark_index_cached("remote", "path/APKINDEX.tar.gz", 300)
expected_key = cache_with_redis.get_index_cache_key("remote", "path/APKINDEX.tar.gz")
mock_redis_client.setex.assert_called_once_with(expected_key, 300, ANY)
def test_present_key_is_valid(self, cache_with_redis, mock_redis_client):
mock_redis_client.exists.return_value = 1
assert cache_with_redis.is_index_valid("remote", "path/APKINDEX.tar.gz")
def test_missing_key_is_not_valid(self, cache_with_redis, mock_redis_client):
mock_redis_client.exists.return_value = 0
assert not cache_with_redis.is_index_valid("remote", "path/APKINDEX.tar.gz")
def test_unavailable_redis_is_not_valid(self, unavailable_cache):
assert not unavailable_cache.is_index_valid("remote", "some/path")
def test_mark_cached_no_op_when_unavailable(self, unavailable_cache):
# client is None when Redis is unavailable — setex cannot be called
assert unavailable_cache.client is None
unavailable_cache.mark_index_cached("remote", "some/path", 300) # must not raise
+269
View File
@@ -0,0 +1,269 @@
"""Tests for ConfigManager, focusing on get_index_patterns (new logic)."""
import os
import pytest
import yaml
from artifactapi.config import ConfigManager
@pytest.fixture
def make_config(tmp_path):
"""Factory: write a remotes dict to a temp YAML and return a ConfigManager."""
def _make(remotes_dict):
cfg_file = tmp_path / "remotes.yaml"
cfg_file.write_text(yaml.dump({"remotes": remotes_dict}))
return ConfigManager(str(cfg_file))
return _make
# ---------------------------------------------------------------------------
# get_index_patterns
# ---------------------------------------------------------------------------
class TestGetIndexPatterns:
def test_alpine_returns_package_defaults(self, make_config):
cfg = make_config({"r": {"type": "remote", "package": "alpine", "base_url": "https://x.com"}})
patterns = cfg.get_index_patterns("r")
# Assert against literal strings, not the live constant, so a rename doesn't mask a regression
assert r"APKINDEX\.tar\.gz$" in patterns
def test_rpm_returns_package_defaults(self, make_config):
cfg = make_config({"r": {"type": "remote", "package": "rpm", "base_url": "https://x.com"}})
patterns = cfg.get_index_patterns("r")
assert r"repomd\.xml$" in patterns
assert any("repodata" in p for p in patterns)
def test_docker_returns_package_defaults(self, make_config):
cfg = make_config({"r": {"type": "remote", "package": "docker", "base_url": "https://x.com"}})
patterns = cfg.get_index_patterns("r")
assert any("manifests" in p for p in patterns)
assert any("tags/list" in p for p in patterns)
def test_generic_returns_empty_list(self, make_config):
cfg = make_config({"r": {"type": "remote", "package": "generic", "base_url": "https://x.com"}})
assert cfg.get_index_patterns("r") == []
def test_unknown_remote_returns_empty_list(self, make_config):
cfg = make_config({})
assert cfg.get_index_patterns("nonexistent") == []
def test_missing_package_field_defaults_to_generic(self, make_config):
cfg = make_config({"r": {"type": "remote", "base_url": "https://x.com"}})
assert cfg.get_index_patterns("r") == []
def test_unknown_package_type_returns_empty_list(self, make_config):
# A mis-spelled package type silently returns [] — this is a known footgun
cfg = make_config({"r": {"type": "remote", "package": "deb", "base_url": "https://x.com"}})
assert cfg.get_index_patterns("r") == []
def test_extra_patterns_appended_after_defaults(self, make_config):
cfg = make_config(
{
"r": {
"type": "remote",
"package": "alpine",
"base_url": "https://x.com",
"index_patterns": [r"custom\.json$"],
}
}
)
patterns = cfg.get_index_patterns("r")
assert r"APKINDEX\.tar\.gz$" in patterns
assert r"custom\.json$" in patterns
# Defaults come first
assert patterns.index(r"APKINDEX\.tar\.gz$") < patterns.index(r"custom\.json$")
def test_explicit_empty_extra_patterns_returns_defaults(self, make_config):
cfg = make_config(
{
"r": {
"type": "remote",
"package": "alpine",
"base_url": "https://x.com",
"index_patterns": [],
}
}
)
assert r"APKINDEX\.tar\.gz$" in cfg.get_index_patterns("r")
def test_duplicate_extra_pattern_not_added_twice(self, make_config):
existing = r"APKINDEX\.tar\.gz$"
cfg = make_config(
{
"r": {
"type": "remote",
"package": "alpine",
"base_url": "https://x.com",
"index_patterns": [existing],
}
}
)
patterns = cfg.get_index_patterns("r")
assert patterns.count(existing) == 1
def test_generic_with_only_extra_patterns(self, make_config):
cfg = make_config(
{
"r": {
"type": "remote",
"package": "generic",
"base_url": "https://x.com",
"index_patterns": [r"meta\.json$", r"index\.yaml$"],
}
}
)
assert cfg.get_index_patterns("r") == [r"meta\.json$", r"index\.yaml$"]
def test_rpm_extra_patterns_merged(self, make_config):
cfg = make_config(
{
"r": {
"type": "remote",
"package": "rpm",
"base_url": "https://x.com",
"index_patterns": [r"custom-meta\.xml$"],
}
}
)
patterns = cfg.get_index_patterns("r")
assert r"repomd\.xml$" in patterns
assert r"custom-meta\.xml$" in patterns
# ---------------------------------------------------------------------------
# get_repository_patterns
# ---------------------------------------------------------------------------
class TestGetRepositoryPatterns:
def test_returns_include_patterns(self, make_config):
cfg = make_config(
{
"r": {
"type": "remote",
"package": "generic",
"base_url": "https://x.com",
"include_patterns": [r".*\.tar\.gz$"],
}
}
)
assert cfg.get_repository_patterns("r", "") == [r".*\.tar\.gz$"]
def test_returns_empty_for_missing_remote(self, make_config):
cfg = make_config({})
assert cfg.get_repository_patterns("nonexistent", "") == []
def test_returns_empty_when_no_patterns_configured(self, make_config):
cfg = make_config({"r": {"type": "remote", "package": "generic", "base_url": "https://x.com"}})
assert cfg.get_repository_patterns("r", "") == []
def test_multiple_patterns_returned(self, make_config):
patterns = [r".*\.rpm$", r".*/repodata/.*$"]
cfg = make_config(
{
"r": {
"type": "remote",
"package": "rpm",
"base_url": "https://x.com",
"include_patterns": patterns,
}
}
)
assert cfg.get_repository_patterns("r", "") == patterns
def test_dict_keyed_repositories_returns_per_repo_patterns(self, make_config):
cfg = make_config(
{
"r": {
"type": "remote",
"package": "generic",
"base_url": "https://x.com",
"include_patterns": [r".*\.tar\.gz$"],
"repositories": {
"/path/to/repo": {"include_patterns": [r".*\.rpm$"]},
},
}
}
)
assert cfg.get_repository_patterns("r", "/path/to/repo") == [r".*\.rpm$"]
def test_dict_keyed_repositories_falls_back_to_remote_patterns(self, make_config):
cfg = make_config(
{
"r": {
"type": "remote",
"package": "generic",
"base_url": "https://x.com",
"include_patterns": [r".*\.tar\.gz$"],
"repositories": {
"/path/to/repo": {"include_patterns": [r".*\.rpm$"]},
},
}
}
)
assert cfg.get_repository_patterns("r", "/unknown/path") == [r".*\.tar\.gz$"]
# ---------------------------------------------------------------------------
# get_cache_config
# ---------------------------------------------------------------------------
class TestGetCacheConfig:
def test_returns_cache_section(self, make_config):
cfg = make_config(
{
"r": {
"type": "remote",
"package": "generic",
"base_url": "https://x.com",
"cache": {"file_ttl": 0, "index_ttl": 7200},
}
}
)
assert cfg.get_cache_config("r") == {"file_ttl": 0, "index_ttl": 7200}
def test_returns_empty_dict_for_missing_remote(self, make_config):
cfg = make_config({})
assert cfg.get_cache_config("nonexistent") == {}
def test_returns_empty_dict_when_no_cache_key(self, make_config):
cfg = make_config({"r": {"type": "remote", "package": "generic", "base_url": "https://x.com"}})
assert cfg.get_cache_config("r") == {}
# ---------------------------------------------------------------------------
# Config file reload
# ---------------------------------------------------------------------------
class TestConfigReload:
def test_reloads_when_file_mtime_advances(self, tmp_path):
cfg_file = tmp_path / "remotes.yaml"
cfg_file.write_text(yaml.dump({"remotes": {"repo-a": {"type": "remote", "package": "generic", "base_url": "https://x.com"}}}))
cfg = ConfigManager(str(cfg_file))
assert "repo-a" in cfg.config["remotes"]
cfg_file.write_text(yaml.dump({"remotes": {"repo-b": {"type": "remote", "package": "generic", "base_url": "https://y.com"}}}))
future_mtime = cfg._last_modified + 1
os.utime(str(cfg_file), (future_mtime, future_mtime))
cfg._check_reload()
assert "repo-b" in cfg.config["remotes"]
assert "repo-a" not in cfg.config["remotes"]
def test_no_reload_when_file_unchanged(self, tmp_path):
cfg_file = tmp_path / "remotes.yaml"
cfg_file.write_text(yaml.dump({"remotes": {"repo-a": {"type": "remote", "package": "generic", "base_url": "https://x.com"}}}))
cfg = ConfigManager(str(cfg_file))
# Call check_reload without touching the file — should not reload
cfg._check_reload()
assert "repo-a" in cfg.config["remotes"]
+273
View File
@@ -0,0 +1,273 @@
"""Tests for docker_auth: WWW-Authenticate parsing and token caching."""
import time
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from artifactapi import docker_auth
from artifactapi.docker_auth import (
_cache_key,
_get_cached_token,
_store_token,
fetch_token,
get_docker_token_for_response,
parse_www_authenticate,
)
@pytest.fixture(autouse=True)
def clear_token_cache():
"""Isolate tests: wipe the module-level token cache before and after each test."""
docker_auth._token_cache.clear()
yield
docker_auth._token_cache.clear()
# ---------------------------------------------------------------------------
# parse_www_authenticate
# ---------------------------------------------------------------------------
class TestParseWwwAuthenticate:
def test_full_bearer_header(self):
header = 'Bearer realm="https://auth.docker.io/token",service="registry.docker.io",scope="repository:library/nginx:pull"'
result = parse_www_authenticate(header)
assert result is not None
realm, service, scope = result
assert realm == "https://auth.docker.io/token"
assert service == "registry.docker.io"
assert scope == "repository:library/nginx:pull"
def test_realm_only(self):
header = 'Bearer realm="https://auth.example.com/token"'
result = parse_www_authenticate(header)
assert result is not None
realm, service, scope = result
assert realm == "https://auth.example.com/token"
assert service == ""
assert scope == ""
def test_realm_and_service_only(self):
header = 'Bearer realm="https://auth.example.com",service="registry.example.com"'
result = parse_www_authenticate(header)
assert result is not None
_, service, scope = result
assert service == "registry.example.com"
assert scope == ""
def test_invalid_scheme_returns_none(self):
assert parse_www_authenticate('Basic realm="example"') is None
def test_empty_header_returns_none(self):
assert parse_www_authenticate("") is None
def test_case_insensitive_bearer_parses_realm(self):
header = 'bearer realm="https://auth.example.com/token"'
result = parse_www_authenticate(header)
assert result is not None
realm, _, _ = result
assert realm == "https://auth.example.com/token"
def test_field_order_scope_before_service_drops_service(self):
# The regex requires realm,service,scope order; scope before service
# results in service being silently dropped. This test documents the known limitation.
header = 'Bearer realm="https://auth.example.com",scope="repo:pull",service="svc"'
result = parse_www_authenticate(header)
assert result is not None
realm, service, scope = result
assert realm == "https://auth.example.com"
assert scope == "repo:pull"
assert service == "" # silently dropped when out of order
# ---------------------------------------------------------------------------
# _cache_key
# ---------------------------------------------------------------------------
class TestCacheKey:
def test_key_contains_all_components(self):
key = _cache_key("https://realm.com", "svc", "scope", "user")
assert "https://realm.com" in key
assert "svc" in key
assert "scope" in key
assert "user" in key
def test_none_username_uses_empty_string(self):
key = _cache_key("https://realm.com", "svc", "scope", None)
assert key.endswith("|")
def test_different_services_give_different_keys(self):
k1 = _cache_key("realm", "svc1", "scope", None)
k2 = _cache_key("realm", "svc2", "scope", None)
assert k1 != k2
def test_different_scopes_give_different_keys(self):
k1 = _cache_key("realm", "svc", "scope:read", None)
k2 = _cache_key("realm", "svc", "scope:write", None)
assert k1 != k2
def test_pipe_in_field_value_can_collide_with_adjacent_fields(self):
# The "|" separator is not escaped, so a pipe embedded in one field
# produces the same key as the same pipe appearing as a separator boundary.
# This is a known limitation: _cache_key("a|b","c","d",None) ==
# _cache_key("a","b|c","d",None). Documents the behaviour, not a claim it's correct.
k1 = _cache_key("a|b", "c", "d", None)
k2 = _cache_key("a", "b|c", "d", None)
assert k1 == k2
# ---------------------------------------------------------------------------
# _get_cached_token / _store_token
# ---------------------------------------------------------------------------
class TestTokenCaching:
def test_get_returns_none_when_not_cached(self):
assert _get_cached_token("no-such-key") is None
def test_get_returns_token_when_valid(self):
_store_token("mykey", "tok-abc", 300)
assert _get_cached_token("mykey") == "tok-abc"
def test_get_returns_none_when_expired(self):
docker_auth._token_cache["mykey"] = ("old-token", time.time() - 1)
assert _get_cached_token("mykey") is None
def test_expired_entry_is_removed_from_cache(self):
docker_auth._token_cache["mykey"] = ("old-token", time.time() - 1)
_get_cached_token("mykey")
assert "mykey" not in docker_auth._token_cache
def test_store_expires_30s_before_stated_time(self):
before = time.time()
_store_token("mykey", "tok", 100)
_, expires_at = docker_auth._token_cache["mykey"]
# expires_in - 30 = 70; allow ±2 s clock wiggle
assert before + 68 <= expires_at <= before + 72
def test_store_enforces_minimum_10s_expiry(self):
before = time.time()
_store_token("mykey", "tok", 5) # expires_in - 30 would be negative
_, expires_at = docker_auth._token_cache["mykey"]
assert expires_at >= before + 10
# ---------------------------------------------------------------------------
# fetch_token (async, mocks httpx)
# ---------------------------------------------------------------------------
def _make_mock_http_client(token_payload: dict):
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = token_payload
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
ctx = MagicMock()
ctx.__aenter__ = AsyncMock(return_value=mock_client)
ctx.__aexit__ = AsyncMock(return_value=False)
return ctx, mock_client
class TestFetchToken:
async def test_returns_token_field(self):
ctx, _ = _make_mock_http_client({"token": "bearer-tok", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token == "bearer-tok"
async def test_falls_back_to_access_token_field(self):
ctx, _ = _make_mock_http_client({"access_token": "access-tok", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token == "access-tok"
async def test_returns_none_when_response_missing_token_field(self):
ctx, _ = _make_mock_http_client({"not_token": "value", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token is None
async def test_defaults_expires_in_to_300_when_missing(self):
ctx, _ = _make_mock_http_client({"token": "tok"}) # no expires_in key
before = time.time()
with patch("httpx.AsyncClient", return_value=ctx):
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token == "tok"
key = _cache_key("https://auth.example.com", "svc", "scope", None)
_, expires_at = docker_auth._token_cache[key]
# Default expires_in=300, stored as time.time() + max(300-30, 10) = 270
assert before + 268 <= expires_at <= before + 272
async def test_uses_cache_on_second_call_without_http(self):
ctx, mock_client = _make_mock_http_client({"token": "cached-tok", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
await fetch_token("https://auth.example.com", "svc", "scope")
mock_client.get.reset_mock()
token = await fetch_token("https://auth.example.com", "svc", "scope")
mock_client.get.assert_not_called()
assert token == "cached-tok"
async def test_returns_none_on_network_error(self):
mock_client = AsyncMock()
mock_client.get = AsyncMock(side_effect=Exception("connection refused"))
ctx = MagicMock()
ctx.__aenter__ = AsyncMock(return_value=mock_client)
ctx.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", return_value=ctx):
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token is None
async def test_returns_none_on_http_status_error(self):
mock_response = MagicMock()
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError("401 Unauthorized", request=MagicMock(), response=MagicMock())
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
ctx = MagicMock()
ctx.__aenter__ = AsyncMock(return_value=mock_client)
ctx.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", return_value=ctx):
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token is None
async def test_passes_credentials_as_auth_tuple(self):
ctx, mock_client = _make_mock_http_client({"token": "authed-tok", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
await fetch_token("https://auth.example.com", "svc", "scope", "user", "pass")
call_kwargs = mock_client.get.call_args.kwargs
assert call_kwargs.get("auth") == ("user", "pass")
async def test_no_auth_when_no_credentials(self):
ctx, mock_client = _make_mock_http_client({"token": "anon-tok", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
await fetch_token("https://auth.example.com", "svc", "scope")
call_kwargs = mock_client.get.call_args.kwargs
assert call_kwargs.get("auth") is None
# ---------------------------------------------------------------------------
# get_docker_token_for_response
# ---------------------------------------------------------------------------
class TestGetDockerTokenForResponse:
async def test_returns_none_for_non_bearer_header(self):
token = await get_docker_token_for_response('Basic realm="example"')
assert token is None
async def test_end_to_end_parse_and_fetch(self):
"""parse_www_authenticate → fetch_token wired together end-to-end."""
header = 'Bearer realm="https://auth.example.com",service="svc",scope="repo:pull"'
ctx, mock_client = _make_mock_http_client({"token": "e2e-tok", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
token = await get_docker_token_for_response(header, "user", "pass")
assert token == "e2e-tok"
call_kwargs = mock_client.get.call_args.kwargs
assert call_kwargs["params"]["service"] == "svc"
assert call_kwargs["params"]["scope"] == "repo:pull"
assert call_kwargs["auth"] == ("user", "pass")
+556
View File
@@ -0,0 +1,556 @@
"""FastAPI route tests using TestClient with mocked service dependencies."""
import hashlib
import json
from unittest.mock import ANY, AsyncMock, MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Per-test service mocks (replace module-level globals in main.py)
# ---------------------------------------------------------------------------
@pytest.fixture
def mock_storage():
m = MagicMock()
m.get_object_key.return_value = "test-remote/abc123/file.ext"
m.exists.return_value = False
m.download_object.return_value = b"fake content"
m.bucket = "testbucket"
m.client = MagicMock()
return m
@pytest.fixture
def mock_cache():
m = MagicMock()
m.is_index_file.return_value = False
m.is_index_valid.return_value = True
m.available = False
m.client = None
return m
@pytest.fixture
def mock_database():
m = MagicMock()
m.available = False
return m
@pytest.fixture
def mock_metrics():
return MagicMock()
@pytest.fixture
def patched_deps(mock_storage, mock_cache, mock_database, mock_metrics):
"""Swap the module-level service instances in main.py for the duration of a test."""
import artifactapi.main as main_mod
with (
patch.object(main_mod, "storage", mock_storage),
patch.object(main_mod, "cache", mock_cache),
patch.object(main_mod, "database", mock_database),
patch.object(main_mod, "metrics", mock_metrics),
):
yield {
"storage": mock_storage,
"cache": mock_cache,
"database": mock_database,
"metrics": mock_metrics,
}
# ---------------------------------------------------------------------------
# Basic / health endpoints
# ---------------------------------------------------------------------------
class TestBasicEndpoints:
def test_root_returns_remote_list(self, client):
response = client.get("/")
assert response.status_code == 200
data = response.json()
assert "remotes" in data
assert isinstance(data["remotes"], list)
assert len(data["remotes"]) > 0
def test_root_contains_version(self, client):
response = client.get("/")
assert "version" in response.json()
def test_health_check(self, client):
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def test_docker_v2_ping(self, client):
response = client.get("/v2/")
assert response.status_code == 200
assert response.headers.get("Docker-Distribution-Api-Version") == "registry/2.0"
assert response.json() == {}
# ---------------------------------------------------------------------------
# Docker proxy /v2/{remote}/{path}
# ---------------------------------------------------------------------------
class TestDockerProxy:
def test_unknown_remote_returns_404(self, client, patched_deps):
response = client.get("/v2/no-such-remote/library/nginx/manifests/latest")
assert response.status_code == 404
def test_non_docker_package_returns_400(self, client, patched_deps):
# alpine-test is package: alpine, not docker
response = client.get("/v2/alpine-test/library/nginx/manifests/latest")
assert response.status_code == 400
def test_pattern_blocked_returns_403(self, client, patched_deps):
# docker-restricted allows only "library/nginx"
response = client.get("/v2/docker-restricted/library/ubuntu/manifests/latest")
assert response.status_code == 403
def test_allowed_pattern_proceeds_to_cache(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps(
{
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}
).encode()
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = manifest
deps["cache"].is_index_file.return_value = True
deps["cache"].is_index_valid.return_value = True
response = client.get("/v2/docker-restricted/library/nginx/manifests/latest")
assert response.status_code == 200
def test_cache_hit_manifest_returns_correct_content_type(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps(
{
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"schemaVersion": 2,
"layers": [],
}
).encode()
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = manifest
deps["cache"].is_index_file.return_value = True
deps["cache"].is_index_valid.return_value = True
response = client.get("/v2/docker-test/library/nginx/manifests/latest")
assert response.status_code == 200
ct = response.headers["content-type"]
assert ct.startswith("application/vnd.docker.distribution.manifest.v2+json")
def test_cache_hit_sets_docker_content_digest_header(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps(
{
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}
).encode()
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = manifest
deps["cache"].is_index_file.return_value = True
deps["cache"].is_index_valid.return_value = True
response = client.get("/v2/docker-test/library/nginx/manifests/latest")
expected = f"sha256:{hashlib.sha256(manifest).hexdigest()}"
assert response.headers["Docker-Content-Digest"] == expected
def test_cache_hit_records_metrics(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode()
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = manifest
deps["cache"].is_index_file.return_value = False
client.get("/v2/docker-test/library/nginx/manifests/latest")
deps["metrics"].record_cache_hit.assert_called_once_with("docker-test", ANY)
def test_head_request_returns_no_body(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps(
{
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}
).encode()
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = manifest
deps["cache"].is_index_file.return_value = False
response = client.head("/v2/docker-test/library/nginx/manifests/latest")
assert response.status_code == 200
assert response.content == b""
def test_cache_miss_calls_upstream_fetch(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps(
{
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}
).encode()
deps["storage"].exists.return_value = False
deps["storage"].download_object.return_value = manifest
deps["cache"].is_index_file.return_value = True
with patch(
"artifactapi.main.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached"},
) as mock_fetch:
response = client.get("/v2/docker-test/library/nginx/manifests/latest")
mock_fetch.assert_called_once()
assert response.status_code == 200
def test_cache_miss_on_index_marks_index_cached(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps(
{
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}
).encode()
deps["storage"].exists.return_value = False
deps["storage"].download_object.return_value = manifest
deps["cache"].is_index_file.return_value = True
with patch(
"artifactapi.main.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached"},
):
response = client.get("/v2/docker-test/library/nginx/manifests/latest")
assert response.status_code == 200
deps["cache"].mark_index_cached.assert_called_once()
def test_index_expired_triggers_refetch(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps(
{
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}
).encode()
deps["storage"].exists.return_value = True # cached in S3
deps["cache"].is_index_file.return_value = True
deps["cache"].is_index_valid.return_value = False # but TTL expired
deps["storage"].download_object.return_value = manifest
with patch(
"artifactapi.main.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached"},
) as mock_fetch:
response = client.get("/v2/docker-test/library/nginx/manifests/latest")
mock_fetch.assert_called_once()
assert response.status_code == 200
# ---------------------------------------------------------------------------
# Generic artifact route /api/v1/remote/{remote}/{path}
# ---------------------------------------------------------------------------
class TestGenericArtifactRoute:
def test_unknown_remote_returns_404(self, client, patched_deps):
response = client.get("/api/v1/remote/nonexistent/path/to/file.tar.gz")
assert response.status_code == 404
def test_pattern_blocked_returns_403(self, client, patched_deps):
# generic-test only allows .tar.gz
response = client.get("/api/v1/remote/generic-test/some/path/file.rpm")
assert response.status_code == 403
def test_cache_hit_returns_200_with_source_header(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"tar content"
deps["cache"].is_index_file.return_value = False
response = client.get("/api/v1/remote/generic-test/some/path/archive.tar.gz")
assert response.status_code == 200
assert response.headers["X-Artifact-Source"] == "cache"
assert response.content == b"tar content"
def test_cache_hit_sets_content_disposition(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"content"
deps["cache"].is_index_file.return_value = False
response = client.get("/api/v1/remote/generic-test/some/path/archive.tar.gz")
disposition = response.headers["content-disposition"]
assert "attachment" in disposition
assert "archive.tar.gz" in disposition
def test_cache_hit_sets_artifact_size_header(self, client, patched_deps):
deps = patched_deps
content = b"some artifact content bytes"
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = content
deps["cache"].is_index_file.return_value = False
response = client.get("/api/v1/remote/generic-test/some/path/archive.tar.gz")
assert response.headers["X-Artifact-Size"] == str(len(content))
def test_cache_hit_records_metrics(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"content"
deps["cache"].is_index_file.return_value = False
client.get("/api/v1/remote/generic-test/some/path/archive.tar.gz")
deps["metrics"].record_cache_hit.assert_called_once_with("generic-test", ANY)
def test_cache_hit_records_artifact_mapping(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"content"
deps["cache"].is_index_file.return_value = False
client.get("/api/v1/remote/generic-test/some/path/archive.tar.gz")
deps["database"].record_artifact_mapping.assert_called_once()
def test_cache_hit_rpm_returns_correct_content_type(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"rpm bytes"
deps["cache"].is_index_file.return_value = False
response = client.get("/api/v1/remote/rpm-test/almalinux/9/x86_64/bash-5.1.8.x86_64.rpm")
assert response.status_code == 200
assert "application/x-rpm" in response.headers["content-type"]
def test_cache_hit_xml_returns_correct_content_type(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"<?xml version='1.0'?>"
deps["cache"].is_index_file.return_value = False
response = client.get("/api/v1/remote/rpm-test/repo/repodata/primary.xml")
assert response.status_code == 200
assert "application/xml" in response.headers["content-type"]
def test_cache_miss_fetches_upstream_and_returns_200(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = False
deps["storage"].download_object.return_value = b"fresh content"
deps["cache"].is_index_file.return_value = False
with patch(
"artifactapi.main.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached"},
) as mock_fetch:
response = client.get("/api/v1/remote/generic-test/some/path/archive.tar.gz")
mock_fetch.assert_called_once()
assert response.status_code == 200
assert response.headers["X-Artifact-Source"] == "remote"
def test_cache_miss_records_metrics(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = False
deps["storage"].download_object.return_value = b"fresh content"
deps["cache"].is_index_file.return_value = False
with patch(
"artifactapi.main.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached"},
):
client.get("/api/v1/remote/generic-test/some/path/archive.tar.gz")
deps["metrics"].record_cache_miss.assert_called_once_with("generic-test", ANY)
def test_cache_miss_on_index_marks_index_cached(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = False
deps["storage"].download_object.return_value = b"APKINDEX content"
deps["cache"].is_index_file.return_value = True
with patch(
"artifactapi.main.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached"},
):
response = client.get("/api/v1/remote/alpine-test/alpine/v3.18/x86_64/APKINDEX.tar.gz")
assert response.status_code == 200
deps["cache"].mark_index_cached.assert_called_once()
def test_upstream_error_returns_502(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = False
deps["cache"].is_index_file.return_value = False
with patch(
"artifactapi.main.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "error", "error": "upstream unreachable"},
):
response = client.get("/api/v1/remote/generic-test/some/path/archive.tar.gz")
assert response.status_code == 502
def test_index_file_bypasses_include_patterns(self, client, patched_deps):
"""Index files must be served even when they don't match include_patterns."""
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"APKINDEX content"
deps["cache"].is_index_file.return_value = True
deps["cache"].is_index_valid.return_value = True
# APKINDEX.tar.gz does not match alpine-test's include_patterns (.*.apk$),
# but since is_index_file returns True it must be allowed through.
response = client.get("/api/v1/remote/alpine-test/alpine/v3.18/x86_64/APKINDEX.tar.gz")
assert response.status_code == 200
def test_local_repo_file_not_found_returns_404(self, client, patched_deps):
deps = patched_deps
deps["database"].get_local_file_metadata.return_value = None
deps["database"].available = True
response = client.get("/api/v1/remote/local-test/path/to/nonexistent.bin")
assert response.status_code == 404
# ---------------------------------------------------------------------------
# Upload route PUT /api/v1/remote/{remote}/{path}
# ---------------------------------------------------------------------------
class TestUploadRoute:
def test_unknown_remote_returns_404(self, client, patched_deps):
response = client.put(
"/api/v1/remote/nonexistent/path/to/file.tar.gz",
files={"file": ("file.tar.gz", b"content", "application/octet-stream")},
)
assert response.status_code == 404
def test_non_local_remote_returns_400(self, client, patched_deps):
response = client.put(
"/api/v1/remote/generic-test/path/to/file.tar.gz",
files={"file": ("file.tar.gz", b"content", "application/octet-stream")},
)
assert response.status_code == 400
# ---------------------------------------------------------------------------
# HEAD route HEAD /api/v1/remote/{remote}/{path}
# ---------------------------------------------------------------------------
class TestHeadRoute:
def test_non_local_remote_returns_405(self, client, patched_deps):
response = client.head("/api/v1/remote/generic-test/path/to/file.tar.gz")
assert response.status_code == 405
def test_local_repo_file_not_found_returns_404(self, client, patched_deps):
deps = patched_deps
deps["database"].get_local_file_metadata.return_value = None
deps["database"].available = True
response = client.head("/api/v1/remote/local-test/path/to/nonexistent.bin")
assert response.status_code == 404
def test_unknown_remote_returns_404(self, client, patched_deps):
response = client.head("/api/v1/remote/nonexistent/path/to/file.bin")
assert response.status_code == 404
# ---------------------------------------------------------------------------
# DELETE route DELETE /api/v1/remote/{remote}/{path}
# ---------------------------------------------------------------------------
class TestDeleteRoute:
def test_unknown_remote_returns_404(self, client, patched_deps):
response = client.delete("/api/v1/remote/nonexistent/path/to/file.tar.gz")
assert response.status_code == 404
def test_non_local_remote_returns_400(self, client, patched_deps):
response = client.delete("/api/v1/remote/generic-test/path/to/file.tar.gz")
assert response.status_code == 400
# ---------------------------------------------------------------------------
# Cache flush PUT /cache/flush
# ---------------------------------------------------------------------------
class TestCacheFlushEndpoint:
def test_flush_all_returns_flushed_structure(self, client, patched_deps):
deps = patched_deps
deps["cache"].available = False
deps["storage"].client.list_objects_v2.return_value = {}
response = client.put("/cache/flush")
assert response.status_code == 200
data = response.json()
assert "flushed" in data
assert "redis_keys" in data["flushed"]
assert "s3_objects" in data["flushed"]
def test_flush_specific_remote_echoes_remote(self, client, patched_deps):
deps = patched_deps
deps["cache"].available = False
deps["storage"].client.list_objects_v2.return_value = {}
response = client.put("/cache/flush?remote=alpine-test")
assert response.status_code == 200
assert response.json()["remote"] == "alpine-test"
def test_flush_all_deletes_redis_keys_when_cache_available(self, client, patched_deps):
deps = patched_deps
deps["cache"].available = True
redis_mock = MagicMock()
deps["cache"].client = redis_mock
# First pattern (index:*) returns keys; subsequent pattern returns nothing
redis_mock.keys.side_effect = [["index:test:abc", "index:test:def"], []]
deps["storage"].client.list_objects_v2.return_value = {}
response = client.put("/cache/flush")
assert response.status_code == 200
data = response.json()
assert data["flushed"]["redis_keys"] == 2
redis_mock.delete.assert_called_once_with("index:test:abc", "index:test:def")
# ---------------------------------------------------------------------------
# Metrics endpoint GET /metrics
# ---------------------------------------------------------------------------
class TestMetricsEndpoint:
def test_returns_prometheus_text_by_default(self, client, patched_deps):
response = client.get("/metrics")
assert response.status_code == 200
assert response.headers["content-type"].startswith("text/plain")
# ---------------------------------------------------------------------------
# Config endpoint GET /config
# ---------------------------------------------------------------------------
class TestConfigEndpoint:
def test_returns_config_with_remotes(self, client):
response = client.get("/config")
assert response.status_code == 200
data = response.json()
assert "remotes" in data
assert "alpine-test" in data["remotes"]
+132
View File
@@ -0,0 +1,132 @@
"""Tests for S3Storage: get_object_key (pure logic) and I/O methods."""
import hashlib
from unittest.mock import MagicMock, patch
import pytest
from botocore.exceptions import ClientError
from fastapi import HTTPException
from artifactapi.storage import S3Storage
@pytest.fixture
def storage():
"""S3Storage with a mocked boto3 client."""
with patch("boto3.client", return_value=MagicMock()):
s = S3Storage(
endpoint="localhost:9000",
access_key="testkey",
secret_key="testsecret",
bucket="testbucket",
secure=False,
)
s.client = MagicMock()
return s
# ---------------------------------------------------------------------------
# get_object_key
# ---------------------------------------------------------------------------
class TestGetObjectKey:
def test_key_has_three_part_structure(self, storage):
# remote / hash-segment / filename
key = storage.get_object_key("myremote", "some/path/to/file.rpm")
parts = key.split("/")
assert len(parts) == 3
assert parts[0] == "myremote"
assert parts[2] == "file.rpm"
assert len(parts[1]) == 16 # SHA-256 hex truncated to 16 chars
def test_key_uses_sha256_of_directory_path(self, storage):
# Pin the hash algorithm, truncation length, and format in one assertion
key = storage.get_object_key("myremote", "some/path/to/file.rpm")
expected_hash = hashlib.sha256(b"some/path/to").hexdigest()[:16]
assert key == f"myremote/{expected_hash}/file.rpm"
def test_different_remotes_give_different_keys(self, storage):
k1 = storage.get_object_key("remote-a", "path/to/file.rpm")
k2 = storage.get_object_key("remote-b", "path/to/file.rpm")
assert k1 != k2
def test_different_directories_give_different_keys(self, storage):
k1 = storage.get_object_key("myremote", "path/version-1/file.rpm")
k2 = storage.get_object_key("myremote", "path/version-2/file.rpm")
assert k1 != k2
assert k1.split("/")[-1] == k2.split("/")[-1] == "file.rpm"
def test_leading_slash_stripped(self, storage):
k1 = storage.get_object_key("myremote", "/path/to/file.rpm")
k2 = storage.get_object_key("myremote", "path/to/file.rpm")
assert k1 == k2
def test_file_with_no_directory(self, storage):
key = storage.get_object_key("myremote", "file.rpm")
assert key == "myremote/file.rpm"
def test_docker_blob_uses_digest_path(self, storage):
digest = "a" * 64 # realistic 64-char SHA-256 hex string
path = f"library/nginx/blobs/sha256:{digest}"
key = storage.get_object_key("dockerhub", path)
assert key == f"dockerhub/blobs/sha256/{digest}"
def test_docker_blob_deduplication_across_images(self, storage):
"""Same blob digest pulled from different images maps to the same S3 key."""
digest = "deadbeef" * 8 # 64-char hex
k1 = storage.get_object_key("dockerhub", f"library/nginx/blobs/sha256:{digest}")
k2 = storage.get_object_key("dockerhub", f"library/ubuntu/blobs/sha256:{digest}")
assert k1 == k2
def test_docker_blob_different_digests_different_keys(self, storage):
k1 = storage.get_object_key("dockerhub", "library/nginx/blobs/sha256:" + "a" * 64)
k2 = storage.get_object_key("dockerhub", "library/nginx/blobs/sha256:" + "b" * 64)
assert k1 != k2
def test_docker_blob_different_remotes_different_keys(self, storage):
digest = "abc" * 21 + "d" # 64-char hex
k1 = storage.get_object_key("remote-a", f"library/nginx/blobs/sha256:{digest}")
k2 = storage.get_object_key("remote-b", f"library/nginx/blobs/sha256:{digest}")
assert k1 != k2
# ---------------------------------------------------------------------------
# get_url
# ---------------------------------------------------------------------------
class TestGetUrl:
def test_returns_http_url_for_insecure_endpoint(self, storage):
url = storage.get_url("myremote/abc123/file.rpm")
assert url == "http://localhost:9000/testbucket/myremote/abc123/file.rpm"
def test_returns_http_url_for_secure_storage(self):
with patch("boto3.client", return_value=MagicMock()):
s = S3Storage(endpoint="s3.example.com", access_key="k", secret_key="s", bucket="b", secure=True)
s.client = MagicMock()
# get_url uses http:// always (direct internal access address, not the S3 protocol)
assert s.get_url("path/to/file.rpm") == "http://s3.example.com/b/path/to/file.rpm"
# ---------------------------------------------------------------------------
# upload / download_object
# ---------------------------------------------------------------------------
class TestUpload:
def test_upload_returns_s3_uri(self, storage):
storage.client.put_object.return_value = {}
result = storage.upload("myremote/abc123/file.rpm", b"content")
assert result == "s3://testbucket/myremote/abc123/file.rpm"
class TestDownloadObject:
def test_download_object_raises_404_on_client_error(self, storage):
storage.client.get_object.side_effect = ClientError(
{"Error": {"Code": "NoSuchKey", "Message": "The specified key does not exist"}},
"GetObject",
)
with pytest.raises(HTTPException) as exc_info:
storage.download_object("nonexistent/key")
assert exc_info.value.status_code == 404
+8
View File
@@ -0,0 +1,8 @@
[tox]
envlist = py311
isolated_build = true
[testenv]
extras = dev
commands =
pytest {posargs:tests}