diff --git a/src/artifactapi/artifact/docker.py b/src/artifactapi/artifact/docker.py index ba40612..fd849d6 100644 --- a/src/artifactapi/artifact/docker.py +++ b/src/artifactapi/artifact/docker.py @@ -1,3 +1,4 @@ +import asyncio import hashlib import json import logging @@ -47,23 +48,39 @@ async def proxy(request: Request, remote_name: str, path: str, storage, cache, c if not await _proxy.handle_expired_mutable(remote_name, path, remote_url, config, cache, storage): cached_key = None + lock_acquired = False + if not cached_key: + lock_acquired = cache.acquire_fetch_lock(remote_name, path) + if not lock_acquired: + # Another pod is already fetching — poll storage briefly before issuing a duplicate upstream request + for _ in range(10): + await asyncio.sleep(0.5) + probe_key = storage.get_object_key(remote_name, path) + if storage.exists(probe_key): + cached_key = probe_key + break + if not cached_key: logger.info(f"Cache MISS: {remote_name}/{path} - fetching from remote: {remote_url}") - result = await _proxy.cache_single_artifact(remote_url, remote_name, path, storage, remote_config) - if result["status"] == "error": - raise HTTPException(status_code=502, detail=f"Failed to fetch: {result['error']}") - if result["status"] == "cached" and is_mutable: - cache_config = config.get_cache_config(remote_name) - mutable_ttl = cache_config.get("mutable_ttl", 3600) - cache.mark_index_cached(remote_name, path, mutable_ttl) - logger.info(f"Mutable file cached with TTL: {remote_name}/{path} (ttl: {mutable_ttl}s)") - if result.get("etag") or result.get("last_modified"): - cache.store_mutable_meta(remote_name, path, result.get("etag"), result.get("last_modified")) - if not is_mutable: - published = result.get("last_modified") - if published: - cache.store_artifact_published(remote_name, path, published) - _proxy._check_quarantine(remote_name, published, config) + try: + result = await _proxy.cache_single_artifact(remote_url, remote_name, path, storage, remote_config) + if result["status"] == "error": + raise HTTPException(status_code=502, detail=f"Failed to fetch: {result['error']}") + if result["status"] == "cached" and is_mutable: + cache_config = config.get_cache_config(remote_name) + mutable_ttl = cache_config.get("mutable_ttl", 3600) + cache.mark_index_cached(remote_name, path, mutable_ttl) + logger.info(f"Mutable file cached with TTL: {remote_name}/{path} (ttl: {mutable_ttl}s)") + if result.get("etag") or result.get("last_modified"): + cache.store_mutable_meta(remote_name, path, result.get("etag"), result.get("last_modified")) + if not is_mutable: + published = result.get("last_modified") + if published: + cache.store_artifact_published(remote_name, path, published) + _proxy._check_quarantine(remote_name, published, config) + finally: + if lock_acquired: + cache.release_fetch_lock(remote_name, path) elif not is_mutable: published = cache.get_artifact_published(remote_name, path) if not published: @@ -90,6 +107,14 @@ async def proxy(request: Request, remote_name: str, path: str, storage, cache, c content_type = "application/vnd.oci.image.manifest.v1+json" digest = f"sha256:{hashlib.sha256(artifact_data).hexdigest()}" + + # Cross-link tag manifests to their sha256 digest key so digest-addressed pulls hit cache + if is_mutable and "/manifests/" in path: + digest_path = re.sub(r"/manifests/[^/]+$", f"/manifests/{digest}", path) + digest_key = storage.get_object_key(remote_name, digest_path) + if not storage.exists(digest_key): + storage.upload(digest_key, artifact_data) + headers = { "Docker-Distribution-Api-Version": "registry/2.0", "Docker-Content-Digest": digest, diff --git a/src/artifactapi/cache/redis.py b/src/artifactapi/cache/redis.py index a3012ae..8c7534b 100644 --- a/src/artifactapi/cache/redis.py +++ b/src/artifactapi/cache/redis.py @@ -99,6 +99,25 @@ class RedisCache: except Exception: return None + def acquire_fetch_lock(self, remote_name: str, path: str, ttl: int = 30) -> bool: + """Try to acquire a short-lived fetch lock. Returns True if acquired, False if held by another caller.""" + if not self.available: + return True # fail open: no Redis → behave as if we always hold the lock + key = f"fetchlock:{remote_name}:{hashlib.sha256(path.encode()).hexdigest()[:16]}" + try: + return bool(self.client.set(key, 1, nx=True, ex=ttl)) + except Exception: + return True + + def release_fetch_lock(self, remote_name: str, path: str) -> None: + if not self.available: + return + key = f"fetchlock:{remote_name}:{hashlib.sha256(path.encode()).hexdigest()[:16]}" + try: + self.client.delete(key) + except Exception: + pass + def cleanup_expired_index(self, storage, remote_name: str, path: str) -> None: if not self.available: return diff --git a/tests/test_cache.py b/tests/test_cache.py index d30169c..2c19593 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -327,3 +327,74 @@ class TestArtifactPublished: def test_get_returns_none_when_unavailable(self, unavailable_cache): assert unavailable_cache.get_artifact_published("remote", "path") is None + + +# --------------------------------------------------------------------------- +# fetch lock (thundering-herd deduplication) +# --------------------------------------------------------------------------- + + +class TestFetchLock: + def test_acquire_returns_true_when_lock_obtained(self, cache_with_redis, mock_redis_client): + mock_redis_client.set.return_value = True + result = cache_with_redis.acquire_fetch_lock("myremote", "library/nginx/manifests/latest") + assert result is True + + def test_acquire_calls_set_nx_with_ttl(self, cache_with_redis, mock_redis_client): + mock_redis_client.set.return_value = True + cache_with_redis.acquire_fetch_lock("myremote", "library/nginx/manifests/latest", ttl=15) + _, kwargs = mock_redis_client.set.call_args + assert kwargs.get("nx") is True + assert kwargs.get("ex") == 15 + + def test_acquire_returns_false_when_lock_already_held(self, cache_with_redis, mock_redis_client): + mock_redis_client.set.return_value = None # Redis SET NX → None when key exists + result = cache_with_redis.acquire_fetch_lock("myremote", "library/nginx/manifests/latest") + assert result is False + + def test_acquire_fails_open_when_unavailable(self, unavailable_cache): + # caller must be allowed to proceed when Redis is down + assert unavailable_cache.acquire_fetch_lock("myremote", "some/path") is True + + def test_acquire_fails_open_on_redis_exception(self, cache_with_redis, mock_redis_client): + mock_redis_client.set.side_effect = Exception("connection reset") + assert cache_with_redis.acquire_fetch_lock("myremote", "some/path") is True + + def test_lock_key_embeds_path_hash(self, cache_with_redis, mock_redis_client): + mock_redis_client.set.return_value = True + path = "library/nginx/manifests/latest" + cache_with_redis.acquire_fetch_lock("myremote", path) + args, _ = mock_redis_client.set.call_args + expected_hash = hashlib.sha256(path.encode()).hexdigest()[:16] + assert args[0] == f"fetchlock:myremote:{expected_hash}" + + def test_lock_key_hash_is_16_chars(self, cache_with_redis, mock_redis_client): + mock_redis_client.set.return_value = True + cache_with_redis.acquire_fetch_lock("myremote", "some/long/path/file.tar.gz") + args, _ = mock_redis_client.set.call_args + # key format: fetchlock::<16-char hash> + parts = args[0].split(":") + assert len(parts) == 3 + assert len(parts[2]) == 16 + + def test_different_paths_produce_different_lock_keys(self, cache_with_redis, mock_redis_client): + mock_redis_client.set.return_value = True + cache_with_redis.acquire_fetch_lock("myremote", "path/a/manifests/latest") + key_a = mock_redis_client.set.call_args[0][0] + mock_redis_client.set.reset_mock() + cache_with_redis.acquire_fetch_lock("myremote", "path/b/manifests/latest") + key_b = mock_redis_client.set.call_args[0][0] + assert key_a != key_b + + def test_release_deletes_correct_key(self, cache_with_redis, mock_redis_client): + path = "library/nginx/manifests/latest" + cache_with_redis.release_fetch_lock("myremote", path) + expected_hash = hashlib.sha256(path.encode()).hexdigest()[:16] + mock_redis_client.delete.assert_called_once_with(f"fetchlock:myremote:{expected_hash}") + + def test_release_no_op_when_unavailable(self, unavailable_cache): + unavailable_cache.release_fetch_lock("myremote", "some/path") # must not raise + + def test_release_no_op_on_redis_exception(self, cache_with_redis, mock_redis_client): + mock_redis_client.delete.side_effect = Exception("timeout") + cache_with_redis.release_fetch_lock("myremote", "some/path") # must not raise diff --git a/tests/test_routes.py b/tests/test_routes.py index 1bea394..25cdde8 100644 --- a/tests/test_routes.py +++ b/tests/test_routes.py @@ -260,6 +260,135 @@ class TestDockerProxy: mock_fetch.assert_called_once() assert response.status_code == 200 + # --- Issue 1: sha256 digest cross-linking --- + + def test_tag_manifest_is_stored_under_digest_key_on_cache_hit(self, client, patched_deps): + # When serving a cached tag manifest the handler must also write the content + # under the sha256 digest key so subsequent sha256-addressed pulls hit cache. + deps = patched_deps + manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode() + # First exists call (tag manifest): hit. Second (digest key): miss → triggers upload. + deps["storage"].exists.side_effect = [True, False] + deps["storage"].download_object.return_value = manifest + deps["cache"].is_mutable_file.return_value = True + deps["cache"].is_index_valid.return_value = True + + response = client.get("/v2/docker-test/library/nginx/manifests/v1.25.3") + + assert response.status_code == 200 + deps["storage"].upload.assert_called_once_with( + deps["storage"].get_object_key.return_value, manifest + ) + + def test_tag_manifest_digest_key_not_written_when_already_exists(self, client, patched_deps): + # When the digest key already exists in storage upload must not be called. + deps = patched_deps + manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode() + # Both the tag key and the digest key already present. + deps["storage"].exists.return_value = True + deps["storage"].download_object.return_value = manifest + deps["cache"].is_mutable_file.return_value = True + deps["cache"].is_index_valid.return_value = True + + client.get("/v2/docker-test/library/nginx/manifests/v1.25.3") + + deps["storage"].upload.assert_not_called() + + def test_sha256_manifest_request_is_not_cross_linked(self, client, patched_deps): + # sha256-addressed manifests are immutable — the cross-link logic must not apply. + deps = patched_deps + manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode() + deps["storage"].exists.return_value = True + deps["storage"].download_object.return_value = manifest + deps["cache"].is_mutable_file.return_value = False # sha256 manifest is immutable + + with patch("artifactapi.artifact.proxy._fetch_last_modified", new_callable=AsyncMock, return_value=None): + client.get("/v2/docker-test/library/nginx/manifests/sha256:" + "a" * 64) + + deps["storage"].upload.assert_not_called() + + # --- Issue 2: thundering herd distributed lock --- + + def test_lock_acquired_and_released_on_upstream_fetch(self, client, patched_deps): + deps = patched_deps + manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode() + deps["storage"].exists.side_effect = [False, False] # initial miss; digest key also absent + deps["storage"].download_object.return_value = manifest + deps["cache"].is_mutable_file.return_value = True + deps["cache"].acquire_fetch_lock.return_value = True + + with patch( + "artifactapi.artifact.proxy.cache_single_artifact", + new_callable=AsyncMock, + return_value={"status": "cached"}, + ): + response = client.get("/v2/docker-test/library/nginx/manifests/latest") + + deps["cache"].acquire_fetch_lock.assert_called_once() + deps["cache"].release_fetch_lock.assert_called_once() + assert response.status_code == 200 + + def test_lock_released_even_when_fetch_returns_error(self, client, patched_deps): + deps = patched_deps + deps["storage"].exists.return_value = False + deps["cache"].is_mutable_file.return_value = True + deps["cache"].acquire_fetch_lock.return_value = True + + with patch( + "artifactapi.artifact.proxy.cache_single_artifact", + new_callable=AsyncMock, + return_value={"status": "error", "error": "upstream down"}, + ): + response = client.get("/v2/docker-test/library/nginx/manifests/latest") + + deps["cache"].release_fetch_lock.assert_called_once() + assert response.status_code == 502 + + def test_thundering_herd_polls_storage_when_lock_not_acquired(self, client, patched_deps): + # When the lock is held by another pod the handler must poll storage and serve + # from cache once the competing fetch completes, without issuing its own upstream request. + deps = patched_deps + manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode() + # Initial cache check: miss. First poll iteration: another pod has written it. + # Third call is for the digest cross-link check (is_mutable=True path); digest key exists. + deps["storage"].exists.side_effect = [False, True, True] + deps["storage"].download_object.return_value = manifest + deps["cache"].is_mutable_file.return_value = True + deps["cache"].is_index_valid.return_value = True + deps["cache"].acquire_fetch_lock.return_value = False # lock held by peer + + with patch("artifactapi.artifact.docker.asyncio.sleep", new_callable=AsyncMock): + with patch( + "artifactapi.artifact.proxy.cache_single_artifact", + new_callable=AsyncMock, + ) as mock_fetch: + response = client.get("/v2/docker-test/library/nginx/manifests/latest") + + mock_fetch.assert_not_called() + assert response.status_code == 200 + + def test_thundering_herd_falls_through_to_fetch_if_poll_times_out(self, client, patched_deps): + # If the item never appears in storage during the poll window the handler must + # still issue its own upstream fetch as a fallback. + deps = patched_deps + manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode() + # All exists calls return False — item never appears during polling. + deps["storage"].exists.return_value = False + deps["storage"].download_object.return_value = manifest + deps["cache"].is_mutable_file.return_value = True + deps["cache"].acquire_fetch_lock.return_value = False # lock held by peer + + with patch("artifactapi.artifact.docker.asyncio.sleep", new_callable=AsyncMock): + with patch( + "artifactapi.artifact.proxy.cache_single_artifact", + new_callable=AsyncMock, + return_value={"status": "cached"}, + ) as mock_fetch: + response = client.get("/v2/docker-test/library/nginx/manifests/latest") + + mock_fetch.assert_called_once() + assert response.status_code == 200 + # --------------------------------------------------------------------------- # Generic artifact route /api/v1/remote/{remote}/{path}