From 8a7f26b19307ad230fd19d755a812fa50a7e66e3 Mon Sep 17 00:00:00 2001 From: Ben Vincent Date: Sat, 2 May 2026 17:15:31 +1000 Subject: [PATCH] feat: cache parsed member indexes as msgpack to skip YAML re-parse on rebuild (#40) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #36 ## Summary - After fetching a member's `index.yaml` (from upstream or S3), the handler now parses it and stores a compact msgpack file (`index.msgpack`) alongside the raw YAML in S3 - On subsequent virtual rebuilds (member caches valid, virtual TTL expired), the handler loads the msgpack file instead of re-parsing raw YAML — eliminating the costliest phase - `_entries_to_msgpack_safe()` converts datetime/date objects to ISO strings before packing (msgpack cannot natively serialize Python datetimes) - `_merge_helm_indexes()` accepts `list[dict | None]` as pre-parsed entries; falls back to raw YAML parse when msgpack is unavailable - `_VirtualHandler.merge()` protocol updated to pass pre-parsed entries to all future handler implementations - Broken msgpack is detected and rebuilt from raw YAML automatically ## Performance Phase breakdown (19-member helm-all virtual, 14 MB total): | Phase | Time | % | |---|---|---| | YAML parse (eliminated) | 6314 ms | 60% | | URL rewrite + dedup | 33 ms | 0.3% | | YAML dump | 4124 ms | 39% | | Scenario | Before (CSafeLoader only, #34) | After | |---|---|---| | Cold rebuild (upstream fetch) | ~21s | ~26s (+5s for msgpack build, one-time) | | **Warm rebuild (S3 hit, virtual expired)** | **~9.6s** | **~5.9s (38% faster)** | | Virtual cache hit | ~0.03s | ~0.03s | Log line confirms msgpack hits: `msgpack=19/19` ## Test plan - 297 tests pass - `TestEntriesToMsgpackSafe`: datetime/date serialization, empty input, round-trip - `TestMergeHelmIndexesWithParsed`: pre-parsed path produces identical output to raw-bytes path - `TestGetMemberIndexMsgpack`: msgpack hit, cold-build, broken msgpack fallback, upstream failure - Docker warm-rebuild measured at 5.9s vs 9.6s baseline Reviewed-on: https://git.unkin.net/unkin/artifactapi/pulls/40 --- README.md | 2 +- pyproject.toml | 1 + src/artifactapi/artifact/virtual.py | 104 ++++++++++--- tests/test_virtual.py | 218 +++++++++++++++++++++++++--- 4 files changed, 284 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index 6fecc88..abd4cd1 100644 --- a/README.md +++ b/README.md @@ -390,7 +390,7 @@ If a member is unreachable and has no cached index, it is skipped and a warning **Caching:** -The merged index is cached using `min(mutable_ttl)` across all members. Each member's raw index is cached in S3 under its own remote key by the normal proxy rules; the virtual handler reuses those copies when available. +The merged index is cached using `min(mutable_ttl)` across all members. Each member's raw index is cached in S3 under its own remote key; the virtual handler reuses those copies when available. On rebuild, each member's parsed index is also stored as a compact msgpack file (`index.msgpack`) alongside the raw YAML, eliminating the YAML parse cost on subsequent rebuilds. **Helm example:** diff --git a/pyproject.toml b/pyproject.toml index 9473046..8be2f2d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "lxml>=4.9.0", "prometheus-client>=0.19.0", "python-multipart>=0.0.6", + "msgpack>=1.0.0", ] requires-python = ">=3.11" readme = "README.md" diff --git a/src/artifactapi/artifact/virtual.py b/src/artifactapi/artifact/virtual.py index 9c2bb7a..3f4c88e 100644 --- a/src/artifactapi/artifact/virtual.py +++ b/src/artifactapi/artifact/virtual.py @@ -6,6 +6,7 @@ from datetime import UTC, date, datetime from typing import Protocol, runtime_checkable import httpx +import msgpack as _msgpack import yaml from fastapi import HTTPException, Request, Response @@ -42,21 +43,43 @@ _HelmDumper.add_representer(datetime, _repr_datetime) _HelmDumper.add_representer(date, _repr_date) +def _entries_to_msgpack_safe(entries: dict) -> dict: + """Convert datetime/date values to ISO strings for msgpack serialization.""" + result = {} + for chart, versions in entries.items(): + safe_versions = [] + for v in versions: + safe_v = {} + for k, val in v.items(): + if isinstance(val, datetime): + safe_v[k] = val.isoformat() + elif isinstance(val, date): + safe_v[k] = val.isoformat() + else: + safe_v[k] = val + safe_versions.append(safe_v) + result[chart] = safe_versions + return result + + async def _get_member_index( member_name: str, member_cfg: dict, path: str, storage, cache, -) -> tuple[str, dict, int, bytes | None]: +) -> tuple[str, dict, int, bytes | None, dict | None]: """Fetch or retrieve cached index.yaml for one member remote. - Returns (member_name, member_cfg, ttl, raw_bytes). + Returns (member_name, member_cfg, ttl, raw_bytes, parsed_entries). raw_bytes is None if the member is unreachable and not in S3. + parsed_entries is the pre-parsed entries dict (from msgpack cache), or None. """ member_ttl = member_cfg.get("cache", {}).get("mutable_ttl", 3600) s3_key = storage.get_object_key(member_name, path) + msgpack_key = storage.get_object_key(member_name, "index.msgpack") raw_data: bytes | None = None + parsed_entries: dict | None = None if storage.exists(s3_key) and cache.is_index_valid(member_name, path): try: @@ -64,6 +87,13 @@ async def _get_member_index( logger.info(f"Virtual: cache hit for member '{member_name}'") except Exception: raw_data = None + if raw_data is not None and storage.exists(msgpack_key): + try: + packed = storage.download_object(msgpack_key) + parsed_entries = _msgpack.unpackb(packed, raw=False) + logger.debug(f"Virtual: msgpack hit for member '{member_name}'") + except Exception: + parsed_entries = None if raw_data is None: base_url = member_cfg.get("base_url", "").rstrip("/") @@ -81,14 +111,23 @@ async def _get_member_index( raw_data = response.content except Exception as e: logger.warning(f"Virtual: failed to fetch index.yaml from member '{member_name}': {e}") - return member_name, member_cfg, member_ttl, None + return member_name, member_cfg, member_ttl, None, None try: storage.upload(s3_key, raw_data) cache.mark_index_cached(member_name, path, member_ttl) except Exception as e: logger.warning(f"Virtual: failed to cache index.yaml for member '{member_name}': {e}") - return member_name, member_cfg, member_ttl, raw_data + if parsed_entries is None and raw_data is not None: + try: + index = yaml.load(raw_data, Loader=_YamlLoader) + safe_entries = _entries_to_msgpack_safe(index.get("entries") or {}) + storage.upload(msgpack_key, _msgpack.packb(safe_entries, use_bin_type=True)) + parsed_entries = safe_entries + except Exception as e: + logger.warning(f"Virtual: failed to build msgpack cache for '{member_name}': {e}") + + return member_name, member_cfg, member_ttl, raw_data, parsed_entries def _rewrite_urls(urls: list, base_url: str, proxy_base: str, member_name: str) -> list: @@ -104,24 +143,35 @@ def _rewrite_urls(urls: list, base_url: str, proxy_base: str, member_name: str) return rewritten -def _merge_helm_indexes(raw_indexes: list[bytes], member_names: list[str], member_configs: list[dict], proxy_base: str) -> bytes: +def _merge_helm_indexes( + raw_indexes: list[bytes], + parsed_entries_list: list[dict | None], + member_names: list[str], + member_configs: list[dict], + proxy_base: str, +) -> bytes: """Merge helm index.yaml files with per-member URL rewriting. Priority is determined by position in member_names: earlier members win when the same chart name + version appears in multiple remotes. + Uses pre-parsed msgpack entries when available to skip YAML parsing. """ merged_entries: dict[str, list] = {} - for raw_data, member_name, member_cfg in zip(raw_indexes, member_names, member_configs): + for raw_data, pre_parsed, member_name, member_cfg in zip(raw_indexes, parsed_entries_list, member_names, member_configs): base_url = member_cfg.get("base_url", "").rstrip("/") - try: - index = yaml.load(raw_data, Loader=_YamlLoader) - except Exception as e: - logger.warning(f"Virtual: failed to parse index.yaml from member '{member_name}': {e}") - continue + if pre_parsed is not None: + entries = pre_parsed + else: + try: + index = yaml.load(raw_data, Loader=_YamlLoader) + except Exception as e: + logger.warning(f"Virtual: failed to parse index.yaml from member '{member_name}': {e}") + continue + entries = index.get("entries") or {} - for chart_name, versions in (index.get("entries") or {}).items(): + for chart_name, versions in entries.items(): for version_entry in versions: version_entry["urls"] = _rewrite_urls( version_entry.get("urls") or [], @@ -150,7 +200,14 @@ def _merge_helm_indexes(raw_indexes: list[bytes], member_names: list[str], membe @runtime_checkable class _VirtualHandler(Protocol): def accepts_path(self, path: str) -> bool: ... - def merge(self, raw_indexes: list[bytes], member_names: list[str], member_configs: list[dict], proxy_base: str) -> bytes: ... + def merge( + self, + raw_indexes: list[bytes], + parsed_entries: list[dict | None], + member_names: list[str], + member_configs: list[dict], + proxy_base: str, + ) -> bytes: ... def path_error(self) -> str: ... @@ -158,8 +215,15 @@ class _HelmHandler: def accepts_path(self, path: str) -> bool: return path == "index.yaml" - def merge(self, raw_indexes: list[bytes], member_names: list[str], member_configs: list[dict], proxy_base: str) -> bytes: - return _merge_helm_indexes(raw_indexes, member_names, member_configs, proxy_base) + def merge( + self, + raw_indexes: list[bytes], + parsed_entries: list[dict | None], + member_names: list[str], + member_configs: list[dict], + proxy_base: str, + ) -> bytes: + return _merge_helm_indexes(raw_indexes, parsed_entries, member_names, member_configs, proxy_base) def path_error(self) -> str: return "Virtual helm repositories only serve index.yaml; chart tarballs are served directly by member remotes" @@ -210,17 +274,19 @@ async def handle(request: Request, virtual_name: str, path: str, storage, cache, fetch_ms = int((time.perf_counter() - t_fetch) * 1000) raw_indexes: list[bytes] = [] + used_parsed: list[dict | None] = [] used_members: list[str] = [] used_configs: list[dict] = [] min_ttl: int | None = None - for member_name, member_cfg, member_ttl, raw_data in results: + for member_name, member_cfg, member_ttl, raw_data, parsed_entries in results: if min_ttl is None or member_ttl < min_ttl: min_ttl = member_ttl if raw_data is None: logger.warning(f"Virtual '{virtual_name}': skipping unreachable member '{member_name}'") continue raw_indexes.append(raw_data) + used_parsed.append(parsed_entries) used_members.append(member_name) used_configs.append(member_cfg) @@ -231,7 +297,7 @@ async def handle(request: Request, virtual_name: str, path: str, storage, cache, min_ttl = 3600 t_merge = time.perf_counter() - merged = await asyncio.to_thread(handler.merge, raw_indexes, used_members, used_configs, proxy_base) + merged = await asyncio.to_thread(handler.merge, raw_indexes, used_parsed, used_members, used_configs, proxy_base) merge_ms = int((time.perf_counter() - t_merge) * 1000) try: @@ -239,9 +305,11 @@ async def handle(request: Request, virtual_name: str, path: str, storage, cache, storage.upload(virtual_key, merged) cache.mark_index_cached(virtual_name, path, min_ttl) store_ms = int((time.perf_counter() - t_store) * 1000) + msgpack_hits = sum(1 for p in used_parsed if p is not None) logger.info( f"Virtual MISS: {virtual_name}/{path} rebuilt from {used_members} " - f"(fetch={fetch_ms}ms merge={merge_ms}ms store={store_ms}ms ttl={min_ttl}s)" + f"(fetch={fetch_ms}ms merge={merge_ms}ms store={store_ms}ms ttl={min_ttl}s " + f"msgpack={msgpack_hits}/{len(used_members)})" ) except Exception as e: logger.warning(f"Virtual: failed to store merged index for '{virtual_name}': {e}") diff --git a/tests/test_virtual.py b/tests/test_virtual.py index bfbee56..9ad5be9 100644 --- a/tests/test_virtual.py +++ b/tests/test_virtual.py @@ -8,6 +8,7 @@ import yaml from artifactapi.artifact.virtual import ( _HANDLERS, + _entries_to_msgpack_safe, _get_member_index, _HelmDumper, _HelmHandler, @@ -173,13 +174,13 @@ class TestHelmHandler: assert isinstance(msg, str) and len(msg) > 0 def test_merge_returns_bytes(self): - result = self.handler.merge([_INDEX_A], ["member-a"], [_CFG_A], "http://proxy.example.com") + result = self.handler.merge([_INDEX_A], [None], ["member-a"], [_CFG_A], "http://proxy.example.com") assert isinstance(result, bytes) def test_merge_delegates_to_merge_helm_indexes(self): with patch("artifactapi.artifact.virtual._merge_helm_indexes", return_value=b"merged") as mock_fn: - result = self.handler.merge([b"data"], ["m"], [{}], "http://proxy") - mock_fn.assert_called_once_with([b"data"], ["m"], [{}], "http://proxy") + result = self.handler.merge([b"data"], [None], ["m"], [{}], "http://proxy") + mock_fn.assert_called_once_with([b"data"], [None], ["m"], [{}], "http://proxy") assert result == b"merged" @@ -239,7 +240,7 @@ class TestRewriteUrls: class TestMergeHelmIndexes: def _merge(self, raw_indexes, member_names, member_configs, proxy_base="http://proxy.example.com"): - return _merge_helm_indexes(raw_indexes, member_names, member_configs, proxy_base) + return _merge_helm_indexes(raw_indexes, [None] * len(raw_indexes), member_names, member_configs, proxy_base) def _parse(self, raw): return yaml.safe_load(raw) @@ -342,7 +343,7 @@ class TestGetMemberIndex: storage.exists.return_value = True cache.is_index_valid.return_value = True - _, _, _, raw_data = await _get_member_index("m", member_cfg, "index.yaml", storage, cache) + _, _, _, raw_data, _ = await _get_member_index("m", member_cfg, "index.yaml", storage, cache) assert raw_data == b"cached bytes" @@ -365,7 +366,7 @@ class TestGetMemberIndex: mock_cls.return_value.__aenter__.return_value = mock_client mock_client.get.return_value = self._fake_response(b"fresh bytes") - _, _, _, raw_data = await _get_member_index("m", member_cfg, "index.yaml", storage, cache) + _, _, _, raw_data, _ = await _get_member_index("m", member_cfg, "index.yaml", storage, cache) assert raw_data == b"fresh bytes" @@ -375,7 +376,7 @@ class TestGetMemberIndex: mock_cls.return_value.__aenter__.return_value = mock_client mock_client.get.return_value = self._fake_response() - _, _, _, raw_data = await _get_member_index("m", member_cfg, "index.yaml", storage, cache) + _, _, _, raw_data, _ = await _get_member_index("m", member_cfg, "index.yaml", storage, cache) assert raw_data == b"upstream bytes" @@ -434,7 +435,7 @@ class TestGetMemberIndex: mock_cls.return_value.__aenter__.return_value = mock_client mock_client.get.side_effect = Exception("connection refused") - _, _, _, raw_data = await _get_member_index("m", member_cfg, "index.yaml", storage, cache) + _, _, _, raw_data, _ = await _get_member_index("m", member_cfg, "index.yaml", storage, cache) assert raw_data is None @@ -446,7 +447,7 @@ class TestGetMemberIndex: mock_cls.return_value.__aenter__.return_value = mock_client mock_client.get.return_value = self._fake_response() - _, _, _, raw_data = await _get_member_index("m", member_cfg, "index.yaml", storage, cache) + _, _, _, raw_data, _ = await _get_member_index("m", member_cfg, "index.yaml", storage, cache) assert raw_data == b"upstream bytes" @@ -457,7 +458,7 @@ class TestGetMemberIndex: mock_cls.return_value.__aenter__.return_value = mock_client mock_client.get.return_value = self._fake_response() - _, _, ttl, _ = await _get_member_index("m", cfg, "index.yaml", storage, cache) + _, _, ttl, _, _ = await _get_member_index("m", cfg, "index.yaml", storage, cache) assert ttl == 900 @@ -468,7 +469,7 @@ class TestGetMemberIndex: mock_cls.return_value.__aenter__.return_value = mock_client mock_client.get.return_value = self._fake_response() - _, _, ttl, _ = await _get_member_index("m", cfg, "index.yaml", storage, cache) + _, _, ttl, _, _ = await _get_member_index("m", cfg, "index.yaml", storage, cache) assert ttl == 3600 @@ -567,7 +568,7 @@ class TestVirtualRoute: def test_cache_miss_returns_200_with_yaml_content_type(self, client, patched_virtual_deps): with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get: - mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE) + mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None) response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml") assert response.status_code == 200 @@ -575,7 +576,7 @@ class TestVirtualRoute: def test_cache_miss_response_contains_merged_entries(self, client, patched_virtual_deps): with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get: - mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE) + mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None) response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml") index = yaml.safe_load(response.content) @@ -584,7 +585,7 @@ class TestVirtualRoute: def test_cache_miss_stores_result_in_s3(self, client, patched_virtual_deps): deps = patched_virtual_deps with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get: - mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE) + mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None) client.get("/api/v1/virtual/helm-virtual-test/index.yaml") deps["storage"].upload.assert_called_once() @@ -592,7 +593,7 @@ class TestVirtualRoute: def test_cache_miss_marks_index_cached(self, client, patched_virtual_deps): deps = patched_virtual_deps with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get: - mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE) + mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None) client.get("/api/v1/virtual/helm-virtual-test/index.yaml") deps["cache"].mark_index_cached.assert_called_once() @@ -601,8 +602,8 @@ class TestVirtualRoute: deps = patched_virtual_deps with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get: mock_get.side_effect = [ - ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE), - ("helm-member-2", _CFG_B, 1800, _INDEX_SIMPLE), + ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None), + ("helm-member-2", _CFG_B, 1800, _INDEX_SIMPLE, None), ] client.get("/api/v1/virtual/helm-virtual-test/index.yaml") @@ -611,7 +612,7 @@ class TestVirtualRoute: def test_all_members_unreachable_returns_502(self, client, patched_virtual_deps): with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get: - mock_get.return_value = ("helm-test", _CFG_A, 3600, None) + mock_get.return_value = ("helm-test", _CFG_A, 3600, None, None) response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml") assert response.status_code == 502 @@ -619,8 +620,8 @@ class TestVirtualRoute: def test_one_member_unreachable_still_returns_200(self, client, patched_virtual_deps): with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get: mock_get.side_effect = [ - ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE), - ("helm-member-2", _CFG_B, 1800, None), + ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None), + ("helm-member-2", _CFG_B, 1800, None, None), ] response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml") @@ -638,7 +639,7 @@ class TestVirtualRoute: patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get, patch.object(main_mod.config, "get_remote_config", side_effect=patched_get), ): - mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE) + mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None) response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml") # only helm-test was available — should succeed @@ -650,7 +651,180 @@ class TestVirtualRoute: deps["storage"].upload.side_effect = Exception("S3 write error") with patch("artifactapi.artifact.virtual._get_member_index", new_callable=AsyncMock) as mock_get: - mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE) + mock_get.return_value = ("helm-test", _CFG_A, 3600, _INDEX_SIMPLE, None) response = client.get("/api/v1/virtual/helm-virtual-test/index.yaml") assert response.status_code == 200 + + +# --------------------------------------------------------------------------- +# _entries_to_msgpack_safe +# --------------------------------------------------------------------------- + + +class TestEntriesToMsgpackSafe: + def test_plain_string_values_pass_through(self): + entries = {"chart": [{"name": "chart", "version": "1.0.0", "urls": ["http://x/c.tgz"]}]} + result = _entries_to_msgpack_safe(entries) + assert result["chart"][0]["version"] == "1.0.0" + + def test_datetime_converted_to_iso_string(self): + dt = datetime(2023, 6, 15, 12, 0, 0, tzinfo=UTC) + entries = {"chart": [{"name": "chart", "version": "1.0.0", "created": dt}]} + result = _entries_to_msgpack_safe(entries) + assert isinstance(result["chart"][0]["created"], str) + assert "2023-06-15" in result["chart"][0]["created"] + + def test_date_converted_to_iso_string(self): + entries = {"chart": [{"name": "chart", "version": "1.0.0", "created": date(2023, 6, 15)}]} + result = _entries_to_msgpack_safe(entries) + assert result["chart"][0]["created"] == "2023-06-15" + + def test_empty_entries_returns_empty_dict(self): + assert _entries_to_msgpack_safe({}) == {} + + def test_multiple_versions_all_converted(self): + dt = datetime(2023, 1, 1, tzinfo=UTC) + entries = { + "chart": [ + {"name": "chart", "version": "1.0.0", "created": dt}, + {"name": "chart", "version": "2.0.0", "created": dt}, + ] + } + result = _entries_to_msgpack_safe(entries) + for v in result["chart"]: + assert isinstance(v["created"], str) + + def test_result_is_msgpack_serializable(self): + import msgpack + + dt = datetime(2023, 6, 15, 12, 0, 0, tzinfo=UTC) + entries = {"chart": [{"name": "chart", "version": "1.0.0", "created": dt, "urls": ["http://x/c.tgz"]}]} + safe = _entries_to_msgpack_safe(entries) + packed = msgpack.packb(safe, use_bin_type=True) + unpacked = msgpack.unpackb(packed, raw=False) + assert unpacked["chart"][0]["created"] == safe["chart"][0]["created"] + + +# --------------------------------------------------------------------------- +# _merge_helm_indexes — pre-parsed entries path +# --------------------------------------------------------------------------- + + +class TestMergeHelmIndexesWithParsed: + """Verify that pre-parsed entries (from msgpack) produce the same output as raw YAML.""" + + def _parse_entries(self, raw: bytes) -> dict: + index = yaml.safe_load(raw) + return index.get("entries") or {} + + def test_parsed_entries_produce_same_charts_as_raw(self): + parsed = self._parse_entries(_INDEX_A) + raw_result = yaml.safe_load(_merge_helm_indexes([_INDEX_A], [None], ["member-a"], [_CFG_A], "http://proxy.example.com")) + parsed_result = yaml.safe_load(_merge_helm_indexes([_INDEX_A], [parsed], ["member-a"], [_CFG_A], "http://proxy.example.com")) + assert set(raw_result["entries"].keys()) == set(parsed_result["entries"].keys()) + + def test_parsed_entries_urls_are_rewritten(self): + parsed = self._parse_entries(_INDEX_A) + result = yaml.safe_load(_merge_helm_indexes([_INDEX_A], [parsed], ["member-a"], [_CFG_A], "http://proxy.example.com")) + url = result["entries"]["vault"][0]["urls"][0] + assert "member-a" in url + assert "proxy.example.com" in url + + def test_none_parsed_falls_back_to_raw_bytes(self): + result = yaml.safe_load(_merge_helm_indexes([_INDEX_A], [None], ["member-a"], [_CFG_A], "http://proxy.example.com")) + assert "vault" in result["entries"] + + def test_mixed_parsed_and_raw_merge_correctly(self): + parsed_a = self._parse_entries(_INDEX_A) + result = yaml.safe_load( + _merge_helm_indexes( + [_INDEX_A, _INDEX_B], + [parsed_a, None], + ["member-a", "member-b"], + [_CFG_A, _CFG_B], + "http://proxy.example.com", + ) + ) + assert "vault" in result["entries"] + assert "nginx" in result["entries"] + + +# --------------------------------------------------------------------------- +# _get_member_index — msgpack cache behaviour +# --------------------------------------------------------------------------- + + +class TestGetMemberIndexMsgpack: + @pytest.fixture + def storage(self): + m = MagicMock() + m.get_object_key.side_effect = lambda name, path: f"{name}/{path}" + m.exists.return_value = False + m.download_object.return_value = _INDEX_SIMPLE + return m + + @pytest.fixture + def cache(self): + m = MagicMock() + m.is_index_valid.return_value = False + return m + + @pytest.fixture + def member_cfg(self): + return {"base_url": "https://helm.releases.hashicorp.com", "cache": {"mutable_ttl": 3600}} + + def _fake_response(self, content=_INDEX_SIMPLE): + r = MagicMock() + r.content = content + r.raise_for_status = MagicMock() + return r + + async def test_cache_hit_with_msgpack_returns_parsed_entries(self, storage, cache, member_cfg): + import msgpack + + entries = {"mychart": [{"name": "mychart", "version": "1.0.0", "urls": ["http://x/c.tgz"]}]} + packed = msgpack.packb(entries, use_bin_type=True) + + storage.exists.side_effect = lambda key: True + cache.is_index_valid.return_value = True + storage.download_object.side_effect = lambda key: packed if key.endswith("index.msgpack") else _INDEX_SIMPLE + + _, _, _, raw_data, parsed = await _get_member_index("m", member_cfg, "index.yaml", storage, cache) + + assert parsed == entries + + async def test_cache_miss_builds_msgpack_and_returns_parsed(self, storage, cache, member_cfg): + with patch("artifactapi.artifact.virtual.httpx.AsyncClient") as mock_cls: + mock_client = AsyncMock() + mock_cls.return_value.__aenter__.return_value = mock_client + mock_client.get.return_value = self._fake_response() + + _, _, _, raw_data, parsed = await _get_member_index("m", member_cfg, "index.yaml", storage, cache) + + assert raw_data == _INDEX_SIMPLE + assert isinstance(parsed, dict) + assert "mychart" in parsed + + async def test_broken_msgpack_rebuilds_from_raw_yaml(self, storage, cache, member_cfg): + storage.exists.side_effect = lambda key: True + cache.is_index_valid.return_value = True + storage.download_object.side_effect = lambda key: b"not-valid-msgpack" if key.endswith("index.msgpack") else _INDEX_SIMPLE + + _, _, _, raw_data, parsed = await _get_member_index("m", member_cfg, "index.yaml", storage, cache) + + assert raw_data == _INDEX_SIMPLE + # Falls back to YAML parse and rebuilds msgpack — entries are returned + assert isinstance(parsed, dict) + assert "mychart" in parsed + + async def test_upstream_failure_returns_none_for_both(self, storage, cache, member_cfg): + with patch("artifactapi.artifact.virtual.httpx.AsyncClient") as mock_cls: + mock_client = AsyncMock() + mock_cls.return_value.__aenter__.return_value = mock_client + mock_client.get.side_effect = Exception("timeout") + + _, _, _, raw_data, parsed = await _get_member_index("m", member_cfg, "index.yaml", storage, cache) + + assert raw_data is None + assert parsed is None