Files
artifactapi/tests/test_docker_auth.py
T
unkinben 8da43e610e tests: resolve all peer-review issues across test suite
Address every substantive critique from the peer review:

test_cache: replace tautological same-inputs key test with hardcoded
hash assertion; assert setex call + TTL in mark_index_cached test;
assert client is None for unavailable no-op; rename Packages.gz test
to document intentional behaviour; add alpine sig/tmp negatives; add
hyphenated and date-tag docker positive cases; add key hash-length
assertion.

test_config: replace live-constant comparisons with literal string
assertions for alpine/rpm/docker; add unknown package type test;
add dict-keyed repositories branch coverage (per-repo override and
fallback); fix cache config to full equality check; add explicit empty
index_patterns test.

test_docker_auth: fix case-insensitive test to verify realm value;
add field-order (scope before service) limitation test; add pipe-char
collision documentation test; add missing fetch_token edge cases
(no token field, HTTPStatusError, missing expires_in default 300);
replace rubber-stamp delegate test with end-to-end parse→fetch test.

test_storage: replace split prefix/suffix assertions with structural
3-part check + pinned sha256 assertion; fix Docker blob digests to
64-char hex; add secure=True URL test; add upload return value test;
add download_object 404-on-ClientError test; remove redundant subset
test.

test_routes: add metrics.record_cache_hit/miss assertions; add
mark_index_cached assertion after cache miss on index (docker + generic);
add Content-Disposition, X-Artifact-Size header checks; add rpm/xml
content-type tests; add flush test that verifies Redis keys are deleted
when cache is available; add smoke coverage for upload (PUT), HEAD, DELETE,
/metrics, and /config routes.
2026-04-25 19:58:33 +10:00

274 lines
12 KiB
Python

