Compare commits
2 Commits
v3.6.4
...
6d6cc4b78c
| Author | SHA1 | Date | |
|---|---|---|---|
| 6d6cc4b78c | |||
| 8d9bc1c422 |
@@ -30,6 +30,15 @@ func (db *DB) GetOverviewStats(ctx context.Context) (*models.OverviewStats, erro
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = db.Pool.QueryRow(ctx, `
|
||||||
|
SELECT COALESCE(SUM(size_bytes), 0)
|
||||||
|
FROM access_log
|
||||||
|
WHERE cache_hit = TRUE AND created_at > NOW() - INTERVAL '30 days'
|
||||||
|
`).Scan(&stats.BandwidthSaved30d)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return &stats, nil
|
return &stats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -22,18 +22,20 @@ import (
|
|||||||
const fetchLockTTL = 30 * time.Second
|
const fetchLockTTL = 30 * time.Second
|
||||||
|
|
||||||
type Engine struct {
|
type Engine struct {
|
||||||
db *database.DB
|
db *database.DB
|
||||||
cache *cache.Redis
|
cache *cache.Redis
|
||||||
store *storage.S3
|
store *storage.S3
|
||||||
cas *storage.CAS
|
cas *storage.CAS
|
||||||
|
circuit *CircuitBreaker
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
|
func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
|
||||||
return &Engine{
|
return &Engine{
|
||||||
db: db,
|
db: db,
|
||||||
cache: c,
|
cache: c,
|
||||||
store: s,
|
store: s,
|
||||||
cas: storage.NewCAS(s),
|
cas: storage.NewCAS(s),
|
||||||
|
circuit: NewCircuitBreaker(c),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -110,10 +112,26 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
|||||||
fwdHeaders = clientHeaders[0]
|
fwdHeaders = clientHeaders[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Short-circuit upstream calls when the remote's breaker is open: serve
|
||||||
|
// stale from the store if we have it, otherwise fail fast rather than
|
||||||
|
// hammering a known-bad upstream.
|
||||||
|
if e.circuit.IsOpen(ctx, remote.Name) {
|
||||||
|
if stale, serr := e.serveFromStore(ctx, remote, path); serr == nil {
|
||||||
|
slog.Warn("circuit open, serving stale", "remote", remote.Name, "path", path)
|
||||||
|
stale.Source = "cache"
|
||||||
|
go e.logAccess(remote.Name, path, true, stale.Size, 0)
|
||||||
|
return stale, nil
|
||||||
|
}
|
||||||
|
return nil, &ProxyError{Status: http.StatusServiceUnavailable, Message: "upstream circuit open"}
|
||||||
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders)
|
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders)
|
||||||
upstreamMS := int(time.Since(start).Milliseconds())
|
upstreamMS := int(time.Since(start).Milliseconds())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if isNetworkError(err) {
|
||||||
|
e.circuit.RecordFailure(ctx, remote.Name)
|
||||||
|
}
|
||||||
if remote.StaleOnError && isNetworkError(err) {
|
if remote.StaleOnError && isNetworkError(err) {
|
||||||
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
|
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
|
||||||
stale, serr := e.serveFromStore(ctx, remote, path)
|
stale, serr := e.serveFromStore(ctx, remote, path)
|
||||||
@@ -127,6 +145,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.circuit.RecordSuccess(ctx, remote.Name)
|
||||||
go e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
|
go e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,6 +50,11 @@ export function Dashboard() {
|
|||||||
value={formatNumber(stats.total_blobs_deduped)}
|
value={formatNumber(stats.total_blobs_deduped)}
|
||||||
sub="shared blobs"
|
sub="shared blobs"
|
||||||
/>
|
/>
|
||||||
|
<StatsCard
|
||||||
|
label="Bandwidth Saved"
|
||||||
|
value={formatBytes(stats.bandwidth_saved_30d)}
|
||||||
|
sub="last 30 days"
|
||||||
|
/>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
{health && (
|
{health && (
|
||||||
|
|||||||
Reference in New Issue
Block a user