Compare commits

..

1 Commits

Author SHA1 Message Date
unkinben c47daca1f1 fix: add a grace period before GC deletes orphaned blobs
ci/woodpecker/pr/pre-commit Pipeline failed
ci/woodpecker/pr/test Pipeline failed
ci/woodpecker/pr/build Pipeline failed
FindOrphanedBlobs returned any unreferenced blob, so a concurrent dedup
upload (which inserts the blob row before its artifact/local_files row)
could have its S3 object deleted mid-flight. Restrict collection to blobs
older than a 1h grace period.

Refs #71
2026-07-02 22:26:19 +10:00
+1 -27
View File
@@ -2,8 +2,6 @@ package proxy
import ( import (
"context" "context"
"crypto/sha256"
"encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -33,7 +31,6 @@ type Engine struct {
cache *cache.Redis cache *cache.Redis
store *storage.S3 store *storage.S3
cas *storage.CAS cas *storage.CAS
circuit *CircuitBreaker
accessLog chan database.AccessLogEntry accessLog chan database.AccessLogEntry
} }
@@ -43,7 +40,6 @@ func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
cache: c, cache: c,
store: s, store: s,
cas: storage.NewCAS(s), cas: storage.NewCAS(s),
circuit: NewCircuitBreaker(c),
accessLog: make(chan database.AccessLogEntry, accessLogBufferSize), accessLog: make(chan database.AccessLogEntry, accessLogBufferSize),
} }
go e.runAccessLogWriter() go e.runAccessLogWriter()
@@ -158,26 +154,10 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
fwdHeaders = clientHeaders[0] fwdHeaders = clientHeaders[0]
} }
// Short-circuit upstream calls when the remote's breaker is open: serve
// stale from the store if we have it, otherwise fail fast rather than
// hammering a known-bad upstream.
if e.circuit.IsOpen(ctx, remote.Name) {
if stale, serr := e.serveFromStore(ctx, remote, path); serr == nil {
slog.Warn("circuit open, serving stale", "remote", remote.Name, "path", path)
stale.Source = "cache"
e.logAccess(remote.Name, path, true, stale.Size, 0)
return stale, nil
}
return nil, &ProxyError{Status: http.StatusServiceUnavailable, Message: "upstream circuit open"}
}
start := time.Now() start := time.Now()
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders) result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders)
upstreamMS := int(time.Since(start).Milliseconds()) upstreamMS := int(time.Since(start).Milliseconds())
if err != nil { if err != nil {
if isNetworkError(err) {
e.circuit.RecordFailure(ctx, remote.Name)
}
if remote.StaleOnError && isNetworkError(err) { if remote.StaleOnError && isNetworkError(err) {
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl) _ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
stale, serr := e.serveFromStore(ctx, remote, path) stale, serr := e.serveFromStore(ctx, remote, path)
@@ -191,7 +171,6 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
return nil, err return nil, err
} }
e.circuit.RecordSuccess(ctx, remote.Name)
e.logAccess(remote.Name, path, false, result.Size, upstreamMS) e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
return result, nil return result, nil
} }
@@ -254,7 +233,7 @@ func (e *Engine) headUpstream(ctx context.Context, remote models.Remote, path st
} }
if resp.StatusCode == http.StatusUnauthorized { if resp.StatusCode == http.StatusUnauthorized {
resp.Body.Close() resp.Body.Close()
token, _, terr := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote) token, terr := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
if terr == nil && token != "" { if terr == nil && token != "" {
resp, err = doHead(http.Header{"Authorization": []string{"Bearer " + token}}) resp, err = doHead(http.Header{"Authorization": []string{"Bearer " + token}})
if err != nil { if err != nil {
@@ -535,11 +514,6 @@ const (
bearerTokenTTLMargin = 10 * time.Second bearerTokenTTLMargin = 10 * time.Second
) )
func sha256Hash(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}
// cachedBearerToken returns a bearer token for the given challenge, reusing a // cachedBearerToken returns a bearer token for the given challenge, reusing a
// Redis-cached token for the same remote+challenge while it is still valid. // Redis-cached token for the same remote+challenge while it is still valid.
func (e *Engine) cachedBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) { func (e *Engine) cachedBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) {