Compare commits

..

1 Commits

Author SHA1 Message Date
unkinben ad6dfbdc5b feat: wire the circuit breaker into the proxy fetch path
ci/woodpecker/pr/test Pipeline failed
ci/woodpecker/pr/pre-commit Pipeline failed
ci/woodpecker/pr/build Pipeline failed
The circuit breaker was fully implemented and tested but never called, so
a failing upstream was hit on every request. Engine.Fetch now short-
circuits when the breaker is open (serving stale if available, else 503),
records a failure on each UpstreamError, and resets on success.

Refs #74
2026-07-02 22:33:38 +10:00
3 changed files with 5 additions and 23 deletions
+3 -9
View File
@@ -138,22 +138,16 @@ func (db *DB) InsertAccessLogBatch(ctx context.Context, entries []AccessLogEntry
return err return err
} }
// FindOrphanedBlobs returns blobs no longer referenced by any artifact or func (db *DB) FindOrphanedBlobs(ctx context.Context) ([]models.Blob, error) {
// local file, restricted to those created before now()-minAge. The age cutoff
// is a grace period that avoids a TOCTOU race with in-flight dedup uploads,
// which insert the blob row before the referencing artifact/local_files row.
func (db *DB) FindOrphanedBlobs(ctx context.Context, minAge time.Duration) ([]models.Blob, error) {
cutoff := time.Now().Add(-minAge)
rows, err := db.Pool.Query(ctx, ` rows, err := db.Pool.Query(ctx, `
SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at
FROM blobs b FROM blobs b
WHERE b.created_at < $1 WHERE b.content_hash NOT IN (
AND b.content_hash NOT IN (
SELECT content_hash FROM artifacts SELECT content_hash FROM artifacts
UNION UNION
SELECT content_hash FROM local_files SELECT content_hash FROM local_files
) )
`, cutoff) `)
if err != nil { if err != nil {
return nil, err return nil, err
} }
+1 -6
View File
@@ -9,11 +9,6 @@ import (
"git.unkin.net/unkin/artifactapi/internal/storage" "git.unkin.net/unkin/artifactapi/internal/storage"
) )
// blobGracePeriod is how old an orphaned blob must be before GC will delete
// it. This avoids racing in-flight dedup uploads that insert the blob row
// before the referencing artifact/local_files row exists.
const blobGracePeriod = 1 * time.Hour
type Collector struct { type Collector struct {
db *database.DB db *database.DB
store *storage.S3 store *storage.S3
@@ -43,7 +38,7 @@ func (c *Collector) Run(ctx context.Context) {
func (c *Collector) sweep(ctx context.Context) { func (c *Collector) sweep(ctx context.Context) {
start := time.Now() start := time.Now()
orphaned, err := c.db.FindOrphanedBlobs(ctx, blobGracePeriod) orphaned, err := c.db.FindOrphanedBlobs(ctx)
if err != nil { if err != nil {
slog.Error("gc: find orphaned blobs", "error", err) slog.Error("gc: find orphaned blobs", "error", err)
return return
+1 -8
View File
@@ -2,8 +2,6 @@ package proxy
import ( import (
"context" "context"
"crypto/sha256"
"encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -254,7 +252,7 @@ func (e *Engine) headUpstream(ctx context.Context, remote models.Remote, path st
} }
if resp.StatusCode == http.StatusUnauthorized { if resp.StatusCode == http.StatusUnauthorized {
resp.Body.Close() resp.Body.Close()
token, _, terr := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote) token, terr := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
if terr == nil && token != "" { if terr == nil && token != "" {
resp, err = doHead(http.Header{"Authorization": []string{"Bearer " + token}}) resp, err = doHead(http.Header{"Authorization": []string{"Bearer " + token}})
if err != nil { if err != nil {
@@ -535,11 +533,6 @@ const (
bearerTokenTTLMargin = 10 * time.Second bearerTokenTTLMargin = 10 * time.Second
) )
func sha256Hash(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}
// cachedBearerToken returns a bearer token for the given challenge, reusing a // cachedBearerToken returns a bearer token for the given challenge, reusing a
// Redis-cached token for the same remote+challenge while it is still valid. // Redis-cached token for the same remote+challenge while it is still valid.
func (e *Engine) cachedBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) { func (e *Engine) cachedBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) {