Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b698d1bdc0 |
@@ -2,6 +2,7 @@ package proxy
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"sync"
|
||||
|
||||
"git.unkin.net/unkin/artifactapi/internal/provider"
|
||||
"git.unkin.net/unkin/artifactapi/pkg/models"
|
||||
@@ -60,10 +61,29 @@ func (c *Classifier) Classify(remote models.Remote, path string) Classification
|
||||
return ClassImmutable
|
||||
}
|
||||
|
||||
// patternCache memoises regex compilation. Classify runs on every proxied
|
||||
// request and previously recompiled each remote's pattern lists every time;
|
||||
// keying by the pattern string lets each distinct pattern compile once and
|
||||
// then be reused, with no invalidation needed (the pattern text is the key).
|
||||
// A pattern that fails to compile is cached as a typed nil so we don't retry.
|
||||
var patternCache sync.Map // map[string]*regexp.Regexp
|
||||
|
||||
func compileCached(pattern string) *regexp.Regexp {
|
||||
if v, ok := patternCache.Load(pattern); ok {
|
||||
return v.(*regexp.Regexp)
|
||||
}
|
||||
re, err := regexp.Compile(pattern)
|
||||
if err != nil {
|
||||
re = nil
|
||||
}
|
||||
patternCache.Store(pattern, re)
|
||||
return re
|
||||
}
|
||||
|
||||
func compilePatterns(patterns []string) []*regexp.Regexp {
|
||||
compiled := make([]*regexp.Regexp, 0, len(patterns))
|
||||
for _, p := range patterns {
|
||||
if re, err := regexp.Compile(p); err == nil {
|
||||
if re := compileCached(p); re != nil {
|
||||
compiled = append(compiled, re)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,20 +22,18 @@ import (
|
||||
const fetchLockTTL = 30 * time.Second
|
||||
|
||||
type Engine struct {
|
||||
db *database.DB
|
||||
cache *cache.Redis
|
||||
store *storage.S3
|
||||
cas *storage.CAS
|
||||
circuit *CircuitBreaker
|
||||
db *database.DB
|
||||
cache *cache.Redis
|
||||
store *storage.S3
|
||||
cas *storage.CAS
|
||||
}
|
||||
|
||||
func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
|
||||
return &Engine{
|
||||
db: db,
|
||||
cache: c,
|
||||
store: s,
|
||||
cas: storage.NewCAS(s),
|
||||
circuit: NewCircuitBreaker(c),
|
||||
db: db,
|
||||
cache: c,
|
||||
store: s,
|
||||
cas: storage.NewCAS(s),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,26 +110,10 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
||||
fwdHeaders = clientHeaders[0]
|
||||
}
|
||||
|
||||
// Short-circuit upstream calls when the remote's breaker is open: serve
|
||||
// stale from the store if we have it, otherwise fail fast rather than
|
||||
// hammering a known-bad upstream.
|
||||
if e.circuit.IsOpen(ctx, remote.Name) {
|
||||
if stale, serr := e.serveFromStore(ctx, remote, path); serr == nil {
|
||||
slog.Warn("circuit open, serving stale", "remote", remote.Name, "path", path)
|
||||
stale.Source = "cache"
|
||||
go e.logAccess(remote.Name, path, true, stale.Size, 0)
|
||||
return stale, nil
|
||||
}
|
||||
return nil, &ProxyError{Status: http.StatusServiceUnavailable, Message: "upstream circuit open"}
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders)
|
||||
upstreamMS := int(time.Since(start).Milliseconds())
|
||||
if err != nil {
|
||||
if isNetworkError(err) {
|
||||
e.circuit.RecordFailure(ctx, remote.Name)
|
||||
}
|
||||
if remote.StaleOnError && isNetworkError(err) {
|
||||
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
|
||||
stale, serr := e.serveFromStore(ctx, remote, path)
|
||||
@@ -145,7 +127,6 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
||||
return nil, err
|
||||
}
|
||||
|
||||
e.circuit.RecordSuccess(ctx, remote.Name)
|
||||
go e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user