diff --git a/internal/proxy/engine.go b/internal/proxy/engine.go index 94d0fca..6dbfbd1 100644 --- a/internal/proxy/engine.go +++ b/internal/proxy/engine.go @@ -2,8 +2,6 @@ package proxy import ( "context" - "crypto/sha256" - "encoding/hex" "encoding/json" "errors" "fmt" @@ -185,65 +183,74 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa return nil, &ProxyError{Status: resp.StatusCode, Message: fmt.Sprintf("upstream returned %d", resp.StatusCode)} } - body, err := io.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - return nil, fmt.Errorf("read upstream body: %w", err) - } - - rewritten, err := prov.RewriteResponse(body, remote, "") - if err != nil { - return nil, fmt.Errorf("rewrite response: %w", err) - } - if rewritten != nil { - body = rewritten - } - contentType := prov.ContentType(path) if ct := resp.Header.Get("Content-Type"); ct != "" { contentType = ct } + // Mutable indexes are small and may be rewritten, so buffer them in memory. if class == ClassMutable { + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + return nil, fmt.Errorf("read upstream body: %w", err) + } + + rewritten, err := prov.RewriteResponse(body, remote, "") + if err != nil { + return nil, fmt.Errorf("rewrite response: %w", err) + } + if rewritten != nil { + body = rewritten + } + s3Key := storage.IndexKey(remote.Name, path) if err := e.store.Upload(ctx, s3Key, bytesReader(body), int64(len(body)), contentType); err != nil { return nil, fmt.Errorf("upload index: %w", err) } - etag := resp.Header.Get("ETag") - _ = e.cache.SetTTL(ctx, remote.Name, path, ttl) - if etag != "" { - _ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl) - } - } else { - hash := sha256Hash(body) - s3Key := storage.BlobKey(hash) - - exists, _ := e.store.Exists(ctx, s3Key) - if !exists { - if err := e.store.Upload(ctx, s3Key, bytesReader(body), int64(len(body)), contentType); err != nil { - return nil, fmt.Errorf("upload blob: %w", err) - } - } - - contentHash := fmt.Sprintf("sha256:%s", hash) - if err := e.db.UpsertBlob(ctx, contentHash, s3Key, int64(len(body)), contentType); err != nil { - slog.Warn("upsert blob failed", "error", err) - } - if err := e.db.UpsertArtifact(ctx, remote.Name, path, contentHash, resp.Header.Get("ETag")); err != nil { - slog.Warn("upsert artifact failed", "error", err) - } - _ = e.cache.SetTTL(ctx, remote.Name, path, ttl) if etag := resp.Header.Get("ETag"); etag != "" { _ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl) } + + return &FetchResult{ + Reader: io.NopCloser(bytesReader(body)), + ContentType: contentType, + Size: int64(len(body)), + Source: "remote", + }, nil } + // Immutable blobs are streamed through the content-addressable store + // (tempfile -> sha256 -> S3) so arbitrarily large artifacts never sit + // fully in memory. Immutable content is never rewritten in the proxy path. + casResult, err := e.cas.Store(ctx, resp.Body, contentType) + resp.Body.Close() + if err != nil { + return nil, fmt.Errorf("store blob: %w", err) + } + + if err := e.db.UpsertBlob(ctx, casResult.ContentHash, casResult.S3Key, casResult.SizeBytes, contentType); err != nil { + slog.Warn("upsert blob failed", "error", err) + } + if err := e.db.UpsertArtifact(ctx, remote.Name, path, casResult.ContentHash, resp.Header.Get("ETag")); err != nil { + slog.Warn("upsert artifact failed", "error", err) + } + + _ = e.cache.SetTTL(ctx, remote.Name, path, ttl) + if etag := resp.Header.Get("ETag"); etag != "" { + _ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl) + } + + reader, info, err := e.store.Download(ctx, casResult.S3Key) + if err != nil { + return nil, fmt.Errorf("serve stored blob: %w", err) + } return &FetchResult{ - Reader: io.NopCloser(bytesReader(body)), - ContentType: contentType, - Size: int64(len(body)), + Reader: reader, + ContentType: info.ContentType, + Size: casResult.SizeBytes, Source: "remote", }, nil } @@ -321,11 +328,6 @@ func (e *Engine) logAccess(remoteName, path string, cacheHit bool, size int64, u _ = e.db.InsertAccessLog(ctx, remoteName, path, cacheHit, size, upstreamMS, "") } -func sha256Hash(data []byte) string { - h := sha256.Sum256(data) - return hex.EncodeToString(h[:]) -} - func bytesReader(data []byte) io.Reader { return io.NewSectionReader(readerAt(data), 0, int64(len(data))) }