"""Tests for docker_auth: WWW-Authenticate parsing and token caching."""
import time
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from artifactapi import docker_auth
from artifactapi.docker_auth import (
_cache_key,
_get_cached_token,
_store_token,
fetch_token,
get_docker_token_for_response,
parse_www_authenticate,
)
@pytest.fixture(autouse=True)
def clear_token_cache():
"""Isolate tests: wipe the module-level token cache before and after each test."""
docker_auth._token_cache.clear()
yield
docker_auth._token_cache.clear()
# ---------------------------------------------------------------------------
# parse_www_authenticate
# ---------------------------------------------------------------------------
class TestParseWwwAuthenticate:
def test_full_bearer_header(self):
header = 'Bearer realm="https://auth.docker.io/token",service="registry.docker.io",scope="repository:library/nginx:pull"'
result = parse_www_authenticate(header)
assert result is not None
realm, service, scope = result
assert realm == "https://auth.docker.io/token"
assert service == "registry.docker.io"
assert scope == "repository:library/nginx:pull"
def test_realm_only(self):
header = 'Bearer realm="https://auth.example.com/token"'
result = parse_www_authenticate(header)
assert result is not None
realm, service, scope = result
assert realm == "https://auth.example.com/token"
assert service == ""
assert scope == ""
def test_realm_and_service_only(self):
header = 'Bearer realm="https://auth.example.com",service="registry.example.com"'
result = parse_www_authenticate(header)
assert result is not None
_, service, scope = result
assert service == "registry.example.com"
assert scope == ""
def test_invalid_scheme_returns_none(self):
assert parse_www_authenticate('Basic realm="example"') is None
def test_empty_header_returns_none(self):
assert parse_www_authenticate("") is None
def test_case_insensitive_bearer_parses_realm(self):
header = 'bearer realm="https://auth.example.com/token"'
result = parse_www_authenticate(header)
assert result is not None
realm, _, _ = result
assert realm == "https://auth.example.com/token"
def test_field_order_scope_before_service_drops_service(self):
# The regex requires realm,service,scope order; scope before service
# results in service being silently dropped. This test documents the known limitation.
header = 'Bearer realm="https://auth.example.com",scope="repo:pull",service="svc"'
result = parse_www_authenticate(header)
assert result is not None
realm, service, scope = result
assert realm == "https://auth.example.com"
assert scope == "repo:pull"
assert service == "" # silently dropped when out of order
# ---------------------------------------------------------------------------
# _cache_key
# ---------------------------------------------------------------------------
class TestCacheKey:
def test_key_contains_all_components(self):
key = _cache_key("https://realm.com", "svc", "scope", "user")
assert "https://realm.com" in key
assert "svc" in key
assert "scope" in key
assert "user" in key
def test_none_username_uses_empty_string(self):
key = _cache_key("https://realm.com", "svc", "scope", None)
assert key.endswith("|")
def test_different_services_give_different_keys(self):
k1 = _cache_key("realm", "svc1", "scope", None)
k2 = _cache_key("realm", "svc2", "scope", None)
assert k1 != k2
def test_different_scopes_give_different_keys(self):
k1 = _cache_key("realm", "svc", "scope:read", None)
k2 = _cache_key("realm", "svc", "scope:write", None)
assert k1 != k2
def test_pipe_in_field_value_can_collide_with_adjacent_fields(self):
# The "|" separator is not escaped, so a pipe embedded in one field
# produces the same key as the same pipe appearing as a separator boundary.
# This is a known limitation: _cache_key("a|b","c","d",None) ==
# _cache_key("a","b|c","d",None). Documents the behaviour, not a claim it's correct.
k1 = _cache_key("a|b", "c", "d", None)
k2 = _cache_key("a", "b|c", "d", None)
assert k1 == k2
# ---------------------------------------------------------------------------
# _get_cached_token / _store_token
# ---------------------------------------------------------------------------
class TestTokenCaching:
def test_get_returns_none_when_not_cached(self):
assert _get_cached_token("no-such-key") is None
def test_get_returns_token_when_valid(self):
_store_token("mykey", "tok-abc", 300)
assert _get_cached_token("mykey") == "tok-abc"
def test_get_returns_none_when_expired(self):
docker_auth._token_cache["mykey"] = ("old-token", time.time() - 1)
assert _get_cached_token("mykey") is None
def test_expired_entry_is_removed_from_cache(self):
docker_auth._token_cache["mykey"] = ("old-token", time.time() - 1)
_get_cached_token("mykey")
assert "mykey" not in docker_auth._token_cache
def test_store_expires_30s_before_stated_time(self):
before = time.time()
_store_token("mykey", "tok", 100)
_, expires_at = docker_auth._token_cache["mykey"]
# expires_in - 30 = 70; allow ±2 s clock wiggle
assert before + 68 <= expires_at <= before + 72
def test_store_enforces_minimum_10s_expiry(self):
before = time.time()
_store_token("mykey", "tok", 5) # expires_in - 30 would be negative
_, expires_at = docker_auth._token_cache["mykey"]
assert expires_at >= before + 10
# ---------------------------------------------------------------------------
# fetch_token (async, mocks httpx)
# ---------------------------------------------------------------------------
def _make_mock_http_client(token_payload: dict):
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = token_payload
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
ctx = MagicMock()
ctx.__aenter__ = AsyncMock(return_value=mock_client)
ctx.__aexit__ = AsyncMock(return_value=False)
return ctx, mock_client
class TestFetchToken:
async def test_returns_token_field(self):
ctx, _ = _make_mock_http_client({"token": "bearer-tok", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token == "bearer-tok"
async def test_falls_back_to_access_token_field(self):
ctx, _ = _make_mock_http_client({"access_token": "access-tok", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token == "access-tok"
async def test_returns_none_when_response_missing_token_field(self):
ctx, _ = _make_mock_http_client({"not_token": "value", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token is None
async def test_defaults_expires_in_to_300_when_missing(self):
ctx, _ = _make_mock_http_client({"token": "tok"}) # no expires_in key
before = time.time()
with patch("httpx.AsyncClient", return_value=ctx):
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token == "tok"
key = _cache_key("https://auth.example.com", "svc", "scope", None)
_, expires_at = docker_auth._token_cache[key]
# Default expires_in=300, stored as time.time() + max(300-30, 10) = 270
assert before + 268 <= expires_at <= before + 272
async def test_uses_cache_on_second_call_without_http(self):
ctx, mock_client = _make_mock_http_client({"token": "cached-tok", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
await fetch_token("https://auth.example.com", "svc", "scope")
mock_client.get.reset_mock()
token = await fetch_token("https://auth.example.com", "svc", "scope")
mock_client.get.assert_not_called()
assert token == "cached-tok"
async def test_returns_none_on_network_error(self):
mock_client = AsyncMock()
mock_client.get = AsyncMock(side_effect=Exception("connection refused"))
ctx = MagicMock()
ctx.__aenter__ = AsyncMock(return_value=mock_client)
ctx.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", return_value=ctx):
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token is None
async def test_returns_none_on_http_status_error(self):
mock_response = MagicMock()
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError("401 Unauthorized", request=MagicMock(), response=MagicMock())
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
ctx = MagicMock()
ctx.__aenter__ = AsyncMock(return_value=mock_client)
ctx.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", return_value=ctx):
token = await fetch_token("https://auth.example.com", "svc", "scope")
assert token is None
async def test_passes_credentials_as_auth_tuple(self):
ctx, mock_client = _make_mock_http_client({"token": "authed-tok", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
await fetch_token("https://auth.example.com", "svc", "scope", "user", "pass")
call_kwargs = mock_client.get.call_args.kwargs
assert call_kwargs.get("auth") == ("user", "pass")
async def test_no_auth_when_no_credentials(self):
ctx, mock_client = _make_mock_http_client({"token": "anon-tok", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
await fetch_token("https://auth.example.com", "svc", "scope")
call_kwargs = mock_client.get.call_args.kwargs
assert call_kwargs.get("auth") is None
# ---------------------------------------------------------------------------
# get_docker_token_for_response
# ---------------------------------------------------------------------------
class TestGetDockerTokenForResponse:
async def test_returns_none_for_non_bearer_header(self):
token = await get_docker_token_for_response('Basic realm="example"')
assert token is None
async def test_end_to_end_parse_and_fetch(self):
"""parse_www_authenticate → fetch_token wired together end-to-end."""
header = 'Bearer realm="https://auth.example.com",service="svc",scope="repo:pull"'
ctx, mock_client = _make_mock_http_client({"token": "e2e-tok", "expires_in": 300})
with patch("httpx.AsyncClient", return_value=ctx):
token = await get_docker_token_for_response(header, "user", "pass")
assert token == "e2e-tok"
call_kwargs = mock_client.get.call_args.kwargs
assert call_kwargs["params"]["service"] == "svc"
assert call_kwargs["params"]["scope"] == "repo:pull"
assert call_kwargs["auth"] == ("user", "pass")