Files
artifactapi/src/artifactapi/artifact/virtual.py
T
unkinben 1e0f4dc840
ci/woodpecker/pr/test Pipeline was successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/pr/build Pipeline was successful
feat: cache parsed member indexes as msgpack to skip YAML re-parse on rebuild
Warm rebuilds of virtual repos (member caches valid, virtual TTL expired)
previously re-parsed all member index.yaml files on every rebuild. With 19
Helm members totalling 14 MB, YAML parsing was 60% of merge time (~6.3s of
~9.6s). Parsing each member's YAML also produces msgpack and stores it in S3
alongside the raw index. Subsequent rebuilds load the compact msgpack and skip
YAML parsing entirely.

Before: warm rebuild ~9.6s (CSafeLoader baseline)
After:  warm rebuild ~5.9s (38% faster, merge=4.7s down from ~9.6s)
2026-05-02 17:04:19 +10:00

318 lines
12 KiB
Python

import asyncio
import base64
import logging
import time
from datetime import UTC, date, datetime
from typing import Protocol, runtime_checkable
import httpx
import msgpack as _msgpack
import yaml
from fastapi import HTTPException, Request, Response
logger = logging.getLogger(__name__)
try:
_YamlLoader = yaml.CSafeLoader
_YamlDumperBase = yaml.CDumper
except AttributeError:
_YamlLoader = yaml.SafeLoader
_YamlDumperBase = yaml.Dumper
class _HelmDumper(_YamlDumperBase):
"""YAML dumper that serializes datetime/date objects back to ISO 8601 strings.
yaml.safe_load converts timestamp-shaped YAML scalars (e.g. chart `created`
fields) to Python datetime objects. Without a custom representer, yaml.dump
would render them as "2022-12-16 11:08:49+00:00" (space, not T), which
Go's YAML parser cannot unmarshal into time.Time.
"""
def _repr_datetime(dumper: yaml.Dumper, data: datetime) -> yaml.ScalarNode:
s = data.strftime("%Y-%m-%dT%H:%M:%S.%f") + ("Z" if data.tzinfo else "")
return dumper.represent_scalar("tag:yaml.org,2002:str", s)
def _repr_date(dumper: yaml.Dumper, data: date) -> yaml.ScalarNode:
return dumper.represent_scalar("tag:yaml.org,2002:str", data.isoformat())
_HelmDumper.add_representer(datetime, _repr_datetime)
_HelmDumper.add_representer(date, _repr_date)
def _entries_to_msgpack_safe(entries: dict) -> dict:
"""Convert datetime/date values to ISO strings for msgpack serialization."""
result = {}
for chart, versions in entries.items():
safe_versions = []
for v in versions:
safe_v = {}
for k, val in v.items():
if isinstance(val, datetime):
safe_v[k] = val.isoformat()
elif isinstance(val, date):
safe_v[k] = val.isoformat()
else:
safe_v[k] = val
safe_versions.append(safe_v)
result[chart] = safe_versions
return result
async def _get_member_index(
member_name: str,
member_cfg: dict,
path: str,
storage,
cache,
) -> tuple[str, dict, int, bytes | None, dict | None]:
"""Fetch or retrieve cached index.yaml for one member remote.
Returns (member_name, member_cfg, ttl, raw_bytes, parsed_entries).
raw_bytes is None if the member is unreachable and not in S3.
parsed_entries is the pre-parsed entries dict (from msgpack cache), or None.
"""
member_ttl = member_cfg.get("cache", {}).get("mutable_ttl", 3600)
s3_key = storage.get_object_key(member_name, path)
msgpack_key = storage.get_object_key(member_name, "index.msgpack")
raw_data: bytes | None = None
parsed_entries: dict | None = None
if storage.exists(s3_key) and cache.is_index_valid(member_name, path):
try:
raw_data = storage.download_object(s3_key)
logger.info(f"Virtual: cache hit for member '{member_name}'")
except Exception:
raw_data = None
if raw_data is not None and storage.exists(msgpack_key):
try:
packed = storage.download_object(msgpack_key)
parsed_entries = _msgpack.unpackb(packed, raw=False)
logger.debug(f"Virtual: msgpack hit for member '{member_name}'")
except Exception:
parsed_entries = None
if raw_data is None:
base_url = member_cfg.get("base_url", "").rstrip("/")
upstream_url = f"{base_url}/index.yaml"
headers = {}
username = member_cfg.get("username")
password = member_cfg.get("password")
if username and password:
token = base64.b64encode(f"{username}:{password}".encode()).decode()
headers["Authorization"] = f"Basic {token}"
try:
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(upstream_url, headers=headers, timeout=30.0)
response.raise_for_status()
raw_data = response.content
except Exception as e:
logger.warning(f"Virtual: failed to fetch index.yaml from member '{member_name}': {e}")
return member_name, member_cfg, member_ttl, None, None
try:
storage.upload(s3_key, raw_data)
cache.mark_index_cached(member_name, path, member_ttl)
except Exception as e:
logger.warning(f"Virtual: failed to cache index.yaml for member '{member_name}': {e}")
if parsed_entries is None and raw_data is not None:
try:
index = yaml.load(raw_data, Loader=_YamlLoader)
safe_entries = _entries_to_msgpack_safe(index.get("entries") or {})
storage.upload(msgpack_key, _msgpack.packb(safe_entries, use_bin_type=True))
parsed_entries = safe_entries
except Exception as e:
logger.warning(f"Virtual: failed to build msgpack cache for '{member_name}': {e}")
return member_name, member_cfg, member_ttl, raw_data, parsed_entries
def _rewrite_urls(urls: list, base_url: str, proxy_base: str, member_name: str) -> list:
proxy_remote = f"{proxy_base}/api/v1/remote/{member_name}"
rewritten = []
for url in urls:
if url.startswith(("http://", "https://")):
if base_url and url.startswith(base_url):
url = proxy_remote + url[len(base_url) :]
else:
url = f"{proxy_remote}/{url.lstrip('/')}"
rewritten.append(url)
return rewritten
def _merge_helm_indexes(
raw_indexes: list[bytes],
parsed_entries_list: list[dict | None],
member_names: list[str],
member_configs: list[dict],
proxy_base: str,
) -> bytes:
"""Merge helm index.yaml files with per-member URL rewriting.
Priority is determined by position in member_names: earlier members win
when the same chart name + version appears in multiple remotes.
Uses pre-parsed msgpack entries when available to skip YAML parsing.
"""
merged_entries: dict[str, list] = {}
for raw_data, pre_parsed, member_name, member_cfg in zip(raw_indexes, parsed_entries_list, member_names, member_configs):
base_url = member_cfg.get("base_url", "").rstrip("/")
if pre_parsed is not None:
entries = pre_parsed
else:
try:
index = yaml.load(raw_data, Loader=_YamlLoader)
except Exception as e:
logger.warning(f"Virtual: failed to parse index.yaml from member '{member_name}': {e}")
continue
entries = index.get("entries") or {}
for chart_name, versions in entries.items():
for version_entry in versions:
version_entry["urls"] = _rewrite_urls(
version_entry.get("urls") or [],
base_url,
proxy_base,
member_name,
)
if chart_name not in merged_entries:
merged_entries[chart_name] = list(versions)
else:
existing = {(v.get("name"), v.get("version")) for v in merged_entries[chart_name]}
for version_entry in versions:
key = (version_entry.get("name"), version_entry.get("version"))
if key not in existing:
merged_entries[chart_name].append(version_entry)
existing.add(key)
merged = {
"apiVersion": "v1",
"entries": merged_entries,
"generated": datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.000Z"),
}
return yaml.dump(merged, Dumper=_HelmDumper, default_flow_style=False, allow_unicode=True).encode()
@runtime_checkable
class _VirtualHandler(Protocol):
def accepts_path(self, path: str) -> bool: ...
def merge(
self,
raw_indexes: list[bytes],
parsed_entries: list[dict | None],
member_names: list[str],
member_configs: list[dict],
proxy_base: str,
) -> bytes: ...
def path_error(self) -> str: ...
class _HelmHandler:
def accepts_path(self, path: str) -> bool:
return path == "index.yaml"
def merge(
self,
raw_indexes: list[bytes],
parsed_entries: list[dict | None],
member_names: list[str],
member_configs: list[dict],
proxy_base: str,
) -> bytes:
return _merge_helm_indexes(raw_indexes, parsed_entries, member_names, member_configs, proxy_base)
def path_error(self) -> str:
return "Virtual helm repositories only serve index.yaml; chart tarballs are served directly by member remotes"
_HANDLERS: dict[str, _VirtualHandler] = {
"helm": _HelmHandler(),
}
async def handle(request: Request, virtual_name: str, path: str, storage, cache, config) -> Response:
virtual_cfg = config.get_virtual_config(virtual_name)
if not virtual_cfg:
raise HTTPException(status_code=404, detail=f"Virtual repository '{virtual_name}' not configured")
package = virtual_cfg.get("package")
handler = _HANDLERS.get(package)
if handler is None:
raise HTTPException(status_code=400, detail=f"Virtual repositories with package '{package}' are not yet supported")
if not handler.accepts_path(path):
raise HTTPException(status_code=404, detail=handler.path_error())
members = virtual_cfg.get("members", [])
if not members:
raise HTTPException(status_code=500, detail=f"Virtual repository '{virtual_name}' has no members configured")
virtual_key = storage.get_object_key(virtual_name, path)
if cache.is_index_valid(virtual_name, path) and storage.exists(virtual_key):
data = storage.download_object(virtual_key)
logger.info(f"Virtual HIT: {virtual_name}/{path}")
return Response(content=data, media_type="text/yaml")
# Resolve configs first (config reads are sync/cheap)
member_entries = []
for member_name in members:
member_cfg = config.get_remote_config(member_name)
if not member_cfg:
logger.warning(f"Virtual '{virtual_name}': member '{member_name}' not found in config, skipping")
continue
member_entries.append((member_name, member_cfg))
# Fetch all member indexes in parallel; asyncio.gather preserves input order
proxy_base = str(request.base_url).rstrip("/")
t_fetch = time.perf_counter()
results = await asyncio.gather(*[_get_member_index(name, cfg, path, storage, cache) for name, cfg in member_entries])
fetch_ms = int((time.perf_counter() - t_fetch) * 1000)
raw_indexes: list[bytes] = []
used_parsed: list[dict | None] = []
used_members: list[str] = []
used_configs: list[dict] = []
min_ttl: int | None = None
for member_name, member_cfg, member_ttl, raw_data, parsed_entries in results:
if min_ttl is None or member_ttl < min_ttl:
min_ttl = member_ttl
if raw_data is None:
logger.warning(f"Virtual '{virtual_name}': skipping unreachable member '{member_name}'")
continue
raw_indexes.append(raw_data)
used_parsed.append(parsed_entries)
used_members.append(member_name)
used_configs.append(member_cfg)
if not raw_indexes:
raise HTTPException(status_code=502, detail=f"Virtual repository '{virtual_name}': no member indices could be fetched")
if min_ttl is None:
min_ttl = 3600
t_merge = time.perf_counter()
merged = await asyncio.to_thread(handler.merge, raw_indexes, used_parsed, used_members, used_configs, proxy_base)
merge_ms = int((time.perf_counter() - t_merge) * 1000)
try:
t_store = time.perf_counter()
storage.upload(virtual_key, merged)
cache.mark_index_cached(virtual_name, path, min_ttl)
store_ms = int((time.perf_counter() - t_store) * 1000)
msgpack_hits = sum(1 for p in used_parsed if p is not None)
logger.info(
f"Virtual MISS: {virtual_name}/{path} rebuilt from {used_members} "
f"(fetch={fetch_ms}ms merge={merge_ms}ms store={store_ms}ms ttl={min_ttl}s "
f"msgpack={msgpack_hits}/{len(used_members)})"
)
except Exception as e:
logger.warning(f"Virtual: failed to store merged index for '{virtual_name}': {e}")
return Response(content=merged, media_type="text/yaml")