perf: stream proxied artifacts instead of buffering the full body in memory (#94)
Fixes #66 ## Why `fetchFromUpstream` read every upstream response with `io.ReadAll`, hashed it in memory, uploaded from memory and served from memory. A single large immutable blob (Docker layer, RPM, tarball, Go module zip) — or several concurrent ones — could OOM the process. The streaming, tempfile-backed CAS already existed but the proxy path bypassed it (and `Engine.cas` was assigned but unused). ## Changes - Immutable fetches now stream through `CAS.Store` (tempfile -> sha256 -> S3), so memory stays bounded regardless of artifact size, and are served back from the store. - Mutable indexes stay on the in-memory path (small, and subject to `RewriteResponse`). - Skipping `RewriteResponse` for immutable content is behaviour-preserving: the proxy path always passes an empty `proxyBaseURL`, under which every providers `RewriteResponse` is a no-op. - Remove the now-unused in-memory `sha256Hash` helper. ## Validation - `make e2e` passes. - Live smoke test against Postgres/Redis/MinIO: proxied a 12 MB blob through a generic remote — fetch #1 `X-Artifact-Source: remote`, fetch #2 `X-Artifact-Source: cache`, both byte-identical (sha256) to the origin. Reviewed-on: #94 Co-authored-by: Ben Vincent <ben@unkin.net> Co-committed-by: Ben Vincent <ben@unkin.net>
This commit was merged in pull request #94.
This commit is contained in:
+50
-48
@@ -2,8 +2,6 @@ package proxy
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
|
||||||
"encoding/hex"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -185,65 +183,74 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
|
|||||||
return nil, &ProxyError{Status: resp.StatusCode, Message: fmt.Sprintf("upstream returned %d", resp.StatusCode)}
|
return nil, &ProxyError{Status: resp.StatusCode, Message: fmt.Sprintf("upstream returned %d", resp.StatusCode)}
|
||||||
}
|
}
|
||||||
|
|
||||||
body, err := io.ReadAll(resp.Body)
|
|
||||||
resp.Body.Close()
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("read upstream body: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
rewritten, err := prov.RewriteResponse(body, remote, "")
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("rewrite response: %w", err)
|
|
||||||
}
|
|
||||||
if rewritten != nil {
|
|
||||||
body = rewritten
|
|
||||||
}
|
|
||||||
|
|
||||||
contentType := prov.ContentType(path)
|
contentType := prov.ContentType(path)
|
||||||
if ct := resp.Header.Get("Content-Type"); ct != "" {
|
if ct := resp.Header.Get("Content-Type"); ct != "" {
|
||||||
contentType = ct
|
contentType = ct
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mutable indexes are small and may be rewritten, so buffer them in memory.
|
||||||
if class == ClassMutable {
|
if class == ClassMutable {
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
resp.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("read upstream body: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rewritten, err := prov.RewriteResponse(body, remote, "")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("rewrite response: %w", err)
|
||||||
|
}
|
||||||
|
if rewritten != nil {
|
||||||
|
body = rewritten
|
||||||
|
}
|
||||||
|
|
||||||
s3Key := storage.IndexKey(remote.Name, path)
|
s3Key := storage.IndexKey(remote.Name, path)
|
||||||
if err := e.store.Upload(ctx, s3Key, bytesReader(body), int64(len(body)), contentType); err != nil {
|
if err := e.store.Upload(ctx, s3Key, bytesReader(body), int64(len(body)), contentType); err != nil {
|
||||||
return nil, fmt.Errorf("upload index: %w", err)
|
return nil, fmt.Errorf("upload index: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
etag := resp.Header.Get("ETag")
|
|
||||||
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
|
|
||||||
if etag != "" {
|
|
||||||
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
hash := sha256Hash(body)
|
|
||||||
s3Key := storage.BlobKey(hash)
|
|
||||||
|
|
||||||
exists, _ := e.store.Exists(ctx, s3Key)
|
|
||||||
if !exists {
|
|
||||||
if err := e.store.Upload(ctx, s3Key, bytesReader(body), int64(len(body)), contentType); err != nil {
|
|
||||||
return nil, fmt.Errorf("upload blob: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
contentHash := fmt.Sprintf("sha256:%s", hash)
|
|
||||||
if err := e.db.UpsertBlob(ctx, contentHash, s3Key, int64(len(body)), contentType); err != nil {
|
|
||||||
slog.Warn("upsert blob failed", "error", err)
|
|
||||||
}
|
|
||||||
if err := e.db.UpsertArtifact(ctx, remote.Name, path, contentHash, resp.Header.Get("ETag")); err != nil {
|
|
||||||
slog.Warn("upsert artifact failed", "error", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
|
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
|
||||||
if etag := resp.Header.Get("ETag"); etag != "" {
|
if etag := resp.Header.Get("ETag"); etag != "" {
|
||||||
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
|
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return &FetchResult{
|
||||||
|
Reader: io.NopCloser(bytesReader(body)),
|
||||||
|
ContentType: contentType,
|
||||||
|
Size: int64(len(body)),
|
||||||
|
Source: "remote",
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Immutable blobs are streamed through the content-addressable store
|
||||||
|
// (tempfile -> sha256 -> S3) so arbitrarily large artifacts never sit
|
||||||
|
// fully in memory. Immutable content is never rewritten in the proxy path.
|
||||||
|
casResult, err := e.cas.Store(ctx, resp.Body, contentType)
|
||||||
|
resp.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("store blob: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.db.UpsertBlob(ctx, casResult.ContentHash, casResult.S3Key, casResult.SizeBytes, contentType); err != nil {
|
||||||
|
slog.Warn("upsert blob failed", "error", err)
|
||||||
|
}
|
||||||
|
if err := e.db.UpsertArtifact(ctx, remote.Name, path, casResult.ContentHash, resp.Header.Get("ETag")); err != nil {
|
||||||
|
slog.Warn("upsert artifact failed", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
|
||||||
|
if etag := resp.Header.Get("ETag"); etag != "" {
|
||||||
|
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, info, err := e.store.Download(ctx, casResult.S3Key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("serve stored blob: %w", err)
|
||||||
|
}
|
||||||
return &FetchResult{
|
return &FetchResult{
|
||||||
Reader: io.NopCloser(bytesReader(body)),
|
Reader: reader,
|
||||||
ContentType: contentType,
|
ContentType: info.ContentType,
|
||||||
Size: int64(len(body)),
|
Size: casResult.SizeBytes,
|
||||||
Source: "remote",
|
Source: "remote",
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@@ -321,11 +328,6 @@ func (e *Engine) logAccess(remoteName, path string, cacheHit bool, size int64, u
|
|||||||
_ = e.db.InsertAccessLog(ctx, remoteName, path, cacheHit, size, upstreamMS, "")
|
_ = e.db.InsertAccessLog(ctx, remoteName, path, cacheHit, size, upstreamMS, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func sha256Hash(data []byte) string {
|
|
||||||
h := sha256.Sum256(data)
|
|
||||||
return hex.EncodeToString(h[:])
|
|
||||||
}
|
|
||||||
|
|
||||||
func bytesReader(data []byte) io.Reader {
|
func bytesReader(data []byte) io.Reader {
|
||||||
return io.NewSectionReader(readerAt(data), 0, int64(len(data)))
|
return io.NewSectionReader(readerAt(data), 0, int64(len(data)))
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user