From 6d6cc4b78cd6e682be1b09279e6c47cfb280c739 Mon Sep 17 00:00:00 2001 From: Ben Vincent Date: Thu, 2 Jul 2026 00:38:36 +1000 Subject: [PATCH] feat: wire the circuit breaker into the proxy fetch path The circuit breaker was fully implemented and tested but never called, so a failing upstream was hit on every request. Engine.Fetch now short- circuits when the breaker is open (serving stale if available, else 503), records a failure on each UpstreamError, and resets on success. Refs #74 --- internal/proxy/engine.go | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/internal/proxy/engine.go b/internal/proxy/engine.go index ba63e78..90903e5 100644 --- a/internal/proxy/engine.go +++ b/internal/proxy/engine.go @@ -22,18 +22,20 @@ import ( const fetchLockTTL = 30 * time.Second type Engine struct { - db *database.DB - cache *cache.Redis - store *storage.S3 - cas *storage.CAS + db *database.DB + cache *cache.Redis + store *storage.S3 + cas *storage.CAS + circuit *CircuitBreaker } func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine { return &Engine{ - db: db, - cache: c, - store: s, - cas: storage.NewCAS(s), + db: db, + cache: c, + store: s, + cas: storage.NewCAS(s), + circuit: NewCircuitBreaker(c), } } @@ -110,10 +112,26 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p fwdHeaders = clientHeaders[0] } + // Short-circuit upstream calls when the remote's breaker is open: serve + // stale from the store if we have it, otherwise fail fast rather than + // hammering a known-bad upstream. + if e.circuit.IsOpen(ctx, remote.Name) { + if stale, serr := e.serveFromStore(ctx, remote, path); serr == nil { + slog.Warn("circuit open, serving stale", "remote", remote.Name, "path", path) + stale.Source = "cache" + go e.logAccess(remote.Name, path, true, stale.Size, 0) + return stale, nil + } + return nil, &ProxyError{Status: http.StatusServiceUnavailable, Message: "upstream circuit open"} + } + start := time.Now() result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders) upstreamMS := int(time.Since(start).Milliseconds()) if err != nil { + if isNetworkError(err) { + e.circuit.RecordFailure(ctx, remote.Name) + } if remote.StaleOnError && isNetworkError(err) { _ = e.cache.SetTTL(ctx, remote.Name, path, ttl) stale, serr := e.serveFromStore(ctx, remote, path) @@ -127,6 +145,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p return nil, err } + e.circuit.RecordSuccess(ctx, remote.Name) go e.logAccess(remote.Name, path, false, result.Size, upstreamMS) return result, nil }