Compare commits

...

3 Commits

Author SHA1 Message Date
unkinben 1b585af14e feat: wire the circuit breaker into the proxy fetch path (#90)
ci/woodpecker/tag/docker Pipeline was successful
Fixes #74

## Why
`internal/proxy/circuit.go` implemented and tested a circuit breaker, but nothing ever called it — a repeatedly-failing upstream was still hit on every request.

## Changes
- Construct a `CircuitBreaker` in `NewEngine`.
- In `Engine.Fetch`: short-circuit when the breaker is open (serve stale from the store if present, otherwise return 503), `RecordFailure` on each `UpstreamError`, and `RecordSuccess` on a successful fetch.

## Validation
- `go test ./internal/proxy/` and `make e2e` pass.

---------

Co-authored-by: BenVincent <benvin@main.unkin.net>
Reviewed-on: #90
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-07-02 22:43:22 +10:00
unkinben e7c9387bcc fix: GC has no grace period (TOCTOU with dedup uploads) (#86)
Fixes #71

## Why
`FindOrphanedBlobs` returned any blob not currently referenced. Because CAS dedups (the blob row can exist before its artifact/local_files row is written), a concurrent upload reusing an existing hash could have its S3 object deleted mid-flight by the GC.

## Changes
- `FindOrphanedBlobs` now takes a `minAge` and only returns blobs with `created_at < now()-minAge`.
- The collector passes a 1h `blobGracePeriod`.

## Validation
- `go test ./internal/gc/...` and `make e2e` pass.

---------

Co-authored-by: BenVincent <benvin@main.unkin.net>
Reviewed-on: #86
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-07-02 22:43:18 +10:00
unkinben 7e07eaa758 fix: repair master build after conflicting merges (#96)
## Why
`master` does not compile. Three PRs that each built individually combined badly:
- #92 changed `fetchBearerToken` to return `(string, time.Duration, error)` and added `cachedBearerToken` (which hashes the challenge via `sha256Hash`).
- #94 (streaming) removed the now-unused-in-that-PR `sha256Hash` helper and its `crypto/sha256` / `encoding/hex` imports.
- #89 (HEAD) added `headUpstream`, which calls `fetchBearerToken` expecting two return values.

Result on `master`: `internal/proxy/engine.go` fails to build (`assignment mismatch: 2 variables but fetchBearerToken returns 3 values`; `undefined: sha256Hash`).

## Changes
- Re-add the `sha256Hash` helper and its `crypto/sha256` + `encoding/hex` imports.
- Fix the `headUpstream` 401 path to handle `fetchBearerToken`s three return values.

## Validation
- `go build ./...`, `go vet`, and `make e2e` all pass.

Should merge before the other in-flight branches so they rebase onto a compiling `master`.

Reviewed-on: #96
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-07-02 22:36:09 +10:00
3 changed files with 42 additions and 5 deletions
+9 -3
View File
@@ -138,16 +138,22 @@ func (db *DB) InsertAccessLogBatch(ctx context.Context, entries []AccessLogEntry
return err return err
} }
func (db *DB) FindOrphanedBlobs(ctx context.Context) ([]models.Blob, error) { // FindOrphanedBlobs returns blobs no longer referenced by any artifact or
// local file, restricted to those created before now()-minAge. The age cutoff
// is a grace period that avoids a TOCTOU race with in-flight dedup uploads,
// which insert the blob row before the referencing artifact/local_files row.
func (db *DB) FindOrphanedBlobs(ctx context.Context, minAge time.Duration) ([]models.Blob, error) {
cutoff := time.Now().Add(-minAge)
rows, err := db.Pool.Query(ctx, ` rows, err := db.Pool.Query(ctx, `
SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at
FROM blobs b FROM blobs b
WHERE b.content_hash NOT IN ( WHERE b.created_at < $1
AND b.content_hash NOT IN (
SELECT content_hash FROM artifacts SELECT content_hash FROM artifacts
UNION UNION
SELECT content_hash FROM local_files SELECT content_hash FROM local_files
) )
`) `, cutoff)
if err != nil { if err != nil {
return nil, err return nil, err
} }
+6 -1
View File
@@ -9,6 +9,11 @@ import (
"git.unkin.net/unkin/artifactapi/internal/storage" "git.unkin.net/unkin/artifactapi/internal/storage"
) )
// blobGracePeriod is how old an orphaned blob must be before GC will delete
// it. This avoids racing in-flight dedup uploads that insert the blob row
// before the referencing artifact/local_files row exists.
const blobGracePeriod = 1 * time.Hour
type Collector struct { type Collector struct {
db *database.DB db *database.DB
store *storage.S3 store *storage.S3
@@ -38,7 +43,7 @@ func (c *Collector) Run(ctx context.Context) {
func (c *Collector) sweep(ctx context.Context) { func (c *Collector) sweep(ctx context.Context) {
start := time.Now() start := time.Now()
orphaned, err := c.db.FindOrphanedBlobs(ctx) orphaned, err := c.db.FindOrphanedBlobs(ctx, blobGracePeriod)
if err != nil { if err != nil {
slog.Error("gc: find orphaned blobs", "error", err) slog.Error("gc: find orphaned blobs", "error", err)
return return
+27 -1
View File
@@ -2,6 +2,8 @@ package proxy
import ( import (
"context" "context"
"crypto/sha256"
"encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -31,6 +33,7 @@ type Engine struct {
cache *cache.Redis cache *cache.Redis
store *storage.S3 store *storage.S3
cas *storage.CAS cas *storage.CAS
circuit *CircuitBreaker
accessLog chan database.AccessLogEntry accessLog chan database.AccessLogEntry
} }
@@ -40,6 +43,7 @@ func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
cache: c, cache: c,
store: s, store: s,
cas: storage.NewCAS(s), cas: storage.NewCAS(s),
circuit: NewCircuitBreaker(c),
accessLog: make(chan database.AccessLogEntry, accessLogBufferSize), accessLog: make(chan database.AccessLogEntry, accessLogBufferSize),
} }
go e.runAccessLogWriter() go e.runAccessLogWriter()
@@ -154,10 +158,26 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
fwdHeaders = clientHeaders[0] fwdHeaders = clientHeaders[0]
} }
// Short-circuit upstream calls when the remote's breaker is open: serve
// stale from the store if we have it, otherwise fail fast rather than
// hammering a known-bad upstream.
if e.circuit.IsOpen(ctx, remote.Name) {
if stale, serr := e.serveFromStore(ctx, remote, path); serr == nil {
slog.Warn("circuit open, serving stale", "remote", remote.Name, "path", path)
stale.Source = "cache"
e.logAccess(remote.Name, path, true, stale.Size, 0)
return stale, nil
}
return nil, &ProxyError{Status: http.StatusServiceUnavailable, Message: "upstream circuit open"}
}
start := time.Now() start := time.Now()
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders) result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders)
upstreamMS := int(time.Since(start).Milliseconds()) upstreamMS := int(time.Since(start).Milliseconds())
if err != nil { if err != nil {
if isNetworkError(err) {
e.circuit.RecordFailure(ctx, remote.Name)
}
if remote.StaleOnError && isNetworkError(err) { if remote.StaleOnError && isNetworkError(err) {
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl) _ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
stale, serr := e.serveFromStore(ctx, remote, path) stale, serr := e.serveFromStore(ctx, remote, path)
@@ -171,6 +191,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
return nil, err return nil, err
} }
e.circuit.RecordSuccess(ctx, remote.Name)
e.logAccess(remote.Name, path, false, result.Size, upstreamMS) e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
return result, nil return result, nil
} }
@@ -233,7 +254,7 @@ func (e *Engine) headUpstream(ctx context.Context, remote models.Remote, path st
} }
if resp.StatusCode == http.StatusUnauthorized { if resp.StatusCode == http.StatusUnauthorized {
resp.Body.Close() resp.Body.Close()
token, terr := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote) token, _, terr := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
if terr == nil && token != "" { if terr == nil && token != "" {
resp, err = doHead(http.Header{"Authorization": []string{"Bearer " + token}}) resp, err = doHead(http.Header{"Authorization": []string{"Bearer " + token}})
if err != nil { if err != nil {
@@ -514,6 +535,11 @@ const (
bearerTokenTTLMargin = 10 * time.Second bearerTokenTTLMargin = 10 * time.Second
) )
func sha256Hash(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}
// cachedBearerToken returns a bearer token for the given challenge, reusing a // cachedBearerToken returns a bearer token for the given challenge, reusing a
// Redis-cached token for the same remote+challenge while it is still valid. // Redis-cached token for the same remote+challenge while it is still valid.
func (e *Engine) cachedBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) { func (e *Engine) cachedBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) {