Compare commits

..

1 Commits

Author SHA1 Message Date
unkinben bb172276ba feat: add local RPM repository with on-demand repodata
ci/woodpecker/pr/pre-commit Pipeline failed
ci/woodpecker/pr/test Pipeline was successful
ci/woodpecker/pr/build Pipeline was successful
Upload RPMs to local repos. Metadata is parsed async after upload
using cavaliergopher/rpm and stored in rpm_metadata table. Repodata
(repomd.xml, primary.xml.gz, filelists.xml.gz, other.xml.gz) is
generated on-demand from the DB — nothing stored in S3.

- RPM provider implements LocalUploader (validates .rpm extension,
  stores under Packages/)
- RPM provider implements PostUploadHook (async goroutine parses RPM
  headers, extracts name/version/arch/deps/etc into rpm_metadata)
- RPM provider implements LocalIndexer (serves repodata/* paths by
  querying rpm_metadata and generating XML on the fly)
- New provider interfaces: PostUploadHook, BlobReader, MetadataStore,
  RPMMetadataReader
- New rpm_metadata table with JSONB columns for requires/provides/
  files/changelogs

Tested e2e: upload cowsay RPM → repodata generated → dnf install
from local repo
2026-06-23 23:08:59 +10:00
39 changed files with 175 additions and 1077 deletions
-24
View File
@@ -1,24 +0,0 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- id: check-merge-conflict
- repo: https://github.com/dnephin/pre-commit-golang
rev: v0.5.1
hooks:
- id: go-fmt
- id: go-mod-tidy
- repo: local
hooks:
- id: go-vet
name: go vet
entry: go vet ./...
language: system
types: [go]
pass_filenames: false
-4
View File
@@ -8,8 +8,6 @@ steps:
settings:
registry: git.unkin.net
repo: git.unkin.net/unkin/artifactapi
build_args:
VERSION: ${CI_COMMIT_TAG}
username: droneci
password:
from_secret: DRONECI_PASSWORD
@@ -24,8 +22,6 @@ steps:
repo: git.unkin.net/unkin/artifactapi-ui
dockerfile: ui/Dockerfile.ui
context: ui
build_args:
BASE_PATH: /ui
username: droneci
password:
from_secret: DRONECI_PASSWORD
+3 -11
View File
@@ -3,15 +3,7 @@ when:
steps:
- name: pre-commit
image: git.unkin.net/unkin/almalinux9-gobuilder:20260606
image: golang:1.25
commands:
- uvx pre-commit run --all-files
backend_options:
kubernetes:
resources:
requests:
memory: 512Mi
cpu: 1
limits:
memory: 2Gi
cpu: 2
- test -z "$(gofmt -l .)"
- go vet ./...
+1 -2
View File
@@ -9,8 +9,7 @@ RUN go mod download
COPY . .
ARG VERSION=dev
RUN CGO_ENABLED=0 go build -ldflags="-s -w -X main.version=${VERSION}" -o artifactapi ./cmd/artifactapi
RUN CGO_ENABLED=0 go build -ldflags="-s -w" -o artifactapi ./cmd/artifactapi
FROM gcr.io/distroless/static-debian12:nonroot
+1 -1
View File
@@ -12,7 +12,7 @@ check-go:
fi
build: check-go tidy
go build -ldflags="-s -w -X main.version=$(VERSION)" -o $(BINARY) ./cmd/artifactapi
go build -ldflags="-s -w" -o $(BINARY) ./cmd/artifactapi
test: check-go
go test -race -count=1 ./pkg/... ./internal/...
+1 -3
View File
@@ -13,8 +13,6 @@ import (
"git.unkin.net/unkin/artifactapi/internal/tui"
)
var version = "dev"
func main() {
if len(os.Args) > 1 && os.Args[1] == "tui" {
endpoint := os.Getenv("ARTIFACTAPI_ENDPOINT")
@@ -44,7 +42,7 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
srv, err := server.New(cfg, version)
srv, err := server.New(cfg)
if err != nil {
slog.Error("failed to create server", "error", err)
os.Exit(1)
+1 -1
View File
@@ -95,7 +95,7 @@ func TestMain(m *testing.M) {
}
cfg.ListenAddr = "127.0.0.1:0"
srv, err := server.New(cfg, "e2e-test")
srv, err := server.New(cfg)
if err != nil {
log.Fatalf("server: %v", err)
}
-24
View File
@@ -24,30 +24,6 @@ func TestRoot(t *testing.T) {
}
}
func TestRemoteUpstreamTimeouts(t *testing.T) {
createRemote(t, `{
"name": "timeout-test",
"package_type": "generic",
"base_url": "https://example.com",
"stale_on_error": true,
"upstream_dial_timeout": 3,
"upstream_tls_timeout": 4,
"upstream_response_header_timeout": 5
}`)
defer deleteRemote(t, "timeout-test")
remote := getJSON(t, apiURL("/api/v2/remotes/timeout-test"))
for field, want := range map[string]float64{
"upstream_dial_timeout": 3,
"upstream_tls_timeout": 4,
"upstream_response_header_timeout": 5,
} {
if got, _ := remote[field].(float64); got != want {
t.Errorf("%s: got %v, want %v", field, remote[field], want)
}
}
}
func TestRemoteCRUD(t *testing.T) {
createRemote(t, `{
"name": "test-generic",
-33
View File
@@ -24,39 +24,6 @@ func TestProxyBlocklist(t *testing.T) {
assertStatus(t, apiURL("/api/v1/remote/blocklist-test/malware.exe"), http.StatusForbidden)
}
func TestProxyHeadBlocklist(t *testing.T) {
createRemote(t, `{
"name": "head-block-test",
"package_type": "generic",
"base_url": "https://example.com",
"blocklist": ["\\.exe$"],
"stale_on_error": true
}`)
defer deleteRemote(t, "head-block-test")
req, _ := http.NewRequest(http.MethodHead, apiURL("/v2/head-block-test/malware.exe"), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("HEAD: %v", err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusForbidden {
t.Fatalf("HEAD blocklisted path: got %d, want 403", resp.StatusCode)
}
}
func TestProxyHeadUnknownRemote(t *testing.T) {
req, _ := http.NewRequest(http.MethodHead, apiURL("/v2/nonexistent/some/path"), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("HEAD: %v", err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("HEAD unknown remote: got %d, want 404", resp.StatusCode)
}
}
func TestProxyPatterns(t *testing.T) {
createRemote(t, `{
"name": "patterns-test",
+1 -51
View File
@@ -37,20 +37,6 @@ func (h *ProxyHandler) Routes() chi.Router {
return r
}
func (h *ProxyHandler) DockerV2Routes() chi.Router {
r := chi.NewRouter()
r.Get("/", h.handleDockerPing)
r.Head("/", h.handleDockerPing)
r.Get("/{remoteName}/*", h.handleProxy)
r.Head("/{remoteName}/*", h.handleProxyHead)
return r
}
func (h *ProxyHandler) handleDockerPing(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Docker-Distribution-Api-Version", "registry/2.0")
w.WriteHeader(http.StatusOK)
}
func (h *ProxyHandler) handleProxy(w http.ResponseWriter, r *http.Request) {
remoteName := chi.URLParam(r, "remoteName")
path := chi.URLParam(r, "*")
@@ -67,7 +53,7 @@ func (h *ProxyHandler) handleProxy(w http.ResponseWriter, r *http.Request) {
return
}
result, err := h.engine.Fetch(r.Context(), *remote, path, prov, r.Header)
result, err := h.engine.Fetch(r.Context(), *remote, path, prov)
if err != nil {
var proxyErr *proxy.ProxyError
if errors.As(err, &proxyErr) {
@@ -89,42 +75,6 @@ func (h *ProxyHandler) handleProxy(w http.ResponseWriter, r *http.Request) {
io.Copy(w, result.Reader)
}
func (h *ProxyHandler) handleProxyHead(w http.ResponseWriter, r *http.Request) {
remoteName := chi.URLParam(r, "remoteName")
path := chi.URLParam(r, "*")
remote, err := h.db.GetRemote(r.Context(), remoteName)
if err != nil {
http.Error(w, fmt.Sprintf("remote %q not found", remoteName), http.StatusNotFound)
return
}
prov, err := provider.Get(remote.PackageType)
if err != nil {
http.Error(w, fmt.Sprintf("no provider for %q", remote.PackageType), http.StatusInternalServerError)
return
}
result, err := h.engine.Head(r.Context(), *remote, path, prov)
if err != nil {
var proxyErr *proxy.ProxyError
if errors.As(err, &proxyErr) {
http.Error(w, proxyErr.Message, proxyErr.Status)
return
}
slog.Error("proxy head failed", "remote", remoteName, "path", path, "error", err)
http.Error(w, "bad gateway", http.StatusBadGateway)
return
}
w.Header().Set("Content-Type", result.ContentType)
w.Header().Set("X-Artifact-Source", result.Source)
if result.Size > 0 {
w.Header().Set("Content-Length", fmt.Sprintf("%d", result.Size))
}
w.WriteHeader(http.StatusOK)
}
func (h *ProxyHandler) handleVirtual(w http.ResponseWriter, r *http.Request) {
virtualName := chi.URLParam(r, "virtualName")
path := chi.URLParam(r, "*")
-8
View File
@@ -69,10 +69,6 @@ func (h *RemotesHandler) create(w http.ResponseWriter, r *http.Request) {
http.Error(w, "base_url is required for remote repositories", http.StatusBadRequest)
return
}
if err := remote.ValidatePatterns(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := h.db.CreateRemote(r.Context(), &remote); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -88,10 +84,6 @@ func (h *RemotesHandler) update(w http.ResponseWriter, r *http.Request) {
return
}
remote.Name = name
if err := remote.ValidatePatterns(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := h.db.UpdateRemote(r.Context(), &remote); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
-12
View File
@@ -70,18 +70,6 @@ func (r *Redis) GetETag(ctx context.Context, remote, path string) (string, error
return val, err
}
func (r *Redis) GetToken(ctx context.Context, key string) (string, error) {
val, err := r.client.Get(ctx, "token:"+key).Result()
if err == redis.Nil {
return "", nil
}
return val, err
}
func (r *Redis) SetToken(ctx context.Context, key, token string, ttl time.Duration) error {
return r.client.Set(ctx, "token:"+key, token, ttl).Err()
}
func (r *Redis) IncrCircuitFailure(ctx context.Context, remote string, cooldown time.Duration) (int64, error) {
key := fmt.Sprintf("circuit:%s", remote)
pipe := r.client.Pipeline()
+1 -1
View File
@@ -65,7 +65,7 @@ func Load() (*Config, error) {
}
func getenv(key, fallback string) string {
if v, ok := os.LookupEnv(key); ok {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
+3 -38
View File
@@ -4,8 +4,6 @@ import (
"context"
"time"
"github.com/jackc/pgx/v5"
"git.unkin.net/unkin/artifactapi/pkg/models"
)
@@ -111,49 +109,16 @@ func (db *DB) InsertAccessLog(ctx context.Context, remoteName, path string, cach
return err
}
// AccessLogEntry is one buffered access-log record.
type AccessLogEntry struct {
RemoteName string
Path string
CacheHit bool
SizeBytes int64
UpstreamMS int
ClientIP string
}
// InsertAccessLogBatch bulk-inserts access-log rows with a single COPY.
func (db *DB) InsertAccessLogBatch(ctx context.Context, entries []AccessLogEntry) error {
if len(entries) == 0 {
return nil
}
rows := make([][]any, len(entries))
for i, e := range entries {
rows[i] = []any{e.RemoteName, e.Path, e.CacheHit, e.SizeBytes, e.UpstreamMS, e.ClientIP}
}
_, err := db.Pool.CopyFrom(ctx,
pgx.Identifier{"access_log"},
[]string{"remote_name", "path", "cache_hit", "size_bytes", "upstream_ms", "client_ip"},
pgx.CopyFromRows(rows),
)
return err
}
// FindOrphanedBlobs returns blobs no longer referenced by any artifact or
// local file, restricted to those created before now()-minAge. The age cutoff
// is a grace period that avoids a TOCTOU race with in-flight dedup uploads,
// which insert the blob row before the referencing artifact/local_files row.
func (db *DB) FindOrphanedBlobs(ctx context.Context, minAge time.Duration) ([]models.Blob, error) {
cutoff := time.Now().Add(-minAge)
func (db *DB) FindOrphanedBlobs(ctx context.Context) ([]models.Blob, error) {
rows, err := db.Pool.Query(ctx, `
SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at
FROM blobs b
WHERE b.created_at < $1
AND b.content_hash NOT IN (
WHERE b.content_hash NOT IN (
SELECT content_hash FROM artifacts
UNION
SELECT content_hash FROM local_files
)
`, cutoff)
`)
if err != nil {
return nil, err
}
-3
View File
@@ -124,9 +124,6 @@ func (db *DB) migrate() error {
CREATE INDEX IF NOT EXISTS idx_access_log_remote_time ON access_log(remote_name, created_at);
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS repo_type TEXT DEFAULT 'remote';
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_dial_timeout INTEGER DEFAULT 0;
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_tls_timeout INTEGER DEFAULT 0;
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_response_header_timeout INTEGER DEFAULT 0;
CREATE TABLE IF NOT EXISTS rpm_metadata (
id BIGSERIAL PRIMARY KEY,
+5 -14
View File
@@ -11,9 +11,7 @@ const remoteCols = `name, package_type, repo_type, base_url, description, userna
patterns, blocklist, mutable_patterns, immutable_patterns,
ban_tags_enabled, ban_tags,
quarantine_enabled, quarantine_days, stale_on_error,
releases_remote, managed_by,
upstream_dial_timeout, upstream_tls_timeout, upstream_response_header_timeout,
created_at, updated_at`
releases_remote, managed_by, created_at, updated_at`
func scanRemote(scanner interface{ Scan(...any) error }, r *models.Remote) error {
return scanner.Scan(
@@ -22,9 +20,7 @@ func scanRemote(scanner interface{ Scan(...any) error }, r *models.Remote) error
&r.Patterns, &r.Blocklist, &r.MutablePatterns, &r.ImmutablePatterns,
&r.BanTagsEnabled, &r.BanTags,
&r.QuarantineEnabled, &r.QuarantineDays, &r.StaleOnError,
&r.ReleasesRemote, &r.ManagedBy,
&r.UpstreamDialTimeout, &r.UpstreamTLSTimeout, &r.UpstreamResponseHeaderTimeout,
&r.CreatedAt, &r.UpdatedAt,
&r.ReleasesRemote, &r.ManagedBy, &r.CreatedAt, &r.UpdatedAt,
)
}
@@ -63,9 +59,8 @@ func (db *DB) CreateRemote(ctx context.Context, r *models.Remote) error {
patterns, blocklist, mutable_patterns, immutable_patterns,
ban_tags_enabled, ban_tags,
quarantine_enabled, quarantine_days, stale_on_error,
releases_remote, managed_by,
upstream_dial_timeout, upstream_tls_timeout, upstream_response_header_timeout
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24)
releases_remote, managed_by
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21)
`,
r.Name, r.PackageType, r.RepoType, r.BaseURL, r.Description, r.Username, r.Password,
r.ImmutableTTL, r.MutableTTL, r.CheckMutable,
@@ -73,7 +68,6 @@ func (db *DB) CreateRemote(ctx context.Context, r *models.Remote) error {
r.BanTagsEnabled, r.BanTags,
r.QuarantineEnabled, r.QuarantineDays, r.StaleOnError,
r.ReleasesRemote, r.ManagedBy,
r.UpstreamDialTimeout, r.UpstreamTLSTimeout, r.UpstreamResponseHeaderTimeout,
)
return err
}
@@ -86,9 +80,7 @@ func (db *DB) UpdateRemote(ctx context.Context, r *models.Remote) error {
patterns=$11, blocklist=$12, mutable_patterns=$13, immutable_patterns=$14,
ban_tags_enabled=$15, ban_tags=$16,
quarantine_enabled=$17, quarantine_days=$18, stale_on_error=$19,
releases_remote=$20, managed_by=$21,
upstream_dial_timeout=$22, upstream_tls_timeout=$23, upstream_response_header_timeout=$24,
updated_at=NOW()
releases_remote=$20, managed_by=$21, updated_at=NOW()
WHERE name=$1
`,
r.Name, r.PackageType, r.RepoType, r.BaseURL, r.Description, r.Username, r.Password,
@@ -97,7 +89,6 @@ func (db *DB) UpdateRemote(ctx context.Context, r *models.Remote) error {
r.BanTagsEnabled, r.BanTags,
r.QuarantineEnabled, r.QuarantineDays, r.StaleOnError,
r.ReleasesRemote, r.ManagedBy,
r.UpstreamDialTimeout, r.UpstreamTLSTimeout, r.UpstreamResponseHeaderTimeout,
)
return err
}
+22 -22
View File
@@ -33,29 +33,29 @@ func (db *DB) InsertRPMMetadata(ctx context.Context, meta *provider.RPMMetadata)
}
type RPMMetadataRow struct {
RepoName string
FilePath string
ContentHash string
Name string
Epoch int
Version string
Release string
Arch string
Summary string
Description string
RPMSize int64
RepoName string
FilePath string
ContentHash string
Name string
Epoch int
Version string
Release string
Arch string
Summary string
Description string
RPMSize int64
InstalledSize int64
License string
Vendor string
Group string
BuildHost string
SourceRPM string
URL string
Packager string
Requires json.RawMessage
Provides json.RawMessage
Files json.RawMessage
Changelogs json.RawMessage
License string
Vendor string
Group string
BuildHost string
SourceRPM string
URL string
Packager string
Requires json.RawMessage
Provides json.RawMessage
Files json.RawMessage
Changelogs json.RawMessage
}
func (db *DB) ListRPMMetadataEntries(ctx context.Context, repoName string) ([]provider.RPMMetadata, error) {
-9
View File
@@ -30,15 +30,6 @@ func (db *DB) GetOverviewStats(ctx context.Context) (*models.OverviewStats, erro
return nil, err
}
err = db.Pool.QueryRow(ctx, `
SELECT COALESCE(SUM(size_bytes), 0)
FROM access_log
WHERE cache_hit = TRUE AND created_at > NOW() - INTERVAL '30 days'
`).Scan(&stats.BandwidthSaved30d)
if err != nil {
return nil, err
}
return &stats, nil
}
+1 -6
View File
@@ -9,11 +9,6 @@ import (
"git.unkin.net/unkin/artifactapi/internal/storage"
)
// blobGracePeriod is how old an orphaned blob must be before GC will delete
// it. This avoids racing in-flight dedup uploads that insert the blob row
// before the referencing artifact/local_files row exists.
const blobGracePeriod = 1 * time.Hour
type Collector struct {
db *database.DB
store *storage.S3
@@ -43,7 +38,7 @@ func (c *Collector) Run(ctx context.Context) {
func (c *Collector) sweep(ctx context.Context) {
start := time.Now()
orphaned, err := c.db.FindOrphanedBlobs(ctx, blobGracePeriod)
orphaned, err := c.db.FindOrphanedBlobs(ctx)
if err != nil {
slog.Error("gc: find orphaned blobs", "error", err)
return
+1 -21
View File
@@ -2,7 +2,6 @@ package proxy
import (
"regexp"
"sync"
"git.unkin.net/unkin/artifactapi/internal/provider"
"git.unkin.net/unkin/artifactapi/pkg/models"
@@ -61,29 +60,10 @@ func (c *Classifier) Classify(remote models.Remote, path string) Classification
return ClassImmutable
}
// patternCache memoises regex compilation. Classify runs on every proxied
// request and previously recompiled each remote's pattern lists every time;
// keying by the pattern string lets each distinct pattern compile once and
// then be reused, with no invalidation needed (the pattern text is the key).
// A pattern that fails to compile is cached as a typed nil so we don't retry.
var patternCache sync.Map // map[string]*regexp.Regexp
func compileCached(pattern string) *regexp.Regexp {
if v, ok := patternCache.Load(pattern); ok {
return v.(*regexp.Regexp)
}
re, err := regexp.Compile(pattern)
if err != nil {
re = nil
}
patternCache.Store(pattern, re)
return re
}
func compilePatterns(patterns []string) []*regexp.Regexp {
compiled := make([]*regexp.Regexp, 0, len(patterns))
for _, p := range patterns {
if re := compileCached(p); re != nil {
if re, err := regexp.Compile(p); err == nil {
compiled = append(compiled, re)
}
}
+86 -396
View File
@@ -4,13 +4,10 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"time"
"git.unkin.net/unkin/artifactapi/internal/cache"
@@ -22,65 +19,19 @@ import (
const fetchLockTTL = 30 * time.Second
const (
accessLogBufferSize = 4096
accessLogBatchSize = 128
accessLogFlushEvery = 2 * time.Second
)
type Engine struct {
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
circuit *CircuitBreaker
accessLog chan database.AccessLogEntry
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
}
func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
e := &Engine{
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
circuit: NewCircuitBreaker(c),
accessLog: make(chan database.AccessLogEntry, accessLogBufferSize),
}
go e.runAccessLogWriter()
return e
}
// runAccessLogWriter drains the access-log channel and writes rows in batches,
// replacing a goroutine-per-request insert. It runs for the process lifetime;
// access logs are best-effort telemetry, so a small tail may be lost on abrupt
// shutdown.
func (e *Engine) runAccessLogWriter() {
ticker := time.NewTicker(accessLogFlushEvery)
defer ticker.Stop()
batch := make([]database.AccessLogEntry, 0, accessLogBatchSize)
flush := func() {
if len(batch) == 0 {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := e.db.InsertAccessLogBatch(ctx, batch); err != nil {
slog.Warn("access log batch insert failed", "error", err, "count", len(batch))
}
cancel()
batch = batch[:0]
}
for {
select {
case entry := <-e.accessLog:
batch = append(batch, entry)
if len(batch) >= accessLogBatchSize {
flush()
}
case <-ticker.C:
flush()
}
return &Engine{
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
}
}
@@ -91,7 +42,7 @@ type FetchResult struct {
Source string // "cache" or "remote"
}
func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, prov provider.Provider, clientHeaders ...http.Header) (*FetchResult, error) {
func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, prov provider.Provider) (*FetchResult, error) {
classifier := NewClassifier(prov)
class := classifier.Classify(remote, path)
@@ -110,7 +61,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
e.logAccess(remote.Name, path, true, result.Size, 0)
go e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
slog.Warn("cache hit but S3 miss, re-fetching", "remote", remote.Name, "path", path)
@@ -122,12 +73,11 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
}
if !locked {
// Another request holds the fetch lock. Poll the store until the leader
// populates it rather than immediately racing to fetch upstream too; a
// cold-cache stampede otherwise hits upstream once per waiter.
if result := e.waitForStore(ctx, remote, path); result != nil {
time.Sleep(500 * time.Millisecond)
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
e.logAccess(remote.Name, path, true, result.Size, 0)
go e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
}
@@ -146,138 +96,35 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
e.logAccess(remote.Name, path, true, result.Size, 0)
go e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
}
}
}
var fwdHeaders http.Header
if len(clientHeaders) > 0 && clientHeaders[0] != nil {
fwdHeaders = clientHeaders[0]
}
// Short-circuit upstream calls when the remote's breaker is open: serve
// stale from the store if we have it, otherwise fail fast rather than
// hammering a known-bad upstream.
if e.circuit.IsOpen(ctx, remote.Name) {
if stale, serr := e.serveFromStore(ctx, remote, path); serr == nil {
slog.Warn("circuit open, serving stale", "remote", remote.Name, "path", path)
stale.Source = "cache"
e.logAccess(remote.Name, path, true, stale.Size, 0)
return stale, nil
}
return nil, &ProxyError{Status: http.StatusServiceUnavailable, Message: "upstream circuit open"}
}
start := time.Now()
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders)
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl)
upstreamMS := int(time.Since(start).Milliseconds())
if err != nil {
if isNetworkError(err) {
e.circuit.RecordFailure(ctx, remote.Name)
}
if remote.StaleOnError && isNetworkError(err) {
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
stale, serr := e.serveFromStore(ctx, remote, path)
if serr == nil {
slog.Warn("serving stale on upstream error", "remote", remote.Name, "path", path, "error", err)
stale.Source = "cache"
e.logAccess(remote.Name, path, true, stale.Size, 0)
go e.logAccess(remote.Name, path, true, stale.Size, 0)
return stale, nil
}
}
return nil, err
}
e.circuit.RecordSuccess(ctx, remote.Name)
e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
go e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
return result, nil
}
// HeadResult carries artifact metadata for a HEAD request. There is no body.
type HeadResult struct {
ContentType string
Size int64
Source string // "cache" or "remote"
}
// Head resolves artifact metadata without fetching or streaming the body.
// Cached artifacts/indexes are answered from the store metadata; on a miss it
// issues an upstream HEAD. It never downloads or caches the body.
func (e *Engine) Head(ctx context.Context, remote models.Remote, path string, prov provider.Provider) (*HeadResult, error) {
class := NewClassifier(prov).Classify(remote, path)
if class == ClassDenied {
return nil, &ProxyError{Status: http.StatusForbidden, Message: "access denied"}
}
if artifact, err := e.db.GetArtifact(ctx, remote.Name, path); err == nil && artifact != nil {
return &HeadResult{ContentType: artifact.ContentType, Size: artifact.SizeBytes, Source: "cache"}, nil
}
if info, err := e.store.Stat(ctx, storage.IndexKey(remote.Name, path)); err == nil {
return &HeadResult{ContentType: info.ContentType, Size: info.Size, Source: "cache"}, nil
}
return e.headUpstream(ctx, remote, path, prov)
}
func (e *Engine) headUpstream(ctx context.Context, remote models.Remote, path string, prov provider.Provider) (*HeadResult, error) {
url := prov.UpstreamURL(remote, path)
authHeaders, err := prov.AuthHeaders(ctx, remote)
if err != nil {
return nil, fmt.Errorf("auth headers: %w", err)
}
doHead := func(extra http.Header) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
for k, vv := range authHeaders {
for _, v := range vv {
req.Header.Add(k, v)
}
}
for k, vv := range extra {
for _, v := range vv {
req.Header.Set(k, v)
}
}
return http.DefaultClient.Do(req)
}
resp, err := doHead(nil)
if err != nil {
return nil, &UpstreamError{Err: err}
}
if resp.StatusCode == http.StatusUnauthorized {
resp.Body.Close()
token, _, terr := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
if terr == nil && token != "" {
resp, err = doHead(http.Header{"Authorization": []string{"Bearer " + token}})
if err != nil {
return nil, &UpstreamError{Err: err}
}
} else {
return nil, &ProxyError{Status: http.StatusUnauthorized, Message: "upstream returned 401"}
}
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, &ProxyError{Status: resp.StatusCode, Message: fmt.Sprintf("upstream returned %d", resp.StatusCode)}
}
contentType := prov.ContentType(path)
if ct := resp.Header.Get("Content-Type"); ct != "" {
contentType = ct
}
return &HeadResult{ContentType: contentType, Size: resp.ContentLength, Source: "remote"}, nil
}
func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, path string, prov provider.Provider, class Classification, ttl time.Duration, clientHeaders http.Header) (*FetchResult, error) {
func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, path string, prov provider.Provider, class Classification, ttl time.Duration) (*FetchResult, error) {
url := prov.UpstreamURL(remote, path)
authHeaders, err := prov.AuthHeaders(ctx, remote)
@@ -294,144 +141,94 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
req.Header.Add(k, v)
}
}
if clientHeaders != nil {
if accept := clientHeaders.Get("Accept"); accept != "" {
req.Header.Set("Accept", accept)
}
}
resp, err := clientForRemote(remote).Do(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, &UpstreamError{Err: err}
}
if resp.StatusCode == http.StatusUnauthorized {
resp.Body.Close()
token, err := e.cachedBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
if err == nil && token != "" {
req2, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
req2.Header.Set("Authorization", "Bearer "+token)
if clientHeaders != nil {
if accept := clientHeaders.Get("Accept"); accept != "" {
req2.Header.Set("Accept", accept)
}
}
resp, err = clientForRemote(remote).Do(req2)
if err != nil {
return nil, &UpstreamError{Err: err}
}
} else {
return nil, &ProxyError{Status: http.StatusUnauthorized, Message: "upstream returned 401"}
}
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return nil, &ProxyError{Status: resp.StatusCode, Message: fmt.Sprintf("upstream returned %d", resp.StatusCode)}
}
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("read upstream body: %w", err)
}
rewritten, err := prov.RewriteResponse(body, remote, "")
if err != nil {
return nil, fmt.Errorf("rewrite response: %w", err)
}
if rewritten != nil {
body = rewritten
}
contentType := prov.ContentType(path)
if ct := resp.Header.Get("Content-Type"); ct != "" {
if ct := resp.Header.Get("Content-Type"); ct != "" && contentType == "application/octet-stream" {
contentType = ct
}
// Mutable indexes are small and may be rewritten, so buffer them in memory.
if class == ClassMutable {
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("read upstream body: %w", err)
}
rewritten, err := prov.RewriteResponse(body, remote, "")
if err != nil {
return nil, fmt.Errorf("rewrite response: %w", err)
}
if rewritten != nil {
body = rewritten
}
s3Key := storage.IndexKey(remote.Name, path)
if err := e.store.Upload(ctx, s3Key, bytesReader(body), int64(len(body)), contentType); err != nil {
return nil, fmt.Errorf("upload index: %w", err)
}
etag := resp.Header.Get("ETag")
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
if etag != "" {
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
}
} else {
hash := sha256Hash(body)
s3Key := storage.BlobKey(hash)
exists, _ := e.store.Exists(ctx, s3Key)
if !exists {
if err := e.store.Upload(ctx, s3Key, bytesReader(body), int64(len(body)), contentType); err != nil {
return nil, fmt.Errorf("upload blob: %w", err)
}
}
contentHash := fmt.Sprintf("sha256:%s", hash)
if err := e.db.UpsertBlob(ctx, contentHash, s3Key, int64(len(body)), contentType); err != nil {
slog.Warn("upsert blob failed", "error", err)
}
if err := e.db.UpsertArtifact(ctx, remote.Name, path, contentHash, resp.Header.Get("ETag")); err != nil {
slog.Warn("upsert artifact failed", "error", err)
}
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
if etag := resp.Header.Get("ETag"); etag != "" {
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
}
return &FetchResult{
Reader: io.NopCloser(bytesReader(body)),
ContentType: contentType,
Size: int64(len(body)),
Source: "remote",
}, nil
}
// Immutable blobs are streamed through the content-addressable store
// (tempfile -> sha256 -> S3) so arbitrarily large artifacts never sit
// fully in memory. Immutable content is never rewritten in the proxy path.
casResult, err := e.cas.Store(ctx, resp.Body, contentType)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("store blob: %w", err)
}
if err := e.db.UpsertBlob(ctx, casResult.ContentHash, casResult.S3Key, casResult.SizeBytes, contentType); err != nil {
slog.Warn("upsert blob failed", "error", err)
}
if err := e.db.UpsertArtifact(ctx, remote.Name, path, casResult.ContentHash, resp.Header.Get("ETag")); err != nil {
slog.Warn("upsert artifact failed", "error", err)
}
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
if etag := resp.Header.Get("ETag"); etag != "" {
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
}
reader, info, err := e.store.Download(ctx, casResult.S3Key)
if err != nil {
return nil, fmt.Errorf("serve stored blob: %w", err)
}
return &FetchResult{
Reader: reader,
ContentType: info.ContentType,
Size: casResult.SizeBytes,
Reader: io.NopCloser(bytesReader(body)),
ContentType: contentType,
Size: int64(len(body)),
Source: "remote",
}, nil
}
// waitForStore polls the store for an artifact populated by the request that
// holds the fetch lock, returning it once available or nil if it does not
// appear within the wait budget (after which the caller fetches upstream
// itself). It stops early if the request context is cancelled.
func (e *Engine) waitForStore(ctx context.Context, remote models.Remote, path string) *FetchResult {
const (
pollInterval = 100 * time.Millisecond
maxWait = 5 * time.Second
)
deadline := time.Now().Add(maxWait)
for {
if result, err := e.serveFromStore(ctx, remote, path); err == nil {
return result
}
if time.Now().After(deadline) {
return nil
}
select {
case <-ctx.Done():
return nil
case <-time.After(pollInterval):
}
}
}
func (e *Engine) serveFromStore(ctx context.Context, remote models.Remote, path string) (*FetchResult, error) {
artifact, err := e.db.GetArtifact(ctx, remote.Name, path)
if err == nil && artifact != nil {
reader, info, err := e.store.Download(ctx, artifact.ContentHash[len("sha256:"):])
if err == nil {
_ = e.db.TouchArtifactAccess(ctx, remote.Name, path)
return &FetchResult{
Reader: reader,
ContentType: info.ContentType,
Size: info.Size,
}, nil
}
s3Key := storage.BlobKey(artifact.ContentHash[len("sha256:"):])
reader, info, err := e.store.Download(ctx, s3Key)
reader, info, err = e.store.Download(ctx, s3Key)
if err == nil {
_ = e.db.TouchArtifactAccess(ctx, remote.Name, path)
return &FetchResult{
@@ -473,7 +270,7 @@ func (e *Engine) checkUpstream(ctx context.Context, remote models.Remote, path,
}
}
resp, err := clientForRemote(remote).Do(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false, &UpstreamError{Err: err}
}
@@ -494,20 +291,15 @@ func (e *Engine) ttlFor(remote models.Remote, class Classification) time.Duratio
}
}
// logAccess enqueues an access-log entry for the batch writer. It never blocks
// the request path: if the buffer is full the entry is dropped.
func (e *Engine) logAccess(remoteName, path string, cacheHit bool, size int64, upstreamMS int) {
select {
case e.accessLog <- database.AccessLogEntry{
RemoteName: remoteName,
Path: path,
CacheHit: cacheHit,
SizeBytes: size,
UpstreamMS: upstreamMS,
}:
default:
slog.Warn("access log buffer full, dropping entry", "remote", remoteName, "path", path)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = e.db.InsertAccessLog(ctx, remoteName, path, cacheHit, size, upstreamMS, "")
}
func sha256Hash(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}
func bytesReader(data []byte) io.Reader {
@@ -527,110 +319,6 @@ func (r readerAt) ReadAt(p []byte, off int64) (n int, err error) {
return
}
// bearerTokenTTLDefault/Margin bound how long a token is cached: the default
// is used when the token endpoint omits expires_in, and the margin is
// subtracted so a cached token is refreshed slightly before it actually expires.
const (
bearerTokenTTLDefault = 60 * time.Second
bearerTokenTTLMargin = 10 * time.Second
)
func sha256Hash(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}
// cachedBearerToken returns a bearer token for the given challenge, reusing a
// Redis-cached token for the same remote+challenge while it is still valid.
func (e *Engine) cachedBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) {
key := remote.Name + ":" + sha256Hash([]byte(wwwAuth))
if tok, err := e.cache.GetToken(ctx, key); err == nil && tok != "" {
return tok, nil
}
tok, ttl, err := fetchBearerToken(ctx, wwwAuth, remote)
if err != nil {
return "", err
}
if tok != "" {
if ttl <= 0 {
ttl = bearerTokenTTLDefault
}
if ttl > bearerTokenTTLMargin {
ttl -= bearerTokenTTLMargin
}
_ = e.cache.SetToken(ctx, key, tok, ttl)
}
return tok, nil
}
func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, time.Duration, error) {
if !strings.HasPrefix(wwwAuth, "Bearer ") {
return "", 0, fmt.Errorf("not a Bearer challenge")
}
params := map[string]string{}
for _, part := range strings.Split(wwwAuth[7:], ",") {
part = strings.TrimSpace(part)
eq := strings.Index(part, "=")
if eq < 0 {
continue
}
key := part[:eq]
val := strings.Trim(part[eq+1:], `"`)
params[key] = val
}
realm := params["realm"]
if realm == "" {
return "", 0, fmt.Errorf("no realm in Bearer challenge")
}
tokenURL := realm
sep := "?"
if s, ok := params["service"]; ok {
tokenURL += sep + "service=" + s
sep = "&"
}
if s, ok := params["scope"]; ok {
tokenURL += sep + "scope=" + s
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, tokenURL, nil)
if err != nil {
return "", 0, err
}
if remote.Username != "" && remote.Password != "" {
req.SetBasicAuth(remote.Username, remote.Password)
}
resp, err := clientForRemote(remote).Do(req)
if err != nil {
return "", 0, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", 0, fmt.Errorf("token endpoint returned %d", resp.StatusCode)
}
var tokenResp struct {
Token string `json:"token"`
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
}
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
return "", 0, err
}
ttl := time.Duration(tokenResp.ExpiresIn) * time.Second
if tokenResp.Token != "" {
return tokenResp.Token, ttl, nil
}
return tokenResp.AccessToken, ttl, nil
}
type ProxyError struct {
Status int
Message string
@@ -646,6 +334,8 @@ func (e *UpstreamError) Error() string { return fmt.Sprintf("upstream error: %v"
func (e *UpstreamError) Unwrap() error { return e.Err }
func isNetworkError(err error) bool {
var ue *UpstreamError
return errors.As(err, &ue)
if _, ok := err.(*UpstreamError); ok {
return true
}
return false
}
-83
View File
@@ -1,83 +0,0 @@
package proxy
import (
"net"
"net/http"
"sync"
"time"
"git.unkin.net/unkin/artifactapi/pkg/models"
)
// Default upstream timeouts. A remote may override any of these; a zero
// override falls back to the default here. There is deliberately no overall
// Client.Timeout: the proxy streams arbitrarily large artifacts and total time
// is bounded by the request context instead. We only constrain the phases that
// must never hang — connect, TLS handshake, and time-to-first-response-header —
// so a slow or wedged upstream cannot pin a goroutine and connection.
const (
defaultDialTimeout = 10 * time.Second
defaultTLSTimeout = 10 * time.Second
defaultResponseHeaderTimeout = 30 * time.Second
)
type clientKey struct {
dial time.Duration
tls time.Duration
respHeader time.Duration
}
var (
clientCacheMu sync.Mutex
clientCache = map[clientKey]*http.Client{}
)
// upstreamClientFor returns an HTTP client configured with the given timeouts,
// reusing a cached client (and its connection pool) for identical timeout sets.
// Zero values fall back to the defaults.
func upstreamClientFor(dial, tls, respHeader time.Duration) *http.Client {
if dial <= 0 {
dial = defaultDialTimeout
}
if tls <= 0 {
tls = defaultTLSTimeout
}
if respHeader <= 0 {
respHeader = defaultResponseHeaderTimeout
}
key := clientKey{dial: dial, tls: tls, respHeader: respHeader}
clientCacheMu.Lock()
defer clientCacheMu.Unlock()
if c, ok := clientCache[key]; ok {
return c
}
c := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: dial,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: tls,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: respHeader,
},
}
clientCache[key] = c
return c
}
// clientForRemote returns the upstream client for a remote, applying its
// per-remote timeout overrides (in seconds) on top of the defaults.
func clientForRemote(remote models.Remote) *http.Client {
return upstreamClientFor(
time.Duration(remote.UpstreamDialTimeout)*time.Second,
time.Duration(remote.UpstreamTLSTimeout)*time.Second,
time.Duration(remote.UpstreamResponseHeaderTimeout)*time.Second,
)
}
+2 -5
View File
@@ -35,7 +35,6 @@ import (
type Server struct {
cfg *config.Config
version string
router chi.Router
db *database.DB
cache *cache.Redis
@@ -46,7 +45,7 @@ type Server struct {
gc *gc.Collector
}
func New(cfg *config.Config, version string) (*Server, error) {
func New(cfg *config.Config) (*Server, error) {
db, err := database.New(cfg.DatabaseDSN())
if err != nil {
return nil, fmt.Errorf("database: %w", err)
@@ -69,7 +68,6 @@ func New(cfg *config.Config, version string) (*Server, error) {
s := &Server{
cfg: cfg,
version: version,
db: db,
cache: redis,
store: s3,
@@ -98,7 +96,6 @@ func (s *Server) routes() chi.Router {
proxyHandler := v1.NewProxyHandler(s.engine, s.virtEngine, s.db, s.store, s.localHandler)
r.Mount("/api/v1", proxyHandler.Routes())
r.Mount("/v2", proxyHandler.DockerV2Routes())
remotesHandler := v2.NewRemotesHandler(s.db)
virtualsHandler := v2.NewVirtualsHandler(s.db)
@@ -140,7 +137,7 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{"name":"artifactapi","version":"%s"}`, s.version)
fmt.Fprint(w, `{"name":"artifactapi","version":"3.0.0-dev"}`)
}
func (s *Server) newHTTPServer() *http.Server {
+2 -2
View File
@@ -79,7 +79,7 @@ func (e *Engine) fetchMemberIndexes(ctx context.Context, virt models.Virtual, pa
results[idx] = result{err: fmt.Errorf("local index %q: %w", name, err)}
return
}
results[idx] = result{index: MemberIndex{RemoteName: name, RepoType: remote.RepoType, BaseURL: remote.BaseURL, Body: body}}
results[idx] = result{index: MemberIndex{RemoteName: name, RepoType: remote.RepoType, Body: body}}
return
}
@@ -102,7 +102,7 @@ func (e *Engine) fetchMemberIndexes(ctx context.Context, virt models.Virtual, pa
return
}
results[idx] = result{index: MemberIndex{RemoteName: name, RepoType: remote.RepoType, BaseURL: remote.BaseURL, Body: body}}
results[idx] = result{index: MemberIndex{RemoteName: name, RepoType: remote.RepoType, Body: body}}
}(i, memberName)
}
+3 -40
View File
@@ -54,27 +54,15 @@ func (m *HelmMerger) MergeIndexes(members []MemberIndex, proxyBaseURL string) ([
seen[chart][ver.Version] = true
if proxyBaseURL != "" {
routePrefix := "remote"
if member.RepoType == "local" {
routePrefix = "local"
}
baseHost := extractHost(member.BaseURL)
for i, u := range ver.URLs {
if strings.HasPrefix(u, "http://") || strings.HasPrefix(u, "https://") {
if baseHost != "" && extractHost(u) != baseHost {
continue
}
relPath := extractPathRelativeToBase(u, member.BaseURL)
ver.URLs[i] = fmt.Sprintf("%s/api/v1/%s/%s/%s",
ver.URLs[i] = fmt.Sprintf("%s/api/v1/remote/%s/%s",
strings.TrimRight(proxyBaseURL, "/"),
routePrefix,
member.RemoteName,
relPath)
extractPath(u))
} else {
ver.URLs[i] = fmt.Sprintf("%s/api/v1/%s/%s/%s",
ver.URLs[i] = fmt.Sprintf("%s/api/v1/remote/%s/%s",
strings.TrimRight(proxyBaseURL, "/"),
routePrefix,
member.RemoteName,
u)
}
@@ -90,31 +78,6 @@ func (m *HelmMerger) MergeIndexes(members []MemberIndex, proxyBaseURL string) ([
return yaml.Marshal(merged)
}
func extractHost(rawURL string) string {
idx := strings.Index(rawURL, "://")
if idx == -1 {
return ""
}
rest := rawURL[idx+3:]
slashIdx := strings.Index(rest, "/")
if slashIdx == -1 {
return rest
}
return rest[:slashIdx]
}
func extractPathRelativeToBase(rawURL, baseURL string) string {
fullPath := extractPath(rawURL)
basePath := extractPath(baseURL)
if basePath != "" {
basePath = strings.TrimRight(basePath, "/") + "/"
if strings.HasPrefix(fullPath, basePath) {
return fullPath[len(basePath):]
}
}
return fullPath
}
func extractPath(rawURL string) string {
idx := strings.Index(rawURL, "://")
if idx == -1 {
-1
View File
@@ -9,7 +9,6 @@ import (
type MemberIndex struct {
RemoteName string
RepoType models.RepoType
BaseURL string
Body []byte
}
-30
View File
@@ -2,7 +2,6 @@ package models
import (
"fmt"
"regexp"
"time"
)
@@ -47,11 +46,6 @@ type Remote struct {
MutableTTL int `json:"mutable_ttl"`
CheckMutable bool `json:"check_mutable"`
// Upstream HTTP timeouts in seconds. 0 means use the server default.
UpstreamDialTimeout int `json:"upstream_dial_timeout,omitempty"`
UpstreamTLSTimeout int `json:"upstream_tls_timeout,omitempty"`
UpstreamResponseHeaderTimeout int `json:"upstream_response_header_timeout,omitempty"`
Patterns []string `json:"patterns,omitempty"`
Blocklist []string `json:"blocklist,omitempty"`
MutablePatterns []string `json:"mutable_patterns,omitempty"`
@@ -72,30 +66,6 @@ type Remote struct {
UpdatedAt time.Time `json:"updated_at"`
}
// ValidatePatterns ensures every configured regex compiles. Storing an
// invalid pattern would otherwise be silently dropped at match time, which
// for the blocklist is a fail-open: a mistyped deny rule becomes a no-op.
func (r *Remote) ValidatePatterns() error {
groups := []struct {
field string
patterns []string
}{
{"patterns", r.Patterns},
{"blocklist", r.Blocklist},
{"mutable_patterns", r.MutablePatterns},
{"immutable_patterns", r.ImmutablePatterns},
{"ban_tags", r.BanTags},
}
for _, g := range groups {
for _, p := range g.patterns {
if _, err := regexp.Compile(p); err != nil {
return fmt.Errorf("invalid regex in %s: %q: %w", g.field, p, err)
}
}
}
return nil
}
type RemoteWithStats struct {
Remote
Stats RemoteStats `json:"stats"`
-19
View File
@@ -1,19 +0,0 @@
package models
import "testing"
func TestRemote_ValidatePatterns(t *testing.T) {
valid := &Remote{
Patterns: []string{`.*\.tar\.gz$`},
Blocklist: []string{`^secret/`},
ImmutablePatterns: []string{`\.rpm$`},
}
if err := valid.ValidatePatterns(); err != nil {
t.Fatalf("expected valid patterns, got %v", err)
}
bad := &Remote{Blocklist: []string{`[unterminated`}}
if err := bad.ValidatePatterns(); err == nil {
t.Fatal("expected error for invalid blocklist regex, got nil")
}
}
-7
View File
@@ -6,20 +6,13 @@ COPY package.json package-lock.json* ./
RUN npm ci
COPY . .
ARG BASE_PATH=/
ENV BASE_PATH=${BASE_PATH}
RUN npm run build
FROM nginx:alpine
ARG BASE_PATH=/
COPY --from=builder /app/dist /usr/share/nginx/html
COPY nginx.conf /etc/nginx/conf.d/default.conf
RUN sed -i "s|\${BASE_PATH}|${BASE_PATH}|g" /etc/nginx/conf.d/default.conf
EXPOSE 80
CMD ["nginx", "-g", "daemon off;"]
+26 -5
View File
@@ -5,12 +5,33 @@ server {
root /usr/share/nginx/html;
index index.html;
location ${BASE_PATH}/ {
rewrite ^${BASE_PATH}(/.*)$ $1 break;
try_files $uri $uri/ /index.html;
location /api/ {
proxy_pass http://artifactapi:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_buffering off;
}
location = ${BASE_PATH} {
return 301 ${BASE_PATH}/;
location /v2/ {
proxy_pass http://artifactapi:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_buffering off;
}
location /health {
proxy_pass http://artifactapi:8000;
}
location /metrics {
proxy_pass http://artifactapi:8000;
}
location / {
try_files $uri $uri/ /index.html;
}
}
-6
View File
@@ -2,8 +2,6 @@ import { Routes, Route, NavLink } from 'react-router-dom';
import { Dashboard } from './pages/Dashboard';
import { Remotes } from './pages/Remotes';
import { RemoteDetail } from './pages/RemoteDetail';
import { Locals } from './pages/Locals';
import { LocalDetail } from './pages/LocalDetail';
import { Virtuals } from './pages/Virtuals';
import { Objects } from './pages/Objects';
import { Probe } from './pages/Probe';
@@ -20,7 +18,6 @@ export function App() {
<div className="sidebar-nav">
<NavLink to="/" end>Dashboard</NavLink>
<NavLink to="/remotes">Remotes</NavLink>
<NavLink to="/locals">Locals</NavLink>
<NavLink to="/virtuals">Virtuals</NavLink>
<NavLink to="/probe">Test Remote</NavLink>
</div>
@@ -34,9 +31,6 @@ export function App() {
<Route path="/remotes" element={<Remotes />} />
<Route path="/remotes/:name" element={<RemoteDetail />} />
<Route path="/remotes/:name/objects" element={<Objects />} />
<Route path="/locals" element={<Locals />} />
<Route path="/locals/:name" element={<LocalDetail />} />
<Route path="/locals/:name/objects" element={<Objects />} />
<Route path="/virtuals" element={<Virtuals />} />
<Route path="/probe" element={<Probe />} />
</Routes>
+1 -5
View File
@@ -4,13 +4,9 @@ import { BrowserRouter } from 'react-router-dom';
import { App } from './App';
import './index.css';
declare const __BASE_PATH__: string;
const basename = __BASE_PATH__.replace(/\/+$/, '') || '/';
createRoot(document.getElementById('root')!).render(
<StrictMode>
<BrowserRouter basename={basename}>
<BrowserRouter>
<App />
</BrowserRouter>
</StrictMode>,
-5
View File
@@ -50,11 +50,6 @@ export function Dashboard() {
value={formatNumber(stats.total_blobs_deduped)}
sub="shared blobs"
/>
<StatsCard
label="Bandwidth Saved"
value={formatBytes(stats.bandwidth_saved_30d)}
sub="last 30 days"
/>
</div>
{health && (
-46
View File
@@ -1,46 +0,0 @@
import { useEffect, useState } from 'react';
import { useParams, Link } from 'react-router-dom';
import { api } from '../api/client';
import type { Remote } from '../api/types';
import { Badge } from '../components/Badge';
import './RemoteDetail.css';
export function LocalDetail() {
const { name } = useParams<{ name: string }>();
const [remote, setRemote] = useState<Remote | null>(null);
const [error, setError] = useState<string | null>(null);
useEffect(() => {
if (!name) return;
api.getRemote(name)
.then(setRemote)
.catch(e => setError(e.message));
}, [name]);
if (error) return <div className="error-banner">{error}</div>;
if (!remote) return <div className="loading">Loading...</div>;
return (
<div>
<div className="detail-header">
<Link to="/locals" className="back-link">&larr; Locals</Link>
<h1 className="page-title">{remote.name}</h1>
<div className="detail-badges">
<Badge variant="blue">{remote.package_type}</Badge>
<Badge variant="default">local</Badge>
{remote.managed_by && <Badge variant="green">managed by {remote.managed_by}</Badge>}
</div>
</div>
{remote.description && (
<p className="detail-description">{remote.description}</p>
)}
<div className="detail-actions">
<Link to={`/locals/${remote.name}/objects`} className="btn btn-primary">
Browse Files
</Link>
</div>
</div>
);
}
-93
View File
@@ -1,93 +0,0 @@
import { useEffect, useState } from 'react';
import { useNavigate } from 'react-router-dom';
import { api } from '../api/client';
import type { Remote } from '../api/types';
import { Badge } from '../components/Badge';
import { DataTable } from '../components/DataTable';
import './Remotes.css';
const typeColors: Record<string, 'blue' | 'green' | 'yellow' | 'red' | 'default'> = {
docker: 'blue',
helm: 'green',
rpm: 'yellow',
pypi: 'blue',
npm: 'red',
generic: 'default',
alpine: 'green',
puppet: 'yellow',
terraform: 'blue',
goproxy: 'green',
};
export function Locals() {
const navigate = useNavigate();
const [remotes, setRemotes] = useState<Remote[]>([]);
const [filter, setFilter] = useState('');
const [loading, setLoading] = useState(true);
useEffect(() => {
api.listRemotes()
.then(r => setRemotes((r || []).filter(x => x.repo_type === 'local')))
.finally(() => setLoading(false));
}, []);
const filtered = remotes.filter(r => {
if (filter && !r.name.toLowerCase().includes(filter.toLowerCase())) return false;
return true;
});
return (
<div>
<h1 className="page-title">Local Repositories</h1>
<div className="remotes-toolbar">
<input
className="search-input"
placeholder="Filter by name..."
value={filter}
onChange={e => setFilter(e.target.value)}
/>
<span className="result-count">{filtered.length} locals</span>
</div>
{loading ? (
<div className="loading">Loading...</div>
) : (
<DataTable
columns={[
{
key: 'name',
header: 'Name',
render: (r: Remote) => <span className="mono">{r.name}</span>,
},
{
key: 'type',
header: 'Type',
render: (r: Remote) => (
<Badge variant={typeColors[r.package_type] || 'default'}>
{r.package_type}
</Badge>
),
width: '110px',
},
{
key: 'description',
header: 'Description',
render: (r: Remote) => r.description || <span className="text-muted"></span>,
},
{
key: 'managed',
header: 'Managed',
render: (r: Remote) =>
r.managed_by ? <Badge variant="blue">{r.managed_by}</Badge> : <span className="text-muted"></span>,
width: '100px',
},
]}
data={filtered}
emptyMessage="No local repositories configured"
onRowClick={(r) => navigate(`/locals/${r.name}`)}
/>
)}
</div>
);
}
+2 -5
View File
@@ -1,5 +1,5 @@
import { useEffect, useState, useCallback, useMemo } from 'react';
import { useParams, useLocation, Link } from 'react-router-dom';
import { useParams, Link } from 'react-router-dom';
import { api } from '../api/client';
import type { Artifact } from '../api/types';
import { formatBytes, timeAgo, truncateHash } from '../components/format';
@@ -171,9 +171,6 @@ function TreeRow({ node, depth, expanded, onToggle, onEvict }: TreeRowProps) {
export function Objects() {
const { name } = useParams<{ name: string }>();
const location = useLocation();
const isLocal = location.pathname.startsWith('/locals/');
const backLink = isLocal ? `/locals/${name}` : `/remotes/${name}`;
const [artifacts, setArtifacts] = useState<Artifact[]>([]);
const [loading, setLoading] = useState(true);
const [filter, setFilter] = useState('');
@@ -236,7 +233,7 @@ export function Objects() {
return (
<div>
<div className="detail-header">
<Link to={backLink} className="back-link">&larr; {name}</Link>
<Link to={`/remotes/${name}`} className="back-link">&larr; {name}</Link>
<h1 className="page-title">Cached Objects</h1>
</div>
+2 -3
View File
@@ -32,10 +32,9 @@ export function Remotes() {
.finally(() => setLoading(false));
}, []);
const remoteOnly = remotes.filter(r => r.repo_type !== 'local');
const types = [...new Set(remoteOnly.map(r => r.package_type))].sort();
const types = [...new Set(remotes.map(r => r.package_type))].sort();
const filtered = remoteOnly.filter(r => {
const filtered = remotes.filter(r => {
if (typeFilter && r.package_type !== typeFilter) return false;
if (filter && !r.name.toLowerCase().includes(filter.toLowerCase())) return false;
return true;
+10 -32
View File
@@ -1,38 +1,21 @@
import { useEffect, useState } from 'react';
import { Link } from 'react-router-dom';
import { api } from '../api/client';
import type { Remote, Virtual } from '../api/types';
import type { Virtual } from '../api/types';
import { Badge } from '../components/Badge';
import { DataTable } from '../components/DataTable';
import './Virtuals.css';
export function Virtuals() {
const [virtuals, setVirtuals] = useState<Virtual[]>([]);
const [remoteMap, setRemoteMap] = useState<Record<string, Remote>>({});
const [loading, setLoading] = useState(true);
const [expanded, setExpanded] = useState<string | null>(null);
useEffect(() => {
Promise.all([api.listVirtuals(), api.listRemotes()])
.then(([v, r]) => {
setVirtuals(v || []);
const map: Record<string, Remote> = {};
for (const remote of r || []) {
map[remote.name] = remote;
}
setRemoteMap(map);
})
api.listVirtuals()
.then(v => setVirtuals(v || []))
.finally(() => setLoading(false));
}, []);
function memberLink(name: string) {
const remote = remoteMap[name];
if (remote?.repo_type === 'local') {
return `/locals/${name}`;
}
return `/remotes/${name}`;
}
return (
<div>
<h1 className="page-title">Virtual Repositories</h1>
@@ -57,7 +40,7 @@ export function Virtuals() {
key: 'members',
header: 'Members',
render: (v: Virtual) => (
<span className="member-count">{v.members?.length || 0} repos</span>
<span className="member-count">{v.members?.length || 0} remotes</span>
),
width: '110px',
},
@@ -86,17 +69,12 @@ export function Virtuals() {
<ul className="member-list">
{virtuals
.find(v => v.name === expanded)
?.members?.map((m, i) => {
const remote = remoteMap[m];
const typeLabel = remote?.repo_type === 'local' ? 'local' : 'remote';
return (
<li key={m}>
<span className="member-priority">{i + 1}</span>
<Link to={memberLink(m)} className="mono">{m}</Link>
<Badge variant={typeLabel === 'local' ? 'yellow' : 'default'}>{typeLabel}</Badge>
</li>
);
})}
?.members?.map((m, i) => (
<li key={m}>
<span className="member-priority">{i + 1}</span>
<a href={`/remotes/${m}`} className="mono">{m}</a>
</li>
))}
</ul>
</div>
)}
-6
View File
@@ -1,10 +1,7 @@
import { defineConfig } from 'vite'
import react from '@vitejs/plugin-react'
const basePath = process.env.BASE_PATH || '/'
export default defineConfig({
base: basePath,
plugins: [react()],
server: {
proxy: {
@@ -14,7 +11,4 @@ export default defineConfig({
'/metrics': 'http://localhost:8000',
},
},
define: {
'__BASE_PATH__': JSON.stringify(basePath),
},
})