perf: batch access-log writes instead of goroutine+insert per request (#91)
Fixes #76 ## Why Every proxied request spawned a goroutine running a 5s-timeout single-row INSERT. Under load this is unbounded goroutines and connection-pool pressure. ## Changes - Add `database.AccessLogEntry` + `InsertAccessLogBatch` (bulk `COPY`). - The engine starts one background writer that drains a buffered channel and flushes every 128 entries or 2s. - `logAccess` is now a non-blocking channel send (drops on full buffer), so the request path never blocks on the DB. Best-effort telemetry: a small tail may be lost on abrupt shutdown. ## Validation - `make e2e` passes. Reviewed-on: #91 Co-authored-by: Ben Vincent <ben@unkin.net> Co-committed-by: Ben Vincent <ben@unkin.net>
This commit was merged in pull request #91.
This commit is contained in:
@@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
|
||||
"git.unkin.net/unkin/artifactapi/pkg/models"
|
||||
)
|
||||
|
||||
@@ -109,6 +111,33 @@ func (db *DB) InsertAccessLog(ctx context.Context, remoteName, path string, cach
|
||||
return err
|
||||
}
|
||||
|
||||
// AccessLogEntry is one buffered access-log record.
|
||||
type AccessLogEntry struct {
|
||||
RemoteName string
|
||||
Path string
|
||||
CacheHit bool
|
||||
SizeBytes int64
|
||||
UpstreamMS int
|
||||
ClientIP string
|
||||
}
|
||||
|
||||
// InsertAccessLogBatch bulk-inserts access-log rows with a single COPY.
|
||||
func (db *DB) InsertAccessLogBatch(ctx context.Context, entries []AccessLogEntry) error {
|
||||
if len(entries) == 0 {
|
||||
return nil
|
||||
}
|
||||
rows := make([][]any, len(entries))
|
||||
for i, e := range entries {
|
||||
rows[i] = []any{e.RemoteName, e.Path, e.CacheHit, e.SizeBytes, e.UpstreamMS, e.ClientIP}
|
||||
}
|
||||
_, err := db.Pool.CopyFrom(ctx,
|
||||
pgx.Identifier{"access_log"},
|
||||
[]string{"remote_name", "path", "cache_hit", "size_bytes", "upstream_ms", "client_ip"},
|
||||
pgx.CopyFromRows(rows),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (db *DB) FindOrphanedBlobs(ctx context.Context) ([]models.Blob, error) {
|
||||
rows, err := db.Pool.Query(ctx, `
|
||||
SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at
|
||||
|
||||
+71
-17
@@ -20,19 +20,63 @@ import (
|
||||
|
||||
const fetchLockTTL = 30 * time.Second
|
||||
|
||||
const (
|
||||
accessLogBufferSize = 4096
|
||||
accessLogBatchSize = 128
|
||||
accessLogFlushEvery = 2 * time.Second
|
||||
)
|
||||
|
||||
type Engine struct {
|
||||
db *database.DB
|
||||
cache *cache.Redis
|
||||
store *storage.S3
|
||||
cas *storage.CAS
|
||||
db *database.DB
|
||||
cache *cache.Redis
|
||||
store *storage.S3
|
||||
cas *storage.CAS
|
||||
accessLog chan database.AccessLogEntry
|
||||
}
|
||||
|
||||
func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
|
||||
return &Engine{
|
||||
db: db,
|
||||
cache: c,
|
||||
store: s,
|
||||
cas: storage.NewCAS(s),
|
||||
e := &Engine{
|
||||
db: db,
|
||||
cache: c,
|
||||
store: s,
|
||||
cas: storage.NewCAS(s),
|
||||
accessLog: make(chan database.AccessLogEntry, accessLogBufferSize),
|
||||
}
|
||||
go e.runAccessLogWriter()
|
||||
return e
|
||||
}
|
||||
|
||||
// runAccessLogWriter drains the access-log channel and writes rows in batches,
|
||||
// replacing a goroutine-per-request insert. It runs for the process lifetime;
|
||||
// access logs are best-effort telemetry, so a small tail may be lost on abrupt
|
||||
// shutdown.
|
||||
func (e *Engine) runAccessLogWriter() {
|
||||
ticker := time.NewTicker(accessLogFlushEvery)
|
||||
defer ticker.Stop()
|
||||
|
||||
batch := make([]database.AccessLogEntry, 0, accessLogBatchSize)
|
||||
flush := func() {
|
||||
if len(batch) == 0 {
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
if err := e.db.InsertAccessLogBatch(ctx, batch); err != nil {
|
||||
slog.Warn("access log batch insert failed", "error", err, "count", len(batch))
|
||||
}
|
||||
cancel()
|
||||
batch = batch[:0]
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case entry := <-e.accessLog:
|
||||
batch = append(batch, entry)
|
||||
if len(batch) >= accessLogBatchSize {
|
||||
flush()
|
||||
}
|
||||
case <-ticker.C:
|
||||
flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,7 +106,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
||||
result, err := e.serveFromStore(ctx, remote, path)
|
||||
if err == nil {
|
||||
result.Source = "cache"
|
||||
go e.logAccess(remote.Name, path, true, result.Size, 0)
|
||||
e.logAccess(remote.Name, path, true, result.Size, 0)
|
||||
return result, nil
|
||||
}
|
||||
slog.Warn("cache hit but S3 miss, re-fetching", "remote", remote.Name, "path", path)
|
||||
@@ -78,7 +122,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
||||
result, err := e.serveFromStore(ctx, remote, path)
|
||||
if err == nil {
|
||||
result.Source = "cache"
|
||||
go e.logAccess(remote.Name, path, true, result.Size, 0)
|
||||
e.logAccess(remote.Name, path, true, result.Size, 0)
|
||||
return result, nil
|
||||
}
|
||||
}
|
||||
@@ -97,7 +141,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
||||
result, err := e.serveFromStore(ctx, remote, path)
|
||||
if err == nil {
|
||||
result.Source = "cache"
|
||||
go e.logAccess(remote.Name, path, true, result.Size, 0)
|
||||
e.logAccess(remote.Name, path, true, result.Size, 0)
|
||||
return result, nil
|
||||
}
|
||||
}
|
||||
@@ -119,14 +163,14 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
||||
if serr == nil {
|
||||
slog.Warn("serving stale on upstream error", "remote", remote.Name, "path", path, "error", err)
|
||||
stale.Source = "cache"
|
||||
go e.logAccess(remote.Name, path, true, stale.Size, 0)
|
||||
e.logAccess(remote.Name, path, true, stale.Size, 0)
|
||||
return stale, nil
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
|
||||
e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -403,10 +447,20 @@ func (e *Engine) ttlFor(remote models.Remote, class Classification) time.Duratio
|
||||
}
|
||||
}
|
||||
|
||||
// logAccess enqueues an access-log entry for the batch writer. It never blocks
|
||||
// the request path: if the buffer is full the entry is dropped.
|
||||
func (e *Engine) logAccess(remoteName, path string, cacheHit bool, size int64, upstreamMS int) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
_ = e.db.InsertAccessLog(ctx, remoteName, path, cacheHit, size, upstreamMS, "")
|
||||
select {
|
||||
case e.accessLog <- database.AccessLogEntry{
|
||||
RemoteName: remoteName,
|
||||
Path: path,
|
||||
CacheHit: cacheHit,
|
||||
SizeBytes: size,
|
||||
UpstreamMS: upstreamMS,
|
||||
}:
|
||||
default:
|
||||
slog.Warn("access log buffer full, dropping entry", "remote", remoteName, "path", path)
|
||||
}
|
||||
}
|
||||
|
||||
func bytesReader(data []byte) io.Reader {
|
||||
|
||||
Reference in New Issue
Block a user