fix: coalesce concurrent cache-miss fetches (thundering herd) #93
@@ -75,9 +75,10 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !locked {
|
if !locked {
|
||||||
time.Sleep(500 * time.Millisecond)
|
// Another request holds the fetch lock. Poll the store until the leader
|
||||||
result, err := e.serveFromStore(ctx, remote, path)
|
// populates it rather than immediately racing to fetch upstream too; a
|
||||||
if err == nil {
|
// cold-cache stampede otherwise hits upstream once per waiter.
|
||||||
|
if result := e.waitForStore(ctx, remote, path); result != nil {
|
||||||
result.Source = "cache"
|
result.Source = "cache"
|
||||||
go e.logAccess(remote.Name, path, true, result.Size, 0)
|
go e.logAccess(remote.Name, path, true, result.Size, 0)
|
||||||
return result, nil
|
return result, nil
|
||||||
@@ -247,6 +248,31 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForStore polls the store for an artifact populated by the request that
|
||||||
|
// holds the fetch lock, returning it once available or nil if it does not
|
||||||
|
// appear within the wait budget (after which the caller fetches upstream
|
||||||
|
// itself). It stops early if the request context is cancelled.
|
||||||
|
func (e *Engine) waitForStore(ctx context.Context, remote models.Remote, path string) *FetchResult {
|
||||||
|
const (
|
||||||
|
pollInterval = 100 * time.Millisecond
|
||||||
|
maxWait = 5 * time.Second
|
||||||
|
)
|
||||||
|
deadline := time.Now().Add(maxWait)
|
||||||
|
for {
|
||||||
|
if result, err := e.serveFromStore(ctx, remote, path); err == nil {
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
if time.Now().After(deadline) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case <-time.After(pollInterval):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (e *Engine) serveFromStore(ctx context.Context, remote models.Remote, path string) (*FetchResult, error) {
|
func (e *Engine) serveFromStore(ctx context.Context, remote models.Remote, path string) (*FetchResult, error) {
|
||||||
artifact, err := e.db.GetArtifact(ctx, remote.Name, path)
|
artifact, err := e.db.GetArtifact(ctx, remote.Name, path)
|
||||||
if err == nil && artifact != nil {
|
if err == nil && artifact != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user