feat: quarantine new releases to prevent supply chain attacks
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/pr/test Pipeline was successful
ci/woodpecker/pr/build Pipeline was successful

Add per-remote quarantine support: when quarantine_new=true and quarantine_days=N,
immutable artifacts published within the last N days are blocked with 404 until
the quarantine window expires.

- ConfigManager.get_quarantine_config() reads quarantine_new/quarantine_days
- RedisCache.store/get_artifact_published() persist Last-Modified per artifact
- proxy._check_quarantine() enforces the window; fails open when date is unknown
- proxy._fetch_last_modified() HEAD-requests upstream to discover publish date
- Docker proxy route wires quarantine checks on both cache-hit and cache-miss
- remotes.yaml: quarantine_new/quarantine_days added to pypi example (3-day window)
- README: documents quarantine configuration
This commit is contained in:
2026-04-28 23:01:52 +10:00
parent 373366e695
commit 3bd3ca8b74
10 changed files with 414 additions and 0 deletions
+151
View File
@@ -2,6 +2,7 @@
import hashlib
import json
from datetime import UTC
from unittest.mock import ANY, AsyncMock, MagicMock, patch
import pytest
@@ -924,3 +925,153 @@ class TestHelmRemote:
response = client.get("/api/v1/remote/helm-test/vault.zip")
assert response.status_code == 403
# ---------------------------------------------------------------------------
# Quarantine (quarantine-test remote: quarantine_new=True, quarantine_days=3)
# ---------------------------------------------------------------------------
class TestQuarantine:
def _recent_date(self, days_ago=1):
"""Return an HTTP-format date string N days in the past (within quarantine window)."""
from datetime import datetime, timedelta
from email.utils import format_datetime
dt = datetime.now(UTC) - timedelta(days=days_ago)
return format_datetime(dt, usegmt=True)
def _old_date(self, days_ago=10):
"""Return an HTTP-format date string N days in the past (outside quarantine window)."""
from datetime import datetime, timedelta
from email.utils import format_datetime
dt = datetime.now(UTC) - timedelta(days=days_ago)
return format_datetime(dt, usegmt=True)
def test_cache_miss_recent_artifact_quarantined(self, client, patched_deps):
"""Cache miss: artifact published within quarantine window → 404."""
deps = patched_deps
deps["storage"].exists.return_value = False
deps["storage"].download_object.return_value = b"content"
deps["cache"].is_mutable_file.return_value = False
with patch(
"artifactapi.artifact.proxy.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached", "last_modified": self._recent_date()},
):
response = client.get("/api/v1/remote/quarantine-test/some/path/package-1.0.tar.gz")
assert response.status_code == 404
assert "quarantined" in response.json()["detail"].lower()
def test_cache_miss_old_artifact_allowed(self, client, patched_deps):
"""Cache miss: artifact published outside quarantine window → 200."""
deps = patched_deps
deps["storage"].exists.return_value = False
deps["storage"].download_object.return_value = b"content"
deps["cache"].is_mutable_file.return_value = False
with patch(
"artifactapi.artifact.proxy.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached", "last_modified": self._old_date()},
):
response = client.get("/api/v1/remote/quarantine-test/some/path/package-1.0.tar.gz")
assert response.status_code == 200
def test_cache_miss_no_last_modified_fails_open(self, client, patched_deps):
"""Cache miss: no Last-Modified header → fail open (200, not quarantined)."""
deps = patched_deps
deps["storage"].exists.return_value = False
deps["storage"].download_object.return_value = b"content"
deps["cache"].is_mutable_file.return_value = False
with patch(
"artifactapi.artifact.proxy.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached", "last_modified": None},
):
response = client.get("/api/v1/remote/quarantine-test/some/path/package-1.0.tar.gz")
assert response.status_code == 200
def test_cache_hit_recent_artifact_quarantined(self, client, patched_deps):
"""Cache hit: stored publish date within quarantine window → 404."""
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"content"
deps["cache"].is_mutable_file.return_value = False
deps["cache"].get_artifact_published.return_value = self._recent_date()
response = client.get("/api/v1/remote/quarantine-test/some/path/package-1.0.tar.gz")
assert response.status_code == 404
assert "quarantined" in response.json()["detail"].lower()
def test_cache_hit_old_artifact_allowed(self, client, patched_deps):
"""Cache hit: stored publish date outside quarantine window → 200."""
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"content"
deps["cache"].is_mutable_file.return_value = False
deps["cache"].get_artifact_published.return_value = self._old_date()
response = client.get("/api/v1/remote/quarantine-test/some/path/package-1.0.tar.gz")
assert response.status_code == 200
def test_cache_hit_no_stored_date_fetches_upstream(self, client, patched_deps):
"""Cache hit: no stored date → HEAD upstream to get Last-Modified."""
deps = patched_deps
deps["storage"].exists.return_value = True
deps["storage"].download_object.return_value = b"content"
deps["cache"].is_mutable_file.return_value = False
deps["cache"].get_artifact_published.return_value = None
with patch(
"artifactapi.artifact.proxy._fetch_last_modified",
new_callable=AsyncMock,
return_value=self._old_date(),
) as mock_fetch:
response = client.get("/api/v1/remote/quarantine-test/some/path/package-1.0.tar.gz")
mock_fetch.assert_called_once()
assert response.status_code == 200
def test_quarantine_disabled_allows_recent_artifact(self, client, patched_deps):
"""quarantine_new=False: recent artifacts are not blocked."""
deps = patched_deps
deps["storage"].exists.return_value = False
deps["storage"].download_object.return_value = b"content"
deps["cache"].is_mutable_file.return_value = False
with patch(
"artifactapi.artifact.proxy.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached", "last_modified": self._recent_date()},
):
response = client.get("/api/v1/remote/quarantine-disabled/some/path/package-1.0.tar.gz")
assert response.status_code == 200
def test_quarantine_detail_includes_available_date(self, client, patched_deps):
"""The 404 detail should include the date when the artifact becomes available."""
deps = patched_deps
deps["storage"].exists.return_value = False
deps["storage"].download_object.return_value = b"content"
deps["cache"].is_mutable_file.return_value = False
with patch(
"artifactapi.artifact.proxy.cache_single_artifact",
new_callable=AsyncMock,
return_value={"status": "cached", "last_modified": self._recent_date()},
):
response = client.get("/api/v1/remote/quarantine-test/some/path/package-1.0.tar.gz")
assert response.status_code == 404
detail = response.json()["detail"]
assert "available after" in detail
assert "3-day" in detail