Compare commits
3 Commits
ad6dfbdc5b
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 1b585af14e | |||
| e7c9387bcc | |||
| 7e07eaa758 |
@@ -138,16 +138,22 @@ func (db *DB) InsertAccessLogBatch(ctx context.Context, entries []AccessLogEntry
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) FindOrphanedBlobs(ctx context.Context) ([]models.Blob, error) {
|
// FindOrphanedBlobs returns blobs no longer referenced by any artifact or
|
||||||
|
// local file, restricted to those created before now()-minAge. The age cutoff
|
||||||
|
// is a grace period that avoids a TOCTOU race with in-flight dedup uploads,
|
||||||
|
// which insert the blob row before the referencing artifact/local_files row.
|
||||||
|
func (db *DB) FindOrphanedBlobs(ctx context.Context, minAge time.Duration) ([]models.Blob, error) {
|
||||||
|
cutoff := time.Now().Add(-minAge)
|
||||||
rows, err := db.Pool.Query(ctx, `
|
rows, err := db.Pool.Query(ctx, `
|
||||||
SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at
|
SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at
|
||||||
FROM blobs b
|
FROM blobs b
|
||||||
WHERE b.content_hash NOT IN (
|
WHERE b.created_at < $1
|
||||||
|
AND b.content_hash NOT IN (
|
||||||
SELECT content_hash FROM artifacts
|
SELECT content_hash FROM artifacts
|
||||||
UNION
|
UNION
|
||||||
SELECT content_hash FROM local_files
|
SELECT content_hash FROM local_files
|
||||||
)
|
)
|
||||||
`)
|
`, cutoff)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
+6
-1
@@ -9,6 +9,11 @@ import (
|
|||||||
"git.unkin.net/unkin/artifactapi/internal/storage"
|
"git.unkin.net/unkin/artifactapi/internal/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// blobGracePeriod is how old an orphaned blob must be before GC will delete
|
||||||
|
// it. This avoids racing in-flight dedup uploads that insert the blob row
|
||||||
|
// before the referencing artifact/local_files row exists.
|
||||||
|
const blobGracePeriod = 1 * time.Hour
|
||||||
|
|
||||||
type Collector struct {
|
type Collector struct {
|
||||||
db *database.DB
|
db *database.DB
|
||||||
store *storage.S3
|
store *storage.S3
|
||||||
@@ -38,7 +43,7 @@ func (c *Collector) Run(ctx context.Context) {
|
|||||||
func (c *Collector) sweep(ctx context.Context) {
|
func (c *Collector) sweep(ctx context.Context) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
orphaned, err := c.db.FindOrphanedBlobs(ctx)
|
orphaned, err := c.db.FindOrphanedBlobs(ctx, blobGracePeriod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("gc: find orphaned blobs", "error", err)
|
slog.Error("gc: find orphaned blobs", "error", err)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ package proxy
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -31,6 +33,7 @@ type Engine struct {
|
|||||||
cache *cache.Redis
|
cache *cache.Redis
|
||||||
store *storage.S3
|
store *storage.S3
|
||||||
cas *storage.CAS
|
cas *storage.CAS
|
||||||
|
circuit *CircuitBreaker
|
||||||
accessLog chan database.AccessLogEntry
|
accessLog chan database.AccessLogEntry
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -40,6 +43,7 @@ func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
|
|||||||
cache: c,
|
cache: c,
|
||||||
store: s,
|
store: s,
|
||||||
cas: storage.NewCAS(s),
|
cas: storage.NewCAS(s),
|
||||||
|
circuit: NewCircuitBreaker(c),
|
||||||
accessLog: make(chan database.AccessLogEntry, accessLogBufferSize),
|
accessLog: make(chan database.AccessLogEntry, accessLogBufferSize),
|
||||||
}
|
}
|
||||||
go e.runAccessLogWriter()
|
go e.runAccessLogWriter()
|
||||||
@@ -154,10 +158,26 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
|||||||
fwdHeaders = clientHeaders[0]
|
fwdHeaders = clientHeaders[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Short-circuit upstream calls when the remote's breaker is open: serve
|
||||||
|
// stale from the store if we have it, otherwise fail fast rather than
|
||||||
|
// hammering a known-bad upstream.
|
||||||
|
if e.circuit.IsOpen(ctx, remote.Name) {
|
||||||
|
if stale, serr := e.serveFromStore(ctx, remote, path); serr == nil {
|
||||||
|
slog.Warn("circuit open, serving stale", "remote", remote.Name, "path", path)
|
||||||
|
stale.Source = "cache"
|
||||||
|
e.logAccess(remote.Name, path, true, stale.Size, 0)
|
||||||
|
return stale, nil
|
||||||
|
}
|
||||||
|
return nil, &ProxyError{Status: http.StatusServiceUnavailable, Message: "upstream circuit open"}
|
||||||
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders)
|
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders)
|
||||||
upstreamMS := int(time.Since(start).Milliseconds())
|
upstreamMS := int(time.Since(start).Milliseconds())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if isNetworkError(err) {
|
||||||
|
e.circuit.RecordFailure(ctx, remote.Name)
|
||||||
|
}
|
||||||
if remote.StaleOnError && isNetworkError(err) {
|
if remote.StaleOnError && isNetworkError(err) {
|
||||||
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
|
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
|
||||||
stale, serr := e.serveFromStore(ctx, remote, path)
|
stale, serr := e.serveFromStore(ctx, remote, path)
|
||||||
@@ -171,6 +191,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.circuit.RecordSuccess(ctx, remote.Name)
|
||||||
e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
|
e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
@@ -233,7 +254,7 @@ func (e *Engine) headUpstream(ctx context.Context, remote models.Remote, path st
|
|||||||
}
|
}
|
||||||
if resp.StatusCode == http.StatusUnauthorized {
|
if resp.StatusCode == http.StatusUnauthorized {
|
||||||
resp.Body.Close()
|
resp.Body.Close()
|
||||||
token, terr := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
|
token, _, terr := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
|
||||||
if terr == nil && token != "" {
|
if terr == nil && token != "" {
|
||||||
resp, err = doHead(http.Header{"Authorization": []string{"Bearer " + token}})
|
resp, err = doHead(http.Header{"Authorization": []string{"Bearer " + token}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -514,6 +535,11 @@ const (
|
|||||||
bearerTokenTTLMargin = 10 * time.Second
|
bearerTokenTTLMargin = 10 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func sha256Hash(data []byte) string {
|
||||||
|
h := sha256.Sum256(data)
|
||||||
|
return hex.EncodeToString(h[:])
|
||||||
|
}
|
||||||
|
|
||||||
// cachedBearerToken returns a bearer token for the given challenge, reusing a
|
// cachedBearerToken returns a bearer token for the given challenge, reusing a
|
||||||
// Redis-cached token for the same remote+challenge while it is still valid.
|
// Redis-cached token for the same remote+challenge while it is still valid.
|
||||||
func (e *Engine) cachedBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) {
|
func (e *Engine) cachedBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user