fix: cross-link tag manifests to digest keys and add fetch lock to prevent thundering herd #42

Merged
unkinben merged 1 commits from fix/docker-manifest-digest-cache-and-thundering-herd into master 2026-05-10 22:12:54 +10:00
4 changed files with 257 additions and 15 deletions
+40 -15
View File
@@ -1,3 +1,4 @@
import asyncio
import hashlib import hashlib
import json import json
import logging import logging
@@ -47,23 +48,39 @@ async def proxy(request: Request, remote_name: str, path: str, storage, cache, c
if not await _proxy.handle_expired_mutable(remote_name, path, remote_url, config, cache, storage): if not await _proxy.handle_expired_mutable(remote_name, path, remote_url, config, cache, storage):
cached_key = None cached_key = None
lock_acquired = False
if not cached_key:
lock_acquired = cache.acquire_fetch_lock(remote_name, path)
if not lock_acquired:
# Another pod is already fetching — poll storage briefly before issuing a duplicate upstream request
for _ in range(10):
await asyncio.sleep(0.5)
probe_key = storage.get_object_key(remote_name, path)
if storage.exists(probe_key):
cached_key = probe_key
break
if not cached_key: if not cached_key:
logger.info(f"Cache MISS: {remote_name}/{path} - fetching from remote: {remote_url}") logger.info(f"Cache MISS: {remote_name}/{path} - fetching from remote: {remote_url}")
result = await _proxy.cache_single_artifact(remote_url, remote_name, path, storage, remote_config) try:
if result["status"] == "error": result = await _proxy.cache_single_artifact(remote_url, remote_name, path, storage, remote_config)
raise HTTPException(status_code=502, detail=f"Failed to fetch: {result['error']}") if result["status"] == "error":
if result["status"] == "cached" and is_mutable: raise HTTPException(status_code=502, detail=f"Failed to fetch: {result['error']}")
cache_config = config.get_cache_config(remote_name) if result["status"] == "cached" and is_mutable:
mutable_ttl = cache_config.get("mutable_ttl", 3600) cache_config = config.get_cache_config(remote_name)
cache.mark_index_cached(remote_name, path, mutable_ttl) mutable_ttl = cache_config.get("mutable_ttl", 3600)
logger.info(f"Mutable file cached with TTL: {remote_name}/{path} (ttl: {mutable_ttl}s)") cache.mark_index_cached(remote_name, path, mutable_ttl)
if result.get("etag") or result.get("last_modified"): logger.info(f"Mutable file cached with TTL: {remote_name}/{path} (ttl: {mutable_ttl}s)")
cache.store_mutable_meta(remote_name, path, result.get("etag"), result.get("last_modified")) if result.get("etag") or result.get("last_modified"):
if not is_mutable: cache.store_mutable_meta(remote_name, path, result.get("etag"), result.get("last_modified"))
published = result.get("last_modified") if not is_mutable:
if published: published = result.get("last_modified")
cache.store_artifact_published(remote_name, path, published) if published:
_proxy._check_quarantine(remote_name, published, config) cache.store_artifact_published(remote_name, path, published)
_proxy._check_quarantine(remote_name, published, config)
finally:
if lock_acquired:
cache.release_fetch_lock(remote_name, path)
elif not is_mutable: elif not is_mutable:
published = cache.get_artifact_published(remote_name, path) published = cache.get_artifact_published(remote_name, path)
if not published: if not published:
@@ -90,6 +107,14 @@ async def proxy(request: Request, remote_name: str, path: str, storage, cache, c
content_type = "application/vnd.oci.image.manifest.v1+json" content_type = "application/vnd.oci.image.manifest.v1+json"
digest = f"sha256:{hashlib.sha256(artifact_data).hexdigest()}" digest = f"sha256:{hashlib.sha256(artifact_data).hexdigest()}"
# Cross-link tag manifests to their sha256 digest key so digest-addressed pulls hit cache
if is_mutable and "/manifests/" in path:
digest_path = re.sub(r"/manifests/[^/]+$", f"/manifests/{digest}", path)
digest_key = storage.get_object_key(remote_name, digest_path)
if not storage.exists(digest_key):
storage.upload(digest_key, artifact_data)
headers = { headers = {
"Docker-Distribution-Api-Version": "registry/2.0", "Docker-Distribution-Api-Version": "registry/2.0",
"Docker-Content-Digest": digest, "Docker-Content-Digest": digest,
+19
View File
@@ -99,6 +99,25 @@ class RedisCache:
except Exception: except Exception:
return None return None
def acquire_fetch_lock(self, remote_name: str, path: str, ttl: int = 30) -> bool:
"""Try to acquire a short-lived fetch lock. Returns True if acquired, False if held by another caller."""
if not self.available:
return True # fail open: no Redis → behave as if we always hold the lock
key = f"fetchlock:{remote_name}:{hashlib.sha256(path.encode()).hexdigest()[:16]}"
try:
return bool(self.client.set(key, 1, nx=True, ex=ttl))
except Exception:
return True
def release_fetch_lock(self, remote_name: str, path: str) -> None:
if not self.available:
return
key = f"fetchlock:{remote_name}:{hashlib.sha256(path.encode()).hexdigest()[:16]}"
try:
self.client.delete(key)
except Exception:
pass
def cleanup_expired_index(self, storage, remote_name: str, path: str) -> None: def cleanup_expired_index(self, storage, remote_name: str, path: str) -> None:
if not self.available: if not self.available:
return return
+71
View File
@@ -327,3 +327,74 @@ class TestArtifactPublished:
def test_get_returns_none_when_unavailable(self, unavailable_cache): def test_get_returns_none_when_unavailable(self, unavailable_cache):
assert unavailable_cache.get_artifact_published("remote", "path") is None assert unavailable_cache.get_artifact_published("remote", "path") is None
# ---------------------------------------------------------------------------
# fetch lock (thundering-herd deduplication)
# ---------------------------------------------------------------------------
class TestFetchLock:
def test_acquire_returns_true_when_lock_obtained(self, cache_with_redis, mock_redis_client):
mock_redis_client.set.return_value = True
result = cache_with_redis.acquire_fetch_lock("myremote", "library/nginx/manifests/latest")
assert result is True
def test_acquire_calls_set_nx_with_ttl(self, cache_with_redis, mock_redis_client):
mock_redis_client.set.return_value = True
cache_with_redis.acquire_fetch_lock("myremote", "library/nginx/manifests/latest", ttl=15)
_, kwargs = mock_redis_client.set.call_args
assert kwargs.get("nx") is True
assert kwargs.get("ex") == 15
def test_acquire_returns_false_when_lock_already_held(self, cache_with_redis, mock_redis_client):
mock_redis_client.set.return_value = None # Redis SET NX → None when key exists
result = cache_with_redis.acquire_fetch_lock("myremote", "library/nginx/manifests/latest")
assert result is False
def test_acquire_fails_open_when_unavailable(self, unavailable_cache):
# caller must be allowed to proceed when Redis is down
assert unavailable_cache.acquire_fetch_lock("myremote", "some/path") is True
def test_acquire_fails_open_on_redis_exception(self, cache_with_redis, mock_redis_client):
mock_redis_client.set.side_effect = Exception("connection reset")
assert cache_with_redis.acquire_fetch_lock("myremote", "some/path") is True
def test_lock_key_embeds_path_hash(self, cache_with_redis, mock_redis_client):
mock_redis_client.set.return_value = True
path = "library/nginx/manifests/latest"
cache_with_redis.acquire_fetch_lock("myremote", path)
args, _ = mock_redis_client.set.call_args
expected_hash = hashlib.sha256(path.encode()).hexdigest()[:16]
assert args[0] == f"fetchlock:myremote:{expected_hash}"
def test_lock_key_hash_is_16_chars(self, cache_with_redis, mock_redis_client):
mock_redis_client.set.return_value = True
cache_with_redis.acquire_fetch_lock("myremote", "some/long/path/file.tar.gz")
args, _ = mock_redis_client.set.call_args
# key format: fetchlock:<remote>:<16-char hash>
parts = args[0].split(":")
assert len(parts) == 3
assert len(parts[2]) == 16
def test_different_paths_produce_different_lock_keys(self, cache_with_redis, mock_redis_client):
mock_redis_client.set.return_value = True
cache_with_redis.acquire_fetch_lock("myremote", "path/a/manifests/latest")
key_a = mock_redis_client.set.call_args[0][0]
mock_redis_client.set.reset_mock()
cache_with_redis.acquire_fetch_lock("myremote", "path/b/manifests/latest")
key_b = mock_redis_client.set.call_args[0][0]
assert key_a != key_b
def test_release_deletes_correct_key(self, cache_with_redis, mock_redis_client):
path = "library/nginx/manifests/latest"
cache_with_redis.release_fetch_lock("myremote", path)
expected_hash = hashlib.sha256(path.encode()).hexdigest()[:16]
mock_redis_client.delete.assert_called_once_with(f"fetchlock:myremote:{expected_hash}")
def test_release_no_op_when_unavailable(self, unavailable_cache):
unavailable_cache.release_fetch_lock("myremote", "some/path") # must not raise
def test_release_no_op_on_redis_exception(self, cache_with_redis, mock_redis_client):
mock_redis_client.delete.side_effect = Exception("timeout")
cache_with_redis.release_fetch_lock("myremote", "some/path") # must not raise
+127
View File
@@ -260,6 +260,133 @@ class TestDockerProxy:
mock_fetch.assert_called_once() mock_fetch.assert_called_once()
assert response.status_code == 200 assert response.status_code == 200
# --- Issue 1: sha256 digest cross-linking ---
def test_tag_manifest_is_stored_under_digest_key_on_cache_hit(self, client, patched_deps):
# When serving a cached tag manifest the handler must also write the content
# under the sha256 digest key so subsequent sha256-addressed pulls hit cache.
deps = patched_deps
manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode()
# First exists call (tag manifest): hit. Second (digest key): miss → triggers upload.
deps["storage"].exists.side_effect = [True, False]
deps["storage"].download_object.return_value = manifest
deps["cache"].is_mutable_file.return_value = True
deps["cache"].is_index_valid.return_value = True
response = client.get("/v2/docker-test/library/nginx/manifests/v1.25.3")
assert response.status_code == 200
deps["storage"].upload.assert_called_once_with(deps["storage"].get_object_key.return_value, manifest)
def test_tag_manifest_digest_key_not_written_when_already_exists(self, client, patched_deps):
# When the digest key already exists in storage upload must not be called.
deps = patched_deps
manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode()
# Both the tag key and the digest key already present.
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = manifest
deps["cache"].is_mutable_file.return_value = True
deps["cache"].is_index_valid.return_value = True
client.get("/v2/docker-test/library/nginx/manifests/v1.25.3")
deps["storage"].upload.assert_not_called()
def test_sha256_manifest_request_is_not_cross_linked(self, client, patched_deps):
# sha256-addressed manifests are immutable — the cross-link logic must not apply.
deps = patched_deps
manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode()
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = manifest
deps["cache"].is_mutable_file.return_value = False # sha256 manifest is immutable
with patch("artifactapi.artifact.proxy._fetch_last_modified", new_callable=AsyncMock, return_value=None):
client.get("/v2/docker-test/library/nginx/manifests/sha256:" + "a" * 64)
deps["storage"].upload.assert_not_called()
# --- Issue 2: thundering herd distributed lock ---
def test_lock_acquired_and_released_on_upstream_fetch(self, client, patched_deps):
deps = patched_deps
manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode()
deps["storage"].exists.side_effect = [False, False] # initial miss; digest key also absent
deps["storage"].download_object.return_value = manifest
deps["cache"].is_mutable_file.return_value = True
deps["cache"].acquire_fetch_lock.return_value = True
with patch(
"artifactapi.artifact.proxy.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached"},
):
response = client.get("/v2/docker-test/library/nginx/manifests/latest")
deps["cache"].acquire_fetch_lock.assert_called_once()
deps["cache"].release_fetch_lock.assert_called_once()
assert response.status_code == 200
def test_lock_released_even_when_fetch_returns_error(self, client, patched_deps):
deps = patched_deps
deps["storage"].exists.return_value = False
deps["cache"].is_mutable_file.return_value = True
deps["cache"].acquire_fetch_lock.return_value = True
with patch(
"artifactapi.artifact.proxy.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "error", "error": "upstream down"},
):
response = client.get("/v2/docker-test/library/nginx/manifests/latest")
deps["cache"].release_fetch_lock.assert_called_once()
assert response.status_code == 502
def test_thundering_herd_polls_storage_when_lock_not_acquired(self, client, patched_deps):
# When the lock is held by another pod the handler must poll storage and serve
# from cache once the competing fetch completes, without issuing its own upstream request.
deps = patched_deps
manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode()
# Initial cache check: miss. First poll iteration: another pod has written it.
# Third call is for the digest cross-link check (is_mutable=True path); digest key exists.
deps["storage"].exists.side_effect = [False, True, True]
deps["storage"].download_object.return_value = manifest
deps["cache"].is_mutable_file.return_value = True
deps["cache"].is_index_valid.return_value = True
deps["cache"].acquire_fetch_lock.return_value = False # lock held by peer
with patch("artifactapi.artifact.docker.asyncio.sleep", new_callable=AsyncMock):
with patch(
"artifactapi.artifact.proxy.cache_single_artifact",
new_callable=AsyncMock,
) as mock_fetch:
response = client.get("/v2/docker-test/library/nginx/manifests/latest")
mock_fetch.assert_not_called()
assert response.status_code == 200
def test_thundering_herd_falls_through_to_fetch_if_poll_times_out(self, client, patched_deps):
# If the item never appears in storage during the poll window the handler must
# still issue its own upstream fetch as a fallback.
deps = patched_deps
manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode()
# All exists calls return False — item never appears during polling.
deps["storage"].exists.return_value = False
deps["storage"].download_object.return_value = manifest
deps["cache"].is_mutable_file.return_value = True
deps["cache"].acquire_fetch_lock.return_value = False # lock held by peer
with patch("artifactapi.artifact.docker.asyncio.sleep", new_callable=AsyncMock):
with patch(
"artifactapi.artifact.proxy.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached"},
) as mock_fetch:
response = client.get("/v2/docker-test/library/nginx/manifests/latest")
mock_fetch.assert_called_once()
assert response.status_code == 200
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Generic artifact route /api/v1/remote/{remote}/{path} # Generic artifact route /api/v1/remote/{remote}/{path}