feat: add check_mutable_updates flag for conditional upstream revalidation
When check_mutable_updates: true is set on a remote, expired user-defined mutable files are revalidated before re-downloading: - On expiry a conditional HEAD is sent with If-None-Match / If-Modified-Since - 304 Not Modified: TTL is refreshed in Redis, S3 cache is untouched - 200 / no conditional support: cache is invalidated and file re-downloaded - Network error: safe fallback — assume changed, re-download ETag and Last-Modified from upstream responses are stored in Redis under mutable:meta:<remote>:<hash> (no expiry, cleaned up on re-download or cache flush). The flag only applies to user-configured mutable_patterns; built-in package-type defaults (APKINDEX, repomd.xml, Docker manifests) are always re-fetched unconditionally. cache/flush also clears mutable:meta:* keys alongside index:* keys.
This commit is contained in:
@@ -26,11 +26,13 @@ class RedisCache:
|
||||
return any(re.search(p, file_path) for p in patterns)
|
||||
|
||||
def get_index_cache_key(self, remote_name: str, path: str) -> str:
|
||||
"""Generate cache key for index files"""
|
||||
return f"index:{remote_name}:{hashlib.sha256(path.encode()).hexdigest()[:16]}"
|
||||
|
||||
def is_index_valid(self, remote_name: str, path: str, ttl_override: int = None) -> bool:
|
||||
"""Check if index file is still valid (not expired)"""
|
||||
def get_mutable_meta_key(self, remote_name: str, path: str) -> str:
|
||||
return f"mutable:meta:{remote_name}:{hashlib.sha256(path.encode()).hexdigest()[:16]}"
|
||||
|
||||
def is_index_valid(self, remote_name: str, path: str) -> bool:
|
||||
"""Check if mutable file is still within its TTL window."""
|
||||
if not self.available:
|
||||
return False
|
||||
|
||||
@@ -41,7 +43,7 @@ class RedisCache:
|
||||
return False
|
||||
|
||||
def mark_index_cached(self, remote_name: str, path: str, ttl: int = 300) -> None:
|
||||
"""Mark index file as cached with TTL"""
|
||||
"""Set or refresh the TTL key for a mutable file."""
|
||||
if not self.available:
|
||||
return
|
||||
|
||||
@@ -51,13 +53,45 @@ class RedisCache:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def store_mutable_meta(self, remote_name: str, path: str, etag: str | None, last_modified: str | None) -> None:
|
||||
"""Persist ETag and Last-Modified for future conditional requests."""
|
||||
if not self.available:
|
||||
return
|
||||
data = {}
|
||||
if etag:
|
||||
data["etag"] = etag
|
||||
if last_modified:
|
||||
data["last_modified"] = last_modified
|
||||
if not data:
|
||||
return
|
||||
try:
|
||||
self.client.hset(self.get_mutable_meta_key(remote_name, path), mapping=data)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def get_mutable_meta(self, remote_name: str, path: str) -> dict:
|
||||
"""Return stored ETag/Last-Modified for a mutable file, or {}."""
|
||||
if not self.available:
|
||||
return {}
|
||||
try:
|
||||
return self.client.hgetall(self.get_mutable_meta_key(remote_name, path)) or {}
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
def delete_mutable_meta(self, remote_name: str, path: str) -> None:
|
||||
if not self.available:
|
||||
return
|
||||
try:
|
||||
self.client.delete(self.get_mutable_meta_key(remote_name, path))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def cleanup_expired_index(self, storage, remote_name: str, path: str) -> None:
|
||||
"""Remove expired index from S3 storage"""
|
||||
"""Remove an expired mutable file from S3 and clear its Redis meta."""
|
||||
if not self.available:
|
||||
return
|
||||
|
||||
try:
|
||||
# Construct the URL the same way as in the main flow
|
||||
import os
|
||||
|
||||
from .config import ConfigManager
|
||||
@@ -69,9 +103,10 @@ class RedisCache:
|
||||
if remote_config:
|
||||
base_url = remote_config.get("base_url")
|
||||
if base_url:
|
||||
# Use hierarchical path-based key (same as cache_single_artifact)
|
||||
s3_key = storage.get_object_key(remote_name, path)
|
||||
if storage.exists(s3_key):
|
||||
storage.client.delete_object(Bucket=storage.bucket, Key=s3_key)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self.delete_mutable_meta(remote_name, path)
|
||||
|
||||
@@ -124,6 +124,13 @@ class ConfigManager:
|
||||
db_url = f"postgresql://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}"
|
||||
return {"url": db_url}
|
||||
|
||||
def get_user_mutable_patterns(self, remote_name: str) -> list[str]:
|
||||
"""Return only user-configured mutable_patterns, excluding package-type defaults."""
|
||||
remote_config = self.get_remote_config(remote_name)
|
||||
if not remote_config:
|
||||
return []
|
||||
return remote_config.get("mutable_patterns", [])
|
||||
|
||||
def get_mutable_patterns(self, remote_name: str) -> list[str]:
|
||||
"""Return mutable-file patterns for a remote (TTL is configured per-remote in cache.index_ttl).
|
||||
|
||||
|
||||
+64
-6
@@ -87,8 +87,10 @@ def flush_cache(
|
||||
if cache_type in ["all", "index"]:
|
||||
if remote:
|
||||
patterns.append(f"index:{remote}:*")
|
||||
patterns.append(f"mutable:meta:{remote}:*")
|
||||
else:
|
||||
patterns.append("index:*")
|
||||
patterns.append("mutable:meta:*")
|
||||
|
||||
if cache_type in ["all", "metrics"]:
|
||||
if remote:
|
||||
@@ -240,12 +242,36 @@ async def cache_single_artifact(url: str, remote_name: str, path: str) -> dict:
|
||||
"storage_path": storage_path,
|
||||
"size": len(response.content),
|
||||
"status": "cached",
|
||||
"etag": response.headers.get("ETag"),
|
||||
"last_modified": response.headers.get("Last-Modified"),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {"url": url, "status": "error", "error": str(e)}
|
||||
|
||||
|
||||
async def check_upstream_changed(remote_url: str, remote_name: str, path: str) -> bool:
|
||||
"""Conditional HEAD against upstream. Returns False only on a definitive 304."""
|
||||
meta = cache.get_mutable_meta(remote_name, path)
|
||||
if not meta:
|
||||
return True
|
||||
|
||||
headers = {}
|
||||
if meta.get("etag"):
|
||||
headers["If-None-Match"] = meta["etag"]
|
||||
if meta.get("last_modified"):
|
||||
headers["If-Modified-Since"] = meta["last_modified"]
|
||||
if not headers:
|
||||
return True
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(follow_redirects=True) as client:
|
||||
response = await client.head(remote_url, headers=headers)
|
||||
return response.status_code != 304
|
||||
except Exception:
|
||||
return True
|
||||
|
||||
|
||||
@app.get("/api/v1/remote/{remote_name}/{path:path}")
|
||||
async def get_artifact(remote_name: str, path: str):
|
||||
# Check if remote is configured
|
||||
@@ -302,9 +328,23 @@ async def get_artifact(remote_name: str, path: str):
|
||||
|
||||
if cached_key and is_mutable:
|
||||
if not cache.is_index_valid(remote_name, path):
|
||||
logger.info(f"Mutable file EXPIRED: {remote_name}/{path} - removing from cache")
|
||||
cache.cleanup_expired_index(storage, remote_name, path)
|
||||
cached_key = None # Force re-download
|
||||
remote_cfg = config.get_remote_config(remote_name) or {}
|
||||
check_updates = remote_cfg.get("check_mutable_updates", False)
|
||||
user_mutable = check_updates and cache.is_mutable_file(path, config.get_user_mutable_patterns(remote_name))
|
||||
if user_mutable:
|
||||
changed = await check_upstream_changed(remote_url, remote_name, path)
|
||||
if not changed:
|
||||
mutable_ttl = config.get_cache_config(remote_name).get("mutable_ttl", 3600)
|
||||
cache.mark_index_cached(remote_name, path, mutable_ttl)
|
||||
logger.info(f"Mutable file UNCHANGED: {remote_name}/{path} - TTL refreshed ({mutable_ttl}s)")
|
||||
else:
|
||||
logger.info(f"Mutable file CHANGED: {remote_name}/{path} - re-downloading")
|
||||
cache.cleanup_expired_index(storage, remote_name, path)
|
||||
cached_key = None
|
||||
else:
|
||||
logger.info(f"Mutable file EXPIRED: {remote_name}/{path} - removing from cache")
|
||||
cache.cleanup_expired_index(storage, remote_name, path)
|
||||
cached_key = None
|
||||
|
||||
if cached_key:
|
||||
# Return cached artifact
|
||||
@@ -362,6 +402,8 @@ async def get_artifact(remote_name: str, path: str):
|
||||
mutable_ttl = cache_config.get("mutable_ttl", 3600)
|
||||
cache.mark_index_cached(remote_name, path, mutable_ttl)
|
||||
logger.info(f"Mutable file cached with TTL: {remote_name}/{path} (ttl: {mutable_ttl}s)")
|
||||
if result.get("etag") or result.get("last_modified"):
|
||||
cache.store_mutable_meta(remote_name, path, result.get("etag"), result.get("last_modified"))
|
||||
|
||||
# Now return the cached artifact
|
||||
try:
|
||||
@@ -439,9 +481,23 @@ async def docker_v2_proxy(request: Request, remote_name: str, path: str):
|
||||
|
||||
if cached_key and is_mutable:
|
||||
if not cache.is_index_valid(remote_name, path):
|
||||
logger.info(f"Mutable file EXPIRED: {remote_name}/{path} - removing from cache")
|
||||
cache.cleanup_expired_index(storage, remote_name, path)
|
||||
cached_key = None
|
||||
remote_cfg = config.get_remote_config(remote_name) or {}
|
||||
check_updates = remote_cfg.get("check_mutable_updates", False)
|
||||
user_mutable = check_updates and cache.is_mutable_file(path, config.get_user_mutable_patterns(remote_name))
|
||||
if user_mutable:
|
||||
changed = await check_upstream_changed(remote_url, remote_name, path)
|
||||
if not changed:
|
||||
mutable_ttl = config.get_cache_config(remote_name).get("mutable_ttl", 3600)
|
||||
cache.mark_index_cached(remote_name, path, mutable_ttl)
|
||||
logger.info(f"Mutable file UNCHANGED: {remote_name}/{path} - TTL refreshed ({mutable_ttl}s)")
|
||||
else:
|
||||
logger.info(f"Mutable file CHANGED: {remote_name}/{path} - re-downloading")
|
||||
cache.cleanup_expired_index(storage, remote_name, path)
|
||||
cached_key = None
|
||||
else:
|
||||
logger.info(f"Mutable file EXPIRED: {remote_name}/{path} - removing from cache")
|
||||
cache.cleanup_expired_index(storage, remote_name, path)
|
||||
cached_key = None
|
||||
|
||||
if not cached_key:
|
||||
logger.info(f"Cache MISS: {remote_name}/{path} - fetching from remote: {remote_url}")
|
||||
@@ -453,6 +509,8 @@ async def docker_v2_proxy(request: Request, remote_name: str, path: str):
|
||||
mutable_ttl = cache_config.get("mutable_ttl", 3600)
|
||||
cache.mark_index_cached(remote_name, path, mutable_ttl)
|
||||
logger.info(f"Mutable file cached with TTL: {remote_name}/{path} (ttl: {mutable_ttl}s)")
|
||||
if result.get("etag") or result.get("last_modified"):
|
||||
cache.store_mutable_meta(remote_name, path, result.get("etag"), result.get("last_modified"))
|
||||
|
||||
artifact_data = storage.download_object(storage.get_object_key(remote_name, path))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user