2d0e2c64e6
- tests/: 107 unit tests across config, cache, docker_auth, storage, and FastAPI routes; all passing under pytest-asyncio auto mode - tox.ini: runs pytest via uvx --with tox-uv tox (py311) - .pre-commit-config.yaml: ruff lint + ruff-format at v0.15.12 - pyproject.toml: pytest config (asyncio_mode=auto), ruff config (line-length=140), tox/pre-commit added to dev extras - Makefile: test/tox/pre-commit targets via uvx --python 3.11 - Source files reformatted by ruff-format (no logic changes)
224 lines
8.7 KiB
Python
224 lines
8.7 KiB
Python
"""Tests for docker_auth: WWW-Authenticate parsing and token caching."""
|
|
import time
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from artifactapi import docker_auth
|
|
from artifactapi.docker_auth import (
|
|
_cache_key,
|
|
_get_cached_token,
|
|
_store_token,
|
|
fetch_token,
|
|
get_docker_token_for_response,
|
|
parse_www_authenticate,
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def clear_token_cache():
|
|
"""Isolate tests: wipe the module-level token cache before and after each test."""
|
|
docker_auth._token_cache.clear()
|
|
yield
|
|
docker_auth._token_cache.clear()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# parse_www_authenticate
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestParseWwwAuthenticate:
|
|
def test_full_bearer_header(self):
|
|
header = (
|
|
'Bearer realm="https://auth.docker.io/token"'
|
|
',service="registry.docker.io"'
|
|
',scope="repository:library/nginx:pull"'
|
|
)
|
|
result = parse_www_authenticate(header)
|
|
assert result is not None
|
|
realm, service, scope = result
|
|
assert realm == "https://auth.docker.io/token"
|
|
assert service == "registry.docker.io"
|
|
assert scope == "repository:library/nginx:pull"
|
|
|
|
def test_realm_only(self):
|
|
header = 'Bearer realm="https://auth.example.com/token"'
|
|
result = parse_www_authenticate(header)
|
|
assert result is not None
|
|
realm, service, scope = result
|
|
assert realm == "https://auth.example.com/token"
|
|
assert service == ""
|
|
assert scope == ""
|
|
|
|
def test_realm_and_service_only(self):
|
|
header = 'Bearer realm="https://auth.example.com",service="registry.example.com"'
|
|
result = parse_www_authenticate(header)
|
|
assert result is not None
|
|
_, service, scope = result
|
|
assert service == "registry.example.com"
|
|
assert scope == ""
|
|
|
|
def test_invalid_scheme_returns_none(self):
|
|
assert parse_www_authenticate('Basic realm="example"') is None
|
|
|
|
def test_empty_header_returns_none(self):
|
|
assert parse_www_authenticate("") is None
|
|
|
|
def test_case_insensitive_bearer(self):
|
|
header = 'bearer realm="https://auth.example.com/token"'
|
|
assert parse_www_authenticate(header) is not None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _cache_key
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCacheKey:
|
|
def test_key_contains_all_components(self):
|
|
key = _cache_key("https://realm.com", "svc", "scope", "user")
|
|
assert "https://realm.com" in key
|
|
assert "svc" in key
|
|
assert "scope" in key
|
|
assert "user" in key
|
|
|
|
def test_none_username_uses_empty_string(self):
|
|
key = _cache_key("https://realm.com", "svc", "scope", None)
|
|
assert key.endswith("|")
|
|
|
|
def test_different_services_give_different_keys(self):
|
|
k1 = _cache_key("realm", "svc1", "scope", None)
|
|
k2 = _cache_key("realm", "svc2", "scope", None)
|
|
assert k1 != k2
|
|
|
|
def test_different_scopes_give_different_keys(self):
|
|
k1 = _cache_key("realm", "svc", "scope:read", None)
|
|
k2 = _cache_key("realm", "svc", "scope:write", None)
|
|
assert k1 != k2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _get_cached_token / _store_token
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTokenCaching:
|
|
def test_get_returns_none_when_not_cached(self):
|
|
assert _get_cached_token("no-such-key") is None
|
|
|
|
def test_get_returns_token_when_valid(self):
|
|
_store_token("mykey", "tok-abc", 300)
|
|
assert _get_cached_token("mykey") == "tok-abc"
|
|
|
|
def test_get_returns_none_when_expired(self):
|
|
docker_auth._token_cache["mykey"] = ("old-token", time.time() - 1)
|
|
assert _get_cached_token("mykey") is None
|
|
|
|
def test_expired_entry_is_removed_from_cache(self):
|
|
docker_auth._token_cache["mykey"] = ("old-token", time.time() - 1)
|
|
_get_cached_token("mykey")
|
|
assert "mykey" not in docker_auth._token_cache
|
|
|
|
def test_store_expires_30s_before_stated_time(self):
|
|
before = time.time()
|
|
_store_token("mykey", "tok", 100)
|
|
_, expires_at = docker_auth._token_cache["mykey"]
|
|
# expires_in - 30 = 70; allow ±2 s clock wiggle
|
|
assert before + 68 <= expires_at <= before + 72
|
|
|
|
def test_store_enforces_minimum_10s_expiry(self):
|
|
before = time.time()
|
|
_store_token("mykey", "tok", 5) # expires_in - 30 would be negative
|
|
_, expires_at = docker_auth._token_cache["mykey"]
|
|
assert expires_at >= before + 10
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# fetch_token (async, mocks httpx)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_mock_http_client(token_payload: dict):
|
|
mock_response = MagicMock()
|
|
mock_response.raise_for_status = MagicMock()
|
|
mock_response.json.return_value = token_payload
|
|
|
|
mock_client = AsyncMock()
|
|
mock_client.get = AsyncMock(return_value=mock_response)
|
|
|
|
ctx = MagicMock()
|
|
ctx.__aenter__ = AsyncMock(return_value=mock_client)
|
|
ctx.__aexit__ = AsyncMock(return_value=False)
|
|
return ctx, mock_client
|
|
|
|
|
|
class TestFetchToken:
|
|
async def test_returns_token_field(self):
|
|
ctx, _ = _make_mock_http_client({"token": "bearer-tok", "expires_in": 300})
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
token = await fetch_token("https://auth.example.com", "svc", "scope")
|
|
assert token == "bearer-tok"
|
|
|
|
async def test_falls_back_to_access_token_field(self):
|
|
ctx, _ = _make_mock_http_client({"access_token": "access-tok", "expires_in": 300})
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
token = await fetch_token("https://auth.example.com", "svc", "scope")
|
|
assert token == "access-tok"
|
|
|
|
async def test_uses_cache_on_second_call_without_http(self):
|
|
ctx, mock_client = _make_mock_http_client({"token": "cached-tok", "expires_in": 300})
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
await fetch_token("https://auth.example.com", "svc", "scope")
|
|
mock_client.get.reset_mock()
|
|
token = await fetch_token("https://auth.example.com", "svc", "scope")
|
|
mock_client.get.assert_not_called()
|
|
assert token == "cached-tok"
|
|
|
|
async def test_returns_none_on_http_error(self):
|
|
mock_client = AsyncMock()
|
|
mock_client.get = AsyncMock(side_effect=Exception("connection refused"))
|
|
ctx = MagicMock()
|
|
ctx.__aenter__ = AsyncMock(return_value=mock_client)
|
|
ctx.__aexit__ = AsyncMock(return_value=False)
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
token = await fetch_token("https://auth.example.com", "svc", "scope")
|
|
assert token is None
|
|
|
|
async def test_passes_credentials_as_auth_tuple(self):
|
|
ctx, mock_client = _make_mock_http_client({"token": "authed-tok", "expires_in": 300})
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
await fetch_token("https://auth.example.com", "svc", "scope", "user", "pass")
|
|
call_kwargs = mock_client.get.call_args.kwargs
|
|
assert call_kwargs.get("auth") == ("user", "pass")
|
|
|
|
async def test_no_auth_when_no_credentials(self):
|
|
ctx, mock_client = _make_mock_http_client({"token": "anon-tok", "expires_in": 300})
|
|
with patch("httpx.AsyncClient", return_value=ctx):
|
|
await fetch_token("https://auth.example.com", "svc", "scope")
|
|
call_kwargs = mock_client.get.call_args.kwargs
|
|
assert call_kwargs.get("auth") is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_docker_token_for_response
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetDockerTokenForResponse:
|
|
async def test_returns_none_for_non_bearer_header(self):
|
|
token = await get_docker_token_for_response('Basic realm="example"')
|
|
assert token is None
|
|
|
|
async def test_delegates_to_fetch_token(self):
|
|
header = (
|
|
'Bearer realm="https://auth.example.com"'
|
|
',service="svc"'
|
|
',scope="repo:pull"'
|
|
)
|
|
with patch(
|
|
"artifactapi.docker_auth.fetch_token",
|
|
new_callable=AsyncMock,
|
|
return_value="delegated-tok",
|
|
) as mock_fetch:
|
|
token = await get_docker_token_for_response(header, "user", "pass")
|
|
mock_fetch.assert_called_once_with(
|
|
"https://auth.example.com", "svc", "repo:pull", "user", "pass"
|
|
)
|
|
assert token == "delegated-tok"
|