fix: cross-link tag manifests to digest keys and add fetch lock to prevent thundering herd (#42)
Tag manifests (e.g. library/nginx/manifests/latest) and their sha256-addressed counterparts were stored at separate S3 keys with no cross-reference, so a sha256 manifest request always missed cache even when the identical content had just been stored under the tag key. After serving any mutable (tag) manifest, compute the sha256 of the response body and write it under the digest key (manifests/sha256:<hex>) if absent. The next sha256-addressed pull hits cache immediately. Also adds a short-lived Redis distributed lock (SET NX EX 30) around upstream fetches so that concurrent pods racing for the same cold key poll storage for up to 5 s before issuing a duplicate upstream request, eliminating the thundering herd on deploy events. Includes unit tests for both the lock primitives (acquire/release, fail-open when Redis is unavailable) and the docker proxy behaviour (cross-link written on tag hit, not written for sha256 requests, lock acquired/released, poll path serves from cache without upstream fetch, fallback fetch when poll times out). Reviewed-on: #42
This commit was merged in pull request #42.
This commit is contained in:
@@ -260,6 +260,133 @@ class TestDockerProxy:
|
||||
mock_fetch.assert_called_once()
|
||||
assert response.status_code == 200
|
||||
|
||||
# --- Issue 1: sha256 digest cross-linking ---
|
||||
|
||||
def test_tag_manifest_is_stored_under_digest_key_on_cache_hit(self, client, patched_deps):
|
||||
# When serving a cached tag manifest the handler must also write the content
|
||||
# under the sha256 digest key so subsequent sha256-addressed pulls hit cache.
|
||||
deps = patched_deps
|
||||
manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode()
|
||||
# First exists call (tag manifest): hit. Second (digest key): miss → triggers upload.
|
||||
deps["storage"].exists.side_effect = [True, False]
|
||||
deps["storage"].download_object.return_value = manifest
|
||||
deps["cache"].is_mutable_file.return_value = True
|
||||
deps["cache"].is_index_valid.return_value = True
|
||||
|
||||
response = client.get("/v2/docker-test/library/nginx/manifests/v1.25.3")
|
||||
|
||||
assert response.status_code == 200
|
||||
deps["storage"].upload.assert_called_once_with(deps["storage"].get_object_key.return_value, manifest)
|
||||
|
||||
def test_tag_manifest_digest_key_not_written_when_already_exists(self, client, patched_deps):
|
||||
# When the digest key already exists in storage upload must not be called.
|
||||
deps = patched_deps
|
||||
manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode()
|
||||
# Both the tag key and the digest key already present.
|
||||
deps["storage"].exists.return_value = True
|
||||
deps["storage"].download_object.return_value = manifest
|
||||
deps["cache"].is_mutable_file.return_value = True
|
||||
deps["cache"].is_index_valid.return_value = True
|
||||
|
||||
client.get("/v2/docker-test/library/nginx/manifests/v1.25.3")
|
||||
|
||||
deps["storage"].upload.assert_not_called()
|
||||
|
||||
def test_sha256_manifest_request_is_not_cross_linked(self, client, patched_deps):
|
||||
# sha256-addressed manifests are immutable — the cross-link logic must not apply.
|
||||
deps = patched_deps
|
||||
manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode()
|
||||
deps["storage"].exists.return_value = True
|
||||
deps["storage"].download_object.return_value = manifest
|
||||
deps["cache"].is_mutable_file.return_value = False # sha256 manifest is immutable
|
||||
|
||||
with patch("artifactapi.artifact.proxy._fetch_last_modified", new_callable=AsyncMock, return_value=None):
|
||||
client.get("/v2/docker-test/library/nginx/manifests/sha256:" + "a" * 64)
|
||||
|
||||
deps["storage"].upload.assert_not_called()
|
||||
|
||||
# --- Issue 2: thundering herd distributed lock ---
|
||||
|
||||
def test_lock_acquired_and_released_on_upstream_fetch(self, client, patched_deps):
|
||||
deps = patched_deps
|
||||
manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode()
|
||||
deps["storage"].exists.side_effect = [False, False] # initial miss; digest key also absent
|
||||
deps["storage"].download_object.return_value = manifest
|
||||
deps["cache"].is_mutable_file.return_value = True
|
||||
deps["cache"].acquire_fetch_lock.return_value = True
|
||||
|
||||
with patch(
|
||||
"artifactapi.artifact.proxy.cache_single_artifact",
|
||||
new_callable=AsyncMock,
|
||||
return_value={"status": "cached"},
|
||||
):
|
||||
response = client.get("/v2/docker-test/library/nginx/manifests/latest")
|
||||
|
||||
deps["cache"].acquire_fetch_lock.assert_called_once()
|
||||
deps["cache"].release_fetch_lock.assert_called_once()
|
||||
assert response.status_code == 200
|
||||
|
||||
def test_lock_released_even_when_fetch_returns_error(self, client, patched_deps):
|
||||
deps = patched_deps
|
||||
deps["storage"].exists.return_value = False
|
||||
deps["cache"].is_mutable_file.return_value = True
|
||||
deps["cache"].acquire_fetch_lock.return_value = True
|
||||
|
||||
with patch(
|
||||
"artifactapi.artifact.proxy.cache_single_artifact",
|
||||
new_callable=AsyncMock,
|
||||
return_value={"status": "error", "error": "upstream down"},
|
||||
):
|
||||
response = client.get("/v2/docker-test/library/nginx/manifests/latest")
|
||||
|
||||
deps["cache"].release_fetch_lock.assert_called_once()
|
||||
assert response.status_code == 502
|
||||
|
||||
def test_thundering_herd_polls_storage_when_lock_not_acquired(self, client, patched_deps):
|
||||
# When the lock is held by another pod the handler must poll storage and serve
|
||||
# from cache once the competing fetch completes, without issuing its own upstream request.
|
||||
deps = patched_deps
|
||||
manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode()
|
||||
# Initial cache check: miss. First poll iteration: another pod has written it.
|
||||
# Third call is for the digest cross-link check (is_mutable=True path); digest key exists.
|
||||
deps["storage"].exists.side_effect = [False, True, True]
|
||||
deps["storage"].download_object.return_value = manifest
|
||||
deps["cache"].is_mutable_file.return_value = True
|
||||
deps["cache"].is_index_valid.return_value = True
|
||||
deps["cache"].acquire_fetch_lock.return_value = False # lock held by peer
|
||||
|
||||
with patch("artifactapi.artifact.docker.asyncio.sleep", new_callable=AsyncMock):
|
||||
with patch(
|
||||
"artifactapi.artifact.proxy.cache_single_artifact",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_fetch:
|
||||
response = client.get("/v2/docker-test/library/nginx/manifests/latest")
|
||||
|
||||
mock_fetch.assert_not_called()
|
||||
assert response.status_code == 200
|
||||
|
||||
def test_thundering_herd_falls_through_to_fetch_if_poll_times_out(self, client, patched_deps):
|
||||
# If the item never appears in storage during the poll window the handler must
|
||||
# still issue its own upstream fetch as a fallback.
|
||||
deps = patched_deps
|
||||
manifest = json.dumps({"mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": []}).encode()
|
||||
# All exists calls return False — item never appears during polling.
|
||||
deps["storage"].exists.return_value = False
|
||||
deps["storage"].download_object.return_value = manifest
|
||||
deps["cache"].is_mutable_file.return_value = True
|
||||
deps["cache"].acquire_fetch_lock.return_value = False # lock held by peer
|
||||
|
||||
with patch("artifactapi.artifact.docker.asyncio.sleep", new_callable=AsyncMock):
|
||||
with patch(
|
||||
"artifactapi.artifact.proxy.cache_single_artifact",
|
||||
new_callable=AsyncMock,
|
||||
return_value={"status": "cached"},
|
||||
) as mock_fetch:
|
||||
response = client.get("/v2/docker-test/library/nginx/manifests/latest")
|
||||
|
||||
mock_fetch.assert_called_once()
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Generic artifact route /api/v1/remote/{remote}/{path}
|
||||
|
||||
Reference in New Issue
Block a user