feat: wire the circuit breaker into the proxy fetch path (#90)
Fixes #74 ## Why `internal/proxy/circuit.go` implemented and tested a circuit breaker, but nothing ever called it — a repeatedly-failing upstream was still hit on every request. ## Changes - Construct a `CircuitBreaker` in `NewEngine`. - In `Engine.Fetch`: short-circuit when the breaker is open (serve stale from the store if present, otherwise return 503), `RecordFailure` on each `UpstreamError`, and `RecordSuccess` on a successful fetch. ## Validation - `go test ./internal/proxy/` and `make e2e` pass. --------- Co-authored-by: BenVincent <benvin@main.unkin.net> Reviewed-on: #90 Co-authored-by: Ben Vincent <ben@unkin.net> Co-committed-by: Ben Vincent <ben@unkin.net>
This commit was merged in pull request #90.
This commit is contained in:
@@ -33,6 +33,7 @@ type Engine struct {
|
||||
cache *cache.Redis
|
||||
store *storage.S3
|
||||
cas *storage.CAS
|
||||
circuit *CircuitBreaker
|
||||
accessLog chan database.AccessLogEntry
|
||||
}
|
||||
|
||||
@@ -42,6 +43,7 @@ func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
|
||||
cache: c,
|
||||
store: s,
|
||||
cas: storage.NewCAS(s),
|
||||
circuit: NewCircuitBreaker(c),
|
||||
accessLog: make(chan database.AccessLogEntry, accessLogBufferSize),
|
||||
}
|
||||
go e.runAccessLogWriter()
|
||||
@@ -156,10 +158,26 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
||||
fwdHeaders = clientHeaders[0]
|
||||
}
|
||||
|
||||
// Short-circuit upstream calls when the remote's breaker is open: serve
|
||||
// stale from the store if we have it, otherwise fail fast rather than
|
||||
// hammering a known-bad upstream.
|
||||
if e.circuit.IsOpen(ctx, remote.Name) {
|
||||
if stale, serr := e.serveFromStore(ctx, remote, path); serr == nil {
|
||||
slog.Warn("circuit open, serving stale", "remote", remote.Name, "path", path)
|
||||
stale.Source = "cache"
|
||||
e.logAccess(remote.Name, path, true, stale.Size, 0)
|
||||
return stale, nil
|
||||
}
|
||||
return nil, &ProxyError{Status: http.StatusServiceUnavailable, Message: "upstream circuit open"}
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders)
|
||||
upstreamMS := int(time.Since(start).Milliseconds())
|
||||
if err != nil {
|
||||
if isNetworkError(err) {
|
||||
e.circuit.RecordFailure(ctx, remote.Name)
|
||||
}
|
||||
if remote.StaleOnError && isNetworkError(err) {
|
||||
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
|
||||
stale, serr := e.serveFromStore(ctx, remote, path)
|
||||
@@ -173,6 +191,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
||||
return nil, err
|
||||
}
|
||||
|
||||
e.circuit.RecordSuccess(ctx, remote.Name)
|
||||
e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user