8da43e610e
Address every substantive critique from the peer review: test_cache: replace tautological same-inputs key test with hardcoded hash assertion; assert setex call + TTL in mark_index_cached test; assert client is None for unavailable no-op; rename Packages.gz test to document intentional behaviour; add alpine sig/tmp negatives; add hyphenated and date-tag docker positive cases; add key hash-length assertion. test_config: replace live-constant comparisons with literal string assertions for alpine/rpm/docker; add unknown package type test; add dict-keyed repositories branch coverage (per-repo override and fallback); fix cache config to full equality check; add explicit empty index_patterns test. test_docker_auth: fix case-insensitive test to verify realm value; add field-order (scope before service) limitation test; add pipe-char collision documentation test; add missing fetch_token edge cases (no token field, HTTPStatusError, missing expires_in default 300); replace rubber-stamp delegate test with end-to-end parse→fetch test. test_storage: replace split prefix/suffix assertions with structural 3-part check + pinned sha256 assertion; fix Docker blob digests to 64-char hex; add secure=True URL test; add upload return value test; add download_object 404-on-ClientError test; remove redundant subset test. test_routes: add metrics.record_cache_hit/miss assertions; add mark_index_cached assertion after cache miss on index (docker + generic); add Content-Disposition, X-Artifact-Size header checks; add rpm/xml content-type tests; add flush test that verifies Redis keys are deleted when cache is available; add smoke coverage for upload (PUT), HEAD, DELETE, /metrics, and /config routes.
274 lines
12 KiB
Python
274 lines
12 KiB
Python
"""Tests for docker_auth: WWW-Authenticate parsing and token caching."""
|
|
|
|
import time
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import httpx
|
|
import pytest
|
|
|
|
from artifactapi import docker_auth
|
|
from artifactapi.docker_auth import (
|
|
_cache_key,
|
|
_get_cached_token,
|
|
_store_token,
|
|
fetch_token,
|
|
get_docker_token_for_response,
|
|
parse_www_authenticate,
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def clear_token_cache():
|
|
"""Isolate tests: wipe the module-level token cache before and after each test."""
|
|
docker_auth._token_cache.clear()
|
|
yield
|
|
docker_auth._token_cache.clear()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# parse_www_authenticate
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestParseWwwAuthenticate:
|
|
def test_full_bearer_header(self):
|
|
header = 'Bearer realm="https://auth.docker.io/token",service="registry.docker.io",scope="repository:library/nginx:pull"'
|
|
result = parse_www_authenticate(header)
|
|
assert result is not None
|
|
realm, service, scope = result
|
|
assert realm == "https://auth.docker.io/token"
|
|
assert service == "registry.docker.io"
|
|
assert scope == "repository:library/nginx:pull"
|
|
|
|
def test_realm_only(self):
|
|
header = 'Bearer realm="https://auth.example.com/token"'
|
|
result = parse_www_authenticate(header)
|
|
assert result is not None
|
|
realm, service, scope = result
|
|
assert realm == "https://auth.example.com/token"
|
|
assert service == ""
|
|
assert scope == ""
|
|
|
|
def test_realm_and_service_only(self):
|
|
header = 'Bearer realm="https://auth.example.com",service="registry.example.com"'
|
|
result = parse_www_authenticate(header)
|
|
assert result is not None
|
|
_, service, scope = result
|
|
assert service == "registry.example.com"
|
|
assert scope == ""
|
|
|
|
def test_invalid_scheme_returns_none(self):
|
|
assert parse_www_authenticate('Basic realm="example"') is None
|
|
|
|
def test_empty_header_returns_none(self):
|
|
assert parse_www_authenticate("") is None
|
|
|
|
def test_case_insensitive_bearer_parses_realm(self):
|
|
header = 'bearer realm="https://auth.example.com/token"'
|
|
result = parse_www_authenticate(header)
|
|
assert result is not None
|
|
realm, _, _ = result
|
|
assert realm == "https://auth.example.com/token"
|
|
|
|
def test_field_order_scope_before_service_drops_service(self):
|
|
# The regex requires realm,service,scope order; scope before service
|
|
# results in service being silently dropped. This test documents the known limitation.
|
|
header = 'Bearer realm="https://auth.example.com",scope="repo:pull",service="svc"'
|
|
result = parse_www_authenticate(header)
|
|
assert result is not None
|
|
realm, service, scope = result
|
|
assert realm == "https://auth.example.com"
|
|
assert scope == "repo:pull"
|
|
assert service == "" # silently dropped when out of order
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _cache_key
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCacheKey:
|
|
def test_key_contains_all_components(self):
|
|
key = _cache_key("https://realm.com", "svc", "scope", "user")
|
|
assert "https://realm.com" in key
|
|
assert "svc" in key
|
|
assert "scope" in key
|
|
assert "user" in key
|
|
|
|
def test_none_username_uses_empty_string(self):
|
|
key = _cache_key("https://realm.com", "svc", "scope", None)
|
|
assert key.endswith("|")
|
|
|
|
def test_different_services_give_different_keys(self):
|
|
k1 = _cache_key("realm", "svc1", "scope", None)
|
|
k2 = _cache_key("realm", "svc2", "scope", None)
|
|
assert k1 != k2
|
|
|
|
def test_different_scopes_give_different_keys(self):
|
|
k1 = _cache_key("realm", "svc", "scope:read", None)
|
|
k2 = _cache_key("realm", "svc", "scope:write", None)
|
|
assert k1 != k2
|
|
|
|
def test_pipe_in_field_value_can_collide_with_adjacent_fields(self):
|
|
# The "|" separator is not escaped, so a pipe embedded in one field
|
|
# produces the same key as the same pipe appearing as a separator boundary.
|
|
# This is a known limitation: _cache_key("a|b","c","d",None) ==
|
|
# _cache_key("a","b|c","d",None). Documents the behaviour, not a claim it's correct.
|
|
k1 = _cache_key("a|b", "c", "d", None)
|
|
k2 = _cache_key("a", "b|c", "d", None)
|
|
assert k1 == k2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _get_cached_token / _store_token
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestTokenCaching:
|
|
def test_get_returns_none_when_not_cached(self):
|
|
assert _get_cached_token("no-such-key") is None
|
|
|
|
def test_get_returns_token_when_valid(self):
|
|
_store_token("mykey", "tok-abc", 300)
|
|
assert _get_cached_token("mykey") == "tok-abc"
|
|
|
|
def test_get_returns_none_when_expired(self):
|
|
docker_auth._token_cache["mykey"] = ("old-token", time.time() - 1)
|
|
assert _get_cached_token("mykey") is None
|
|
|
|
def test_expired_entry_is_removed_from_cache(self):
|
|
docker_auth._token_cache["mykey"] = ("old-token", time.time() - 1)
|
|
_get_cached_token("mykey")
|
|
assert "mykey" not in docker_auth._token_cache
|
|
|
|
def test_store_expires_30s_before_stated_time(self):
|
|
before = time.time()
|
|
_store_token("mykey", "tok", 100)
|
|
_, expires_at = docker_auth._token_cache["mykey"]
|
|
# expires_in - 30 = 70; allow ±2 s clock wiggle
|
|
assert before + 68 <= expires_at <= before + 72
|
|
|
|
def test_store_enforces_minimum_10s_expiry(self):
|
|
before = time.time()
|
|
_store_token("mykey", "tok", 5) # expires_in - 30 would be negative
|
|
_, expires_at = docker_auth._token_cache["mykey"]
|
|
assert expires_at >= before + 10
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# fetch_token (async, mocks httpx)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_mock_http_client(token_payload: dict):
|
|
mock_response = MagicMock()
|
|
mock_response.raise_for_status = MagicMock()
|
|
mock_response.json.return_value = token_payload
|
|
|
|
mock_client = AsyncMock()
|
|
mock_client.get = AsyncMock(return_value=mock_response)
|
|
|
|
ctx = MagicMock()
|
|
ctx.__aenter__ = AsyncMock(return_value=mock_client)
|
|
ctx.__aexit__ = AsyncMock(return_value=False)
|
|
return ctx, mock_client
|
|
|
|
|
|
class TestFetchToken:
|
|
async def test_returns_token_field(self):
|
|
ctx, _ = _make_mock_http_client({"token": "bearer-tok", "expires_in": 300})
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
token = await fetch_token("https://auth.example.com", "svc", "scope")
|
|
assert token == "bearer-tok"
|
|
|
|
async def test_falls_back_to_access_token_field(self):
|
|
ctx, _ = _make_mock_http_client({"access_token": "access-tok", "expires_in": 300})
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
token = await fetch_token("https://auth.example.com", "svc", "scope")
|
|
assert token == "access-tok"
|
|
|
|
async def test_returns_none_when_response_missing_token_field(self):
|
|
ctx, _ = _make_mock_http_client({"not_token": "value", "expires_in": 300})
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
token = await fetch_token("https://auth.example.com", "svc", "scope")
|
|
assert token is None
|
|
|
|
async def test_defaults_expires_in_to_300_when_missing(self):
|
|
ctx, _ = _make_mock_http_client({"token": "tok"}) # no expires_in key
|
|
before = time.time()
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
token = await fetch_token("https://auth.example.com", "svc", "scope")
|
|
assert token == "tok"
|
|
key = _cache_key("https://auth.example.com", "svc", "scope", None)
|
|
_, expires_at = docker_auth._token_cache[key]
|
|
# Default expires_in=300, stored as time.time() + max(300-30, 10) = 270
|
|
assert before + 268 <= expires_at <= before + 272
|
|
|
|
async def test_uses_cache_on_second_call_without_http(self):
|
|
ctx, mock_client = _make_mock_http_client({"token": "cached-tok", "expires_in": 300})
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
await fetch_token("https://auth.example.com", "svc", "scope")
|
|
mock_client.get.reset_mock()
|
|
token = await fetch_token("https://auth.example.com", "svc", "scope")
|
|
mock_client.get.assert_not_called()
|
|
assert token == "cached-tok"
|
|
|
|
async def test_returns_none_on_network_error(self):
|
|
mock_client = AsyncMock()
|
|
mock_client.get = AsyncMock(side_effect=Exception("connection refused"))
|
|
ctx = MagicMock()
|
|
ctx.__aenter__ = AsyncMock(return_value=mock_client)
|
|
ctx.__aexit__ = AsyncMock(return_value=False)
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
token = await fetch_token("https://auth.example.com", "svc", "scope")
|
|
assert token is None
|
|
|
|
async def test_returns_none_on_http_status_error(self):
|
|
mock_response = MagicMock()
|
|
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError("401 Unauthorized", request=MagicMock(), response=MagicMock())
|
|
mock_client = AsyncMock()
|
|
mock_client.get = AsyncMock(return_value=mock_response)
|
|
ctx = MagicMock()
|
|
ctx.__aenter__ = AsyncMock(return_value=mock_client)
|
|
ctx.__aexit__ = AsyncMock(return_value=False)
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
token = await fetch_token("https://auth.example.com", "svc", "scope")
|
|
assert token is None
|
|
|
|
async def test_passes_credentials_as_auth_tuple(self):
|
|
ctx, mock_client = _make_mock_http_client({"token": "authed-tok", "expires_in": 300})
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
await fetch_token("https://auth.example.com", "svc", "scope", "user", "pass")
|
|
call_kwargs = mock_client.get.call_args.kwargs
|
|
assert call_kwargs.get("auth") == ("user", "pass")
|
|
|
|
async def test_no_auth_when_no_credentials(self):
|
|
ctx, mock_client = _make_mock_http_client({"token": "anon-tok", "expires_in": 300})
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
await fetch_token("https://auth.example.com", "svc", "scope")
|
|
call_kwargs = mock_client.get.call_args.kwargs
|
|
assert call_kwargs.get("auth") is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_docker_token_for_response
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetDockerTokenForResponse:
|
|
async def test_returns_none_for_non_bearer_header(self):
|
|
token = await get_docker_token_for_response('Basic realm="example"')
|
|
assert token is None
|
|
|
|
async def test_end_to_end_parse_and_fetch(self):
|
|
"""parse_www_authenticate → fetch_token wired together end-to-end."""
|
|
header = 'Bearer realm="https://auth.example.com",service="svc",scope="repo:pull"'
|
|
ctx, mock_client = _make_mock_http_client({"token": "e2e-tok", "expires_in": 300})
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
token = await get_docker_token_for_response(header, "user", "pass")
|
|
assert token == "e2e-tok"
|
|
call_kwargs = mock_client.get.call_args.kwargs
|
|
assert call_kwargs["params"]["service"] == "svc"
|
|
assert call_kwargs["params"]["scope"] == "repo:pull"
|
|
assert call_kwargs["auth"] == ("user", "pass")
|