Compare commits

..

1 Commits

Author SHA1 Message Date
unkinben 6d6cc4b78c feat: wire the circuit breaker into the proxy fetch path
ci/woodpecker/pr/test Pipeline was successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/pr/build Pipeline was successful
The circuit breaker was fully implemented and tested but never called, so
a failing upstream was hit on every request. Engine.Fetch now short-
circuits when the breaker is open (serving stale if available, else 503),
records a failure on each UpstreamError, and resets on success.

Refs #74
2026-07-02 00:38:36 +10:00
2 changed files with 28 additions and 29 deletions
+1 -21
View File
@@ -2,7 +2,6 @@ package proxy
import (
"regexp"
"sync"
"git.unkin.net/unkin/artifactapi/internal/provider"
"git.unkin.net/unkin/artifactapi/pkg/models"
@@ -61,29 +60,10 @@ func (c *Classifier) Classify(remote models.Remote, path string) Classification
return ClassImmutable
}
// patternCache memoises regex compilation. Classify runs on every proxied
// request and previously recompiled each remote's pattern lists every time;
// keying by the pattern string lets each distinct pattern compile once and
// then be reused, with no invalidation needed (the pattern text is the key).
// A pattern that fails to compile is cached as a typed nil so we don't retry.
var patternCache sync.Map // map[string]*regexp.Regexp
func compileCached(pattern string) *regexp.Regexp {
if v, ok := patternCache.Load(pattern); ok {
return v.(*regexp.Regexp)
}
re, err := regexp.Compile(pattern)
if err != nil {
re = nil
}
patternCache.Store(pattern, re)
return re
}
func compilePatterns(patterns []string) []*regexp.Regexp {
compiled := make([]*regexp.Regexp, 0, len(patterns))
for _, p := range patterns {
if re := compileCached(p); re != nil {
if re, err := regexp.Compile(p); err == nil {
compiled = append(compiled, re)
}
}
+27 -8
View File
@@ -22,18 +22,20 @@ import (
const fetchLockTTL = 30 * time.Second
type Engine struct {
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
circuit *CircuitBreaker
}
func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
return &Engine{
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
circuit: NewCircuitBreaker(c),
}
}
@@ -110,10 +112,26 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
fwdHeaders = clientHeaders[0]
}
// Short-circuit upstream calls when the remote's breaker is open: serve
// stale from the store if we have it, otherwise fail fast rather than
// hammering a known-bad upstream.
if e.circuit.IsOpen(ctx, remote.Name) {
if stale, serr := e.serveFromStore(ctx, remote, path); serr == nil {
slog.Warn("circuit open, serving stale", "remote", remote.Name, "path", path)
stale.Source = "cache"
go e.logAccess(remote.Name, path, true, stale.Size, 0)
return stale, nil
}
return nil, &ProxyError{Status: http.StatusServiceUnavailable, Message: "upstream circuit open"}
}
start := time.Now()
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders)
upstreamMS := int(time.Since(start).Milliseconds())
if err != nil {
if isNetworkError(err) {
e.circuit.RecordFailure(ctx, remote.Name)
}
if remote.StaleOnError && isNetworkError(err) {
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
stale, serr := e.serveFromStore(ctx, remote, path)
@@ -127,6 +145,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
return nil, err
}
e.circuit.RecordSuccess(ctx, remote.Name)
go e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
return result, nil
}