feat: cache parsed member indexes as msgpack to skip YAML re-parse on rebuild (#40)
ci/woodpecker/tag/docker Pipeline was successful
ci/woodpecker/tag/docker Pipeline was successful
Closes #36 ## Summary - After fetching a member's `index.yaml` (from upstream or S3), the handler now parses it and stores a compact msgpack file (`index.msgpack`) alongside the raw YAML in S3 - On subsequent virtual rebuilds (member caches valid, virtual TTL expired), the handler loads the msgpack file instead of re-parsing raw YAML — eliminating the costliest phase - `_entries_to_msgpack_safe()` converts datetime/date objects to ISO strings before packing (msgpack cannot natively serialize Python datetimes) - `_merge_helm_indexes()` accepts `list[dict | None]` as pre-parsed entries; falls back to raw YAML parse when msgpack is unavailable - `_VirtualHandler.merge()` protocol updated to pass pre-parsed entries to all future handler implementations - Broken msgpack is detected and rebuilt from raw YAML automatically ## Performance Phase breakdown (19-member helm-all virtual, 14 MB total): | Phase | Time | % | |---|---|---| | YAML parse (eliminated) | 6314 ms | 60% | | URL rewrite + dedup | 33 ms | 0.3% | | YAML dump | 4124 ms | 39% | | Scenario | Before (CSafeLoader only, #34) | After | |---|---|---| | Cold rebuild (upstream fetch) | ~21s | ~26s (+5s for msgpack build, one-time) | | **Warm rebuild (S3 hit, virtual expired)** | **~9.6s** | **~5.9s (38% faster)** | | Virtual cache hit | ~0.03s | ~0.03s | Log line confirms msgpack hits: `msgpack=19/19` ## Test plan - 297 tests pass - `TestEntriesToMsgpackSafe`: datetime/date serialization, empty input, round-trip - `TestMergeHelmIndexesWithParsed`: pre-parsed path produces identical output to raw-bytes path - `TestGetMemberIndexMsgpack`: msgpack hit, cold-build, broken msgpack fallback, upstream failure - Docker warm-rebuild measured at 5.9s vs 9.6s baseline Reviewed-on: #40
This commit was merged in pull request #40.
This commit is contained in:
@@ -390,7 +390,7 @@ If a member is unreachable and has no cached index, it is skipped and a warning
|
|||||||
|
|
||||||
**Caching:**
|
**Caching:**
|
||||||
|
|
||||||
The merged index is cached using `min(mutable_ttl)` across all members. Each member's raw index is cached in S3 under its own remote key by the normal proxy rules; the virtual handler reuses those copies when available.
|
The merged index is cached using `min(mutable_ttl)` across all members. Each member's raw index is cached in S3 under its own remote key; the virtual handler reuses those copies when available. On rebuild, each member's parsed index is also stored as a compact msgpack file (`index.msgpack`) alongside the raw YAML, eliminating the YAML parse cost on subsequent rebuilds.
|
||||||
|
|
||||||
**Helm example:**
|
**Helm example:**
|
||||||
|
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ dependencies = [
|
|||||||
"lxml>=4.9.0",
|
"lxml>=4.9.0",
|
||||||
"prometheus-client>=0.19.0",
|
"prometheus-client>=0.19.0",
|
||||||
"python-multipart>=0.0.6",
|
"python-multipart>=0.0.6",
|
||||||
|
"msgpack>=1.0.0",
|
||||||
]
|
]
|
||||||
requires-python = ">=3.11"
|
requires-python = ">=3.11"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ from datetime import UTC, date, datetime
|
|||||||
from typing import Protocol, runtime_checkable
|
from typing import Protocol, runtime_checkable
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
import msgpack as _msgpack
|
||||||
import yaml
|
import yaml
|
||||||
from fastapi import HTTPException, Request, Response
|
from fastapi import HTTPException, Request, Response
|
||||||
|
|
||||||
@@ -42,21 +43,43 @@ _HelmDumper.add_representer(datetime, _repr_datetime)
|
|||||||
_HelmDumper.add_representer(date, _repr_date)
|
_HelmDumper.add_representer(date, _repr_date)
|
||||||
|
|
||||||
|
|
||||||
|
def _entries_to_msgpack_safe(entries: dict) -> dict:
|
||||||
|
"""Convert datetime/date values to ISO strings for msgpack serialization."""
|
||||||
|
result = {}
|
||||||
|
for chart, versions in entries.items():
|
||||||
|
safe_versions = []
|
||||||
|
for v in versions:
|
||||||
|
safe_v = {}
|
||||||
|
for k, val in v.items():
|
||||||
|
if isinstance(val, datetime):
|
||||||
|
safe_v[k] = val.isoformat()
|
||||||
|
elif isinstance(val, date):
|
||||||
|
safe_v[k] = val.isoformat()
|
||||||
|
else:
|
||||||
|
safe_v[k] = val
|
||||||
|
safe_versions.append(safe_v)
|
||||||
|
result[chart] = safe_versions
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
async def _get_member_index(
|
async def _get_member_index(
|
||||||
member_name: str,
|
member_name: str,
|
||||||
member_cfg: dict,
|
member_cfg: dict,
|
||||||
path: str,
|
path: str,
|
||||||
storage,
|
storage,
|
||||||
cache,
|
cache,
|
||||||
) -> tuple[str, dict, int, bytes | None]:
|
) -> tuple[str, dict, int, bytes | None, dict | None]:
|
||||||
"""Fetch or retrieve cached index.yaml for one member remote.
|
"""Fetch or retrieve cached index.yaml for one member remote.
|
||||||
|
|
||||||
Returns (member_name, member_cfg, ttl, raw_bytes).
|
Returns (member_name, member_cfg, ttl, raw_bytes, parsed_entries).
|
||||||
raw_bytes is None if the member is unreachable and not in S3.
|
raw_bytes is None if the member is unreachable and not in S3.
|
||||||
|
parsed_entries is the pre-parsed entries dict (from msgpack cache), or None.
|
||||||
"""
|
"""
|
||||||
member_ttl = member_cfg.get("cache", {}).get("mutable_ttl", 3600)
|
member_ttl = member_cfg.get("cache", {}).get("mutable_ttl", 3600)
|
||||||
s3_key = storage.get_object_key(member_name, path)
|
s3_key = storage.get_object_key(member_name, path)
|
||||||
|
msgpack_key = storage.get_object_key(member_name, "index.msgpack")
|
||||||
raw_data: bytes | None = None
|
raw_data: bytes | None = None
|
||||||
|
parsed_entries: dict | None = None
|
||||||
|
|
||||||
if storage.exists(s3_key) and cache.is_index_valid(member_name, path):
|
if storage.exists(s3_key) and cache.is_index_valid(member_name, path):
|
||||||
try:
|
try:
|
||||||
@@ -64,6 +87,13 @@ async def _get_member_index(
|
|||||||
logger.info(f"Virtual: cache hit for member '{member_name}'")
|
logger.info(f"Virtual: cache hit for member '{member_name}'")
|
||||||
except Exception:
|
except Exception:
|
||||||
raw_data = None
|
raw_data = None
|
||||||
|
if raw_data is not None and storage.exists(msgpack_key):
|
||||||
|
try:
|
||||||
|
packed = storage.download_object(msgpack_key)
|
||||||
|
parsed_entries = _msgpack.unpackb(packed, raw=False)
|
||||||
|
logger.debug(f"Virtual: msgpack hit for member '{member_name}'")
|
||||||
|
except Exception:
|
||||||
|
parsed_entries = None
|
||||||
|
|
||||||
if raw_data is None:
|
if raw_data is None:
|
||||||
base_url = member_cfg.get("base_url", "").rstrip("/")
|
base_url = member_cfg.get("base_url", "").rstrip("/")
|
||||||
@@ -81,14 +111,23 @@ async def _get_member_index(
|
|||||||
raw_data = response.content
|
raw_data = response.content
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Virtual: failed to fetch index.yaml from member '{member_name}': {e}")
|
logger.warning(f"Virtual: failed to fetch index.yaml from member '{member_name}': {e}")
|
||||||
return member_name, member_cfg, member_ttl, None
|
return member_name, member_cfg, member_ttl, None, None
|
||||||
try:
|
try:
|
||||||
storage.upload(s3_key, raw_data)
|
storage.upload(s3_key, raw_data)
|
||||||
cache.mark_index_cached(member_name, path, member_ttl)
|
cache.mark_index_cached(member_name, path, member_ttl)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Virtual: failed to cache index.yaml for member '{member_name}': {e}")
|
logger.warning(f"Virtual: failed to cache index.yaml for member '{member_name}': {e}")
|
||||||
|
|
||||||
return member_name, member_cfg, member_ttl, raw_data
|
if parsed_entries is None and raw_data is not None:
|
||||||
|
try:
|
||||||
|
index = yaml.load(raw_data, Loader=_YamlLoader)
|
||||||
|
safe_entries = _entries_to_msgpack_safe(index.get("entries") or {})
|
||||||
|
storage.upload(msgpack_key, _msgpack.packb(safe_entries, use_bin_type=True))
|
||||||
|
parsed_entries = safe_entries
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Virtual: failed to build msgpack cache for '{member_name}': {e}")
|
||||||
|
|
||||||
|
return member_name, member_cfg, member_ttl, raw_data, parsed_entries
|
||||||
|
|
||||||
|
|
||||||
def _rewrite_urls(urls: list, base_url: str, proxy_base: str, member_name: str) -> list:
|
def _rewrite_urls(urls: list, base_url: str, proxy_base: str, member_name: str) -> list:
|
||||||
@@ -104,24 +143,35 @@ def _rewrite_urls(urls: list, base_url: str, proxy_base: str, member_name: str)
|
|||||||
return rewritten
|
return rewritten
|
||||||
|
|
||||||
|
|
||||||
def _merge_helm_indexes(raw_indexes: list[bytes], member_names: list[str], member_configs: list[dict], proxy_base: str) -> bytes:
|
def _merge_helm_indexes(
|
||||||
|
raw_indexes: list[bytes],
|
||||||
|
parsed_entries_list: list[dict | None],
|
||||||
|
member_names: list[str],
|
||||||
|
member_configs: list[dict],
|
||||||
|
proxy_base: str,
|
||||||
|
) -> bytes:
|
||||||
"""Merge helm index.yaml files with per-member URL rewriting.
|
"""Merge helm index.yaml files with per-member URL rewriting.
|
||||||
|
|
||||||
Priority is determined by position in member_names: earlier members win
|
Priority is determined by position in member_names: earlier members win
|
||||||
when the same chart name + version appears in multiple remotes.
|
when the same chart name + version appears in multiple remotes.
|
||||||
|
Uses pre-parsed msgpack entries when available to skip YAML parsing.
|
||||||
"""
|
"""
|
||||||
merged_entries: dict[str, list] = {}
|
merged_entries: dict[str, list] = {}
|
||||||
|
|
||||||
for raw_data, member_name, member_cfg in zip(raw_indexes, member_names, member_configs):
|
for raw_data, pre_parsed, member_name, member_cfg in zip(raw_indexes, parsed_entries_list, member_names, member_configs):
|
||||||
base_url = member_cfg.get("base_url", "").rstrip("/")
|
base_url = member_cfg.get("base_url", "").rstrip("/")
|
||||||
|
|
||||||
try:
|
if pre_parsed is not None:
|
||||||
index = yaml.load(raw_data, Loader=_YamlLoader)
|
entries = pre_parsed
|
||||||
except Exception as e:
|
else:
|
||||||
logger.warning(f"Virtual: failed to parse index.yaml from member '{member_name}': {e}")
|
try:
|
||||||
continue
|
index = yaml.load(raw_data, Loader=_YamlLoader)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Virtual: failed to parse index.yaml from member '{member_name}': {e}")
|
||||||
|
continue
|
||||||
|
entries = index.get("entries") or {}
|
||||||
|
|
||||||
for chart_name, versions in (index.get("entries") or {}).items():
|
for chart_name, versions in entries.items():
|
||||||
for version_entry in versions:
|
for version_entry in versions:
|
||||||
version_entry["urls"] = _rewrite_urls(
|
version_entry["urls"] = _rewrite_urls(
|
||||||
version_entry.get("urls") or [],
|
version_entry.get("urls") or [],
|
||||||
@@ -150,7 +200,14 @@ def _merge_helm_indexes(raw_indexes: list[bytes], member_names: list[str], membe
|
|||||||
@runtime_checkable
|
@runtime_checkable
|
||||||
class _VirtualHandler(Protocol):
|
class _VirtualHandler(Protocol):
|
||||||
def accepts_path(self, path: str) -> bool: ...
|
def accepts_path(self, path: str) -> bool: ...
|
||||||
def merge(self, raw_indexes: list[bytes], member_names: list[str], member_configs: list[dict], proxy_base: str) -> bytes: ...
|
def merge(
|
||||||
|
self,
|
||||||
|
raw_indexes: list[bytes],
|
||||||
|
parsed_entries: list[dict | None],
|
||||||
|
member_names: list[str],
|
||||||
|
member_configs: list[dict],
|
||||||
|
proxy_base: str,
|
||||||
|
) -> bytes: ...
|
||||||
def path_error(self) -> str: ...
|
def path_error(self) -> str: ...
|
||||||
|
|
||||||
|
|
||||||
@@ -158,8 +215,15 @@ class _HelmHandler:
|
|||||||
def accepts_path(self, path: str) -> bool:
|
def accepts_path(self, path: str) -> bool:
|
||||||
return path == "index.yaml"
|
return path == "index.yaml"
|
||||||
|
|
||||||
def merge(self, raw_indexes: list[bytes], member_names: list[str], member_configs: list[dict], proxy_base: str) -> bytes:
|
def merge(
|
||||||
return _merge_helm_indexes(raw_indexes, member_names, member_configs, proxy_base)
|
self,
|
||||||
|
raw_indexes: list[bytes],
|
||||||
|
parsed_entries: list[dict | None],
|
||||||
|
member_names: list[str],
|
||||||
|
member_configs: list[dict],
|
||||||
|
proxy_base: str,
|
||||||
|
) -> bytes:
|
||||||
|
return _merge_helm_indexes(raw_indexes, parsed_entries, member_names, member_configs, proxy_base)
|
||||||
|
|
||||||
def path_error(self) -> str:
|
def path_error(self) -> str:
|
||||||
return "Virtual helm repositories only serve index.yaml; chart tarballs are served directly by member remotes"
|
return "Virtual helm repositories only serve index.yaml; chart tarballs are served directly by member remotes"
|
||||||
@@ -210,17 +274,19 @@ async def handle(request: Request, virtual_name: str, path: str, storage, cache,
|
|||||||
fetch_ms = int((time.perf_counter() - t_fetch) * 1000)
|
fetch_ms = int((time.perf_counter() - t_fetch) * 1000)
|
||||||
|
|
||||||
raw_indexes: list[bytes] = []
|
raw_indexes: list[bytes] = []
|
||||||
|
used_parsed: list[dict | None] = []
|
||||||
used_members: list[str] = []
|
used_members: list[str] = []
|
||||||
used_configs: list[dict] = []
|
used_configs: list[dict] = []
|
||||||
min_ttl: int | None = None
|
min_ttl: int | None = None
|
||||||
|
|
||||||
for member_name, member_cfg, member_ttl, raw_data in results:
|
for member_name, member_cfg, member_ttl, raw_data, parsed_entries in results:
|
||||||
if min_ttl is None or member_ttl < min_ttl:
|
if min_ttl is None or member_ttl < min_ttl:
|
||||||
min_ttl = member_ttl
|
min_ttl = member_ttl
|
||||||
if raw_data is None:
|
if raw_data is None:
|
||||||
logger.warning(f"Virtual '{virtual_name}': skipping unreachable member '{member_name}'")
|
logger.warning(f"Virtual '{virtual_name}': skipping unreachable member '{member_name}'")
|
||||||
continue
|
continue
|
||||||
raw_indexes.append(raw_data)
|
raw_indexes.append(raw_data)
|
||||||
|
used_parsed.append(parsed_entries)
|
||||||
used_members.append(member_name)
|
used_members.append(member_name)
|
||||||
used_configs.append(member_cfg)
|
used_configs.append(member_cfg)
|
||||||
|
|
||||||
@@ -231,7 +297,7 @@ async def handle(request: Request, virtual_name: str, path: str, storage, cache,
|
|||||||
min_ttl = 3600
|
min_ttl = 3600
|
||||||
|
|
||||||
t_merge = time.perf_counter()
|
t_merge = time.perf_counter()
|
||||||
merged = await asyncio.to_thread(handler.merge, raw_indexes, used_members, used_configs, proxy_base)
|
merged = await asyncio.to_thread(handler.merge, raw_indexes, used_parsed, used_members, used_configs, proxy_base)
|
||||||
merge_ms = int((time.perf_counter() - t_merge) * 1000)
|
merge_ms = int((time.perf_counter() - t_merge) * 1000)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -239,9 +305,11 @@ async def handle(request: Request, virtual_name: str, path: str, storage, cache,
|
|||||||
storage.upload(virtual_key, merged)
|
storage.upload(virtual_key, merged)
|
||||||
cache.mark_index_cached(virtual_name, path, min_ttl)
|
cache.mark_index_cached(virtual_name, path, min_ttl)
|
||||||
store_ms = int((time.perf_counter() - t_store) * 1000)
|
store_ms = int((time.perf_counter() - t_store) * 1000)
|
||||||
|
msgpack_hits = sum(1 for p in used_parsed if p is not None)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Virtual MISS: {virtual_name}/{path} rebuilt from {used_members} "
|
f"Virtual MISS: {virtual_name}/{path} rebuilt from {used_members} "
|
||||||
f"(fetch={fetch_ms}ms merge={merge_ms}ms store={store_ms}ms ttl={min_ttl}s)"
|
f"(fetch={fetch_ms}ms merge={merge_ms}ms store={store_ms}ms ttl={min_ttl}s "
|
||||||
|
f"msgpack={msgpack_hits}/{len(used_members)})"
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Virtual: failed to store merged index for '{virtual_name}': {e}")
|
logger.warning(f"Virtual: failed to store merged index for '{virtual_name}': {e}")
|
||||||
|
|||||||
+196
-22
@@ -8,6 +8,7 @@ import yaml
|
|||||||
|
|
||||||
from artifactapi.artifact.virtual import (
|
from artifactapi.artifact.virtual import (
|
||||||
_HANDLERS,
|
_HANDLERS,
|
||||||
|
_entries_to_msgpack_safe,
|
||||||
_get_member_index,
|
_get_member_index,
|
||||||
_HelmDumper,
|
_HelmDumper,
|
||||||
_HelmHandler,
|
_HelmHandler,
|
||||||
@@ -173,13 +174,13 @@ class TestHelmHandler:
|
|||||||
assert isinstance(msg, str) and len(msg) > 0
|
assert isinstance(msg, str) and len(msg) > 0
|
||||||
|
|
||||||
def test_merge_returns_bytes(self):
|
def test_merge_returns_bytes(self):
|
||||||
result = self.handler.merge([_INDEX_A], ["member-a"], [_CFG_A], "http://proxy.example.com")
|
result = self.handler.merge([_INDEX_A], [None], ["member-a"], [_CFG_A], "http://proxy.example.com")
|
||||||
assert isinstance(result, bytes)
|
assert isinstance(result, bytes)
|
||||||
|
|
||||||
def test_merge_delegates_to_merge_helm_indexes(self):
|
def test_merge_delegates_to_merge_helm_indexes(self):
|
||||||
with patch("artifactapi.artifact.virtual._merge_helm_indexes", return_value=b"merged") as mock_fn:
|
with patch("artifactapi.artifact.virtual._merge_helm_indexes", return_value=b"merged") as mock_fn:
|
||||||
result = self.handler.merge([b"data"], ["m"], [{}], "http://proxy")
|
result = self.handler.merge([b"data"], [None], ["m"], [{}], "http://proxy")
|
||||||
mock_fn.assert_called_once_with([b"data"], ["m"], [{}], "http://proxy")
|
mock_fn.assert_called_once_with([b"data"], [None], ["m"], [{}], "http://proxy")
|
||||||
assert result == b"merged"
|
assert result == b"merged"
|
||||||
|
|
||||||
|
|
||||||
@@ -239,7 +240,7 @@ class TestRewriteUrls:
|
|||||||
|
|
||||||
class TestMergeHelmIndexes:
|
class TestMergeHelmIndexes:
|
||||||
def _merge(self, raw_indexes, member_names, member_configs, proxy_base="http://proxy.example.com"):
|
def _merge(self, raw_indexes, member_names, member_configs, proxy_base="http://proxy.example.com"):
|
||||||
return _merge_helm_indexes(raw_indexes, member_names, member_configs, proxy_base)
|
return _merge_helm_indexes(raw_indexes, [None] * len(raw_indexes), member_names, member_configs, proxy_base)
|
||||||
|
|
||||||
def _parse(self, raw):
|
def _parse(self, raw):
|
||||||
return yaml.safe_load(raw)
|
return yaml.safe_load(raw)
|
||||||
@@ -342,7 +343,7 @@ class TestGetMemberIndex:
|
|||||||
storage.exists.return_value = True
|
storage.exists.return_value = True
|
||||||
cache.is_index_valid.return_value = True
|
cache.is_index_valid.return_value = True
|
||||||
|
|
||||||
_, _, _, raw_data = await _get_member_index("m", member_cfg, "index.yaml", storage, cache)
|
_, _, _, raw_data, _ = await _get_member_index("m", member_cfg, "index.yaml", storage, cache)
|
||||||
|
|
||||||
assert raw_data == b"cached bytes"
|
assert raw_data == b"cached bytes"
|
||||||
|
|
||||||
@@ -365,7 +366,7 @@ class TestGetMemberIndex:
|
|||||||
mock_cls.return_value.__aenter__.return_value = mock_client
|
mock_cls.return_value.__aenter__.return_value = mock_client
|
||||||
mock_client.get.return_value = self._fake_response(b"fresh bytes")
|
mock_client.get.return_value = self._fake_response(b"fresh bytes")
|
||||||
|
|
||||||
_, _, _, raw_data = await _get_member_index("m", member_cfg, "index.yaml", storage, cache)
|
_, _, _, raw_data, _ = await _get_member_index("m", member_cfg, "index.yaml", storage, cache)
|
||||||
|
|
||||||
assert raw_data == b"fresh bytes"
|
assert raw_data == b"fresh bytes"
|
||||||
|
|
||||||
@@ -375,7 +376,7 @@ class TestGetMemberIndex:
|
|||||||
mock_cls.return_value.__aenter__.return_value = mock_client
|
mock_cls.return_value.__aenter__.return_value = mock_client
|
||||||
mock_client.get.return_value = self._fake_response()
|
mock_client.get.return_value = self._fake_response()
|
||||||
|
|
||||||
_, _, _, raw_data = await _get_member_index("m", member_cfg, "index.yaml", storage, cache)
|
_, _, _, raw_data, _ = await _get_member_index("m", member_cfg, "index.yaml", storage, cache)
|
||||||
|
|
||||||
assert raw_data == b"upstream bytes"
|
assert raw_data == b"upstream bytes"
|
||||||
|
|
||||||
@@ -434,7 +435,7 @@ class TestGetMemberIndex:
|
|||||||
mock_cls.return_value.__aenter__.return_value = mock_client
|
mock_cls.return_value.__aenter__.return_value = mock_client
|
||||||
mock_client.get.side_effect = Exception("connection refused")
|
mock_client.get.side_effect = Exception("connection refused")
|
||||||
|
|
||||||
_, _, _, raw_data = await _get_member_index("m", member_cfg, "index.yaml", storage, cache)
|
_, _, _, raw_data, _ = await _get_member_index("m", member_cfg, "index.yaml", storage, cache)
|
||||||
|
|
||||||
assert raw_data is None
|
assert raw_data is None
|
||||||
|
|
||||||
@@ -446,7 +447,7 @@ class TestGetMemberIndex:
|
|||||||
mock_cls.return_value.__aenter__.return_value = mock_client
|
mock_cls.return_value.__aenter__.return_value = mock_client
|
||||||
mock_client.get.return_value = self._fake_response()
|
mock_client.get.return_value = self._fake_response()
|
||||||
|
|
||||||
_, _, _, raw_data = await _get_member_index("m", member_cfg, "index.yaml", storage, cache)
|
_, _, _, raw_data, _ = await _get_member_index("m", member_cfg, "index.yaml", storage, cache)
|
||||||
|
|
||||||
assert raw_data == b"upstream bytes"
|
assert raw_data == b"upstream bytes"
|
||||||
|
|
||||||
@@ -457,7 +458,7 @@ class TestGetMemberIndex:
|
|||||||
mock_cls.return_value.__aenter__.return_value = mock_client
|
mock_cls.return_value.__aenter__.return_value = mock_client
|
||||||
mock_client.get.return_value = self._fake_response()
|
mock_client.get.return_value = self._fake_response()
|
||||||
|
|
||||||
_, _, ttl, _ = await _get_member_index("m", cfg, "index.yaml", storage, cache)
|
_, _, ttl, _, _ = await _get_member_index("m", cfg, "index.yaml", storage, cache)
|
||||||
|
|
||||||
assert ttl == 900
|
assert ttl == 900
|
||||||
|
|
||||||
@@ -468,7 +469,7 @@ class TestGetMemberIndex:
|
|||||||
mock_cls.return_value.__aenter__.return_value = mock_client
|
mock_cls.return_value.__aenter__.return_value = mock_client
|
||||||
mock_client.get.return_value = self._fake_response()
|
mock_client.get.return_value = self._fake_response()
|
||||||
|
|
||||||
_, _, ttl, _ = await _get_member_index("m", cfg, "index.yaml", storage, cache)
|
_, _, ttl, _, _ = await _get_member_index("m", cfg, "index.yaml", storage, cache)
|
||||||
|
|
||||||
assert ttl == 3600
|
assert ttl == 3600
|
||||||
|
|
||||||
@@ -567,7 +568,7 @@ class TestVirtualRoute:
|
|||||||
|
|
||||||
def test_cache_miss_returns_200_with_yaml_content_type(self, client, patched_virtual_deps):
|
def test_cache_miss_returns_200_with_yaml_content_type(self, client, patched_virtual_deps):
|
||||||
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
||||||
mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE)
|
mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None)
|
||||||
response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
||||||
|
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
@@ -575,7 +576,7 @@ class TestVirtualRoute:
|
|||||||
|
|
||||||
def test_cache_miss_response_contains_merged_entries(self, client, patched_virtual_deps):
|
def test_cache_miss_response_contains_merged_entries(self, client, patched_virtual_deps):
|
||||||
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
||||||
mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE)
|
mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None)
|
||||||
response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
||||||
|
|
||||||
index = yaml.safe_load(response.content)
|
index = yaml.safe_load(response.content)
|
||||||
@@ -584,7 +585,7 @@ class TestVirtualRoute:
|
|||||||
def test_cache_miss_stores_result_in_s3(self, client, patched_virtual_deps):
|
def test_cache_miss_stores_result_in_s3(self, client, patched_virtual_deps):
|
||||||
deps = patched_virtual_deps
|
deps = patched_virtual_deps
|
||||||
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
||||||
mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE)
|
mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None)
|
||||||
client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
||||||
|
|
||||||
deps["storage"].upload.assert_called_once()
|
deps["storage"].upload.assert_called_once()
|
||||||
@@ -592,7 +593,7 @@ class TestVirtualRoute:
|
|||||||
def test_cache_miss_marks_index_cached(self, client, patched_virtual_deps):
|
def test_cache_miss_marks_index_cached(self, client, patched_virtual_deps):
|
||||||
deps = patched_virtual_deps
|
deps = patched_virtual_deps
|
||||||
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
||||||
mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE)
|
mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None)
|
||||||
client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
||||||
|
|
||||||
deps["cache"].mark_index_cached.assert_called_once()
|
deps["cache"].mark_index_cached.assert_called_once()
|
||||||
@@ -601,8 +602,8 @@ class TestVirtualRoute:
|
|||||||
deps = patched_virtual_deps
|
deps = patched_virtual_deps
|
||||||
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
||||||
mock_get.side_effect = [
|
mock_get.side_effect = [
|
||||||
("helm-test", _CFG_A, 3600, _INDEX_SIMPLE),
|
("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None),
|
||||||
("helm-member-2", _CFG_B, 1800, _INDEX_SIMPLE),
|
("helm-member-2", _CFG_B, 1800, _INDEX_SIMPLE, None),
|
||||||
]
|
]
|
||||||
client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
||||||
|
|
||||||
@@ -611,7 +612,7 @@ class TestVirtualRoute:
|
|||||||
|
|
||||||
def test_all_members_unreachable_returns_502(self, client, patched_virtual_deps):
|
def test_all_members_unreachable_returns_502(self, client, patched_virtual_deps):
|
||||||
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
||||||
mock_get.return_value = ("helm-test", _CFG_A, 3600, None)
|
mock_get.return_value = ("helm-test", _CFG_A, 3600, None, None)
|
||||||
response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
||||||
|
|
||||||
assert response.status_code == 502
|
assert response.status_code == 502
|
||||||
@@ -619,8 +620,8 @@ class TestVirtualRoute:
|
|||||||
def test_one_member_unreachable_still_returns_200(self, client, patched_virtual_deps):
|
def test_one_member_unreachable_still_returns_200(self, client, patched_virtual_deps):
|
||||||
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
||||||
mock_get.side_effect = [
|
mock_get.side_effect = [
|
||||||
("helm-test", _CFG_A, 3600, _INDEX_SIMPLE),
|
("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None),
|
||||||
("helm-member-2", _CFG_B, 1800, None),
|
("helm-member-2", _CFG_B, 1800, None, None),
|
||||||
]
|
]
|
||||||
response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
||||||
|
|
||||||
@@ -638,7 +639,7 @@ class TestVirtualRoute:
|
|||||||
patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get,
|
patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get,
|
||||||
patch.object(main_mod.config, "get_remote_config", side_effect=patched_get),
|
patch.object(main_mod.config, "get_remote_config", side_effect=patched_get),
|
||||||
):
|
):
|
||||||
mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE)
|
mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None)
|
||||||
response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
||||||
|
|
||||||
# only helm-test was available — should succeed
|
# only helm-test was available — should succeed
|
||||||
@@ -650,7 +651,180 @@ class TestVirtualRoute:
|
|||||||
deps["storage"].upload.side_effect = Exception("S3 write error")
|
deps["storage"].upload.side_effect = Exception("S3 write error")
|
||||||
|
|
||||||
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get:
|
||||||
mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE)
|
mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None)
|
||||||
response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml")
|
||||||
|
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _entries_to_msgpack_safe
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestEntriesToMsgpackSafe:
|
||||||
|
def test_plain_string_values_pass_through(self):
|
||||||
|
entries = {"chart": [{"name": "chart", "version": "1.0.0", "urls": ["http://x/c.tgz"]}]}
|
||||||
|
result = _entries_to_msgpack_safe(entries)
|
||||||
|
assert result["chart"][0]["version"] == "1.0.0"
|
||||||
|
|
||||||
|
def test_datetime_converted_to_iso_string(self):
|
||||||
|
dt = datetime(2023, 6, 15, 12, 0, 0, tzinfo=UTC)
|
||||||
|
entries = {"chart": [{"name": "chart", "version": "1.0.0", "created": dt}]}
|
||||||
|
result = _entries_to_msgpack_safe(entries)
|
||||||
|
assert isinstance(result["chart"][0]["created"], str)
|
||||||
|
assert "2023-06-15" in result["chart"][0]["created"]
|
||||||
|
|
||||||
|
def test_date_converted_to_iso_string(self):
|
||||||
|
entries = {"chart": [{"name": "chart", "version": "1.0.0", "created": date(2023, 6, 15)}]}
|
||||||
|
result = _entries_to_msgpack_safe(entries)
|
||||||
|
assert result["chart"][0]["created"] == "2023-06-15"
|
||||||
|
|
||||||
|
def test_empty_entries_returns_empty_dict(self):
|
||||||
|
assert _entries_to_msgpack_safe({}) == {}
|
||||||
|
|
||||||
|
def test_multiple_versions_all_converted(self):
|
||||||
|
dt = datetime(2023, 1, 1, tzinfo=UTC)
|
||||||
|
entries = {
|
||||||
|
"chart": [
|
||||||
|
{"name": "chart", "version": "1.0.0", "created": dt},
|
||||||
|
{"name": "chart", "version": "2.0.0", "created": dt},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
result = _entries_to_msgpack_safe(entries)
|
||||||
|
for v in result["chart"]:
|
||||||
|
assert isinstance(v["created"], str)
|
||||||
|
|
||||||
|
def test_result_is_msgpack_serializable(self):
|
||||||
|
import msgpack
|
||||||
|
|
||||||
|
dt = datetime(2023, 6, 15, 12, 0, 0, tzinfo=UTC)
|
||||||
|
entries = {"chart": [{"name": "chart", "version": "1.0.0", "created": dt, "urls": ["http://x/c.tgz"]}]}
|
||||||
|
safe = _entries_to_msgpack_safe(entries)
|
||||||
|
packed = msgpack.packb(safe, use_bin_type=True)
|
||||||
|
unpacked = msgpack.unpackb(packed, raw=False)
|
||||||
|
assert unpacked["chart"][0]["created"] == safe["chart"][0]["created"]
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _merge_helm_indexes — pre-parsed entries path
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestMergeHelmIndexesWithParsed:
|
||||||
|
"""Verify that pre-parsed entries (from msgpack) produce the same output as raw YAML."""
|
||||||
|
|
||||||
|
def _parse_entries(self, raw: bytes) -> dict:
|
||||||
|
index = yaml.safe_load(raw)
|
||||||
|
return index.get("entries") or {}
|
||||||
|
|
||||||
|
def test_parsed_entries_produce_same_charts_as_raw(self):
|
||||||
|
parsed = self._parse_entries(_INDEX_A)
|
||||||
|
raw_result = yaml.safe_load(_merge_helm_indexes([_INDEX_A], [None], ["member-a"], [_CFG_A], "http://proxy.example.com"))
|
||||||
|
parsed_result = yaml.safe_load(_merge_helm_indexes([_INDEX_A], [parsed], ["member-a"], [_CFG_A], "http://proxy.example.com"))
|
||||||
|
assert set(raw_result["entries"].keys()) == set(parsed_result["entries"].keys())
|
||||||
|
|
||||||
|
def test_parsed_entries_urls_are_rewritten(self):
|
||||||
|
parsed = self._parse_entries(_INDEX_A)
|
||||||
|
result = yaml.safe_load(_merge_helm_indexes([_INDEX_A], [parsed], ["member-a"], [_CFG_A], "http://proxy.example.com"))
|
||||||
|
url = result["entries"]["vault"][0]["urls"][0]
|
||||||
|
assert "member-a" in url
|
||||||
|
assert "proxy.example.com" in url
|
||||||
|
|
||||||
|
def test_none_parsed_falls_back_to_raw_bytes(self):
|
||||||
|
result = yaml.safe_load(_merge_helm_indexes([_INDEX_A], [None], ["member-a"], [_CFG_A], "http://proxy.example.com"))
|
||||||
|
assert "vault" in result["entries"]
|
||||||
|
|
||||||
|
def test_mixed_parsed_and_raw_merge_correctly(self):
|
||||||
|
parsed_a = self._parse_entries(_INDEX_A)
|
||||||
|
result = yaml.safe_load(
|
||||||
|
_merge_helm_indexes(
|
||||||
|
[_INDEX_A, _INDEX_B],
|
||||||
|
[parsed_a, None],
|
||||||
|
["member-a", "member-b"],
|
||||||
|
[_CFG_A, _CFG_B],
|
||||||
|
"http://proxy.example.com",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert "vault" in result["entries"]
|
||||||
|
assert "nginx" in result["entries"]
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _get_member_index — msgpack cache behaviour
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestGetMemberIndexMsgpack:
|
||||||
|
@pytest.fixture
|
||||||
|
def storage(self):
|
||||||
|
m = MagicMock()
|
||||||
|
m.get_object_key.side_effect = lambda name, path: f"{name}/{path}"
|
||||||
|
m.exists.return_value = False
|
||||||
|
m.download_object.return_value = _INDEX_SIMPLE
|
||||||
|
return m
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def cache(self):
|
||||||
|
m = MagicMock()
|
||||||
|
m.is_index_valid.return_value = False
|
||||||
|
return m
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def member_cfg(self):
|
||||||
|
return {"base_url": "https://helm.releases.hashicorp.com", "cache": {"mutable_ttl": 3600}}
|
||||||
|
|
||||||
|
def _fake_response(self, content=_INDEX_SIMPLE):
|
||||||
|
r = MagicMock()
|
||||||
|
r.content = content
|
||||||
|
r.raise_for_status = MagicMock()
|
||||||
|
return r
|
||||||
|
|
||||||
|
async def test_cache_hit_with_msgpack_returns_parsed_entries(self, storage, cache, member_cfg):
|
||||||
|
import msgpack
|
||||||
|
|
||||||
|
entries = {"mychart": [{"name": "mychart", "version": "1.0.0", "urls": ["http://x/c.tgz"]}]}
|
||||||
|
packed = msgpack.packb(entries, use_bin_type=True)
|
||||||
|
|
||||||
|
storage.exists.side_effect = lambda key: True
|
||||||
|
cache.is_index_valid.return_value = True
|
||||||
|
storage.download_object.side_effect = lambda key: packed if key.endswith("index.msgpack") else _INDEX_SIMPLE
|
||||||
|
|
||||||
|
_, _, _, raw_data, parsed = await _get_member_index("m", member_cfg, "index.yaml", storage, cache)
|
||||||
|
|
||||||
|
assert parsed == entries
|
||||||
|
|
||||||
|
async def test_cache_miss_builds_msgpack_and_returns_parsed(self, storage, cache, member_cfg):
|
||||||
|
with patch("artifactapi.artifact.virtual.httpx.AsyncClient") as mock_cls:
|
||||||
|
mock_client = AsyncMock()
|
||||||
|
mock_cls.return_value.__aenter__.return_value = mock_client
|
||||||
|
mock_client.get.return_value = self._fake_response()
|
||||||
|
|
||||||
|
_, _, _, raw_data, parsed = await _get_member_index("m", member_cfg, "index.yaml", storage, cache)
|
||||||
|
|
||||||
|
assert raw_data == _INDEX_SIMPLE
|
||||||
|
assert isinstance(parsed, dict)
|
||||||
|
assert "mychart" in parsed
|
||||||
|
|
||||||
|
async def test_broken_msgpack_rebuilds_from_raw_yaml(self, storage, cache, member_cfg):
|
||||||
|
storage.exists.side_effect = lambda key: True
|
||||||
|
cache.is_index_valid.return_value = True
|
||||||
|
storage.download_object.side_effect = lambda key: b"not-valid-msgpack" if key.endswith("index.msgpack") else _INDEX_SIMPLE
|
||||||
|
|
||||||
|
_, _, _, raw_data, parsed = await _get_member_index("m", member_cfg, "index.yaml", storage, cache)
|
||||||
|
|
||||||
|
assert raw_data == _INDEX_SIMPLE
|
||||||
|
# Falls back to YAML parse and rebuilds msgpack — entries are returned
|
||||||
|
assert isinstance(parsed, dict)
|
||||||
|
assert "mychart" in parsed
|
||||||
|
|
||||||
|
async def test_upstream_failure_returns_none_for_both(self, storage, cache, member_cfg):
|
||||||
|
with patch("artifactapi.artifact.virtual.httpx.AsyncClient") as mock_cls:
|
||||||
|
mock_client = AsyncMock()
|
||||||
|
mock_cls.return_value.__aenter__.return_value = mock_client
|
||||||
|
mock_client.get.side_effect = Exception("timeout")
|
||||||
|
|
||||||
|
_, _, _, raw_data, parsed = await _get_member_index("m", member_cfg, "index.yaml", storage, cache)
|
||||||
|
|
||||||
|
assert raw_data is None
|
||||||
|
assert parsed is None
|
||||||
|
|||||||
Reference in New Issue
Block a user