tests: resolve all peer-review issues across test suite

Address every substantive critique from the peer review:

test_cache: replace tautological same-inputs key test with hardcoded
hash assertion; assert setex call + TTL in mark_index_cached test;
assert client is None for unavailable no-op; rename Packages.gz test
to document intentional behaviour; add alpine sig/tmp negatives; add
hyphenated and date-tag docker positive cases; add key hash-length
assertion.

test_config: replace live-constant comparisons with literal string
assertions for alpine/rpm/docker; add unknown package type test;
add dict-keyed repositories branch coverage (per-repo override and
fallback); fix cache config to full equality check; add explicit empty
index_patterns test.

test_docker_auth: fix case-insensitive test to verify realm value;
add field-order (scope before service) limitation test; add pipe-char
collision documentation test; add missing fetch_token edge cases
(no token field, HTTPStatusError, missing expires_in default 300);
replace rubber-stamp delegate test with end-to-end parse→fetch test.

test_storage: replace split prefix/suffix assertions with structural
3-part check + pinned sha256 assertion; fix Docker blob digests to
64-char hex; add secure=True URL test; add upload return value test;
add download_object 404-on-ClientError test; remove redundant subset
test.

test_routes: add metrics.record_cache_hit/miss assertions; add
mark_index_cached assertion after cache miss on index (docker + generic);
add Content-Disposition, X-Artifact-Size header checks; add rpm/xml
content-type tests; add flush test that verifies Redis keys are deleted
when cache is available; add smoke coverage for upload (PUT), HEAD, DELETE,
/metrics, and /config routes.
This commit is contained in:
2026-04-25 19:58:33 +10:00
parent 3a13d76f7e
commit 8da43e610e
6 changed files with 608 additions and 164 deletions
+17 -13
View File
@@ -5,6 +5,7 @@ Module-level setup (env vars + connection patches) runs before any test
module is imported, so the FastAPI app initialises against mocks rather
than real S3 / Redis / PostgreSQL services.
"""
import os
import tempfile
from unittest.mock import MagicMock, patch
@@ -75,19 +76,21 @@ _config_path = os.path.join(_tmpdir, "remotes.yaml")
with open(_config_path, "w") as _f:
yaml.dump(TEST_REMOTES, _f)
os.environ.update({
"CONFIG_PATH": _config_path,
"MINIO_ENDPOINT": "localhost:9000",
"MINIO_ACCESS_KEY": "testkey",
"MINIO_SECRET_KEY": "testsecret",
"MINIO_BUCKET": "testbucket",
"REDIS_URL": "redis://localhost:6379/0",
"DBHOST": "localhost",
"DBPORT": "5432",
"DBUSER": "test",
"DBPASS": "test",
"DBNAME": "test",
})
os.environ.update(
{
"CONFIG_PATH": _config_path,
"MINIO_ENDPOINT": "localhost:9000",
"MINIO_ACCESS_KEY": "testkey",
"MINIO_SECRET_KEY": "testsecret",
"MINIO_BUCKET": "testbucket",
"REDIS_URL": "redis://localhost:6379/0",
"DBHOST": "localhost",
"DBPORT": "5432",
"DBUSER": "test",
"DBPASS": "test",
"DBNAME": "test",
}
)
# Patch external service connections before the package is imported.
# These stay active for the whole session (process exits after tests finish).
@@ -109,6 +112,7 @@ from fastapi.testclient import TestClient # noqa: E402
@pytest.fixture(scope="session")
def app():
from artifactapi.main import app as fastapi_app
return fastapi_app
+52 -8
View File
@@ -1,5 +1,7 @@
"""Tests for RedisCache, focusing on is_index_file with configurable patterns."""
from unittest.mock import MagicMock, patch
import hashlib
from unittest.mock import ANY, MagicMock, patch
import pytest
@@ -39,6 +41,7 @@ def cache_with_redis(mock_redis_client):
# is_index_file — alpine patterns
# ---------------------------------------------------------------------------
class TestIsIndexFileAlpine:
def test_apkindex_tarball_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["alpine"]
@@ -56,11 +59,21 @@ class TestIsIndexFileAlpine:
patterns = _PACKAGE_INDEX_PATTERNS["alpine"]
assert not bare_cache.is_index_file("some/path/archive.tar.gz", patterns)
def test_apkindex_signature_file_is_not_index(self, bare_cache):
# Signature file adjacent to the index should not be treated as an index
patterns = _PACKAGE_INDEX_PATTERNS["alpine"]
assert not bare_cache.is_index_file("alpine/v3.18/x86_64/APKINDEX.tar.gz.sig", patterns)
def test_apkindex_tmp_file_is_not_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["alpine"]
assert not bare_cache.is_index_file("alpine/v3.18/x86_64/APKINDEX.tar.gz.tmp", patterns)
# ---------------------------------------------------------------------------
# is_index_file — rpm patterns
# ---------------------------------------------------------------------------
class TestIsIndexFileRpm:
def test_repomd_xml_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["rpm"]
@@ -82,7 +95,10 @@ class TestIsIndexFileRpm:
patterns = _PACKAGE_INDEX_PATTERNS["rpm"]
assert bare_cache.is_index_file("repo/repodata/comps.yaml.xz", patterns)
def test_packages_gz_is_index(self, bare_cache):
def test_packages_gz_pattern_matches_any_path(self, bare_cache):
# The Packages.gz$ regex is a carryover from the original hardcoded logic and
# deliberately matches any path ending in Packages.gz — including Debian-style paths.
# This test documents that intentional behaviour.
patterns = _PACKAGE_INDEX_PATTERNS["rpm"]
assert bare_cache.is_index_file("debian/dists/stable/main/binary-amd64/Packages.gz", patterns)
@@ -99,6 +115,7 @@ class TestIsIndexFileRpm:
# is_index_file — docker patterns
# ---------------------------------------------------------------------------
class TestIsIndexFileDocker:
def test_tag_manifest_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["docker"]
@@ -108,6 +125,14 @@ class TestIsIndexFileDocker:
patterns = _PACKAGE_INDEX_PATTERNS["docker"]
assert bare_cache.is_index_file("library/nginx/manifests/1.25.3", patterns)
def test_hyphenated_tag_manifest_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["docker"]
assert bare_cache.is_index_file("library/nginx/manifests/latest-rc", patterns)
def test_numeric_date_tag_manifest_is_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["docker"]
assert bare_cache.is_index_file("library/nginx/manifests/20240101", patterns)
def test_digest_manifest_is_not_index(self, bare_cache):
patterns = _PACKAGE_INDEX_PATTERNS["docker"]
digest = "sha256:" + "a" * 64
@@ -126,6 +151,7 @@ class TestIsIndexFileDocker:
# is_index_file — edge cases
# ---------------------------------------------------------------------------
class TestIsIndexFileEdgeCases:
def test_empty_patterns_nothing_is_index(self, bare_cache):
assert not bare_cache.is_index_file("APKINDEX.tar.gz", [])
@@ -151,11 +177,15 @@ class TestIsIndexFileEdgeCases:
# get_index_cache_key
# ---------------------------------------------------------------------------
class TestGetIndexCacheKey:
def test_same_inputs_produce_same_key(self, bare_cache):
k1 = bare_cache.get_index_cache_key("alpine-test", "alpine/v3.18/x86_64/APKINDEX.tar.gz")
k2 = bare_cache.get_index_cache_key("alpine-test", "alpine/v3.18/x86_64/APKINDEX.tar.gz")
assert k1 == k2
def test_key_format_is_deterministic(self, bare_cache):
# Assert against a pre-computed value to pin the hash algorithm,
# truncation length, and format string in one assertion.
path = "alpine/v3.18/x86_64/APKINDEX.tar.gz"
expected_hash = hashlib.sha256(path.encode()).hexdigest()[:16]
key = bare_cache.get_index_cache_key("alpine-test", path)
assert key == f"index:alpine-test:{expected_hash}"
def test_different_paths_produce_different_keys(self, bare_cache):
k1 = bare_cache.get_index_cache_key("alpine-test", "alpine/v3.18/x86_64/APKINDEX.tar.gz")
@@ -171,15 +201,27 @@ class TestGetIndexCacheKey:
key = bare_cache.get_index_cache_key("myremote", "some/path")
assert key.startswith("index:myremote:")
def test_key_hash_segment_is_16_chars(self, bare_cache):
key = bare_cache.get_index_cache_key("myremote", "some/path/file.xml")
# Format: index:<remote>:<16-char hash> — the fixed length matters for key-space hygiene
parts = key.split(":")
assert len(parts) == 3
assert len(parts[2]) == 16
# ---------------------------------------------------------------------------
# mark_index_cached / is_index_valid
# ---------------------------------------------------------------------------
class TestIndexValidity:
def test_marked_then_valid(self, cache_with_redis, mock_redis_client):
mock_redis_client.exists.return_value = 1
def test_mark_index_cached_calls_setex_with_correct_ttl(self, cache_with_redis, mock_redis_client):
cache_with_redis.mark_index_cached("remote", "path/APKINDEX.tar.gz", 300)
expected_key = cache_with_redis.get_index_cache_key("remote", "path/APKINDEX.tar.gz")
mock_redis_client.setex.assert_called_once_with(expected_key, 300, ANY)
def test_present_key_is_valid(self, cache_with_redis, mock_redis_client):
mock_redis_client.exists.return_value = 1
assert cache_with_redis.is_index_valid("remote", "path/APKINDEX.tar.gz")
def test_missing_key_is_not_valid(self, cache_with_redis, mock_redis_client):
@@ -190,4 +232,6 @@ class TestIndexValidity:
assert not unavailable_cache.is_index_valid("remote", "some/path")
def test_mark_cached_no_op_when_unavailable(self, unavailable_cache):
# client is None when Redis is unavailable — setex cannot be called
assert unavailable_cache.client is None
unavailable_cache.mark_index_cached("remote", "some/path", 300) # must not raise
+142 -67
View File
@@ -1,19 +1,22 @@
"""Tests for ConfigManager, focusing on get_index_patterns (new logic)."""
import os
import pytest
import yaml
from artifactapi.config import _PACKAGE_INDEX_PATTERNS, ConfigManager
from artifactapi.config import ConfigManager
@pytest.fixture
def make_config(tmp_path):
"""Factory: write a remotes dict to a temp YAML and return a ConfigManager."""
def _make(remotes_dict):
cfg_file = tmp_path / "remotes.yaml"
cfg_file.write_text(yaml.dump({"remotes": remotes_dict}))
return ConfigManager(str(cfg_file))
return _make
@@ -21,18 +24,25 @@ def make_config(tmp_path):
# get_index_patterns
# ---------------------------------------------------------------------------
class TestGetIndexPatterns:
def test_alpine_returns_package_defaults(self, make_config):
cfg = make_config({"r": {"type": "remote", "package": "alpine", "base_url": "https://x.com"}})
assert cfg.get_index_patterns("r") == _PACKAGE_INDEX_PATTERNS["alpine"]
patterns = cfg.get_index_patterns("r")
# Assert against literal strings, not the live constant, so a rename doesn't mask a regression
assert r"APKINDEX\.tar\.gz$" in patterns
def test_rpm_returns_package_defaults(self, make_config):
cfg = make_config({"r": {"type": "remote", "package": "rpm", "base_url": "https://x.com"}})
assert cfg.get_index_patterns("r") == _PACKAGE_INDEX_PATTERNS["rpm"]
patterns = cfg.get_index_patterns("r")
assert r"repomd\.xml$" in patterns
assert any("repodata" in p for p in patterns)
def test_docker_returns_package_defaults(self, make_config):
cfg = make_config({"r": {"type": "remote", "package": "docker", "base_url": "https://x.com"}})
assert cfg.get_index_patterns("r") == _PACKAGE_INDEX_PATTERNS["docker"]
patterns = cfg.get_index_patterns("r")
assert any("manifests" in p for p in patterns)
assert any("tags/list" in p for p in patterns)
def test_generic_returns_empty_list(self, make_config):
cfg = make_config({"r": {"type": "remote", "package": "generic", "base_url": "https://x.com"}})
@@ -46,74 +56,103 @@ class TestGetIndexPatterns:
cfg = make_config({"r": {"type": "remote", "base_url": "https://x.com"}})
assert cfg.get_index_patterns("r") == []
def test_unknown_package_type_returns_empty_list(self, make_config):
# A mis-spelled package type silently returns [] — this is a known footgun
cfg = make_config({"r": {"type": "remote", "package": "deb", "base_url": "https://x.com"}})
assert cfg.get_index_patterns("r") == []
def test_extra_patterns_appended_after_defaults(self, make_config):
cfg = make_config({
"r": {
"type": "remote",
"package": "alpine",
"base_url": "https://x.com",
"index_patterns": ["custom\\.json$"],
cfg = make_config(
{
"r": {
"type": "remote",
"package": "alpine",
"base_url": "https://x.com",
"index_patterns": [r"custom\.json$"],
}
}
})
)
patterns = cfg.get_index_patterns("r")
defaults = _PACKAGE_INDEX_PATTERNS["alpine"]
assert patterns[: len(defaults)] == defaults
assert "custom\\.json$" in patterns
assert r"APKINDEX\.tar\.gz$" in patterns
assert r"custom\.json$" in patterns
# Defaults come first
assert patterns.index(r"APKINDEX\.tar\.gz$") < patterns.index(r"custom\.json$")
def test_explicit_empty_extra_patterns_returns_defaults(self, make_config):
cfg = make_config(
{
"r": {
"type": "remote",
"package": "alpine",
"base_url": "https://x.com",
"index_patterns": [],
}
}
)
assert r"APKINDEX\.tar\.gz$" in cfg.get_index_patterns("r")
def test_duplicate_extra_pattern_not_added_twice(self, make_config):
existing = _PACKAGE_INDEX_PATTERNS["alpine"][0]
cfg = make_config({
"r": {
"type": "remote",
"package": "alpine",
"base_url": "https://x.com",
"index_patterns": [existing],
existing = r"APKINDEX\.tar\.gz$"
cfg = make_config(
{
"r": {
"type": "remote",
"package": "alpine",
"base_url": "https://x.com",
"index_patterns": [existing],
}
}
})
)
patterns = cfg.get_index_patterns("r")
assert patterns.count(existing) == 1
def test_generic_with_only_extra_patterns(self, make_config):
cfg = make_config({
"r": {
"type": "remote",
"package": "generic",
"base_url": "https://x.com",
"index_patterns": ["meta\\.json$", "index\\.yaml$"],
cfg = make_config(
{
"r": {
"type": "remote",
"package": "generic",
"base_url": "https://x.com",
"index_patterns": [r"meta\.json$", r"index\.yaml$"],
}
}
})
assert cfg.get_index_patterns("r") == ["meta\\.json$", "index\\.yaml$"]
)
assert cfg.get_index_patterns("r") == [r"meta\.json$", r"index\.yaml$"]
def test_rpm_extra_patterns_merged(self, make_config):
cfg = make_config({
"r": {
"type": "remote",
"package": "rpm",
"base_url": "https://x.com",
"index_patterns": ["custom-meta\\.xml$"],
cfg = make_config(
{
"r": {
"type": "remote",
"package": "rpm",
"base_url": "https://x.com",
"index_patterns": [r"custom-meta\.xml$"],
}
}
})
)
patterns = cfg.get_index_patterns("r")
for default in _PACKAGE_INDEX_PATTERNS["rpm"]:
assert default in patterns
assert "custom-meta\\.xml$" in patterns
assert r"repomd\.xml$" in patterns
assert r"custom-meta\.xml$" in patterns
# ---------------------------------------------------------------------------
# get_repository_patterns
# ---------------------------------------------------------------------------
class TestGetRepositoryPatterns:
def test_returns_include_patterns(self, make_config):
cfg = make_config({
"r": {
"type": "remote",
"package": "generic",
"base_url": "https://x.com",
"include_patterns": [".*\\.tar\\.gz$"],
cfg = make_config(
{
"r": {
"type": "remote",
"package": "generic",
"base_url": "https://x.com",
"include_patterns": [r".*\.tar\.gz$"],
}
}
})
assert cfg.get_repository_patterns("r", "") == [".*\\.tar\\.gz$"]
)
assert cfg.get_repository_patterns("r", "") == [r".*\.tar\.gz$"]
def test_returns_empty_for_missing_remote(self, make_config):
cfg = make_config({})
@@ -124,35 +163,70 @@ class TestGetRepositoryPatterns:
assert cfg.get_repository_patterns("r", "") == []
def test_multiple_patterns_returned(self, make_config):
patterns = [".*\\.rpm$", ".*/repodata/.*$"]
cfg = make_config({
"r": {
"type": "remote",
"package": "rpm",
"base_url": "https://x.com",
"include_patterns": patterns,
patterns = [r".*\.rpm$", r".*/repodata/.*$"]
cfg = make_config(
{
"r": {
"type": "remote",
"package": "rpm",
"base_url": "https://x.com",
"include_patterns": patterns,
}
}
})
)
assert cfg.get_repository_patterns("r", "") == patterns
def test_dict_keyed_repositories_returns_per_repo_patterns(self, make_config):
cfg = make_config(
{
"r": {
"type": "remote",
"package": "generic",
"base_url": "https://x.com",
"include_patterns": [r".*\.tar\.gz$"],
"repositories": {
"/path/to/repo": {"include_patterns": [r".*\.rpm$"]},
},
}
}
)
assert cfg.get_repository_patterns("r", "/path/to/repo") == [r".*\.rpm$"]
def test_dict_keyed_repositories_falls_back_to_remote_patterns(self, make_config):
cfg = make_config(
{
"r": {
"type": "remote",
"package": "generic",
"base_url": "https://x.com",
"include_patterns": [r".*\.tar\.gz$"],
"repositories": {
"/path/to/repo": {"include_patterns": [r".*\.rpm$"]},
},
}
}
)
assert cfg.get_repository_patterns("r", "/unknown/path") == [r".*\.tar\.gz$"]
# ---------------------------------------------------------------------------
# get_cache_config
# ---------------------------------------------------------------------------
class TestGetCacheConfig:
def test_returns_cache_section(self, make_config):
cfg = make_config({
"r": {
"type": "remote",
"package": "generic",
"base_url": "https://x.com",
"cache": {"file_ttl": 0, "index_ttl": 7200},
cfg = make_config(
{
"r": {
"type": "remote",
"package": "generic",
"base_url": "https://x.com",
"cache": {"file_ttl": 0, "index_ttl": 7200},
}
}
})
result = cfg.get_cache_config("r")
assert result["index_ttl"] == 7200
assert result["file_ttl"] == 0
)
assert cfg.get_cache_config("r") == {"file_ttl": 0, "index_ttl": 7200}
def test_returns_empty_dict_for_missing_remote(self, make_config):
cfg = make_config({})
@@ -167,6 +241,7 @@ class TestGetCacheConfig:
# Config file reload
# ---------------------------------------------------------------------------
class TestConfigReload:
def test_reloads_when_file_mtime_advances(self, tmp_path):
cfg_file = tmp_path / "remotes.yaml"
@@ -188,7 +263,7 @@ class TestConfigReload:
cfg_file.write_text(yaml.dump({"remotes": {"repo-a": {"type": "remote", "package": "generic", "base_url": "https://x.com"}}}))
cfg = ConfigManager(str(cfg_file))
# Call check_reload without touching the file should not reload
# Call check_reload without touching the file should not reload
cfg._check_reload()
assert "repo-a" in cfg.config["remotes"]
+73 -23
View File
@@ -1,7 +1,9 @@
"""Tests for docker_auth: WWW-Authenticate parsing and token caching."""
import time
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from artifactapi import docker_auth
@@ -27,13 +29,10 @@ def clear_token_cache():
# parse_www_authenticate
# ---------------------------------------------------------------------------
class TestParseWwwAuthenticate:
def test_full_bearer_header(self):
header = (
'Bearer realm="https://auth.docker.io/token"'
',service="registry.docker.io"'
',scope="repository:library/nginx:pull"'
)
header = 'Bearer realm="https://auth.docker.io/token",service="registry.docker.io",scope="repository:library/nginx:pull"'
result = parse_www_authenticate(header)
assert result is not None
realm, service, scope = result
@@ -64,15 +63,30 @@ class TestParseWwwAuthenticate:
def test_empty_header_returns_none(self):
assert parse_www_authenticate("") is None
def test_case_insensitive_bearer(self):
def test_case_insensitive_bearer_parses_realm(self):
header = 'bearer realm="https://auth.example.com/token"'
assert parse_www_authenticate(header) is not None
result = parse_www_authenticate(header)
assert result is not None
realm, _, _ = result
assert realm == "https://auth.example.com/token"
def test_field_order_scope_before_service_drops_service(self):
# The regex requires realm,service,scope order; scope before service
# results in service being silently dropped. This test documents the known limitation.
header = 'Bearer realm="https://auth.example.com",scope="repo:pull",service="svc"'
result = parse_www_authenticate(header)
assert result is not None
realm, service, scope = result
assert realm == "https://auth.example.com"
assert scope == "repo:pull"
assert service == "" # silently dropped when out of order
# ---------------------------------------------------------------------------
# _cache_key
# ---------------------------------------------------------------------------
class TestCacheKey:
def test_key_contains_all_components(self):
key = _cache_key("https://realm.com", "svc", "scope", "user")
@@ -95,11 +109,21 @@ class TestCacheKey:
k2 = _cache_key("realm", "svc", "scope:write", None)
assert k1 != k2
def test_pipe_in_field_value_can_collide_with_adjacent_fields(self):
# The "|" separator is not escaped, so a pipe embedded in one field
# produces the same key as the same pipe appearing as a separator boundary.
# This is a known limitation: _cache_key("a|b","c","d",None) ==
# _cache_key("a","b|c","d",None). Documents the behaviour, not a claim it's correct.
k1 = _cache_key("a|b", "c", "d", None)
k2 = _cache_key("a", "b|c", "d", None)
assert k1 == k2
# ---------------------------------------------------------------------------
# _get_cached_token / _store_token
# ---------------------------------------------------------------------------
class TestTokenCaching:
def test_get_returns_none_when_not_cached(self):
assert _get_cached_token("no-such-key") is None
@@ -135,6 +159,7 @@ class TestTokenCaching:
# fetch_token (async, mocks httpx)
# ---------------------------------------------------------------------------
def _make_mock_http_client(token_payload: dict):
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
@@ -162,6 +187,23 @@ class TestFetchToken:
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token == "access-tok"
async def test_returns_none_when_response_missing_token_field(self):
ctx, _ = _make_mock_http_client({"not_token": "value", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token is None
async def test_defaults_expires_in_to_300_when_missing(self):
ctx, _ = _make_mock_http_client({"token": "tok"}) # no expires_in key
before = time.time()
with patch("httpx.AsyncClient", return_value=ctx):
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token == "tok"
key = _cache_key("https://auth.example.com", "svc", "scope", None)
_, expires_at = docker_auth._token_cache[key]
# Default expires_in=300, stored as time.time() + max(300-30, 10) = 270
assert before + 268 <= expires_at <= before + 272
async def test_uses_cache_on_second_call_without_http(self):
ctx, mock_client = _make_mock_http_client({"token": "cached-tok", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
@@ -171,7 +213,7 @@ class TestFetchToken:
mock_client.get.assert_not_called()
assert token == "cached-tok"
async def test_returns_none_on_http_error(self):
async def test_returns_none_on_network_error(self):
mock_client = AsyncMock()
mock_client.get = AsyncMock(side_effect=Exception("connection refused"))
ctx = MagicMock()
@@ -181,6 +223,18 @@ class TestFetchToken:
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token is None
async def test_returns_none_on_http_status_error(self):
mock_response = MagicMock()
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError("401 Unauthorized", request=MagicMock(), response=MagicMock())
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
ctx = MagicMock()
ctx.__aenter__ = AsyncMock(return_value=mock_client)
ctx.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", return_value=ctx):
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token is None
async def test_passes_credentials_as_auth_tuple(self):
ctx, mock_client = _make_mock_http_client({"token": "authed-tok", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
@@ -200,24 +254,20 @@ class TestFetchToken:
# get_docker_token_for_response
# ---------------------------------------------------------------------------
class TestGetDockerTokenForResponse:
async def test_returns_none_for_non_bearer_header(self):
token = await get_docker_token_for_response('Basic realm="example"')
assert token is None
async def test_delegates_to_fetch_token(self):
header = (
'Bearer realm="https://auth.example.com"'
',service="svc"'
',scope="repo:pull"'
)
with patch(
"artifactapi.docker_auth.fetch_token",
new_callable=AsyncMock,
return_value="delegated-tok",
) as mock_fetch:
async def test_end_to_end_parse_and_fetch(self):
"""parse_www_authenticate → fetch_token wired together end-to-end."""
header = 'Bearer realm="https://auth.example.com",service="svc",scope="repo:pull"'
ctx, mock_client = _make_mock_http_client({"token": "e2e-tok", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
token = await get_docker_token_for_response(header, "user", "pass")
mock_fetch.assert_called_once_with(
"https://auth.example.com", "svc", "repo:pull", "user", "pass"
)
assert token == "delegated-tok"
assert token == "e2e-tok"
call_kwargs = mock_client.get.call_args.kwargs
assert call_kwargs["params"]["service"] == "svc"
assert call_kwargs["params"]["scope"] == "repo:pull"
assert call_kwargs["auth"] == ("user", "pass")
+270 -30
View File
@@ -1,7 +1,8 @@
"""FastAPI route tests using TestClient with mocked service dependencies."""
import hashlib
import json
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import ANY, AsyncMock, MagicMock, patch
import pytest
@@ -9,6 +10,7 @@ import pytest
# Per-test service mocks (replace module-level globals in main.py)
# ---------------------------------------------------------------------------
@pytest.fixture
def mock_storage():
m = MagicMock()
@@ -46,6 +48,7 @@ def mock_metrics():
def patched_deps(mock_storage, mock_cache, mock_database, mock_metrics):
"""Swap the module-level service instances in main.py for the duration of a test."""
import artifactapi.main as main_mod
with (
patch.object(main_mod, "storage", mock_storage),
patch.object(main_mod, "cache", mock_cache),
@@ -64,6 +67,7 @@ def patched_deps(mock_storage, mock_cache, mock_database, mock_metrics):
# Basic / health endpoints
# ---------------------------------------------------------------------------
class TestBasicEndpoints:
def test_root_returns_remote_list(self, client):
response = client.get("/")
@@ -93,6 +97,7 @@ class TestBasicEndpoints:
# Docker proxy /v2/{remote}/{path}
# ---------------------------------------------------------------------------
class TestDockerProxy:
def test_unknown_remote_returns_404(self, client, patched_deps):
response = client.get("/v2/no-such-remote/library/nginx/manifests/latest")
@@ -110,10 +115,12 @@ class TestDockerProxy:
def test_allowed_pattern_proceeds_to_cache(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps({
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}).encode()
manifest = json.dumps(
{
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}
).encode()
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = manifest
deps["cache"].is_index_file.return_value = True
@@ -124,11 +131,13 @@ class TestDockerProxy:
def test_cache_hit_manifest_returns_correct_content_type(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps({
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"schemaVersion": 2,
"layers": [],
}).encode()
manifest = json.dumps(
{
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"schemaVersion": 2,
"layers": [],
}
).encode()
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = manifest
deps["cache"].is_index_file.return_value = True
@@ -141,10 +150,12 @@ class TestDockerProxy:
def test_cache_hit_sets_docker_content_digest_header(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps({
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}).encode()
manifest = json.dumps(
{
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}
).encode()
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = manifest
deps["cache"].is_index_file.return_value = True
@@ -154,12 +165,24 @@ class TestDockerProxy:
expected = f"sha256:{hashlib.sha256(manifest).hexdigest()}"
assert response.headers["Docker-Content-Digest"] == expected
def test_cache_hit_records_metrics(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode()
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = manifest
deps["cache"].is_index_file.return_value = False
client.get("/v2/docker-test/library/nginx/manifests/latest")
deps["metrics"].record_cache_hit.assert_called_once_with("docker-test", ANY)
def test_head_request_returns_no_body(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps({
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}).encode()
manifest = json.dumps(
{
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}
).encode()
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = manifest
deps["cache"].is_index_file.return_value = False
@@ -170,10 +193,12 @@ class TestDockerProxy:
def test_cache_miss_calls_upstream_fetch(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps({
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}).encode()
manifest = json.dumps(
{
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}
).encode()
deps["storage"].exists.return_value = False
deps["storage"].download_object.return_value = manifest
deps["cache"].is_index_file.return_value = True
@@ -188,13 +213,37 @@ class TestDockerProxy:
mock_fetch.assert_called_once()
assert response.status_code == 200
def test_cache_miss_on_index_marks_index_cached(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps(
{
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}
).encode()
deps["storage"].exists.return_value = False
deps["storage"].download_object.return_value = manifest
deps["cache"].is_index_file.return_value = True
with patch(
"artifactapi.main.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached"},
):
response = client.get("/v2/docker-test/library/nginx/manifests/latest")
assert response.status_code == 200
deps["cache"].mark_index_cached.assert_called_once()
def test_index_expired_triggers_refetch(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps({
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}).encode()
deps["storage"].exists.return_value = True # cached in S3
manifest = json.dumps(
{
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"layers": [],
}
).encode()
deps["storage"].exists.return_value = True # cached in S3
deps["cache"].is_index_file.return_value = True
deps["cache"].is_index_valid.return_value = False # but TTL expired
deps["storage"].download_object.return_value = manifest
@@ -214,6 +263,7 @@ class TestDockerProxy:
# Generic artifact route /api/v1/remote/{remote}/{path}
# ---------------------------------------------------------------------------
class TestGenericArtifactRoute:
def test_unknown_remote_returns_404(self, client, patched_deps):
response = client.get("/api/v1/remote/nonexistent/path/to/file.tar.gz")
@@ -224,7 +274,7 @@ class TestGenericArtifactRoute:
response = client.get("/api/v1/remote/generic-test/some/path/file.rpm")
assert response.status_code == 403
def test_cache_hit_returns_200_with_header(self, client, patched_deps):
def test_cache_hit_returns_200_with_source_header(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"tar content"
@@ -235,6 +285,65 @@ class TestGenericArtifactRoute:
assert response.headers["X-Artifact-Source"] == "cache"
assert response.content == b"tar content"
def test_cache_hit_sets_content_disposition(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"content"
deps["cache"].is_index_file.return_value = False
response = client.get("/api/v1/remote/generic-test/some/path/archive.tar.gz")
disposition = response.headers["content-disposition"]
assert "attachment" in disposition
assert "archive.tar.gz" in disposition
def test_cache_hit_sets_artifact_size_header(self, client, patched_deps):
deps = patched_deps
content = b"some artifact content bytes"
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = content
deps["cache"].is_index_file.return_value = False
response = client.get("/api/v1/remote/generic-test/some/path/archive.tar.gz")
assert response.headers["X-Artifact-Size"] == str(len(content))
def test_cache_hit_records_metrics(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"content"
deps["cache"].is_index_file.return_value = False
client.get("/api/v1/remote/generic-test/some/path/archive.tar.gz")
deps["metrics"].record_cache_hit.assert_called_once_with("generic-test", ANY)
def test_cache_hit_records_artifact_mapping(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"content"
deps["cache"].is_index_file.return_value = False
client.get("/api/v1/remote/generic-test/some/path/archive.tar.gz")
deps["database"].record_artifact_mapping.assert_called_once()
def test_cache_hit_rpm_returns_correct_content_type(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"rpm bytes"
deps["cache"].is_index_file.return_value = False
response = client.get("/api/v1/remote/rpm-test/almalinux/9/x86_64/bash-5.1.8.x86_64.rpm")
assert response.status_code == 200
assert "application/x-rpm" in response.headers["content-type"]
def test_cache_hit_xml_returns_correct_content_type(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"<?xml version='1.0'?>"
deps["cache"].is_index_file.return_value = False
response = client.get("/api/v1/remote/rpm-test/repo/repodata/primary.xml")
assert response.status_code == 200
assert "application/xml" in response.headers["content-type"]
def test_cache_miss_fetches_upstream_and_returns_200(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = False
@@ -252,6 +361,37 @@ class TestGenericArtifactRoute:
assert response.status_code == 200
assert response.headers["X-Artifact-Source"] == "remote"
def test_cache_miss_records_metrics(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = False
deps["storage"].download_object.return_value = b"fresh content"
deps["cache"].is_index_file.return_value = False
with patch(
"artifactapi.main.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached"},
):
client.get("/api/v1/remote/generic-test/some/path/archive.tar.gz")
deps["metrics"].record_cache_miss.assert_called_once_with("generic-test", ANY)
def test_cache_miss_on_index_marks_index_cached(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = False
deps["storage"].download_object.return_value = b"APKINDEX content"
deps["cache"].is_index_file.return_value = True
with patch(
"artifactapi.main.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached"},
):
response = client.get("/api/v1/remote/alpine-test/alpine/v3.18/x86_64/APKINDEX.tar.gz")
assert response.status_code == 200
deps["cache"].mark_index_cached.assert_called_once()
def test_upstream_error_returns_502(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = False
@@ -271,12 +411,11 @@ class TestGenericArtifactRoute:
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"APKINDEX content"
# Simulate is_index_file returning True (APKINDEX.tar.gz detected as index)
deps["cache"].is_index_file.return_value = True
deps["cache"].is_index_valid.return_value = True
# APKINDEX.tar.gz does not match alpine-test's include_patterns (.*.apk$),
# but since it's an index file it must be allowed through.
# but since is_index_file returns True it must be allowed through.
response = client.get("/api/v1/remote/alpine-test/alpine/v3.18/x86_64/APKINDEX.tar.gz")
assert response.status_code == 200
@@ -289,10 +428,70 @@ class TestGenericArtifactRoute:
assert response.status_code == 404
# ---------------------------------------------------------------------------
# Upload route PUT /api/v1/remote/{remote}/{path}
# ---------------------------------------------------------------------------
class TestUploadRoute:
def test_unknown_remote_returns_404(self, client, patched_deps):
response = client.put(
"/api/v1/remote/nonexistent/path/to/file.tar.gz",
files={"file": ("file.tar.gz", b"content", "application/octet-stream")},
)
assert response.status_code == 404
def test_non_local_remote_returns_400(self, client, patched_deps):
response = client.put(
"/api/v1/remote/generic-test/path/to/file.tar.gz",
files={"file": ("file.tar.gz", b"content", "application/octet-stream")},
)
assert response.status_code == 400
# ---------------------------------------------------------------------------
# HEAD route HEAD /api/v1/remote/{remote}/{path}
# ---------------------------------------------------------------------------
class TestHeadRoute:
def test_non_local_remote_returns_405(self, client, patched_deps):
response = client.head("/api/v1/remote/generic-test/path/to/file.tar.gz")
assert response.status_code == 405
def test_local_repo_file_not_found_returns_404(self, client, patched_deps):
deps = patched_deps
deps["database"].get_local_file_metadata.return_value = None
deps["database"].available = True
response = client.head("/api/v1/remote/local-test/path/to/nonexistent.bin")
assert response.status_code == 404
def test_unknown_remote_returns_404(self, client, patched_deps):
response = client.head("/api/v1/remote/nonexistent/path/to/file.bin")
assert response.status_code == 404
# ---------------------------------------------------------------------------
# DELETE route DELETE /api/v1/remote/{remote}/{path}
# ---------------------------------------------------------------------------
class TestDeleteRoute:
def test_unknown_remote_returns_404(self, client, patched_deps):
response = client.delete("/api/v1/remote/nonexistent/path/to/file.tar.gz")
assert response.status_code == 404
def test_non_local_remote_returns_400(self, client, patched_deps):
response = client.delete("/api/v1/remote/generic-test/path/to/file.tar.gz")
assert response.status_code == 400
# ---------------------------------------------------------------------------
# Cache flush PUT /cache/flush
# ---------------------------------------------------------------------------
class TestCacheFlushEndpoint:
def test_flush_all_returns_flushed_structure(self, client, patched_deps):
deps = patched_deps
@@ -314,3 +513,44 @@ class TestCacheFlushEndpoint:
response = client.put("/cache/flush?remote=alpine-test")
assert response.status_code == 200
assert response.json()["remote"] == "alpine-test"
def test_flush_all_deletes_redis_keys_when_cache_available(self, client, patched_deps):
deps = patched_deps
deps["cache"].available = True
redis_mock = MagicMock()
deps["cache"].client = redis_mock
# First pattern (index:*) returns keys; subsequent pattern returns nothing
redis_mock.keys.side_effect = [["index:test:abc", "index:test:def"], []]
deps["storage"].client.list_objects_v2.return_value = {}
response = client.put("/cache/flush")
assert response.status_code == 200
data = response.json()
assert data["flushed"]["redis_keys"] == 2
redis_mock.delete.assert_called_once_with("index:test:abc", "index:test:def")
# ---------------------------------------------------------------------------
# Metrics endpoint GET /metrics
# ---------------------------------------------------------------------------
class TestMetricsEndpoint:
def test_returns_prometheus_text_by_default(self, client, patched_deps):
response = client.get("/metrics")
assert response.status_code == 200
assert response.headers["content-type"].startswith("text/plain")
# ---------------------------------------------------------------------------
# Config endpoint GET /config
# ---------------------------------------------------------------------------
class TestConfigEndpoint:
def test_returns_config_with_remotes(self, client):
response = client.get("/config")
assert response.status_code == 200
data = response.json()
assert "remotes" in data
assert "alpine-test" in data["remotes"]
+54 -23
View File
@@ -1,7 +1,11 @@
"""Tests for S3Storage, focusing on get_object_key (pure logic, no S3 calls)."""
"""Tests for S3Storage: get_object_key (pure logic) and I/O methods."""
import hashlib
from unittest.mock import MagicMock, patch
import pytest
from botocore.exceptions import ClientError
from fastapi import HTTPException
from artifactapi.storage import S3Storage
@@ -25,19 +29,22 @@ def storage():
# get_object_key
# ---------------------------------------------------------------------------
class TestGetObjectKey:
def test_includes_remote_name_prefix(self, storage):
key = storage.get_object_key("myremote", "some/path/file.rpm")
assert key.startswith("myremote/")
def test_key_has_three_part_structure(self, storage):
# remote / hash-segment / filename
key = storage.get_object_key("myremote", "some/path/to/file.rpm")
parts = key.split("/")
assert len(parts) == 3
assert parts[0] == "myremote"
assert parts[2] == "file.rpm"
assert len(parts[1]) == 16 # SHA-256 hex truncated to 16 chars
def test_ends_with_filename(self, storage):
key = storage.get_object_key("myremote", "some/path/file.rpm")
assert key.endswith("/file.rpm")
def test_same_path_produces_same_key(self, storage):
k1 = storage.get_object_key("myremote", "some/path/file.rpm")
k2 = storage.get_object_key("myremote", "some/path/file.rpm")
assert k1 == k2
def test_key_uses_sha256_of_directory_path(self, storage):
# Pin the hash algorithm, truncation length, and format in one assertion
key = storage.get_object_key("myremote", "some/path/to/file.rpm")
expected_hash = hashlib.sha256(b"some/path/to").hexdigest()[:16]
assert key == f"myremote/{expected_hash}/file.rpm"
def test_different_remotes_give_different_keys(self, storage):
k1 = storage.get_object_key("remote-a", "path/to/file.rpm")
@@ -48,7 +55,6 @@ class TestGetObjectKey:
k1 = storage.get_object_key("myremote", "path/version-1/file.rpm")
k2 = storage.get_object_key("myremote", "path/version-2/file.rpm")
assert k1 != k2
# Same filename, different directory hashes
assert k1.split("/")[-1] == k2.split("/")[-1] == "file.rpm"
def test_leading_slash_stripped(self, storage):
@@ -61,25 +67,25 @@ class TestGetObjectKey:
assert key == "myremote/file.rpm"
def test_docker_blob_uses_digest_path(self, storage):
digest = "abc123def456" * 4
digest = "a" * 64 # realistic 64-char SHA-256 hex string
path = f"library/nginx/blobs/sha256:{digest}"
key = storage.get_object_key("dockerhub", path)
assert key == f"dockerhub/blobs/sha256/{digest}"
def test_docker_blob_deduplication_across_images(self, storage):
"""Same blob digest pulled from different images maps to the same S3 key."""
digest = "deadbeef" * 8
digest = "deadbeef" * 8 # 64-char hex
k1 = storage.get_object_key("dockerhub", f"library/nginx/blobs/sha256:{digest}")
k2 = storage.get_object_key("dockerhub", f"library/ubuntu/blobs/sha256:{digest}")
assert k1 == k2
def test_docker_blob_different_digests_different_keys(self, storage):
k1 = storage.get_object_key("dockerhub", "library/nginx/blobs/sha256:aaa111")
k2 = storage.get_object_key("dockerhub", "library/nginx/blobs/sha256:bbb222")
k1 = storage.get_object_key("dockerhub", "library/nginx/blobs/sha256:" + "a" * 64)
k2 = storage.get_object_key("dockerhub", "library/nginx/blobs/sha256:" + "b" * 64)
assert k1 != k2
def test_docker_blob_different_remotes_different_keys(self, storage):
digest = "abc" * 20
digest = "abc" * 21 + "d" # 64-char hex
k1 = storage.get_object_key("remote-a", f"library/nginx/blobs/sha256:{digest}")
k2 = storage.get_object_key("remote-b", f"library/nginx/blobs/sha256:{digest}")
assert k1 != k2
@@ -89,13 +95,38 @@ class TestGetObjectKey:
# get_url
# ---------------------------------------------------------------------------
class TestGetUrl:
def test_returns_http_url_for_insecure_endpoint(self, storage):
url = storage.get_url("myremote/abc123/file.rpm")
assert url == "http://localhost:9000/testbucket/myremote/abc123/file.rpm"
def test_url_contains_bucket_and_key(self, storage):
key = "myremote/abc/file.tar.gz"
url = storage.get_url(key)
assert "testbucket" in url
assert key in url
def test_returns_http_url_for_secure_storage(self):
with patch("boto3.client", return_value=MagicMock()):
s = S3Storage(endpoint="s3.example.com", access_key="k", secret_key="s", bucket="b", secure=True)
s.client = MagicMock()
# get_url uses http:// always (direct internal access address, not the S3 protocol)
assert s.get_url("path/to/file.rpm") == "http://s3.example.com/b/path/to/file.rpm"
# ---------------------------------------------------------------------------
# upload / download_object
# ---------------------------------------------------------------------------
class TestUpload:
def test_upload_returns_s3_uri(self, storage):
storage.client.put_object.return_value = {}
result = storage.upload("myremote/abc123/file.rpm", b"content")
assert result == "s3://testbucket/myremote/abc123/file.rpm"
class TestDownloadObject:
def test_download_object_raises_404_on_client_error(self, storage):
storage.client.get_object.side_effect = ClientError(
{"Error": {"Code": "NoSuchKey", "Message": "The specified key does not exist"}},
"GetObject",
)
with pytest.raises(HTTPException) as exc_info:
storage.download_object("nonexistent/key")
assert exc_info.value.status_code == 404