perf: batch access-log writes instead of goroutine+insert per request #91
@@ -4,6 +4,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
|
||||||
"git.unkin.net/unkin/artifactapi/pkg/models"
|
"git.unkin.net/unkin/artifactapi/pkg/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -109,6 +111,33 @@ func (db *DB) InsertAccessLog(ctx context.Context, remoteName, path string, cach
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AccessLogEntry is one buffered access-log record.
|
||||||
|
type AccessLogEntry struct {
|
||||||
|
RemoteName string
|
||||||
|
Path string
|
||||||
|
CacheHit bool
|
||||||
|
SizeBytes int64
|
||||||
|
UpstreamMS int
|
||||||
|
ClientIP string
|
||||||
|
}
|
||||||
|
|
||||||
|
// InsertAccessLogBatch bulk-inserts access-log rows with a single COPY.
|
||||||
|
func (db *DB) InsertAccessLogBatch(ctx context.Context, entries []AccessLogEntry) error {
|
||||||
|
if len(entries) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
rows := make([][]any, len(entries))
|
||||||
|
for i, e := range entries {
|
||||||
|
rows[i] = []any{e.RemoteName, e.Path, e.CacheHit, e.SizeBytes, e.UpstreamMS, e.ClientIP}
|
||||||
|
}
|
||||||
|
_, err := db.Pool.CopyFrom(ctx,
|
||||||
|
pgx.Identifier{"access_log"},
|
||||||
|
[]string{"remote_name", "path", "cache_hit", "size_bytes", "upstream_ms", "client_ip"},
|
||||||
|
pgx.CopyFromRows(rows),
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (db *DB) FindOrphanedBlobs(ctx context.Context) ([]models.Blob, error) {
|
func (db *DB) FindOrphanedBlobs(ctx context.Context) ([]models.Blob, error) {
|
||||||
rows, err := db.Pool.Query(ctx, `
|
rows, err := db.Pool.Query(ctx, `
|
||||||
SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at
|
SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at
|
||||||
|
|||||||
@@ -21,19 +21,63 @@ import (
|
|||||||
|
|
||||||
const fetchLockTTL = 30 * time.Second
|
const fetchLockTTL = 30 * time.Second
|
||||||
|
|
||||||
|
const (
|
||||||
|
accessLogBufferSize = 4096
|
||||||
|
accessLogBatchSize = 128
|
||||||
|
accessLogFlushEvery = 2 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
type Engine struct {
|
type Engine struct {
|
||||||
db *database.DB
|
db *database.DB
|
||||||
cache *cache.Redis
|
cache *cache.Redis
|
||||||
store *storage.S3
|
store *storage.S3
|
||||||
cas *storage.CAS
|
cas *storage.CAS
|
||||||
|
accessLog chan database.AccessLogEntry
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
|
func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
|
||||||
return &Engine{
|
e := &Engine{
|
||||||
db: db,
|
db: db,
|
||||||
cache: c,
|
cache: c,
|
||||||
store: s,
|
store: s,
|
||||||
cas: storage.NewCAS(s),
|
cas: storage.NewCAS(s),
|
||||||
|
accessLog: make(chan database.AccessLogEntry, accessLogBufferSize),
|
||||||
|
}
|
||||||
|
go e.runAccessLogWriter()
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
||||||
|
// runAccessLogWriter drains the access-log channel and writes rows in batches,
|
||||||
|
// replacing a goroutine-per-request insert. It runs for the process lifetime;
|
||||||
|
// access logs are best-effort telemetry, so a small tail may be lost on abrupt
|
||||||
|
// shutdown.
|
||||||
|
func (e *Engine) runAccessLogWriter() {
|
||||||
|
ticker := time.NewTicker(accessLogFlushEvery)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
batch := make([]database.AccessLogEntry, 0, accessLogBatchSize)
|
||||||
|
flush := func() {
|
||||||
|
if len(batch) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
if err := e.db.InsertAccessLogBatch(ctx, batch); err != nil {
|
||||||
|
slog.Warn("access log batch insert failed", "error", err, "count", len(batch))
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
batch = batch[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case entry := <-e.accessLog:
|
||||||
|
batch = append(batch, entry)
|
||||||
|
if len(batch) >= accessLogBatchSize {
|
||||||
|
flush()
|
||||||
|
}
|
||||||
|
case <-ticker.C:
|
||||||
|
flush()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -63,7 +107,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
|||||||
result, err := e.serveFromStore(ctx, remote, path)
|
result, err := e.serveFromStore(ctx, remote, path)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
result.Source = "cache"
|
result.Source = "cache"
|
||||||
go e.logAccess(remote.Name, path, true, result.Size, 0)
|
e.logAccess(remote.Name, path, true, result.Size, 0)
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
slog.Warn("cache hit but S3 miss, re-fetching", "remote", remote.Name, "path", path)
|
slog.Warn("cache hit but S3 miss, re-fetching", "remote", remote.Name, "path", path)
|
||||||
@@ -79,7 +123,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
|||||||
result, err := e.serveFromStore(ctx, remote, path)
|
result, err := e.serveFromStore(ctx, remote, path)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
result.Source = "cache"
|
result.Source = "cache"
|
||||||
go e.logAccess(remote.Name, path, true, result.Size, 0)
|
e.logAccess(remote.Name, path, true, result.Size, 0)
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -98,7 +142,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
|||||||
result, err := e.serveFromStore(ctx, remote, path)
|
result, err := e.serveFromStore(ctx, remote, path)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
result.Source = "cache"
|
result.Source = "cache"
|
||||||
go e.logAccess(remote.Name, path, true, result.Size, 0)
|
e.logAccess(remote.Name, path, true, result.Size, 0)
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -120,14 +164,14 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
|||||||
if serr == nil {
|
if serr == nil {
|
||||||
slog.Warn("serving stale on upstream error", "remote", remote.Name, "path", path, "error", err)
|
slog.Warn("serving stale on upstream error", "remote", remote.Name, "path", path, "error", err)
|
||||||
stale.Source = "cache"
|
stale.Source = "cache"
|
||||||
go e.logAccess(remote.Name, path, true, stale.Size, 0)
|
e.logAccess(remote.Name, path, true, stale.Size, 0)
|
||||||
return stale, nil
|
return stale, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
go e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
|
e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -323,10 +367,20 @@ func (e *Engine) ttlFor(remote models.Remote, class Classification) time.Duratio
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// logAccess enqueues an access-log entry for the batch writer. It never blocks
|
||||||
|
// the request path: if the buffer is full the entry is dropped.
|
||||||
func (e *Engine) logAccess(remoteName, path string, cacheHit bool, size int64, upstreamMS int) {
|
func (e *Engine) logAccess(remoteName, path string, cacheHit bool, size int64, upstreamMS int) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
select {
|
||||||
defer cancel()
|
case e.accessLog <- database.AccessLogEntry{
|
||||||
_ = e.db.InsertAccessLog(ctx, remoteName, path, cacheHit, size, upstreamMS, "")
|
RemoteName: remoteName,
|
||||||
|
Path: path,
|
||||||
|
CacheHit: cacheHit,
|
||||||
|
SizeBytes: size,
|
||||||
|
UpstreamMS: upstreamMS,
|
||||||
|
}:
|
||||||
|
default:
|
||||||
|
slog.Warn("access log buffer full, dropping entry", "remote", remoteName, "path", path)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func sha256Hash(data []byte) string {
|
func sha256Hash(data []byte) string {
|
||||||
|
|||||||
Reference in New Issue
Block a user