From 1b585af14e5c8ec54add62da410cab3b10081874 Mon Sep 17 00:00:00 2001 From: Ben Vincent Date: Thu, 2 Jul 2026 22:43:22 +1000 Subject: [PATCH] feat: wire the circuit breaker into the proxy fetch path (#90) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #74 ## Why `internal/proxy/circuit.go` implemented and tested a circuit breaker, but nothing ever called it — a repeatedly-failing upstream was still hit on every request. ## Changes - Construct a `CircuitBreaker` in `NewEngine`. - In `Engine.Fetch`: short-circuit when the breaker is open (serve stale from the store if present, otherwise return 503), `RecordFailure` on each `UpstreamError`, and `RecordSuccess` on a successful fetch. ## Validation - `go test ./internal/proxy/` and `make e2e` pass. --------- Co-authored-by: BenVincent Reviewed-on: https://git.unkin.net/unkin/artifactapi/pulls/90 Co-authored-by: Ben Vincent Co-committed-by: Ben Vincent --- internal/proxy/engine.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/internal/proxy/engine.go b/internal/proxy/engine.go index 7997a96..662d04d 100644 --- a/internal/proxy/engine.go +++ b/internal/proxy/engine.go @@ -33,6 +33,7 @@ type Engine struct { cache *cache.Redis store *storage.S3 cas *storage.CAS + circuit *CircuitBreaker accessLog chan database.AccessLogEntry } @@ -42,6 +43,7 @@ func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine { cache: c, store: s, cas: storage.NewCAS(s), + circuit: NewCircuitBreaker(c), accessLog: make(chan database.AccessLogEntry, accessLogBufferSize), } go e.runAccessLogWriter() @@ -156,10 +158,26 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p fwdHeaders = clientHeaders[0] } + // Short-circuit upstream calls when the remote's breaker is open: serve + // stale from the store if we have it, otherwise fail fast rather than + // hammering a known-bad upstream. + if e.circuit.IsOpen(ctx, remote.Name) { + if stale, serr := e.serveFromStore(ctx, remote, path); serr == nil { + slog.Warn("circuit open, serving stale", "remote", remote.Name, "path", path) + stale.Source = "cache" + e.logAccess(remote.Name, path, true, stale.Size, 0) + return stale, nil + } + return nil, &ProxyError{Status: http.StatusServiceUnavailable, Message: "upstream circuit open"} + } + start := time.Now() result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders) upstreamMS := int(time.Since(start).Milliseconds()) if err != nil { + if isNetworkError(err) { + e.circuit.RecordFailure(ctx, remote.Name) + } if remote.StaleOnError && isNetworkError(err) { _ = e.cache.SetTTL(ctx, remote.Name, path, ttl) stale, serr := e.serveFromStore(ctx, remote, path) @@ -173,6 +191,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p return nil, err } + e.circuit.RecordSuccess(ctx, remote.Name) e.logAccess(remote.Name, path, false, result.Size, upstreamMS) return result, nil }