perf: batch access-log writes instead of goroutine+insert per request #91

Merged
benvin merged 1 commits from benvin/batch-access-log into master 2026-07-02 22:07:56 +10:00
2 changed files with 100 additions and 17 deletions
+29
View File
@@ -4,6 +4,8 @@ import (
"context"
"time"
"github.com/jackc/pgx/v5"
"git.unkin.net/unkin/artifactapi/pkg/models"
)
@@ -109,6 +111,33 @@ func (db *DB) InsertAccessLog(ctx context.Context, remoteName, path string, cach
return err
}
// AccessLogEntry is one buffered access-log record.
type AccessLogEntry struct {
RemoteName string
Path string
CacheHit bool
SizeBytes int64
UpstreamMS int
ClientIP string
}
// InsertAccessLogBatch bulk-inserts access-log rows with a single COPY.
func (db *DB) InsertAccessLogBatch(ctx context.Context, entries []AccessLogEntry) error {
if len(entries) == 0 {
return nil
}
rows := make([][]any, len(entries))
for i, e := range entries {
rows[i] = []any{e.RemoteName, e.Path, e.CacheHit, e.SizeBytes, e.UpstreamMS, e.ClientIP}
}
_, err := db.Pool.CopyFrom(ctx,
pgx.Identifier{"access_log"},
[]string{"remote_name", "path", "cache_hit", "size_bytes", "upstream_ms", "client_ip"},
pgx.CopyFromRows(rows),
)
return err
}
func (db *DB) FindOrphanedBlobs(ctx context.Context) ([]models.Blob, error) {
rows, err := db.Pool.Query(ctx, `
SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at
+71 -17
View File
@@ -21,19 +21,63 @@ import (
const fetchLockTTL = 30 * time.Second
const (
accessLogBufferSize = 4096
accessLogBatchSize = 128
accessLogFlushEvery = 2 * time.Second
)
type Engine struct {
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
accessLog chan database.AccessLogEntry
}
func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
return &Engine{
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
e := &Engine{
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
accessLog: make(chan database.AccessLogEntry, accessLogBufferSize),
}
go e.runAccessLogWriter()
return e
}
// runAccessLogWriter drains the access-log channel and writes rows in batches,
// replacing a goroutine-per-request insert. It runs for the process lifetime;
// access logs are best-effort telemetry, so a small tail may be lost on abrupt
// shutdown.
func (e *Engine) runAccessLogWriter() {
ticker := time.NewTicker(accessLogFlushEvery)
defer ticker.Stop()
batch := make([]database.AccessLogEntry, 0, accessLogBatchSize)
flush := func() {
if len(batch) == 0 {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := e.db.InsertAccessLogBatch(ctx, batch); err != nil {
slog.Warn("access log batch insert failed", "error", err, "count", len(batch))
}
cancel()
batch = batch[:0]
}
for {
select {
case entry := <-e.accessLog:
batch = append(batch, entry)
if len(batch) >= accessLogBatchSize {
flush()
}
case <-ticker.C:
flush()
}
}
}
@@ -63,7 +107,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
go e.logAccess(remote.Name, path, true, result.Size, 0)
e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
slog.Warn("cache hit but S3 miss, re-fetching", "remote", remote.Name, "path", path)
@@ -79,7 +123,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
go e.logAccess(remote.Name, path, true, result.Size, 0)
e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
}
@@ -98,7 +142,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
go e.logAccess(remote.Name, path, true, result.Size, 0)
e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
}
@@ -120,14 +164,14 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
if serr == nil {
slog.Warn("serving stale on upstream error", "remote", remote.Name, "path", path, "error", err)
stale.Source = "cache"
go e.logAccess(remote.Name, path, true, stale.Size, 0)
e.logAccess(remote.Name, path, true, stale.Size, 0)
return stale, nil
}
}
return nil, err
}
go e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
return result, nil
}
@@ -323,10 +367,20 @@ func (e *Engine) ttlFor(remote models.Remote, class Classification) time.Duratio
}
}
// logAccess enqueues an access-log entry for the batch writer. It never blocks
// the request path: if the buffer is full the entry is dropped.
func (e *Engine) logAccess(remoteName, path string, cacheHit bool, size int64, upstreamMS int) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = e.db.InsertAccessLog(ctx, remoteName, path, cacheHit, size, upstreamMS, "")
select {
case e.accessLog <- database.AccessLogEntry{
RemoteName: remoteName,
Path: path,
CacheHit: cacheHit,
SizeBytes: size,
UpstreamMS: upstreamMS,
}:
default:
slog.Warn("access log buffer full, dropping entry", "remote", remoteName, "path", path)
}
}
func sha256Hash(data []byte) string {