Compare commits

..

1 Commits

Author SHA1 Message Date
unkinben b698d1bdc0 perf: memoise regex compilation in the classifier
ci/woodpecker/pr/test Pipeline was successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/pr/build Pipeline was successful
Classify runs on every proxied request and recompiled each remote's
pattern lists every time. Cache compiled regexes keyed by pattern text so
each distinct pattern compiles once and is reused thereafter.

Refs #73
2026-07-02 00:33:46 +10:00
15 changed files with 106 additions and 597 deletions
+1 -1
View File
@@ -95,7 +95,7 @@ func TestMain(m *testing.M) {
}
cfg.ListenAddr = "127.0.0.1:0"
srv, err := server.New(cfg, "e2e-test")
srv, err := server.New(cfg)
if err != nil {
log.Fatalf("server: %v", err)
}
-24
View File
@@ -24,30 +24,6 @@ func TestRoot(t *testing.T) {
}
}
func TestRemoteUpstreamTimeouts(t *testing.T) {
createRemote(t, `{
"name": "timeout-test",
"package_type": "generic",
"base_url": "https://example.com",
"stale_on_error": true,
"upstream_dial_timeout": 3,
"upstream_tls_timeout": 4,
"upstream_response_header_timeout": 5
}`)
defer deleteRemote(t, "timeout-test")
remote := getJSON(t, apiURL("/api/v2/remotes/timeout-test"))
for field, want := range map[string]float64{
"upstream_dial_timeout": 3,
"upstream_tls_timeout": 4,
"upstream_response_header_timeout": 5,
} {
if got, _ := remote[field].(float64); got != want {
t.Errorf("%s: got %v, want %v", field, remote[field], want)
}
}
}
func TestRemoteCRUD(t *testing.T) {
createRemote(t, `{
"name": "test-generic",
-33
View File
@@ -24,39 +24,6 @@ func TestProxyBlocklist(t *testing.T) {
assertStatus(t, apiURL("/api/v1/remote/blocklist-test/malware.exe"), http.StatusForbidden)
}
func TestProxyHeadBlocklist(t *testing.T) {
createRemote(t, `{
"name": "head-block-test",
"package_type": "generic",
"base_url": "https://example.com",
"blocklist": ["\\.exe$"],
"stale_on_error": true
}`)
defer deleteRemote(t, "head-block-test")
req, _ := http.NewRequest(http.MethodHead, apiURL("/v2/head-block-test/malware.exe"), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("HEAD: %v", err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusForbidden {
t.Fatalf("HEAD blocklisted path: got %d, want 403", resp.StatusCode)
}
}
func TestProxyHeadUnknownRemote(t *testing.T) {
req, _ := http.NewRequest(http.MethodHead, apiURL("/v2/nonexistent/some/path"), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("HEAD: %v", err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("HEAD unknown remote: got %d, want 404", resp.StatusCode)
}
}
func TestProxyPatterns(t *testing.T) {
createRemote(t, `{
"name": "patterns-test",
+1 -37
View File
@@ -42,7 +42,7 @@ func (h *ProxyHandler) DockerV2Routes() chi.Router {
r.Get("/", h.handleDockerPing)
r.Head("/", h.handleDockerPing)
r.Get("/{remoteName}/*", h.handleProxy)
r.Head("/{remoteName}/*", h.handleProxyHead)
r.Head("/{remoteName}/*", h.handleProxy)
return r
}
@@ -89,42 +89,6 @@ func (h *ProxyHandler) handleProxy(w http.ResponseWriter, r *http.Request) {
io.Copy(w, result.Reader)
}
func (h *ProxyHandler) handleProxyHead(w http.ResponseWriter, r *http.Request) {
remoteName := chi.URLParam(r, "remoteName")
path := chi.URLParam(r, "*")
remote, err := h.db.GetRemote(r.Context(), remoteName)
if err != nil {
http.Error(w, fmt.Sprintf("remote %q not found", remoteName), http.StatusNotFound)
return
}
prov, err := provider.Get(remote.PackageType)
if err != nil {
http.Error(w, fmt.Sprintf("no provider for %q", remote.PackageType), http.StatusInternalServerError)
return
}
result, err := h.engine.Head(r.Context(), *remote, path, prov)
if err != nil {
var proxyErr *proxy.ProxyError
if errors.As(err, &proxyErr) {
http.Error(w, proxyErr.Message, proxyErr.Status)
return
}
slog.Error("proxy head failed", "remote", remoteName, "path", path, "error", err)
http.Error(w, "bad gateway", http.StatusBadGateway)
return
}
w.Header().Set("Content-Type", result.ContentType)
w.Header().Set("X-Artifact-Source", result.Source)
if result.Size > 0 {
w.Header().Set("Content-Length", fmt.Sprintf("%d", result.Size))
}
w.WriteHeader(http.StatusOK)
}
func (h *ProxyHandler) handleVirtual(w http.ResponseWriter, r *http.Request) {
virtualName := chi.URLParam(r, "virtualName")
path := chi.URLParam(r, "*")
-8
View File
@@ -69,10 +69,6 @@ func (h *RemotesHandler) create(w http.ResponseWriter, r *http.Request) {
http.Error(w, "base_url is required for remote repositories", http.StatusBadRequest)
return
}
if err := remote.ValidatePatterns(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := h.db.CreateRemote(r.Context(), &remote); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -88,10 +84,6 @@ func (h *RemotesHandler) update(w http.ResponseWriter, r *http.Request) {
return
}
remote.Name = name
if err := remote.ValidatePatterns(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := h.db.UpdateRemote(r.Context(), &remote); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
-12
View File
@@ -70,18 +70,6 @@ func (r *Redis) GetETag(ctx context.Context, remote, path string) (string, error
return val, err
}
func (r *Redis) GetToken(ctx context.Context, key string) (string, error) {
val, err := r.client.Get(ctx, "token:"+key).Result()
if err == redis.Nil {
return "", nil
}
return val, err
}
func (r *Redis) SetToken(ctx context.Context, key, token string, ttl time.Duration) error {
return r.client.Set(ctx, "token:"+key, token, ttl).Err()
}
func (r *Redis) IncrCircuitFailure(ctx context.Context, remote string, cooldown time.Duration) (int64, error) {
key := fmt.Sprintf("circuit:%s", remote)
pipe := r.client.Pipeline()
+1 -1
View File
@@ -65,7 +65,7 @@ func Load() (*Config, error) {
}
func getenv(key, fallback string) string {
if v, ok := os.LookupEnv(key); ok {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
+3 -38
View File
@@ -4,8 +4,6 @@ import (
"context"
"time"
"github.com/jackc/pgx/v5"
"git.unkin.net/unkin/artifactapi/pkg/models"
)
@@ -111,49 +109,16 @@ func (db *DB) InsertAccessLog(ctx context.Context, remoteName, path string, cach
return err
}
// AccessLogEntry is one buffered access-log record.
type AccessLogEntry struct {
RemoteName string
Path string
CacheHit bool
SizeBytes int64
UpstreamMS int
ClientIP string
}
// InsertAccessLogBatch bulk-inserts access-log rows with a single COPY.
func (db *DB) InsertAccessLogBatch(ctx context.Context, entries []AccessLogEntry) error {
if len(entries) == 0 {
return nil
}
rows := make([][]any, len(entries))
for i, e := range entries {
rows[i] = []any{e.RemoteName, e.Path, e.CacheHit, e.SizeBytes, e.UpstreamMS, e.ClientIP}
}
_, err := db.Pool.CopyFrom(ctx,
pgx.Identifier{"access_log"},
[]string{"remote_name", "path", "cache_hit", "size_bytes", "upstream_ms", "client_ip"},
pgx.CopyFromRows(rows),
)
return err
}
// FindOrphanedBlobs returns blobs no longer referenced by any artifact or
// local file, restricted to those created before now()-minAge. The age cutoff
// is a grace period that avoids a TOCTOU race with in-flight dedup uploads,
// which insert the blob row before the referencing artifact/local_files row.
func (db *DB) FindOrphanedBlobs(ctx context.Context, minAge time.Duration) ([]models.Blob, error) {
cutoff := time.Now().Add(-minAge)
func (db *DB) FindOrphanedBlobs(ctx context.Context) ([]models.Blob, error) {
rows, err := db.Pool.Query(ctx, `
SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at
FROM blobs b
WHERE b.created_at < $1
AND b.content_hash NOT IN (
WHERE b.content_hash NOT IN (
SELECT content_hash FROM artifacts
UNION
SELECT content_hash FROM local_files
)
`, cutoff)
`)
if err != nil {
return nil, err
}
-3
View File
@@ -124,9 +124,6 @@ func (db *DB) migrate() error {
CREATE INDEX IF NOT EXISTS idx_access_log_remote_time ON access_log(remote_name, created_at);
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS repo_type TEXT DEFAULT 'remote';
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_dial_timeout INTEGER DEFAULT 0;
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_tls_timeout INTEGER DEFAULT 0;
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_response_header_timeout INTEGER DEFAULT 0;
CREATE TABLE IF NOT EXISTS rpm_metadata (
id BIGSERIAL PRIMARY KEY,
+5 -14
View File
@@ -11,9 +11,7 @@ const remoteCols = `name, package_type, repo_type, base_url, description, userna
patterns, blocklist, mutable_patterns, immutable_patterns,
ban_tags_enabled, ban_tags,
quarantine_enabled, quarantine_days, stale_on_error,
releases_remote, managed_by,
upstream_dial_timeout, upstream_tls_timeout, upstream_response_header_timeout,
created_at, updated_at`
releases_remote, managed_by, created_at, updated_at`
func scanRemote(scanner interface{ Scan(...any) error }, r *models.Remote) error {
return scanner.Scan(
@@ -22,9 +20,7 @@ func scanRemote(scanner interface{ Scan(...any) error }, r *models.Remote) error
&r.Patterns, &r.Blocklist, &r.MutablePatterns, &r.ImmutablePatterns,
&r.BanTagsEnabled, &r.BanTags,
&r.QuarantineEnabled, &r.QuarantineDays, &r.StaleOnError,
&r.ReleasesRemote, &r.ManagedBy,
&r.UpstreamDialTimeout, &r.UpstreamTLSTimeout, &r.UpstreamResponseHeaderTimeout,
&r.CreatedAt, &r.UpdatedAt,
&r.ReleasesRemote, &r.ManagedBy, &r.CreatedAt, &r.UpdatedAt,
)
}
@@ -63,9 +59,8 @@ func (db *DB) CreateRemote(ctx context.Context, r *models.Remote) error {
patterns, blocklist, mutable_patterns, immutable_patterns,
ban_tags_enabled, ban_tags,
quarantine_enabled, quarantine_days, stale_on_error,
releases_remote, managed_by,
upstream_dial_timeout, upstream_tls_timeout, upstream_response_header_timeout
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24)
releases_remote, managed_by
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21)
`,
r.Name, r.PackageType, r.RepoType, r.BaseURL, r.Description, r.Username, r.Password,
r.ImmutableTTL, r.MutableTTL, r.CheckMutable,
@@ -73,7 +68,6 @@ func (db *DB) CreateRemote(ctx context.Context, r *models.Remote) error {
r.BanTagsEnabled, r.BanTags,
r.QuarantineEnabled, r.QuarantineDays, r.StaleOnError,
r.ReleasesRemote, r.ManagedBy,
r.UpstreamDialTimeout, r.UpstreamTLSTimeout, r.UpstreamResponseHeaderTimeout,
)
return err
}
@@ -86,9 +80,7 @@ func (db *DB) UpdateRemote(ctx context.Context, r *models.Remote) error {
patterns=$11, blocklist=$12, mutable_patterns=$13, immutable_patterns=$14,
ban_tags_enabled=$15, ban_tags=$16,
quarantine_enabled=$17, quarantine_days=$18, stale_on_error=$19,
releases_remote=$20, managed_by=$21,
upstream_dial_timeout=$22, upstream_tls_timeout=$23, upstream_response_header_timeout=$24,
updated_at=NOW()
releases_remote=$20, managed_by=$21, updated_at=NOW()
WHERE name=$1
`,
r.Name, r.PackageType, r.RepoType, r.BaseURL, r.Description, r.Username, r.Password,
@@ -97,7 +89,6 @@ func (db *DB) UpdateRemote(ctx context.Context, r *models.Remote) error {
r.BanTagsEnabled, r.BanTags,
r.QuarantineEnabled, r.QuarantineDays, r.StaleOnError,
r.ReleasesRemote, r.ManagedBy,
r.UpstreamDialTimeout, r.UpstreamTLSTimeout, r.UpstreamResponseHeaderTimeout,
)
return err
}
+1 -6
View File
@@ -9,11 +9,6 @@ import (
"git.unkin.net/unkin/artifactapi/internal/storage"
)
// blobGracePeriod is how old an orphaned blob must be before GC will delete
// it. This avoids racing in-flight dedup uploads that insert the blob row
// before the referencing artifact/local_files row exists.
const blobGracePeriod = 1 * time.Hour
type Collector struct {
db *database.DB
store *storage.S3
@@ -43,7 +38,7 @@ func (c *Collector) Run(ctx context.Context) {
func (c *Collector) sweep(ctx context.Context) {
start := time.Now()
orphaned, err := c.db.FindOrphanedBlobs(ctx, blobGracePeriod)
orphaned, err := c.db.FindOrphanedBlobs(ctx)
if err != nil {
slog.Error("gc: find orphaned blobs", "error", err)
return
+94 -288
View File
@@ -5,7 +5,6 @@ import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
@@ -22,63 +21,19 @@ import (
const fetchLockTTL = 30 * time.Second
const (
accessLogBufferSize = 4096
accessLogBatchSize = 128
accessLogFlushEvery = 2 * time.Second
)
type Engine struct {
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
accessLog chan database.AccessLogEntry
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
}
func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
e := &Engine{
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
accessLog: make(chan database.AccessLogEntry, accessLogBufferSize),
}
go e.runAccessLogWriter()
return e
}
// runAccessLogWriter drains the access-log channel and writes rows in batches,
// replacing a goroutine-per-request insert. It runs for the process lifetime;
// access logs are best-effort telemetry, so a small tail may be lost on abrupt
// shutdown.
func (e *Engine) runAccessLogWriter() {
ticker := time.NewTicker(accessLogFlushEvery)
defer ticker.Stop()
batch := make([]database.AccessLogEntry, 0, accessLogBatchSize)
flush := func() {
if len(batch) == 0 {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := e.db.InsertAccessLogBatch(ctx, batch); err != nil {
slog.Warn("access log batch insert failed", "error", err, "count", len(batch))
}
cancel()
batch = batch[:0]
}
for {
select {
case entry := <-e.accessLog:
batch = append(batch, entry)
if len(batch) >= accessLogBatchSize {
flush()
}
case <-ticker.C:
flush()
}
return &Engine{
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
}
}
@@ -108,7 +63,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
e.logAccess(remote.Name, path, true, result.Size, 0)
go e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
slog.Warn("cache hit but S3 miss, re-fetching", "remote", remote.Name, "path", path)
@@ -120,12 +75,11 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
}
if !locked {
// Another request holds the fetch lock. Poll the store until the leader
// populates it rather than immediately racing to fetch upstream too; a
// cold-cache stampede otherwise hits upstream once per waiter.
if result := e.waitForStore(ctx, remote, path); result != nil {
time.Sleep(500 * time.Millisecond)
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
e.logAccess(remote.Name, path, true, result.Size, 0)
go e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
}
@@ -144,7 +98,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
e.logAccess(remote.Name, path, true, result.Size, 0)
go e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
}
@@ -166,98 +120,17 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
if serr == nil {
slog.Warn("serving stale on upstream error", "remote", remote.Name, "path", path, "error", err)
stale.Source = "cache"
e.logAccess(remote.Name, path, true, stale.Size, 0)
go e.logAccess(remote.Name, path, true, stale.Size, 0)
return stale, nil
}
}
return nil, err
}
e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
go e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
return result, nil
}
// HeadResult carries artifact metadata for a HEAD request. There is no body.
type HeadResult struct {
ContentType string
Size int64
Source string // "cache" or "remote"
}
// Head resolves artifact metadata without fetching or streaming the body.
// Cached artifacts/indexes are answered from the store metadata; on a miss it
// issues an upstream HEAD. It never downloads or caches the body.
func (e *Engine) Head(ctx context.Context, remote models.Remote, path string, prov provider.Provider) (*HeadResult, error) {
class := NewClassifier(prov).Classify(remote, path)
if class == ClassDenied {
return nil, &ProxyError{Status: http.StatusForbidden, Message: "access denied"}
}
if artifact, err := e.db.GetArtifact(ctx, remote.Name, path); err == nil && artifact != nil {
return &HeadResult{ContentType: artifact.ContentType, Size: artifact.SizeBytes, Source: "cache"}, nil
}
if info, err := e.store.Stat(ctx, storage.IndexKey(remote.Name, path)); err == nil {
return &HeadResult{ContentType: info.ContentType, Size: info.Size, Source: "cache"}, nil
}
return e.headUpstream(ctx, remote, path, prov)
}
func (e *Engine) headUpstream(ctx context.Context, remote models.Remote, path string, prov provider.Provider) (*HeadResult, error) {
url := prov.UpstreamURL(remote, path)
authHeaders, err := prov.AuthHeaders(ctx, remote)
if err != nil {
return nil, fmt.Errorf("auth headers: %w", err)
}
doHead := func(extra http.Header) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
for k, vv := range authHeaders {
for _, v := range vv {
req.Header.Add(k, v)
}
}
for k, vv := range extra {
for _, v := range vv {
req.Header.Set(k, v)
}
}
return http.DefaultClient.Do(req)
}
resp, err := doHead(nil)
if err != nil {
return nil, &UpstreamError{Err: err}
}
if resp.StatusCode == http.StatusUnauthorized {
resp.Body.Close()
token, _, terr := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
if terr == nil && token != "" {
resp, err = doHead(http.Header{"Authorization": []string{"Bearer " + token}})
if err != nil {
return nil, &UpstreamError{Err: err}
}
} else {
return nil, &ProxyError{Status: http.StatusUnauthorized, Message: "upstream returned 401"}
}
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, &ProxyError{Status: resp.StatusCode, Message: fmt.Sprintf("upstream returned %d", resp.StatusCode)}
}
contentType := prov.ContentType(path)
if ct := resp.Header.Get("Content-Type"); ct != "" {
contentType = ct
}
return &HeadResult{ContentType: contentType, Size: resp.ContentLength, Source: "remote"}, nil
}
func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, path string, prov provider.Provider, class Classification, ttl time.Duration, clientHeaders http.Header) (*FetchResult, error) {
url := prov.UpstreamURL(remote, path)
@@ -281,14 +154,14 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
}
}
resp, err := clientForRemote(remote).Do(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, &UpstreamError{Err: err}
}
if resp.StatusCode == http.StatusUnauthorized {
resp.Body.Close()
token, err := e.cachedBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
token, err := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
if err == nil && token != "" {
req2, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
req2.Header.Set("Authorization", "Bearer "+token)
@@ -297,7 +170,7 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
req2.Header.Set("Accept", accept)
}
}
resp, err = clientForRemote(remote).Do(req2)
resp, err = http.DefaultClient.Do(req2)
if err != nil {
return nil, &UpstreamError{Err: err}
}
@@ -311,108 +184,83 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
return nil, &ProxyError{Status: resp.StatusCode, Message: fmt.Sprintf("upstream returned %d", resp.StatusCode)}
}
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("read upstream body: %w", err)
}
rewritten, err := prov.RewriteResponse(body, remote, "")
if err != nil {
return nil, fmt.Errorf("rewrite response: %w", err)
}
if rewritten != nil {
body = rewritten
}
contentType := prov.ContentType(path)
if ct := resp.Header.Get("Content-Type"); ct != "" {
contentType = ct
}
// Mutable indexes are small and may be rewritten, so buffer them in memory.
if class == ClassMutable {
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("read upstream body: %w", err)
}
rewritten, err := prov.RewriteResponse(body, remote, "")
if err != nil {
return nil, fmt.Errorf("rewrite response: %w", err)
}
if rewritten != nil {
body = rewritten
}
s3Key := storage.IndexKey(remote.Name, path)
if err := e.store.Upload(ctx, s3Key, bytesReader(body), int64(len(body)), contentType); err != nil {
return nil, fmt.Errorf("upload index: %w", err)
}
etag := resp.Header.Get("ETag")
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
if etag != "" {
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
}
} else {
hash := sha256Hash(body)
s3Key := storage.BlobKey(hash)
exists, _ := e.store.Exists(ctx, s3Key)
if !exists {
if err := e.store.Upload(ctx, s3Key, bytesReader(body), int64(len(body)), contentType); err != nil {
return nil, fmt.Errorf("upload blob: %w", err)
}
}
contentHash := fmt.Sprintf("sha256:%s", hash)
if err := e.db.UpsertBlob(ctx, contentHash, s3Key, int64(len(body)), contentType); err != nil {
slog.Warn("upsert blob failed", "error", err)
}
if err := e.db.UpsertArtifact(ctx, remote.Name, path, contentHash, resp.Header.Get("ETag")); err != nil {
slog.Warn("upsert artifact failed", "error", err)
}
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
if etag := resp.Header.Get("ETag"); etag != "" {
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
}
return &FetchResult{
Reader: io.NopCloser(bytesReader(body)),
ContentType: contentType,
Size: int64(len(body)),
Source: "remote",
}, nil
}
// Immutable blobs are streamed through the content-addressable store
// (tempfile -> sha256 -> S3) so arbitrarily large artifacts never sit
// fully in memory. Immutable content is never rewritten in the proxy path.
casResult, err := e.cas.Store(ctx, resp.Body, contentType)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("store blob: %w", err)
}
if err := e.db.UpsertBlob(ctx, casResult.ContentHash, casResult.S3Key, casResult.SizeBytes, contentType); err != nil {
slog.Warn("upsert blob failed", "error", err)
}
if err := e.db.UpsertArtifact(ctx, remote.Name, path, casResult.ContentHash, resp.Header.Get("ETag")); err != nil {
slog.Warn("upsert artifact failed", "error", err)
}
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
if etag := resp.Header.Get("ETag"); etag != "" {
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
}
reader, info, err := e.store.Download(ctx, casResult.S3Key)
if err != nil {
return nil, fmt.Errorf("serve stored blob: %w", err)
}
return &FetchResult{
Reader: reader,
ContentType: info.ContentType,
Size: casResult.SizeBytes,
Reader: io.NopCloser(bytesReader(body)),
ContentType: contentType,
Size: int64(len(body)),
Source: "remote",
}, nil
}
// waitForStore polls the store for an artifact populated by the request that
// holds the fetch lock, returning it once available or nil if it does not
// appear within the wait budget (after which the caller fetches upstream
// itself). It stops early if the request context is cancelled.
func (e *Engine) waitForStore(ctx context.Context, remote models.Remote, path string) *FetchResult {
const (
pollInterval = 100 * time.Millisecond
maxWait = 5 * time.Second
)
deadline := time.Now().Add(maxWait)
for {
if result, err := e.serveFromStore(ctx, remote, path); err == nil {
return result
}
if time.Now().After(deadline) {
return nil
}
select {
case <-ctx.Done():
return nil
case <-time.After(pollInterval):
}
}
}
func (e *Engine) serveFromStore(ctx context.Context, remote models.Remote, path string) (*FetchResult, error) {
artifact, err := e.db.GetArtifact(ctx, remote.Name, path)
if err == nil && artifact != nil {
reader, info, err := e.store.Download(ctx, artifact.ContentHash[len("sha256:"):])
if err == nil {
_ = e.db.TouchArtifactAccess(ctx, remote.Name, path)
return &FetchResult{
Reader: reader,
ContentType: info.ContentType,
Size: info.Size,
}, nil
}
s3Key := storage.BlobKey(artifact.ContentHash[len("sha256:"):])
reader, info, err := e.store.Download(ctx, s3Key)
reader, info, err = e.store.Download(ctx, s3Key)
if err == nil {
_ = e.db.TouchArtifactAccess(ctx, remote.Name, path)
return &FetchResult{
@@ -454,7 +302,7 @@ func (e *Engine) checkUpstream(ctx context.Context, remote models.Remote, path,
}
}
resp, err := clientForRemote(remote).Do(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false, &UpstreamError{Err: err}
}
@@ -475,20 +323,15 @@ func (e *Engine) ttlFor(remote models.Remote, class Classification) time.Duratio
}
}
// logAccess enqueues an access-log entry for the batch writer. It never blocks
// the request path: if the buffer is full the entry is dropped.
func (e *Engine) logAccess(remoteName, path string, cacheHit bool, size int64, upstreamMS int) {
select {
case e.accessLog <- database.AccessLogEntry{
RemoteName: remoteName,
Path: path,
CacheHit: cacheHit,
SizeBytes: size,
UpstreamMS: upstreamMS,
}:
default:
slog.Warn("access log buffer full, dropping entry", "remote", remoteName, "path", path)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = e.db.InsertAccessLog(ctx, remoteName, path, cacheHit, size, upstreamMS, "")
}
func sha256Hash(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}
func bytesReader(data []byte) io.Reader {
@@ -508,46 +351,9 @@ func (r readerAt) ReadAt(p []byte, off int64) (n int, err error) {
return
}
// bearerTokenTTLDefault/Margin bound how long a token is cached: the default
// is used when the token endpoint omits expires_in, and the margin is
// subtracted so a cached token is refreshed slightly before it actually expires.
const (
bearerTokenTTLDefault = 60 * time.Second
bearerTokenTTLMargin = 10 * time.Second
)
func sha256Hash(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}
// cachedBearerToken returns a bearer token for the given challenge, reusing a
// Redis-cached token for the same remote+challenge while it is still valid.
func (e *Engine) cachedBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) {
key := remote.Name + ":" + sha256Hash([]byte(wwwAuth))
if tok, err := e.cache.GetToken(ctx, key); err == nil && tok != "" {
return tok, nil
}
tok, ttl, err := fetchBearerToken(ctx, wwwAuth, remote)
if err != nil {
return "", err
}
if tok != "" {
if ttl <= 0 {
ttl = bearerTokenTTLDefault
}
if ttl > bearerTokenTTLMargin {
ttl -= bearerTokenTTLMargin
}
_ = e.cache.SetToken(ctx, key, tok, ttl)
}
return tok, nil
}
func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, time.Duration, error) {
func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) {
if !strings.HasPrefix(wwwAuth, "Bearer ") {
return "", 0, fmt.Errorf("not a Bearer challenge")
return "", fmt.Errorf("not a Bearer challenge")
}
params := map[string]string{}
@@ -564,7 +370,7 @@ func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote)
realm := params["realm"]
if realm == "" {
return "", 0, fmt.Errorf("no realm in Bearer challenge")
return "", fmt.Errorf("no realm in Bearer challenge")
}
tokenURL := realm
@@ -579,37 +385,35 @@ func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, tokenURL, nil)
if err != nil {
return "", 0, err
return "", err
}
if remote.Username != "" && remote.Password != "" {
req.SetBasicAuth(remote.Username, remote.Password)
}
resp, err := clientForRemote(remote).Do(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", 0, err
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", 0, fmt.Errorf("token endpoint returned %d", resp.StatusCode)
return "", fmt.Errorf("token endpoint returned %d", resp.StatusCode)
}
var tokenResp struct {
Token string `json:"token"`
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
}
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
return "", 0, err
return "", err
}
ttl := time.Duration(tokenResp.ExpiresIn) * time.Second
if tokenResp.Token != "" {
return tokenResp.Token, ttl, nil
return tokenResp.Token, nil
}
return tokenResp.AccessToken, ttl, nil
return tokenResp.AccessToken, nil
}
type ProxyError struct {
@@ -627,6 +431,8 @@ func (e *UpstreamError) Error() string { return fmt.Sprintf("upstream error: %v"
func (e *UpstreamError) Unwrap() error { return e.Err }
func isNetworkError(err error) bool {
var ue *UpstreamError
return errors.As(err, &ue)
if _, ok := err.(*UpstreamError); ok {
return true
}
return false
}
-83
View File
@@ -1,83 +0,0 @@
package proxy
import (
"net"
"net/http"
"sync"
"time"
"git.unkin.net/unkin/artifactapi/pkg/models"
)
// Default upstream timeouts. A remote may override any of these; a zero
// override falls back to the default here. There is deliberately no overall
// Client.Timeout: the proxy streams arbitrarily large artifacts and total time
// is bounded by the request context instead. We only constrain the phases that
// must never hang — connect, TLS handshake, and time-to-first-response-header —
// so a slow or wedged upstream cannot pin a goroutine and connection.
const (
defaultDialTimeout = 10 * time.Second
defaultTLSTimeout = 10 * time.Second
defaultResponseHeaderTimeout = 30 * time.Second
)
type clientKey struct {
dial time.Duration
tls time.Duration
respHeader time.Duration
}
var (
clientCacheMu sync.Mutex
clientCache = map[clientKey]*http.Client{}
)
// upstreamClientFor returns an HTTP client configured with the given timeouts,
// reusing a cached client (and its connection pool) for identical timeout sets.
// Zero values fall back to the defaults.
func upstreamClientFor(dial, tls, respHeader time.Duration) *http.Client {
if dial <= 0 {
dial = defaultDialTimeout
}
if tls <= 0 {
tls = defaultTLSTimeout
}
if respHeader <= 0 {
respHeader = defaultResponseHeaderTimeout
}
key := clientKey{dial: dial, tls: tls, respHeader: respHeader}
clientCacheMu.Lock()
defer clientCacheMu.Unlock()
if c, ok := clientCache[key]; ok {
return c
}
c := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: dial,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: tls,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: respHeader,
},
}
clientCache[key] = c
return c
}
// clientForRemote returns the upstream client for a remote, applying its
// per-remote timeout overrides (in seconds) on top of the defaults.
func clientForRemote(remote models.Remote) *http.Client {
return upstreamClientFor(
time.Duration(remote.UpstreamDialTimeout)*time.Second,
time.Duration(remote.UpstreamTLSTimeout)*time.Second,
time.Duration(remote.UpstreamResponseHeaderTimeout)*time.Second,
)
}
-30
View File
@@ -2,7 +2,6 @@ package models
import (
"fmt"
"regexp"
"time"
)
@@ -47,11 +46,6 @@ type Remote struct {
MutableTTL int `json:"mutable_ttl"`
CheckMutable bool `json:"check_mutable"`
// Upstream HTTP timeouts in seconds. 0 means use the server default.
UpstreamDialTimeout int `json:"upstream_dial_timeout,omitempty"`
UpstreamTLSTimeout int `json:"upstream_tls_timeout,omitempty"`
UpstreamResponseHeaderTimeout int `json:"upstream_response_header_timeout,omitempty"`
Patterns []string `json:"patterns,omitempty"`
Blocklist []string `json:"blocklist,omitempty"`
MutablePatterns []string `json:"mutable_patterns,omitempty"`
@@ -72,30 +66,6 @@ type Remote struct {
UpdatedAt time.Time `json:"updated_at"`
}
// ValidatePatterns ensures every configured regex compiles. Storing an
// invalid pattern would otherwise be silently dropped at match time, which
// for the blocklist is a fail-open: a mistyped deny rule becomes a no-op.
func (r *Remote) ValidatePatterns() error {
groups := []struct {
field string
patterns []string
}{
{"patterns", r.Patterns},
{"blocklist", r.Blocklist},
{"mutable_patterns", r.MutablePatterns},
{"immutable_patterns", r.ImmutablePatterns},
{"ban_tags", r.BanTags},
}
for _, g := range groups {
for _, p := range g.patterns {
if _, err := regexp.Compile(p); err != nil {
return fmt.Errorf("invalid regex in %s: %q: %w", g.field, p, err)
}
}
}
return nil
}
type RemoteWithStats struct {
Remote
Stats RemoteStats `json:"stats"`
-19
View File
@@ -1,19 +0,0 @@
package models
import "testing"
func TestRemote_ValidatePatterns(t *testing.T) {
valid := &Remote{
Patterns: []string{`.*\.tar\.gz$`},
Blocklist: []string{`^secret/`},
ImmutablePatterns: []string{`\.rpm$`},
}
if err := valid.ValidatePatterns(); err != nil {
t.Fatalf("expected valid patterns, got %v", err)
}
bad := &Remote{Blocklist: []string{`[unterminated`}}
if err := bad.ValidatePatterns(); err == nil {
t.Fatal("expected error for invalid blocklist regex, got nil")
}
}