From 45d6cdbc6490b5db6379c5f4bffeb5b5f781a642 Mon Sep 17 00:00:00 2001 From: Ben Vincent Date: Thu, 2 Jul 2026 22:07:56 +1000 Subject: [PATCH] perf: batch access-log writes instead of goroutine+insert per request (#91) Fixes #76 ## Why Every proxied request spawned a goroutine running a 5s-timeout single-row INSERT. Under load this is unbounded goroutines and connection-pool pressure. ## Changes - Add `database.AccessLogEntry` + `InsertAccessLogBatch` (bulk `COPY`). - The engine starts one background writer that drains a buffered channel and flushes every 128 entries or 2s. - `logAccess` is now a non-blocking channel send (drops on full buffer), so the request path never blocks on the DB. Best-effort telemetry: a small tail may be lost on abrupt shutdown. ## Validation - `make e2e` passes. Reviewed-on: https://git.unkin.net/unkin/artifactapi/pulls/91 Co-authored-by: Ben Vincent Co-committed-by: Ben Vincent --- internal/database/artifacts.go | 29 +++++++++++ internal/proxy/engine.go | 88 +++++++++++++++++++++++++++------- 2 files changed, 100 insertions(+), 17 deletions(-) diff --git a/internal/database/artifacts.go b/internal/database/artifacts.go index fcd6e21..5d73c15 100644 --- a/internal/database/artifacts.go +++ b/internal/database/artifacts.go @@ -4,6 +4,8 @@ import ( "context" "time" + "github.com/jackc/pgx/v5" + "git.unkin.net/unkin/artifactapi/pkg/models" ) @@ -109,6 +111,33 @@ func (db *DB) InsertAccessLog(ctx context.Context, remoteName, path string, cach return err } +// AccessLogEntry is one buffered access-log record. +type AccessLogEntry struct { + RemoteName string + Path string + CacheHit bool + SizeBytes int64 + UpstreamMS int + ClientIP string +} + +// InsertAccessLogBatch bulk-inserts access-log rows with a single COPY. +func (db *DB) InsertAccessLogBatch(ctx context.Context, entries []AccessLogEntry) error { + if len(entries) == 0 { + return nil + } + rows := make([][]any, len(entries)) + for i, e := range entries { + rows[i] = []any{e.RemoteName, e.Path, e.CacheHit, e.SizeBytes, e.UpstreamMS, e.ClientIP} + } + _, err := db.Pool.CopyFrom(ctx, + pgx.Identifier{"access_log"}, + []string{"remote_name", "path", "cache_hit", "size_bytes", "upstream_ms", "client_ip"}, + pgx.CopyFromRows(rows), + ) + return err +} + func (db *DB) FindOrphanedBlobs(ctx context.Context) ([]models.Blob, error) { rows, err := db.Pool.Query(ctx, ` SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at diff --git a/internal/proxy/engine.go b/internal/proxy/engine.go index afaf82d..4c17cea 100644 --- a/internal/proxy/engine.go +++ b/internal/proxy/engine.go @@ -20,19 +20,63 @@ import ( const fetchLockTTL = 30 * time.Second +const ( + accessLogBufferSize = 4096 + accessLogBatchSize = 128 + accessLogFlushEvery = 2 * time.Second +) + type Engine struct { - db *database.DB - cache *cache.Redis - store *storage.S3 - cas *storage.CAS + db *database.DB + cache *cache.Redis + store *storage.S3 + cas *storage.CAS + accessLog chan database.AccessLogEntry } func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine { - return &Engine{ - db: db, - cache: c, - store: s, - cas: storage.NewCAS(s), + e := &Engine{ + db: db, + cache: c, + store: s, + cas: storage.NewCAS(s), + accessLog: make(chan database.AccessLogEntry, accessLogBufferSize), + } + go e.runAccessLogWriter() + return e +} + +// runAccessLogWriter drains the access-log channel and writes rows in batches, +// replacing a goroutine-per-request insert. It runs for the process lifetime; +// access logs are best-effort telemetry, so a small tail may be lost on abrupt +// shutdown. +func (e *Engine) runAccessLogWriter() { + ticker := time.NewTicker(accessLogFlushEvery) + defer ticker.Stop() + + batch := make([]database.AccessLogEntry, 0, accessLogBatchSize) + flush := func() { + if len(batch) == 0 { + return + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := e.db.InsertAccessLogBatch(ctx, batch); err != nil { + slog.Warn("access log batch insert failed", "error", err, "count", len(batch)) + } + cancel() + batch = batch[:0] + } + + for { + select { + case entry := <-e.accessLog: + batch = append(batch, entry) + if len(batch) >= accessLogBatchSize { + flush() + } + case <-ticker.C: + flush() + } } } @@ -62,7 +106,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p result, err := e.serveFromStore(ctx, remote, path) if err == nil { result.Source = "cache" - go e.logAccess(remote.Name, path, true, result.Size, 0) + e.logAccess(remote.Name, path, true, result.Size, 0) return result, nil } slog.Warn("cache hit but S3 miss, re-fetching", "remote", remote.Name, "path", path) @@ -78,7 +122,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p result, err := e.serveFromStore(ctx, remote, path) if err == nil { result.Source = "cache" - go e.logAccess(remote.Name, path, true, result.Size, 0) + e.logAccess(remote.Name, path, true, result.Size, 0) return result, nil } } @@ -97,7 +141,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p result, err := e.serveFromStore(ctx, remote, path) if err == nil { result.Source = "cache" - go e.logAccess(remote.Name, path, true, result.Size, 0) + e.logAccess(remote.Name, path, true, result.Size, 0) return result, nil } } @@ -119,14 +163,14 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p if serr == nil { slog.Warn("serving stale on upstream error", "remote", remote.Name, "path", path, "error", err) stale.Source = "cache" - go e.logAccess(remote.Name, path, true, stale.Size, 0) + e.logAccess(remote.Name, path, true, stale.Size, 0) return stale, nil } } return nil, err } - go e.logAccess(remote.Name, path, false, result.Size, upstreamMS) + e.logAccess(remote.Name, path, false, result.Size, upstreamMS) return result, nil } @@ -403,10 +447,20 @@ func (e *Engine) ttlFor(remote models.Remote, class Classification) time.Duratio } } +// logAccess enqueues an access-log entry for the batch writer. It never blocks +// the request path: if the buffer is full the entry is dropped. func (e *Engine) logAccess(remoteName, path string, cacheHit bool, size int64, upstreamMS int) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _ = e.db.InsertAccessLog(ctx, remoteName, path, cacheHit, size, upstreamMS, "") + select { + case e.accessLog <- database.AccessLogEntry{ + RemoteName: remoteName, + Path: path, + CacheHit: cacheHit, + SizeBytes: size, + UpstreamMS: upstreamMS, + }: + default: + slog.Warn("access log buffer full, dropping entry", "remote", remoteName, "path", path) + } } func bytesReader(data []byte) io.Reader {