Compare commits

..

3 Commits

Author SHA1 Message Date
unkinben 6d6cc4b78c feat: wire the circuit breaker into the proxy fetch path
ci/woodpecker/pr/test Pipeline was successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/pr/build Pipeline was successful
The circuit breaker was fully implemented and tested but never called, so
a failing upstream was hit on every request. Engine.Fetch now short-
circuits when the breaker is open (serving stale if available, else 503),
records a failure on each UpstreamError, and resets on success.

Refs #74
2026-07-02 00:38:36 +10:00
unkinben 8d9bc1c422 feat: add bandwidth saved stat to dashboard (#65)
ci/woodpecker/tag/docker Pipeline was successful
Shows total bytes served from cache (instead of upstream) over the last 30 days. Queries `SUM(size_bytes) WHERE cache_hit = TRUE` from access_log.

Reviewed-on: #65
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-06-27 22:18:02 +10:00
unkinben 30b7cef026 fix: strip base URL path prefix from helm chart download URLs (#64)
ci/woodpecker/tag/docker Pipeline was successful
When a helm repo base URL includes a path component (e.g. \`stakater.github.io/stakater-charts\`), the merger was extracting the full URL path (\`stakater-charts/reloader-2.2.8.tgz\`) and the proxy then constructed \`base_url/stakater-charts/reloader-2.2.8.tgz\` = double path = 404.

Fix: \`extractPathRelativeToBase()\` strips the shared base path prefix so only the filename portion is used as the proxy path.
Reviewed-on: #64
Co-authored-by: Ben Vincent <ben@unkin.net>
Co-committed-by: Ben Vincent <ben@unkin.net>
2026-06-27 08:02:52 +10:00
4 changed files with 55 additions and 9 deletions
+9
View File
@@ -30,6 +30,15 @@ func (db *DB) GetOverviewStats(ctx context.Context) (*models.OverviewStats, erro
return nil, err
}
err = db.Pool.QueryRow(ctx, `
SELECT COALESCE(SUM(size_bytes), 0)
FROM access_log
WHERE cache_hit = TRUE AND created_at > NOW() - INTERVAL '30 days'
`).Scan(&stats.BandwidthSaved30d)
if err != nil {
return nil, err
}
return &stats, nil
}
+27 -8
View File
@@ -22,18 +22,20 @@ import (
const fetchLockTTL = 30 * time.Second
type Engine struct {
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
circuit *CircuitBreaker
}
func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
return &Engine{
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
circuit: NewCircuitBreaker(c),
}
}
@@ -110,10 +112,26 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
fwdHeaders = clientHeaders[0]
}
// Short-circuit upstream calls when the remote's breaker is open: serve
// stale from the store if we have it, otherwise fail fast rather than
// hammering a known-bad upstream.
if e.circuit.IsOpen(ctx, remote.Name) {
if stale, serr := e.serveFromStore(ctx, remote, path); serr == nil {
slog.Warn("circuit open, serving stale", "remote", remote.Name, "path", path)
stale.Source = "cache"
go e.logAccess(remote.Name, path, true, stale.Size, 0)
return stale, nil
}
return nil, &ProxyError{Status: http.StatusServiceUnavailable, Message: "upstream circuit open"}
}
start := time.Now()
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders)
upstreamMS := int(time.Since(start).Milliseconds())
if err != nil {
if isNetworkError(err) {
e.circuit.RecordFailure(ctx, remote.Name)
}
if remote.StaleOnError && isNetworkError(err) {
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
stale, serr := e.serveFromStore(ctx, remote, path)
@@ -127,6 +145,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
return nil, err
}
e.circuit.RecordSuccess(ctx, remote.Name)
go e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
return result, nil
}
+14 -1
View File
@@ -65,11 +65,12 @@ func (m *HelmMerger) MergeIndexes(members []MemberIndex, proxyBaseURL string) ([
if baseHost != "" && extractHost(u) != baseHost {
continue
}
relPath := extractPathRelativeToBase(u, member.BaseURL)
ver.URLs[i] = fmt.Sprintf("%s/api/v1/%s/%s/%s",
strings.TrimRight(proxyBaseURL, "/"),
routePrefix,
member.RemoteName,
extractPath(u))
relPath)
} else {
ver.URLs[i] = fmt.Sprintf("%s/api/v1/%s/%s/%s",
strings.TrimRight(proxyBaseURL, "/"),
@@ -102,6 +103,18 @@ func extractHost(rawURL string) string {
return rest[:slashIdx]
}
func extractPathRelativeToBase(rawURL, baseURL string) string {
fullPath := extractPath(rawURL)
basePath := extractPath(baseURL)
if basePath != "" {
basePath = strings.TrimRight(basePath, "/") + "/"
if strings.HasPrefix(fullPath, basePath) {
return fullPath[len(basePath):]
}
}
return fullPath
}
func extractPath(rawURL string) string {
idx := strings.Index(rawURL, "://")
if idx == -1 {
+5
View File
@@ -50,6 +50,11 @@ export function Dashboard() {
value={formatNumber(stats.total_blobs_deduped)}
sub="shared blobs"
/>
<StatsCard
label="Bandwidth Saved"
value={formatBytes(stats.bandwidth_saved_30d)}
sub="last 30 days"
/>
</div>
{health && (