fix: cross-link tag manifests to digest keys and add fetch lock to prevent thundering herd
Tag manifests (e.g. library/nginx/manifests/latest) and their sha256-addressed counterparts were stored at separate S3 keys with no cross-reference, so a sha256 manifest request always missed cache even when the identical content had just been stored under the tag key. After serving any mutable (tag) manifest, compute the sha256 of the response body and write it under the digest key (manifests/sha256:<hex>) if absent. The next sha256-addressed pull hits cache immediately. Also adds a short-lived Redis distributed lock (SET NX EX 30) around upstream fetches so that concurrent pods racing for the same cold key poll storage for up to 5 s before issuing a duplicate upstream request, eliminating the thundering herd on deploy events. Includes unit tests for both the lock primitives (acquire/release, fail-open when Redis is unavailable) and the docker proxy behaviour (cross-link written on tag hit, not written for sha256 requests, lock acquired/released, poll path serves from cache without upstream fetch, fallback fetch when poll times out).
This commit is contained in:
@@ -327,3 +327,74 @@ class TestArtifactPublished:
|
||||
|
||||
def test_get_returns_none_when_unavailable(self, unavailable_cache):
|
||||
assert unavailable_cache.get_artifact_published("remote", "path") is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# fetch lock (thundering-herd deduplication)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFetchLock:
|
||||
def test_acquire_returns_true_when_lock_obtained(self, cache_with_redis, mock_redis_client):
|
||||
mock_redis_client.set.return_value = True
|
||||
result = cache_with_redis.acquire_fetch_lock("myremote", "library/nginx/manifests/latest")
|
||||
assert result is True
|
||||
|
||||
def test_acquire_calls_set_nx_with_ttl(self, cache_with_redis, mock_redis_client):
|
||||
mock_redis_client.set.return_value = True
|
||||
cache_with_redis.acquire_fetch_lock("myremote", "library/nginx/manifests/latest", ttl=15)
|
||||
_, kwargs = mock_redis_client.set.call_args
|
||||
assert kwargs.get("nx") is True
|
||||
assert kwargs.get("ex") == 15
|
||||
|
||||
def test_acquire_returns_false_when_lock_already_held(self, cache_with_redis, mock_redis_client):
|
||||
mock_redis_client.set.return_value = None # Redis SET NX → None when key exists
|
||||
result = cache_with_redis.acquire_fetch_lock("myremote", "library/nginx/manifests/latest")
|
||||
assert result is False
|
||||
|
||||
def test_acquire_fails_open_when_unavailable(self, unavailable_cache):
|
||||
# caller must be allowed to proceed when Redis is down
|
||||
assert unavailable_cache.acquire_fetch_lock("myremote", "some/path") is True
|
||||
|
||||
def test_acquire_fails_open_on_redis_exception(self, cache_with_redis, mock_redis_client):
|
||||
mock_redis_client.set.side_effect = Exception("connection reset")
|
||||
assert cache_with_redis.acquire_fetch_lock("myremote", "some/path") is True
|
||||
|
||||
def test_lock_key_embeds_path_hash(self, cache_with_redis, mock_redis_client):
|
||||
mock_redis_client.set.return_value = True
|
||||
path = "library/nginx/manifests/latest"
|
||||
cache_with_redis.acquire_fetch_lock("myremote", path)
|
||||
args, _ = mock_redis_client.set.call_args
|
||||
expected_hash = hashlib.sha256(path.encode()).hexdigest()[:16]
|
||||
assert args[0] == f"fetchlock:myremote:{expected_hash}"
|
||||
|
||||
def test_lock_key_hash_is_16_chars(self, cache_with_redis, mock_redis_client):
|
||||
mock_redis_client.set.return_value = True
|
||||
cache_with_redis.acquire_fetch_lock("myremote", "some/long/path/file.tar.gz")
|
||||
args, _ = mock_redis_client.set.call_args
|
||||
# key format: fetchlock:<remote>:<16-char hash>
|
||||
parts = args[0].split(":")
|
||||
assert len(parts) == 3
|
||||
assert len(parts[2]) == 16
|
||||
|
||||
def test_different_paths_produce_different_lock_keys(self, cache_with_redis, mock_redis_client):
|
||||
mock_redis_client.set.return_value = True
|
||||
cache_with_redis.acquire_fetch_lock("myremote", "path/a/manifests/latest")
|
||||
key_a = mock_redis_client.set.call_args[0][0]
|
||||
mock_redis_client.set.reset_mock()
|
||||
cache_with_redis.acquire_fetch_lock("myremote", "path/b/manifests/latest")
|
||||
key_b = mock_redis_client.set.call_args[0][0]
|
||||
assert key_a != key_b
|
||||
|
||||
def test_release_deletes_correct_key(self, cache_with_redis, mock_redis_client):
|
||||
path = "library/nginx/manifests/latest"
|
||||
cache_with_redis.release_fetch_lock("myremote", path)
|
||||
expected_hash = hashlib.sha256(path.encode()).hexdigest()[:16]
|
||||
mock_redis_client.delete.assert_called_once_with(f"fetchlock:myremote:{expected_hash}")
|
||||
|
||||
def test_release_no_op_when_unavailable(self, unavailable_cache):
|
||||
unavailable_cache.release_fetch_lock("myremote", "some/path") # must not raise
|
||||
|
||||
def test_release_no_op_on_redis_exception(self, cache_with_redis, mock_redis_client):
|
||||
mock_redis_client.delete.side_effect = Exception("timeout")
|
||||
cache_with_redis.release_fetch_lock("myremote", "some/path") # must not raise
|
||||
|
||||
Reference in New Issue
Block a user