Compare commits

..

13 Commits

Author SHA1 Message Date
unkinben c47daca1f1 fix: add a grace period before GC deletes orphaned blobs
ci/woodpecker/pr/pre-commit Pipeline failed
ci/woodpecker/pr/test Pipeline failed
ci/woodpecker/pr/build Pipeline failed
FindOrphanedBlobs returned any unreferenced blob, so a concurrent dedup
upload (which inserts the blob row before its artifact/local_files row)
could have its S3 object deleted mid-flight. Restrict collection to blobs
older than a 1h grace period.

Refs #71
2026-07-02 22:26:19 +10:00
unkinben f61ab99ae8 fix: set timeouts on the upstream HTTP client (#83)
Fixes #67

## Why
The proxy used `http.DefaultClient` for all upstream GET/HEAD and bearer-token requests. It has no timeouts, so a slow or hung upstream holds a goroutine and connection indefinitely.

## Changes
- Add a shared `upstreamClient` (`internal/proxy/httpclient.go`) with dial, TLS-handshake, response-header and idle-connection timeouts, plus connection pooling.
- Deliberately no overall `Client.Timeout`, so large artifact bodies can still stream; total time is bounded by the request context.
- Route all four upstream calls in the engine through it.

## Validation
- `make e2e` passes.

Reviewed-on: #83
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-07-02 22:24:49 +10:00
unkinben c39703ed0d fix: getenv treats an explicitly-empty value as unset (#85)
Fixes #69

## Why
`getenv` returned the fallback whenever `os.Getenv` was empty, so an intentionally-empty env var could not override a non-empty default.

## Changes
- Use `os.LookupEnv` to distinguish unset from set-but-empty.

## Validation
- `make e2e` passes.

Reviewed-on: #85
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-07-02 22:09:09 +10:00
unkinben 5261af4c63 fix: coalesce concurrent cache-miss fetches (thundering herd) (#93)
Fixes #75

## Why
On a fetch-lock miss, `Engine.Fetch` slept a flat 500ms once, tried the store, and otherwise fell through to fetch upstream unlocked. A cold-cache stampede therefore still hit upstream once per waiter.

## Changes
- Add `waitForStore`, which polls the store every 100ms for up to 5s (stopping on context cancellation) so waiters pick up the lock leaders populated result.
- Only fall through to an upstream fetch if the leader has not populated the store within the wait budget.

## Validation
- `make e2e` passes.

Reviewed-on: #93
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-07-02 22:08:29 +10:00
unkinben 45d6cdbc64 perf: batch access-log writes instead of goroutine+insert per request (#91)
Fixes #76

## Why
Every proxied request spawned a goroutine running a 5s-timeout single-row INSERT. Under load this is unbounded goroutines and connection-pool pressure.

## Changes
- Add `database.AccessLogEntry` + `InsertAccessLogBatch` (bulk `COPY`).
- The engine starts one background writer that drains a buffered channel and flushes every 128 entries or 2s.
- `logAccess` is now a non-blocking channel send (drops on full buffer), so the request path never blocks on the DB. Best-effort telemetry: a small tail may be lost on abrupt shutdown.

## Validation
- `make e2e` passes.

Reviewed-on: #91
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-07-02 22:07:56 +10:00
unkinben b59cc45765 fix: HEAD requests fetch and stream the full body (#89)
Fixes #70

## Why
Docker `HEAD` routes mapped to `handleProxy`, which ran a full `Fetch` + `io.Copy` — downloading the entire blob (and fetching upstream on a miss) only for net/http to discard the body. HEAD existence checks (manifests, blobs) are common.

## Changes
- Add `Engine.Head`: answers cached artifacts/indexes from store metadata (no blob download); on a miss issues an upstream `HEAD` (with bearer-token handling) and never caches a body.
- Route `HEAD /v2/{remote}/*` to a dedicated `handleProxyHead` that writes headers only.
- Add e2e tests for HEAD on a blocklisted path (403) and an unknown remote (404).

## Note
`headUpstream` uses `http.DefaultClient` to build cleanly on master; it will pick up the shared timeout-configured client from #67 once that merges.

## Validation
- `make e2e` passes (includes new HEAD tests).

Reviewed-on: #89
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-07-02 22:06:50 +10:00
unkinben e7027c8ccc feat: cache upstream bearer tokens (#92)
Fixes #77

## Why
Each upstream 401 re-ran the full token-endpoint request, even though a single Docker pull triggers many blob/manifest requests sharing one scope.

## Changes
- Add Redis `GetToken`/`SetToken`.
- `fetchBearerToken` now also parses `expires_in` and returns a TTL.
- New `Engine.cachedBearerToken` reuses a cached token keyed by remote + challenge (hashed), caching for `expires_in` minus a safety margin (default 60s when absent).

## Validation
- `make e2e` passes.

Reviewed-on: #92
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-07-02 21:35:46 +10:00
unkinben f3680951b7 perf: stream proxied artifacts instead of buffering the full body in memory (#94)
Fixes #66

## Why
`fetchFromUpstream` read every upstream response with `io.ReadAll`, hashed it in memory, uploaded from memory and served from memory. A single large immutable blob (Docker layer, RPM, tarball, Go module zip) — or several concurrent ones — could OOM the process. The streaming, tempfile-backed CAS already existed but the proxy path bypassed it (and `Engine.cas` was assigned but unused).

## Changes
- Immutable fetches now stream through `CAS.Store` (tempfile -> sha256 -> S3), so memory stays bounded regardless of artifact size, and are served back from the store.
- Mutable indexes stay on the in-memory path (small, and subject to `RewriteResponse`).
- Skipping `RewriteResponse` for immutable content is behaviour-preserving: the proxy path always passes an empty `proxyBaseURL`, under which every providers `RewriteResponse` is a no-op.
- Remove the now-unused in-memory `sha256Hash` helper.

## Validation
- `make e2e` passes.
- Live smoke test against Postgres/Redis/MinIO: proxied a 12 MB blob through a generic remote — fetch #1 `X-Artifact-Source: remote`, fetch #2 `X-Artifact-Source: cache`, both byte-identical (sha256) to the origin.

Reviewed-on: #94
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-07-02 21:33:42 +10:00
unkinben 61a1a99112 perf: compile remote match patterns once instead of per-request (#88)
Fixes #73

## Why
`Classifier.Classify` runs on every proxied request and recompiled the Blocklist/Patterns/Immutable/Mutable regex lists each time. Regex compilation is expensive and fully redundant.

## Changes
- Memoise compilation in a `sync.Map` keyed by pattern text (`compileCached`); each distinct pattern compiles once and is reused. Patterns that fail to compile are cached as a typed nil so they are not retried. No invalidation needed since the pattern text is the key.

## Validation
- `go test ./internal/proxy/` and `make e2e` pass.

Reviewed-on: #88
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-07-02 20:20:00 +10:00
unkinben f0e44d6810 fix: blocklist fails open when a regex fails to compile (#87)
Fixes #72

## Why
`compilePatterns` silently discards any pattern that fails to compile. A typo in a blocklist entry therefore turns a deny rule into a no-op — a fail-open with security impact.

## Changes
- Add `Remote.ValidatePatterns`, which compiles every pattern list (patterns, blocklist, mutable/immutable patterns, ban_tags) and returns an error on the first invalid regex.
- Reject invalid patterns with 400 at remote create and update time.
- Unit test for valid and invalid patterns.

## Validation
- `go test ./pkg/models/` and `make e2e` pass.

Reviewed-on: #87
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-07-02 20:19:27 +10:00
unkinben 0a89b2005c fix: isNetworkError should use errors.As, not a bare type assertion (#84)
Fixes #68

## Why
`isNetworkError` type-asserted `err.(*UpstreamError)` directly. If the error is ever wrapped, stale-on-error handling silently stops triggering.

## Changes
- Use `errors.As` to detect `*UpstreamError` through wrapping.

## Validation
- `make e2e` passes.

Reviewed-on: #84
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-07-02 20:18:23 +10:00
unkinben f23bf2a6d9 fix: serveFromStore does a guaranteed-miss S3 lookup on every cache hit (#82)
Fixes #78

## Why
`serveFromStore` first called `store.Download` with the bare content hash as the S3 key, which never matches real object keys (`blobs/sha256/<hash>`). Every cached blob serve therefore paid an extra guaranteed-404 round-trip before retrying with the correct `BlobKey`.

## Changes
- Remove the dead first `Download` attempt; go straight to the `BlobKey` lookup, then fall back to the index key.

## Validation
- `make e2e` passes (proxy cache-hit paths exercised end-to-end).

Reviewed-on: #82
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-07-02 20:07:30 +10:00
unkinben b9098bf19c fix: e2e suite fails to build (stale server.New call) (#81)
Fixes #80

## Why
`make e2e` did not compile against master: `e2e/e2e_test.go` called `server.New(cfg)` but the signature is `New(cfg, version string)`. This blocked all end-to-end validation.

## Changes
- Pass a static `"e2e-test"` version to `server.New` in the e2e bootstrap.

## Validation
- `make e2e` builds and passes (testcontainers: postgres/redis/minio).

Reviewed-on: #81
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-07-02 20:00:24 +10:00
16 changed files with 613 additions and 109 deletions
+1 -1
View File
@@ -95,7 +95,7 @@ func TestMain(m *testing.M) {
}
cfg.ListenAddr = "127.0.0.1:0"
srv, err := server.New(cfg)
srv, err := server.New(cfg, "e2e-test")
if err != nil {
log.Fatalf("server: %v", err)
}
+24
View File
@@ -24,6 +24,30 @@ func TestRoot(t *testing.T) {
}
}
func TestRemoteUpstreamTimeouts(t *testing.T) {
createRemote(t, `{
"name": "timeout-test",
"package_type": "generic",
"base_url": "https://example.com",
"stale_on_error": true,
"upstream_dial_timeout": 3,
"upstream_tls_timeout": 4,
"upstream_response_header_timeout": 5
}`)
defer deleteRemote(t, "timeout-test")
remote := getJSON(t, apiURL("/api/v2/remotes/timeout-test"))
for field, want := range map[string]float64{
"upstream_dial_timeout": 3,
"upstream_tls_timeout": 4,
"upstream_response_header_timeout": 5,
} {
if got, _ := remote[field].(float64); got != want {
t.Errorf("%s: got %v, want %v", field, remote[field], want)
}
}
}
func TestRemoteCRUD(t *testing.T) {
createRemote(t, `{
"name": "test-generic",
+33
View File
@@ -24,6 +24,39 @@ func TestProxyBlocklist(t *testing.T) {
assertStatus(t, apiURL("/api/v1/remote/blocklist-test/malware.exe"), http.StatusForbidden)
}
func TestProxyHeadBlocklist(t *testing.T) {
createRemote(t, `{
"name": "head-block-test",
"package_type": "generic",
"base_url": "https://example.com",
"blocklist": ["\\.exe$"],
"stale_on_error": true
}`)
defer deleteRemote(t, "head-block-test")
req, _ := http.NewRequest(http.MethodHead, apiURL("/v2/head-block-test/malware.exe"), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("HEAD: %v", err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusForbidden {
t.Fatalf("HEAD blocklisted path: got %d, want 403", resp.StatusCode)
}
}
func TestProxyHeadUnknownRemote(t *testing.T) {
req, _ := http.NewRequest(http.MethodHead, apiURL("/v2/nonexistent/some/path"), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("HEAD: %v", err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("HEAD unknown remote: got %d, want 404", resp.StatusCode)
}
}
func TestProxyPatterns(t *testing.T) {
createRemote(t, `{
"name": "patterns-test",
+37 -1
View File
@@ -42,7 +42,7 @@ func (h *ProxyHandler) DockerV2Routes() chi.Router {
r.Get("/", h.handleDockerPing)
r.Head("/", h.handleDockerPing)
r.Get("/{remoteName}/*", h.handleProxy)
r.Head("/{remoteName}/*", h.handleProxy)
r.Head("/{remoteName}/*", h.handleProxyHead)
return r
}
@@ -89,6 +89,42 @@ func (h *ProxyHandler) handleProxy(w http.ResponseWriter, r *http.Request) {
io.Copy(w, result.Reader)
}
func (h *ProxyHandler) handleProxyHead(w http.ResponseWriter, r *http.Request) {
remoteName := chi.URLParam(r, "remoteName")
path := chi.URLParam(r, "*")
remote, err := h.db.GetRemote(r.Context(), remoteName)
if err != nil {
http.Error(w, fmt.Sprintf("remote %q not found", remoteName), http.StatusNotFound)
return
}
prov, err := provider.Get(remote.PackageType)
if err != nil {
http.Error(w, fmt.Sprintf("no provider for %q", remote.PackageType), http.StatusInternalServerError)
return
}
result, err := h.engine.Head(r.Context(), *remote, path, prov)
if err != nil {
var proxyErr *proxy.ProxyError
if errors.As(err, &proxyErr) {
http.Error(w, proxyErr.Message, proxyErr.Status)
return
}
slog.Error("proxy head failed", "remote", remoteName, "path", path, "error", err)
http.Error(w, "bad gateway", http.StatusBadGateway)
return
}
w.Header().Set("Content-Type", result.ContentType)
w.Header().Set("X-Artifact-Source", result.Source)
if result.Size > 0 {
w.Header().Set("Content-Length", fmt.Sprintf("%d", result.Size))
}
w.WriteHeader(http.StatusOK)
}
func (h *ProxyHandler) handleVirtual(w http.ResponseWriter, r *http.Request) {
virtualName := chi.URLParam(r, "virtualName")
path := chi.URLParam(r, "*")
+8
View File
@@ -69,6 +69,10 @@ func (h *RemotesHandler) create(w http.ResponseWriter, r *http.Request) {
http.Error(w, "base_url is required for remote repositories", http.StatusBadRequest)
return
}
if err := remote.ValidatePatterns(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := h.db.CreateRemote(r.Context(), &remote); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -84,6 +88,10 @@ func (h *RemotesHandler) update(w http.ResponseWriter, r *http.Request) {
return
}
remote.Name = name
if err := remote.ValidatePatterns(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := h.db.UpdateRemote(r.Context(), &remote); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
+12
View File
@@ -70,6 +70,18 @@ func (r *Redis) GetETag(ctx context.Context, remote, path string) (string, error
return val, err
}
func (r *Redis) GetToken(ctx context.Context, key string) (string, error) {
val, err := r.client.Get(ctx, "token:"+key).Result()
if err == redis.Nil {
return "", nil
}
return val, err
}
func (r *Redis) SetToken(ctx context.Context, key, token string, ttl time.Duration) error {
return r.client.Set(ctx, "token:"+key, token, ttl).Err()
}
func (r *Redis) IncrCircuitFailure(ctx context.Context, remote string, cooldown time.Duration) (int64, error) {
key := fmt.Sprintf("circuit:%s", remote)
pipe := r.client.Pipeline()
+1 -1
View File
@@ -65,7 +65,7 @@ func Load() (*Config, error) {
}
func getenv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
if v, ok := os.LookupEnv(key); ok {
return v
}
return fallback
+38 -3
View File
@@ -4,6 +4,8 @@ import (
"context"
"time"
"github.com/jackc/pgx/v5"
"git.unkin.net/unkin/artifactapi/pkg/models"
)
@@ -109,16 +111,49 @@ func (db *DB) InsertAccessLog(ctx context.Context, remoteName, path string, cach
return err
}
func (db *DB) FindOrphanedBlobs(ctx context.Context) ([]models.Blob, error) {
// AccessLogEntry is one buffered access-log record.
type AccessLogEntry struct {
RemoteName string
Path string
CacheHit bool
SizeBytes int64
UpstreamMS int
ClientIP string
}
// InsertAccessLogBatch bulk-inserts access-log rows with a single COPY.
func (db *DB) InsertAccessLogBatch(ctx context.Context, entries []AccessLogEntry) error {
if len(entries) == 0 {
return nil
}
rows := make([][]any, len(entries))
for i, e := range entries {
rows[i] = []any{e.RemoteName, e.Path, e.CacheHit, e.SizeBytes, e.UpstreamMS, e.ClientIP}
}
_, err := db.Pool.CopyFrom(ctx,
pgx.Identifier{"access_log"},
[]string{"remote_name", "path", "cache_hit", "size_bytes", "upstream_ms", "client_ip"},
pgx.CopyFromRows(rows),
)
return err
}
// FindOrphanedBlobs returns blobs no longer referenced by any artifact or
// local file, restricted to those created before now()-minAge. The age cutoff
// is a grace period that avoids a TOCTOU race with in-flight dedup uploads,
// which insert the blob row before the referencing artifact/local_files row.
func (db *DB) FindOrphanedBlobs(ctx context.Context, minAge time.Duration) ([]models.Blob, error) {
cutoff := time.Now().Add(-minAge)
rows, err := db.Pool.Query(ctx, `
SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at
FROM blobs b
WHERE b.content_hash NOT IN (
WHERE b.created_at < $1
AND b.content_hash NOT IN (
SELECT content_hash FROM artifacts
UNION
SELECT content_hash FROM local_files
)
`)
`, cutoff)
if err != nil {
return nil, err
}
+3
View File
@@ -124,6 +124,9 @@ func (db *DB) migrate() error {
CREATE INDEX IF NOT EXISTS idx_access_log_remote_time ON access_log(remote_name, created_at);
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS repo_type TEXT DEFAULT 'remote';
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_dial_timeout INTEGER DEFAULT 0;
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_tls_timeout INTEGER DEFAULT 0;
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_response_header_timeout INTEGER DEFAULT 0;
CREATE TABLE IF NOT EXISTS rpm_metadata (
id BIGSERIAL PRIMARY KEY,
+14 -5
View File
@@ -11,7 +11,9 @@ const remoteCols = `name, package_type, repo_type, base_url, description, userna
patterns, blocklist, mutable_patterns, immutable_patterns,
ban_tags_enabled, ban_tags,
quarantine_enabled, quarantine_days, stale_on_error,
releases_remote, managed_by, created_at, updated_at`
releases_remote, managed_by,
upstream_dial_timeout, upstream_tls_timeout, upstream_response_header_timeout,
created_at, updated_at`
func scanRemote(scanner interface{ Scan(...any) error }, r *models.Remote) error {
return scanner.Scan(
@@ -20,7 +22,9 @@ func scanRemote(scanner interface{ Scan(...any) error }, r *models.Remote) error
&r.Patterns, &r.Blocklist, &r.MutablePatterns, &r.ImmutablePatterns,
&r.BanTagsEnabled, &r.BanTags,
&r.QuarantineEnabled, &r.QuarantineDays, &r.StaleOnError,
&r.ReleasesRemote, &r.ManagedBy, &r.CreatedAt, &r.UpdatedAt,
&r.ReleasesRemote, &r.ManagedBy,
&r.UpstreamDialTimeout, &r.UpstreamTLSTimeout, &r.UpstreamResponseHeaderTimeout,
&r.CreatedAt, &r.UpdatedAt,
)
}
@@ -59,8 +63,9 @@ func (db *DB) CreateRemote(ctx context.Context, r *models.Remote) error {
patterns, blocklist, mutable_patterns, immutable_patterns,
ban_tags_enabled, ban_tags,
quarantine_enabled, quarantine_days, stale_on_error,
releases_remote, managed_by
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21)
releases_remote, managed_by,
upstream_dial_timeout, upstream_tls_timeout, upstream_response_header_timeout
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24)
`,
r.Name, r.PackageType, r.RepoType, r.BaseURL, r.Description, r.Username, r.Password,
r.ImmutableTTL, r.MutableTTL, r.CheckMutable,
@@ -68,6 +73,7 @@ func (db *DB) CreateRemote(ctx context.Context, r *models.Remote) error {
r.BanTagsEnabled, r.BanTags,
r.QuarantineEnabled, r.QuarantineDays, r.StaleOnError,
r.ReleasesRemote, r.ManagedBy,
r.UpstreamDialTimeout, r.UpstreamTLSTimeout, r.UpstreamResponseHeaderTimeout,
)
return err
}
@@ -80,7 +86,9 @@ func (db *DB) UpdateRemote(ctx context.Context, r *models.Remote) error {
patterns=$11, blocklist=$12, mutable_patterns=$13, immutable_patterns=$14,
ban_tags_enabled=$15, ban_tags=$16,
quarantine_enabled=$17, quarantine_days=$18, stale_on_error=$19,
releases_remote=$20, managed_by=$21, updated_at=NOW()
releases_remote=$20, managed_by=$21,
upstream_dial_timeout=$22, upstream_tls_timeout=$23, upstream_response_header_timeout=$24,
updated_at=NOW()
WHERE name=$1
`,
r.Name, r.PackageType, r.RepoType, r.BaseURL, r.Description, r.Username, r.Password,
@@ -89,6 +97,7 @@ func (db *DB) UpdateRemote(ctx context.Context, r *models.Remote) error {
r.BanTagsEnabled, r.BanTags,
r.QuarantineEnabled, r.QuarantineDays, r.StaleOnError,
r.ReleasesRemote, r.ManagedBy,
r.UpstreamDialTimeout, r.UpstreamTLSTimeout, r.UpstreamResponseHeaderTimeout,
)
return err
}
+6 -1
View File
@@ -9,6 +9,11 @@ import (
"git.unkin.net/unkin/artifactapi/internal/storage"
)
// blobGracePeriod is how old an orphaned blob must be before GC will delete
// it. This avoids racing in-flight dedup uploads that insert the blob row
// before the referencing artifact/local_files row exists.
const blobGracePeriod = 1 * time.Hour
type Collector struct {
db *database.DB
store *storage.S3
@@ -38,7 +43,7 @@ func (c *Collector) Run(ctx context.Context) {
func (c *Collector) sweep(ctx context.Context) {
start := time.Now()
orphaned, err := c.db.FindOrphanedBlobs(ctx)
orphaned, err := c.db.FindOrphanedBlobs(ctx, blobGracePeriod)
if err != nil {
slog.Error("gc: find orphaned blobs", "error", err)
return
+21 -1
View File
@@ -2,6 +2,7 @@ package proxy
import (
"regexp"
"sync"
"git.unkin.net/unkin/artifactapi/internal/provider"
"git.unkin.net/unkin/artifactapi/pkg/models"
@@ -60,10 +61,29 @@ func (c *Classifier) Classify(remote models.Remote, path string) Classification
return ClassImmutable
}
// patternCache memoises regex compilation. Classify runs on every proxied
// request and previously recompiled each remote's pattern lists every time;
// keying by the pattern string lets each distinct pattern compile once and
// then be reused, with no invalidation needed (the pattern text is the key).
// A pattern that fails to compile is cached as a typed nil so we don't retry.
var patternCache sync.Map // map[string]*regexp.Regexp
func compileCached(pattern string) *regexp.Regexp {
if v, ok := patternCache.Load(pattern); ok {
return v.(*regexp.Regexp)
}
re, err := regexp.Compile(pattern)
if err != nil {
re = nil
}
patternCache.Store(pattern, re)
return re
}
func compilePatterns(patterns []string) []*regexp.Regexp {
compiled := make([]*regexp.Regexp, 0, len(patterns))
for _, p := range patterns {
if re, err := regexp.Compile(p); err == nil {
if re := compileCached(p); re != nil {
compiled = append(compiled, re)
}
}
+283 -96
View File
@@ -2,9 +2,8 @@ package proxy
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
@@ -21,19 +20,63 @@ import (
const fetchLockTTL = 30 * time.Second
const (
accessLogBufferSize = 4096
accessLogBatchSize = 128
accessLogFlushEvery = 2 * time.Second
)
type Engine struct {
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
accessLog chan database.AccessLogEntry
}
func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
return &Engine{
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
e := &Engine{
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
accessLog: make(chan database.AccessLogEntry, accessLogBufferSize),
}
go e.runAccessLogWriter()
return e
}
// runAccessLogWriter drains the access-log channel and writes rows in batches,
// replacing a goroutine-per-request insert. It runs for the process lifetime;
// access logs are best-effort telemetry, so a small tail may be lost on abrupt
// shutdown.
func (e *Engine) runAccessLogWriter() {
ticker := time.NewTicker(accessLogFlushEvery)
defer ticker.Stop()
batch := make([]database.AccessLogEntry, 0, accessLogBatchSize)
flush := func() {
if len(batch) == 0 {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := e.db.InsertAccessLogBatch(ctx, batch); err != nil {
slog.Warn("access log batch insert failed", "error", err, "count", len(batch))
}
cancel()
batch = batch[:0]
}
for {
select {
case entry := <-e.accessLog:
batch = append(batch, entry)
if len(batch) >= accessLogBatchSize {
flush()
}
case <-ticker.C:
flush()
}
}
}
@@ -63,7 +106,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
go e.logAccess(remote.Name, path, true, result.Size, 0)
e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
slog.Warn("cache hit but S3 miss, re-fetching", "remote", remote.Name, "path", path)
@@ -75,11 +118,12 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
}
if !locked {
time.Sleep(500 * time.Millisecond)
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
// Another request holds the fetch lock. Poll the store until the leader
// populates it rather than immediately racing to fetch upstream too; a
// cold-cache stampede otherwise hits upstream once per waiter.
if result := e.waitForStore(ctx, remote, path); result != nil {
result.Source = "cache"
go e.logAccess(remote.Name, path, true, result.Size, 0)
e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
}
@@ -98,7 +142,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
go e.logAccess(remote.Name, path, true, result.Size, 0)
e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
}
@@ -120,17 +164,98 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
if serr == nil {
slog.Warn("serving stale on upstream error", "remote", remote.Name, "path", path, "error", err)
stale.Source = "cache"
go e.logAccess(remote.Name, path, true, stale.Size, 0)
e.logAccess(remote.Name, path, true, stale.Size, 0)
return stale, nil
}
}
return nil, err
}
go e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
return result, nil
}
// HeadResult carries artifact metadata for a HEAD request. There is no body.
type HeadResult struct {
ContentType string
Size int64
Source string // "cache" or "remote"
}
// Head resolves artifact metadata without fetching or streaming the body.
// Cached artifacts/indexes are answered from the store metadata; on a miss it
// issues an upstream HEAD. It never downloads or caches the body.
func (e *Engine) Head(ctx context.Context, remote models.Remote, path string, prov provider.Provider) (*HeadResult, error) {
class := NewClassifier(prov).Classify(remote, path)
if class == ClassDenied {
return nil, &ProxyError{Status: http.StatusForbidden, Message: "access denied"}
}
if artifact, err := e.db.GetArtifact(ctx, remote.Name, path); err == nil && artifact != nil {
return &HeadResult{ContentType: artifact.ContentType, Size: artifact.SizeBytes, Source: "cache"}, nil
}
if info, err := e.store.Stat(ctx, storage.IndexKey(remote.Name, path)); err == nil {
return &HeadResult{ContentType: info.ContentType, Size: info.Size, Source: "cache"}, nil
}
return e.headUpstream(ctx, remote, path, prov)
}
func (e *Engine) headUpstream(ctx context.Context, remote models.Remote, path string, prov provider.Provider) (*HeadResult, error) {
url := prov.UpstreamURL(remote, path)
authHeaders, err := prov.AuthHeaders(ctx, remote)
if err != nil {
return nil, fmt.Errorf("auth headers: %w", err)
}
doHead := func(extra http.Header) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
for k, vv := range authHeaders {
for _, v := range vv {
req.Header.Add(k, v)
}
}
for k, vv := range extra {
for _, v := range vv {
req.Header.Set(k, v)
}
}
return http.DefaultClient.Do(req)
}
resp, err := doHead(nil)
if err != nil {
return nil, &UpstreamError{Err: err}
}
if resp.StatusCode == http.StatusUnauthorized {
resp.Body.Close()
token, terr := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
if terr == nil && token != "" {
resp, err = doHead(http.Header{"Authorization": []string{"Bearer " + token}})
if err != nil {
return nil, &UpstreamError{Err: err}
}
} else {
return nil, &ProxyError{Status: http.StatusUnauthorized, Message: "upstream returned 401"}
}
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, &ProxyError{Status: resp.StatusCode, Message: fmt.Sprintf("upstream returned %d", resp.StatusCode)}
}
contentType := prov.ContentType(path)
if ct := resp.Header.Get("Content-Type"); ct != "" {
contentType = ct
}
return &HeadResult{ContentType: contentType, Size: resp.ContentLength, Source: "remote"}, nil
}
func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, path string, prov provider.Provider, class Classification, ttl time.Duration, clientHeaders http.Header) (*FetchResult, error) {
url := prov.UpstreamURL(remote, path)
@@ -154,14 +279,14 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
}
}
resp, err := http.DefaultClient.Do(req)
resp, err := clientForRemote(remote).Do(req)
if err != nil {
return nil, &UpstreamError{Err: err}
}
if resp.StatusCode == http.StatusUnauthorized {
resp.Body.Close()
token, err := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
token, err := e.cachedBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
if err == nil && token != "" {
req2, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
req2.Header.Set("Authorization", "Bearer "+token)
@@ -170,7 +295,7 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
req2.Header.Set("Accept", accept)
}
}
resp, err = http.DefaultClient.Do(req2)
resp, err = clientForRemote(remote).Do(req2)
if err != nil {
return nil, &UpstreamError{Err: err}
}
@@ -184,83 +309,108 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
return nil, &ProxyError{Status: resp.StatusCode, Message: fmt.Sprintf("upstream returned %d", resp.StatusCode)}
}
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("read upstream body: %w", err)
}
rewritten, err := prov.RewriteResponse(body, remote, "")
if err != nil {
return nil, fmt.Errorf("rewrite response: %w", err)
}
if rewritten != nil {
body = rewritten
}
contentType := prov.ContentType(path)
if ct := resp.Header.Get("Content-Type"); ct != "" {
contentType = ct
}
// Mutable indexes are small and may be rewritten, so buffer them in memory.
if class == ClassMutable {
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("read upstream body: %w", err)
}
rewritten, err := prov.RewriteResponse(body, remote, "")
if err != nil {
return nil, fmt.Errorf("rewrite response: %w", err)
}
if rewritten != nil {
body = rewritten
}
s3Key := storage.IndexKey(remote.Name, path)
if err := e.store.Upload(ctx, s3Key, bytesReader(body), int64(len(body)), contentType); err != nil {
return nil, fmt.Errorf("upload index: %w", err)
}
etag := resp.Header.Get("ETag")
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
if etag != "" {
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
}
} else {
hash := sha256Hash(body)
s3Key := storage.BlobKey(hash)
exists, _ := e.store.Exists(ctx, s3Key)
if !exists {
if err := e.store.Upload(ctx, s3Key, bytesReader(body), int64(len(body)), contentType); err != nil {
return nil, fmt.Errorf("upload blob: %w", err)
}
}
contentHash := fmt.Sprintf("sha256:%s", hash)
if err := e.db.UpsertBlob(ctx, contentHash, s3Key, int64(len(body)), contentType); err != nil {
slog.Warn("upsert blob failed", "error", err)
}
if err := e.db.UpsertArtifact(ctx, remote.Name, path, contentHash, resp.Header.Get("ETag")); err != nil {
slog.Warn("upsert artifact failed", "error", err)
}
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
if etag := resp.Header.Get("ETag"); etag != "" {
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
}
return &FetchResult{
Reader: io.NopCloser(bytesReader(body)),
ContentType: contentType,
Size: int64(len(body)),
Source: "remote",
}, nil
}
// Immutable blobs are streamed through the content-addressable store
// (tempfile -> sha256 -> S3) so arbitrarily large artifacts never sit
// fully in memory. Immutable content is never rewritten in the proxy path.
casResult, err := e.cas.Store(ctx, resp.Body, contentType)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("store blob: %w", err)
}
if err := e.db.UpsertBlob(ctx, casResult.ContentHash, casResult.S3Key, casResult.SizeBytes, contentType); err != nil {
slog.Warn("upsert blob failed", "error", err)
}
if err := e.db.UpsertArtifact(ctx, remote.Name, path, casResult.ContentHash, resp.Header.Get("ETag")); err != nil {
slog.Warn("upsert artifact failed", "error", err)
}
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
if etag := resp.Header.Get("ETag"); etag != "" {
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
}
reader, info, err := e.store.Download(ctx, casResult.S3Key)
if err != nil {
return nil, fmt.Errorf("serve stored blob: %w", err)
}
return &FetchResult{
Reader: io.NopCloser(bytesReader(body)),
ContentType: contentType,
Size: int64(len(body)),
Reader: reader,
ContentType: info.ContentType,
Size: casResult.SizeBytes,
Source: "remote",
}, nil
}
// waitForStore polls the store for an artifact populated by the request that
// holds the fetch lock, returning it once available or nil if it does not
// appear within the wait budget (after which the caller fetches upstream
// itself). It stops early if the request context is cancelled.
func (e *Engine) waitForStore(ctx context.Context, remote models.Remote, path string) *FetchResult {
const (
pollInterval = 100 * time.Millisecond
maxWait = 5 * time.Second
)
deadline := time.Now().Add(maxWait)
for {
if result, err := e.serveFromStore(ctx, remote, path); err == nil {
return result
}
if time.Now().After(deadline) {
return nil
}
select {
case <-ctx.Done():
return nil
case <-time.After(pollInterval):
}
}
}
func (e *Engine) serveFromStore(ctx context.Context, remote models.Remote, path string) (*FetchResult, error) {
artifact, err := e.db.GetArtifact(ctx, remote.Name, path)
if err == nil && artifact != nil {
reader, info, err := e.store.Download(ctx, artifact.ContentHash[len("sha256:"):])
if err == nil {
_ = e.db.TouchArtifactAccess(ctx, remote.Name, path)
return &FetchResult{
Reader: reader,
ContentType: info.ContentType,
Size: info.Size,
}, nil
}
s3Key := storage.BlobKey(artifact.ContentHash[len("sha256:"):])
reader, info, err = e.store.Download(ctx, s3Key)
reader, info, err := e.store.Download(ctx, s3Key)
if err == nil {
_ = e.db.TouchArtifactAccess(ctx, remote.Name, path)
return &FetchResult{
@@ -302,7 +452,7 @@ func (e *Engine) checkUpstream(ctx context.Context, remote models.Remote, path,
}
}
resp, err := http.DefaultClient.Do(req)
resp, err := clientForRemote(remote).Do(req)
if err != nil {
return false, &UpstreamError{Err: err}
}
@@ -323,15 +473,20 @@ func (e *Engine) ttlFor(remote models.Remote, class Classification) time.Duratio
}
}
// logAccess enqueues an access-log entry for the batch writer. It never blocks
// the request path: if the buffer is full the entry is dropped.
func (e *Engine) logAccess(remoteName, path string, cacheHit bool, size int64, upstreamMS int) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = e.db.InsertAccessLog(ctx, remoteName, path, cacheHit, size, upstreamMS, "")
}
func sha256Hash(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
select {
case e.accessLog <- database.AccessLogEntry{
RemoteName: remoteName,
Path: path,
CacheHit: cacheHit,
SizeBytes: size,
UpstreamMS: upstreamMS,
}:
default:
slog.Warn("access log buffer full, dropping entry", "remote", remoteName, "path", path)
}
}
func bytesReader(data []byte) io.Reader {
@@ -351,9 +506,41 @@ func (r readerAt) ReadAt(p []byte, off int64) (n int, err error) {
return
}
func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) {
// bearerTokenTTLDefault/Margin bound how long a token is cached: the default
// is used when the token endpoint omits expires_in, and the margin is
// subtracted so a cached token is refreshed slightly before it actually expires.
const (
bearerTokenTTLDefault = 60 * time.Second
bearerTokenTTLMargin = 10 * time.Second
)
// cachedBearerToken returns a bearer token for the given challenge, reusing a
// Redis-cached token for the same remote+challenge while it is still valid.
func (e *Engine) cachedBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) {
key := remote.Name + ":" + sha256Hash([]byte(wwwAuth))
if tok, err := e.cache.GetToken(ctx, key); err == nil && tok != "" {
return tok, nil
}
tok, ttl, err := fetchBearerToken(ctx, wwwAuth, remote)
if err != nil {
return "", err
}
if tok != "" {
if ttl <= 0 {
ttl = bearerTokenTTLDefault
}
if ttl > bearerTokenTTLMargin {
ttl -= bearerTokenTTLMargin
}
_ = e.cache.SetToken(ctx, key, tok, ttl)
}
return tok, nil
}
func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, time.Duration, error) {
if !strings.HasPrefix(wwwAuth, "Bearer ") {
return "", fmt.Errorf("not a Bearer challenge")
return "", 0, fmt.Errorf("not a Bearer challenge")
}
params := map[string]string{}
@@ -370,7 +557,7 @@ func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote)
realm := params["realm"]
if realm == "" {
return "", fmt.Errorf("no realm in Bearer challenge")
return "", 0, fmt.Errorf("no realm in Bearer challenge")
}
tokenURL := realm
@@ -385,35 +572,37 @@ func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, tokenURL, nil)
if err != nil {
return "", err
return "", 0, err
}
if remote.Username != "" && remote.Password != "" {
req.SetBasicAuth(remote.Username, remote.Password)
}
resp, err := http.DefaultClient.Do(req)
resp, err := clientForRemote(remote).Do(req)
if err != nil {
return "", err
return "", 0, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("token endpoint returned %d", resp.StatusCode)
return "", 0, fmt.Errorf("token endpoint returned %d", resp.StatusCode)
}
var tokenResp struct {
Token string `json:"token"`
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
}
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
return "", err
return "", 0, err
}
ttl := time.Duration(tokenResp.ExpiresIn) * time.Second
if tokenResp.Token != "" {
return tokenResp.Token, nil
return tokenResp.Token, ttl, nil
}
return tokenResp.AccessToken, nil
return tokenResp.AccessToken, ttl, nil
}
type ProxyError struct {
@@ -431,8 +620,6 @@ func (e *UpstreamError) Error() string { return fmt.Sprintf("upstream error: %v"
func (e *UpstreamError) Unwrap() error { return e.Err }
func isNetworkError(err error) bool {
if _, ok := err.(*UpstreamError); ok {
return true
}
return false
var ue *UpstreamError
return errors.As(err, &ue)
}
+83
View File
@@ -0,0 +1,83 @@
package proxy
import (
"net"
"net/http"
"sync"
"time"
"git.unkin.net/unkin/artifactapi/pkg/models"
)
// Default upstream timeouts. A remote may override any of these; a zero
// override falls back to the default here. There is deliberately no overall
// Client.Timeout: the proxy streams arbitrarily large artifacts and total time
// is bounded by the request context instead. We only constrain the phases that
// must never hang — connect, TLS handshake, and time-to-first-response-header —
// so a slow or wedged upstream cannot pin a goroutine and connection.
const (
defaultDialTimeout = 10 * time.Second
defaultTLSTimeout = 10 * time.Second
defaultResponseHeaderTimeout = 30 * time.Second
)
type clientKey struct {
dial time.Duration
tls time.Duration
respHeader time.Duration
}
var (
clientCacheMu sync.Mutex
clientCache = map[clientKey]*http.Client{}
)
// upstreamClientFor returns an HTTP client configured with the given timeouts,
// reusing a cached client (and its connection pool) for identical timeout sets.
// Zero values fall back to the defaults.
func upstreamClientFor(dial, tls, respHeader time.Duration) *http.Client {
if dial <= 0 {
dial = defaultDialTimeout
}
if tls <= 0 {
tls = defaultTLSTimeout
}
if respHeader <= 0 {
respHeader = defaultResponseHeaderTimeout
}
key := clientKey{dial: dial, tls: tls, respHeader: respHeader}
clientCacheMu.Lock()
defer clientCacheMu.Unlock()
if c, ok := clientCache[key]; ok {
return c
}
c := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: dial,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: tls,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: respHeader,
},
}
clientCache[key] = c
return c
}
// clientForRemote returns the upstream client for a remote, applying its
// per-remote timeout overrides (in seconds) on top of the defaults.
func clientForRemote(remote models.Remote) *http.Client {
return upstreamClientFor(
time.Duration(remote.UpstreamDialTimeout)*time.Second,
time.Duration(remote.UpstreamTLSTimeout)*time.Second,
time.Duration(remote.UpstreamResponseHeaderTimeout)*time.Second,
)
}
+30
View File
@@ -2,6 +2,7 @@ package models
import (
"fmt"
"regexp"
"time"
)
@@ -46,6 +47,11 @@ type Remote struct {
MutableTTL int `json:"mutable_ttl"`
CheckMutable bool `json:"check_mutable"`
// Upstream HTTP timeouts in seconds. 0 means use the server default.
UpstreamDialTimeout int `json:"upstream_dial_timeout,omitempty"`
UpstreamTLSTimeout int `json:"upstream_tls_timeout,omitempty"`
UpstreamResponseHeaderTimeout int `json:"upstream_response_header_timeout,omitempty"`
Patterns []string `json:"patterns,omitempty"`
Blocklist []string `json:"blocklist,omitempty"`
MutablePatterns []string `json:"mutable_patterns,omitempty"`
@@ -66,6 +72,30 @@ type Remote struct {
UpdatedAt time.Time `json:"updated_at"`
}
// ValidatePatterns ensures every configured regex compiles. Storing an
// invalid pattern would otherwise be silently dropped at match time, which
// for the blocklist is a fail-open: a mistyped deny rule becomes a no-op.
func (r *Remote) ValidatePatterns() error {
groups := []struct {
field string
patterns []string
}{
{"patterns", r.Patterns},
{"blocklist", r.Blocklist},
{"mutable_patterns", r.MutablePatterns},
{"immutable_patterns", r.ImmutablePatterns},
{"ban_tags", r.BanTags},
}
for _, g := range groups {
for _, p := range g.patterns {
if _, err := regexp.Compile(p); err != nil {
return fmt.Errorf("invalid regex in %s: %q: %w", g.field, p, err)
}
}
}
return nil
}
type RemoteWithStats struct {
Remote
Stats RemoteStats `json:"stats"`
+19
View File
@@ -0,0 +1,19 @@
package models
import "testing"
func TestRemote_ValidatePatterns(t *testing.T) {
valid := &Remote{
Patterns: []string{`.*\.tar\.gz$`},
Blocklist: []string{`^secret/`},
ImmutablePatterns: []string{`\.rpm$`},
}
if err := valid.ValidatePatterns(); err != nil {
t.Fatalf("expected valid patterns, got %v", err)
}
bad := &Remote{Blocklist: []string{`[unterminated`}}
if err := bad.ValidatePatterns(); err == nil {
t.Fatal("expected error for invalid blocklist regex, got nil")
}
}