Compare commits

..

2 Commits

Author SHA1 Message Date
benvin 756846c0ba Merge branch 'master' into benvin/local-terraform-registry 2026-06-22 23:32:12 +10:00
unkinben ab44271e82 feat: add local repository type with repo_type field
Introduces repo_type (remote/local) as a separate axis from package_type
so that any package type can be hosted locally. A terraform local repo
is package_type=terraform + repo_type=local.

- Remote model gains RepoType field (defaults to "remote")
- Database schema adds repo_type column with migration for existing DBs
- V1 proxy adds /api/v1/local/{name}/* route for serving local files
- V2 upload via PUT /api/v2/remotes/{name}/files/{ns}/{type}/{file}.zip
  validates filename matches terraform-provider-{type}_{ver}_{os}_{arch}.zip
  and returns 409 on duplicate (no overwrites)
- index.json and {version}.json are computed on-the-fly from uploaded zips
  rather than stored as separate files
- V2 create validates repo_type and requires base_url only for remotes
2026-06-22 22:51:41 +10:00
70 changed files with 325 additions and 2728 deletions
-4
View File
@@ -1,6 +1,2 @@
bin/
terraform/
# e2e-docker fixtures are real package files (.rpm, .tgz, .whl, .zip, ...) that
# are intentionally tracked, overriding any global ignore of those extensions.
!e2e-docker/fixtures/**
-24
View File
@@ -1,24 +0,0 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- id: check-merge-conflict
- repo: https://github.com/dnephin/pre-commit-golang
rev: v0.5.1
hooks:
- id: go-fmt
- id: go-mod-tidy
- repo: local
hooks:
- id: go-vet
name: go vet
entry: go vet ./...
language: system
types: [go]
pass_filenames: false
-4
View File
@@ -8,8 +8,6 @@ steps:
settings:
registry: git.unkin.net
repo: git.unkin.net/unkin/artifactapi
build_args:
VERSION: ${CI_COMMIT_TAG}
username: droneci
password:
from_secret: DRONECI_PASSWORD
@@ -24,8 +22,6 @@ steps:
repo: git.unkin.net/unkin/artifactapi-ui
dockerfile: ui/Dockerfile.ui
context: ui
build_args:
BASE_PATH: /ui
username: droneci
password:
from_secret: DRONECI_PASSWORD
+3 -11
View File
@@ -3,15 +3,7 @@ when:
steps:
- name: pre-commit
image: git.unkin.net/unkin/almalinux9-gobuilder:20260606
image: golang:1.25
commands:
- uvx pre-commit run --all-files
backend_options:
kubernetes:
resources:
requests:
memory: 512Mi
cpu: 1
limits:
memory: 2Gi
cpu: 2
- test -z "$(gofmt -l .)"
- go vet ./...
+1 -2
View File
@@ -9,8 +9,7 @@ RUN go mod download
COPY . .
ARG VERSION=dev
RUN CGO_ENABLED=0 go build -ldflags="-s -w -X main.version=${VERSION}" -o artifactapi ./cmd/artifactapi
RUN CGO_ENABLED=0 go build -ldflags="-s -w" -o artifactapi ./cmd/artifactapi
FROM gcr.io/distroless/static-debian12:nonroot
+2 -7
View File
@@ -1,4 +1,4 @@
.PHONY: build test lint fmt e2e docker-e2e docker docker-ui compose clean tidy check-go
.PHONY: build test lint fmt e2e docker docker-ui compose clean tidy check-go
BINARY := bin/artifactapi
MODULE := git.unkin.net/unkin/artifactapi
@@ -12,7 +12,7 @@ check-go:
fi
build: check-go tidy
go build -ldflags="-s -w -X main.version=$(VERSION)" -o $(BINARY) ./cmd/artifactapi
go build -ldflags="-s -w" -o $(BINARY) ./cmd/artifactapi
test: check-go
go test -race -count=1 ./pkg/... ./internal/...
@@ -28,11 +28,6 @@ fmt: check-go
e2e: check-go
TESTCONTAINERS_RYUK_DISABLED=true go test -tags=e2e -race -count=1 -timeout=5m ./e2e/...
# Build the container, bring up the full docker-compose stack + a mock upstream,
# and run the black-box suite against the running product.
docker-e2e: check-go
./scripts/docker-e2e.sh
docker:
docker build -t artifactapi:$(VERSION) .
+1 -3
View File
@@ -13,8 +13,6 @@ import (
"git.unkin.net/unkin/artifactapi/internal/tui"
)
var version = "dev"
func main() {
if len(os.Args) > 1 && os.Args[1] == "tui" {
endpoint := os.Getenv("ARTIFACTAPI_ENDPOINT")
@@ -44,7 +42,7 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
srv, err := server.New(cfg, version)
srv, err := server.New(cfg)
if err != nil {
slog.Error("failed to create server", "error", err)
os.Exit(1)
-18
View File
@@ -1,18 +0,0 @@
# Overlay for the dockerised end-to-end suite (scripts/docker-e2e.sh).
# Adds a static mock upstream that the artifactapi container proxies, so the
# caching tests are hermetic and need no internet access.
services:
mockupstream:
image: nginx:alpine
volumes:
- ./e2e-docker/fixtures:/usr/share/nginx/html:ro,z
# No host port needed: only the artifactapi container talks to it, and the
# tests compare served bytes against the on-disk fixtures.
artifactapi:
# The host port is set via ARTIFACTAPI_PORT (see scripts/docker-e2e.sh),
# defaulting to 8000; the e2e run uses 8001 to avoid colliding with a
# locally-running instance.
depends_on:
mockupstream:
condition: service_started
+1 -1
View File
@@ -2,7 +2,7 @@ services:
artifactapi:
build: .
ports:
- "${ARTIFACTAPI_PORT:-8000}:8000"
- "8000:8000"
environment:
LISTEN_ADDR: ":8000"
DBHOST: postgres
-39
View File
@@ -1,39 +0,0 @@
# Dockerised end-to-end suite
Black-box tests that run against a fully **containerised** artifactapi stack
(built image + Postgres + Redis + MinIO) plus a static mock upstream. Unlike the
in-process `e2e/` suite (testcontainers, server run in-process), these only speak
HTTP to the running product, so they exercise the shipped container image.
## Run
```bash
make docker-e2e # build image, compose up, run suite, compose down
```
`scripts/docker-e2e.sh` builds and starts `docker-compose.yml` +
`docker-compose.e2e.yml`, waits for `/health`, then runs
`go test -tags=dockere2e ./e2e-docker/...` and tears everything down.
The stack publishes artifactapi on host port **8001** (to avoid colliding with a
local instance on 8000). Override with `ARTIFACTAPI_URL` to point the tests at an
already-running stack.
## Coverage
- **Repository lifecycle** — add / change / delete for remote, local and virtual repos.
- **Caching** — one immutable artifact per remote package type (generic, docker,
helm, pypi, npm, rpm, alpine, puppet, terraform, goproxy) proxied through the
mock upstream: first fetch `X-Artifact-Source: remote`, second `cache`, bytes
verified against the origin fixture.
- **Local uploads** — generic (upload/download), pypi (wheel + generated `simple/`
index), rpm (real package + **automatic repodata** generation).
- **Virtual repositories** — pypi simple-index merge and helm `index.yaml` merge
across two members.
## Fixtures
`fixtures/` is served by the mock upstream at its web root. Paths mirror each
provider's upstream URL layout (e.g. `v2/...` for docker, `v1/providers/...` for
terraform). The RPM under `fixtures/rpmrepo/Packages/` is a real package so the
rpm provider can parse its metadata for repodata generation.
-76
View File
@@ -1,76 +0,0 @@
//go:build dockere2e
package e2edocker
import (
"bytes"
"fmt"
"net/http"
"testing"
)
// TestCachingPerProvider proxies one immutable artifact for every remote
// package type through the mock upstream and asserts: first fetch is served
// from the remote, the second from cache, and the bytes match the origin.
func TestCachingPerProvider(t *testing.T) {
cases := []struct {
pkgType string
// path is the request path under /api/v1/remote/<name>/. The provider
// derives the upstream URL from it (docker prepends /v2/, terraform
// prepends /v1/providers/), and the fixture lives at that resolved path.
path string
fixture string
}{
{"generic", "blobs/hello.bin", "blobs/hello.bin"},
{"npm", "mypkg/-/mypkg-1.0.0.tgz", "mypkg/-/mypkg-1.0.0.tgz"},
{"helm", "charts/mychart-1.0.0.tgz", "charts/mychart-1.0.0.tgz"},
{"pypi", "packages/foo-1.0-py3-none-any.whl", "packages/foo-1.0-py3-none-any.whl"},
{"rpm", "rpmrepo/Packages/e2e-testpkg-1.0-1.noarch.rpm", "rpmrepo/Packages/e2e-testpkg-1.0-1.noarch.rpm"},
{"alpine", "alpine/x86_64/testpkg-1.0-r0.apk", "alpine/x86_64/testpkg-1.0-r0.apk"},
{"puppet", "puppet-releases/author-mod-1.0.0.tar.gz", "puppet-releases/author-mod-1.0.0.tar.gz"},
{"goproxy", "goproxy/example.com/mod/@v/v1.0.0.zip", "goproxy/example.com/mod/@v/v1.0.0.zip"},
{"terraform", "hashicorp/aws/download/pkg.zip", "v1/providers/hashicorp/aws/download/pkg.zip"},
{"docker", "library/testimg/blobs/blobdata", "v2/library/testimg/blobs/blobdata"},
}
for _, tc := range cases {
t.Run(tc.pkgType, func(t *testing.T) {
name := "cache-" + tc.pkgType
createRepo(t, fmt.Sprintf(`{
"name": %q,
"package_type": %q,
"repo_type": "remote",
"base_url": %q,
"stale_on_error": true
}`, name, tc.pkgType, mockUpstream()))
defer deleteRepo(t, name)
want := fixtureBytes(t, tc.fixture)
url := api("/api/v1/remote/" + name + "/" + tc.path)
// First fetch: from remote.
resp, body := doRequest(t, http.MethodGet, url, nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("first fetch: status %d: %s", resp.StatusCode, body)
}
if src := resp.Header.Get("X-Artifact-Source"); src != "remote" {
t.Fatalf("first fetch source = %q, want remote", src)
}
if !bytes.Equal(body, want) {
t.Fatalf("first fetch body mismatch: got %d bytes, want %d", len(body), len(want))
}
// Second fetch: from cache, identical bytes.
resp, body = doRequest(t, http.MethodGet, url, nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("second fetch: status %d: %s", resp.StatusCode, body)
}
if src := resp.Header.Get("X-Artifact-Source"); src != "cache" {
t.Fatalf("second fetch source = %q, want cache", src)
}
if !bytes.Equal(body, want) {
t.Fatalf("cached body mismatch: got %d bytes, want %d", len(body), len(want))
}
})
}
}
Binary file not shown.
-1
View File
@@ -1 +0,0 @@
hello artifactapi generic blob
Binary file not shown.
-8
View File
@@ -1,8 +0,0 @@
apiVersion: v1
entries:
alpha:
- name: alpha
version: 1.0.0
urls:
- charts/alpha-1.0.0.tgz
generated: "2026-01-01T00:00:00Z"
-8
View File
@@ -1,8 +0,0 @@
apiVersion: v1
entries:
beta:
- name: beta
version: 2.0.0
urls:
- charts/beta-2.0.0.tgz
generated: "2026-01-01T00:00:00Z"
Binary file not shown.
-108
View File
@@ -1,108 +0,0 @@
//go:build dockere2e
// Package e2edocker holds the black-box end-to-end suite that runs against a
// fully dockerised artifactapi stack (see scripts/docker-e2e.sh). Unlike the
// in-process e2e suite, these tests only speak HTTP to the running container.
package e2edocker
import (
"bytes"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"testing"
"time"
)
func baseURL() string {
if v := os.Getenv("ARTIFACTAPI_URL"); v != "" {
return strings.TrimRight(v, "/")
}
return "http://localhost:8000"
}
// mockUpstream is the base URL the artifactapi *container* uses to reach the
// static mock upstream. It is resolved on the compose network, not the host.
func mockUpstream() string {
if v := os.Getenv("MOCK_UPSTREAM_INTERNAL"); v != "" {
return strings.TrimRight(v, "/")
}
return "http://mockupstream"
}
func api(path string) string { return baseURL() + path }
func fixtureBytes(t *testing.T, rel string) []byte {
t.Helper()
b, err := os.ReadFile(filepath.Join("fixtures", rel))
if err != nil {
t.Fatalf("read fixture %s: %v", rel, err)
}
return b
}
func doRequest(t *testing.T, method, url string, body []byte, contentType string) (*http.Response, []byte) {
t.Helper()
var r io.Reader
if body != nil {
r = bytes.NewReader(body)
}
req, err := http.NewRequest(method, url, r)
if err != nil {
t.Fatalf("%s %s: %v", method, url, err)
}
if contentType != "" {
req.Header.Set("Content-Type", contentType)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("%s %s: %v", method, url, err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
return resp, respBody
}
func createRepo(t *testing.T, jsonBody string) {
t.Helper()
resp, body := doRequest(t, http.MethodPost, api("/api/v2/remotes"), []byte(jsonBody), "application/json")
if resp.StatusCode != http.StatusCreated {
t.Fatalf("create repo: status %d: %s", resp.StatusCode, body)
}
}
func deleteRepo(t *testing.T, name string) {
t.Helper()
doRequest(t, http.MethodDelete, api("/api/v2/remotes/"+name), nil, "")
}
func createVirtual(t *testing.T, jsonBody string) {
t.Helper()
resp, body := doRequest(t, http.MethodPost, api("/api/v2/virtuals"), []byte(jsonBody), "application/json")
if resp.StatusCode != http.StatusCreated {
t.Fatalf("create virtual: status %d: %s", resp.StatusCode, body)
}
}
func deleteVirtual(t *testing.T, name string) {
t.Helper()
doRequest(t, http.MethodDelete, api("/api/v2/virtuals/"+name), nil, "")
}
// getEventually retries a GET until it returns 200 or the deadline passes. Used
// for asynchronously-generated artifacts (e.g. rpm repodata after upload).
func getEventually(t *testing.T, url string, timeout time.Duration) (*http.Response, []byte) {
t.Helper()
deadline := time.Now().Add(timeout)
var resp *http.Response
var body []byte
for {
resp, body = doRequest(t, http.MethodGet, url, nil, "")
if resp.StatusCode == http.StatusOK || time.Now().After(deadline) {
return resp, body
}
time.Sleep(250 * time.Millisecond)
}
}
-93
View File
@@ -1,93 +0,0 @@
//go:build dockere2e
package e2edocker
import (
"bytes"
"net/http"
"strings"
"testing"
"time"
)
func uploadFile(t *testing.T, repo, filePath string, body []byte, contentType string) {
t.Helper()
url := api("/api/v2/remotes/" + repo + "/files/" + filePath)
resp, respBody := doRequest(t, http.MethodPut, url, body, contentType)
if resp.StatusCode != http.StatusCreated {
t.Fatalf("upload %s: status %d: %s", filePath, resp.StatusCode, respBody)
}
}
// TestLocalGenericUpload uploads a generic file and downloads it back.
func TestLocalGenericUpload(t *testing.T) {
createRepo(t, `{"name":"local-generic","package_type":"generic","repo_type":"local"}`)
defer deleteRepo(t, "local-generic")
content := []byte("artifactapi local generic upload payload")
uploadFile(t, "local-generic", "data/hello.bin", content, "application/octet-stream")
resp, body := doRequest(t, http.MethodGet, api("/api/v1/local/local-generic/data/hello.bin"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("download: status %d: %s", resp.StatusCode, body)
}
if !bytes.Equal(body, content) {
t.Fatalf("downloaded content mismatch")
}
}
// TestLocalPyPIUpload uploads a wheel and validates the generated simple index.
func TestLocalPyPIUpload(t *testing.T) {
createRepo(t, `{"name":"local-pypi","package_type":"pypi","repo_type":"local"}`)
defer deleteRepo(t, "local-pypi")
wheel := fixtureBytes(t, "packages/foo-1.0-py3-none-any.whl")
uploadFile(t, "local-pypi", "foo-1.0-py3-none-any.whl", wheel, "application/zip")
// Root index lists the package.
resp, body := doRequest(t, http.MethodGet, api("/api/v1/local/local-pypi/simple/"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("simple index: status %d: %s", resp.StatusCode, body)
}
if !strings.Contains(string(body), "foo") {
t.Fatalf("simple index missing package 'foo': %s", body)
}
// Per-package index lists the wheel file.
resp, body = doRequest(t, http.MethodGet, api("/api/v1/local/local-pypi/simple/foo/"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("package index: status %d: %s", resp.StatusCode, body)
}
if !strings.Contains(string(body), "foo-1.0-py3-none-any.whl") {
t.Fatalf("package index missing wheel: %s", body)
}
// The wheel downloads back byte-identical.
resp, body = doRequest(t, http.MethodGet, api("/api/v1/local/local-pypi/foo/foo-1.0-py3-none-any.whl"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("download wheel: status %d: %s", resp.StatusCode, body)
}
if !bytes.Equal(body, wheel) {
t.Fatalf("wheel content mismatch")
}
}
// TestLocalRPMRepodata uploads a real RPM and validates that repodata is
// generated automatically (the special rpm-local feature).
func TestLocalRPMRepodata(t *testing.T) {
createRepo(t, `{"name":"local-rpm","package_type":"rpm","repo_type":"local"}`)
defer deleteRepo(t, "local-rpm")
rpm := fixtureBytes(t, "rpmrepo/Packages/e2e-testpkg-1.0-1.noarch.rpm")
uploadFile(t, "local-rpm", "e2e-testpkg-1.0-1.noarch.rpm", rpm, "application/x-rpm")
// repodata is generated asynchronously after upload; poll for it.
resp, body := getEventually(t, api("/api/v1/local/local-rpm/repodata/repomd.xml"), 15*time.Second)
if resp.StatusCode != http.StatusOK {
t.Fatalf("repomd.xml: status %d: %s", resp.StatusCode, body)
}
s := string(body)
if !strings.Contains(s, "<repomd") || !strings.Contains(s, "primary") {
t.Fatalf("repomd.xml not a valid repodata document: %s", s)
}
}
-134
View File
@@ -1,134 +0,0 @@
//go:build dockere2e
package e2edocker
import (
"encoding/json"
"net/http"
"testing"
)
func TestHealth(t *testing.T) {
resp, body := doRequest(t, http.MethodGet, api("/health"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("health: status %d: %s", resp.StatusCode, body)
}
}
// TestRemoteLifecycle covers add/change/delete for a remote repository.
func TestRemoteLifecycle(t *testing.T) {
createRepo(t, `{
"name": "crud-remote",
"package_type": "generic",
"repo_type": "remote",
"base_url": "https://example.com",
"mutable_ttl": 600,
"stale_on_error": true
}`)
defer deleteRepo(t, "crud-remote")
got := getRepo(t, "crud-remote")
if got["base_url"] != "https://example.com" || got["mutable_ttl"].(float64) != 600 {
t.Fatalf("unexpected created remote: %v", got)
}
// change
resp, body := doRequest(t, http.MethodPut, api("/api/v2/remotes/crud-remote"), []byte(`{
"package_type": "generic",
"base_url": "https://updated.example.com",
"mutable_ttl": 120,
"stale_on_error": true
}`), "application/json")
if resp.StatusCode != http.StatusOK {
t.Fatalf("update remote: status %d: %s", resp.StatusCode, body)
}
got = getRepo(t, "crud-remote")
if got["base_url"] != "https://updated.example.com" || got["mutable_ttl"].(float64) != 120 {
t.Fatalf("update not applied: %v", got)
}
// delete
resp, _ = doRequest(t, http.MethodDelete, api("/api/v2/remotes/crud-remote"), nil, "")
if resp.StatusCode != http.StatusNoContent {
t.Fatalf("delete remote: status %d", resp.StatusCode)
}
resp, _ = doRequest(t, http.MethodGet, api("/api/v2/remotes/crud-remote"), nil, "")
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("expected 404 after delete, got %d", resp.StatusCode)
}
}
// TestLocalLifecycle covers add/delete for a local repository.
func TestLocalLifecycle(t *testing.T) {
createRepo(t, `{
"name": "crud-local",
"package_type": "generic",
"repo_type": "local"
}`)
defer deleteRepo(t, "crud-local")
got := getRepo(t, "crud-local")
if got["repo_type"] != "local" {
t.Fatalf("expected repo_type local, got %v", got["repo_type"])
}
resp, _ := doRequest(t, http.MethodDelete, api("/api/v2/remotes/crud-local"), nil, "")
if resp.StatusCode != http.StatusNoContent {
t.Fatalf("delete local: status %d", resp.StatusCode)
}
}
// TestVirtualLifecycle covers add/change/delete for a virtual repository.
func TestVirtualLifecycle(t *testing.T) {
createRepo(t, `{"name":"vmem-a","package_type":"helm","repo_type":"remote","base_url":"https://a.example.com","stale_on_error":true}`)
createRepo(t, `{"name":"vmem-b","package_type":"helm","repo_type":"remote","base_url":"https://b.example.com","stale_on_error":true}`)
defer deleteRepo(t, "vmem-a")
defer deleteRepo(t, "vmem-b")
createVirtual(t, `{
"name": "crud-virtual",
"package_type": "helm",
"members": ["vmem-a"]
}`)
defer deleteVirtual(t, "crud-virtual")
// change members
resp, body := doRequest(t, http.MethodPut, api("/api/v2/virtuals/crud-virtual"), []byte(`{
"package_type": "helm",
"members": ["vmem-a", "vmem-b"]
}`), "application/json")
if resp.StatusCode != http.StatusOK {
t.Fatalf("update virtual: status %d: %s", resp.StatusCode, body)
}
resp, body = doRequest(t, http.MethodGet, api("/api/v2/virtuals/crud-virtual"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("get virtual: status %d: %s", resp.StatusCode, body)
}
var v map[string]any
if err := json.Unmarshal(body, &v); err != nil {
t.Fatalf("decode virtual: %v", err)
}
members, _ := v["members"].([]any)
if len(members) != 2 {
t.Fatalf("expected 2 members after update, got %v", v["members"])
}
resp, _ = doRequest(t, http.MethodDelete, api("/api/v2/virtuals/crud-virtual"), nil, "")
if resp.StatusCode != http.StatusNoContent {
t.Fatalf("delete virtual: status %d", resp.StatusCode)
}
}
func getRepo(t *testing.T, name string) map[string]any {
t.Helper()
resp, body := doRequest(t, http.MethodGet, api("/api/v2/remotes/"+name), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("get remote %s: status %d: %s", name, resp.StatusCode, body)
}
var m map[string]any
if err := json.Unmarshal(body, &m); err != nil {
t.Fatalf("decode remote %s: %v", name, err)
}
return m
}
-54
View File
@@ -1,54 +0,0 @@
//go:build dockere2e
package e2edocker
import (
"net/http"
"strings"
"testing"
)
// TestVirtualPyPIMerge uploads different packages to two pypi locals and
// checks that a virtual over them serves a merged simple index.
func TestVirtualPyPIMerge(t *testing.T) {
createRepo(t, `{"name":"pmerge-a","package_type":"pypi","repo_type":"local"}`)
createRepo(t, `{"name":"pmerge-b","package_type":"pypi","repo_type":"local"}`)
defer deleteRepo(t, "pmerge-a")
defer deleteRepo(t, "pmerge-b")
uploadFile(t, "pmerge-a", "foo-1.0-py3-none-any.whl", fixtureBytes(t, "packages/foo-1.0-py3-none-any.whl"), "application/zip")
uploadFile(t, "pmerge-b", "bar-2.0-py3-none-any.whl", []byte("bar wheel payload"), "application/zip")
createVirtual(t, `{"name":"pmerge-v","package_type":"pypi","members":["pmerge-a","pmerge-b"]}`)
defer deleteVirtual(t, "pmerge-v")
resp, body := doRequest(t, http.MethodGet, api("/api/v1/virtual/pmerge-v/simple/"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("virtual simple index: status %d: %s", resp.StatusCode, body)
}
s := string(body)
if !strings.Contains(s, "foo") || !strings.Contains(s, "bar") {
t.Fatalf("merged index missing a member package (want foo and bar): %s", s)
}
}
// TestVirtualHelmMerge points two helm remotes at mock index.yaml documents
// with distinct charts and checks the virtual merges both into one index.
func TestVirtualHelmMerge(t *testing.T) {
createRepo(t, `{"name":"hmerge-a","package_type":"helm","repo_type":"remote","base_url":"`+mockUpstream()+`/helm-a","stale_on_error":true}`)
createRepo(t, `{"name":"hmerge-b","package_type":"helm","repo_type":"remote","base_url":"`+mockUpstream()+`/helm-b","stale_on_error":true}`)
defer deleteRepo(t, "hmerge-a")
defer deleteRepo(t, "hmerge-b")
createVirtual(t, `{"name":"hmerge-v","package_type":"helm","members":["hmerge-a","hmerge-b"]}`)
defer deleteVirtual(t, "hmerge-v")
resp, body := doRequest(t, http.MethodGet, api("/api/v1/virtual/hmerge-v/index.yaml"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("virtual index.yaml: status %d: %s", resp.StatusCode, body)
}
s := string(body)
if !strings.Contains(s, "alpha") || !strings.Contains(s, "beta") {
t.Fatalf("merged helm index missing a member chart (want alpha and beta): %s", s)
}
}
+1 -1
View File
@@ -95,7 +95,7 @@ func TestMain(m *testing.M) {
}
cfg.ListenAddr = "127.0.0.1:0"
srv, err := server.New(cfg, "e2e-test")
srv, err := server.New(cfg)
if err != nil {
log.Fatalf("server: %v", err)
}
-24
View File
@@ -24,30 +24,6 @@ func TestRoot(t *testing.T) {
}
}
func TestRemoteUpstreamTimeouts(t *testing.T) {
createRemote(t, `{
"name": "timeout-test",
"package_type": "generic",
"base_url": "https://example.com",
"stale_on_error": true,
"upstream_dial_timeout": 3,
"upstream_tls_timeout": 4,
"upstream_response_header_timeout": 5
}`)
defer deleteRemote(t, "timeout-test")
remote := getJSON(t, apiURL("/api/v2/remotes/timeout-test"))
for field, want := range map[string]float64{
"upstream_dial_timeout": 3,
"upstream_tls_timeout": 4,
"upstream_response_header_timeout": 5,
} {
if got, _ := remote[field].(float64); got != want {
t.Errorf("%s: got %v, want %v", field, remote[field], want)
}
}
}
func TestRemoteCRUD(t *testing.T) {
createRemote(t, `{
"name": "test-generic",
-33
View File
@@ -24,39 +24,6 @@ func TestProxyBlocklist(t *testing.T) {
assertStatus(t, apiURL("/api/v1/remote/blocklist-test/malware.exe"), http.StatusForbidden)
}
func TestProxyHeadBlocklist(t *testing.T) {
createRemote(t, `{
"name": "head-block-test",
"package_type": "generic",
"base_url": "https://example.com",
"blocklist": ["\\.exe$"],
"stale_on_error": true
}`)
defer deleteRemote(t, "head-block-test")
req, _ := http.NewRequest(http.MethodHead, apiURL("/v2/head-block-test/malware.exe"), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("HEAD: %v", err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusForbidden {
t.Fatalf("HEAD blocklisted path: got %d, want 403", resp.StatusCode)
}
}
func TestProxyHeadUnknownRemote(t *testing.T) {
req, _ := http.NewRequest(http.MethodHead, apiURL("/v2/nonexistent/some/path"), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("HEAD: %v", err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("HEAD unknown remote: got %d, want 404", resp.StatusCode)
}
}
func TestProxyPatterns(t *testing.T) {
createRemote(t, `{
"name": "patterns-test",
-1
View File
@@ -3,7 +3,6 @@ module git.unkin.net/unkin/artifactapi
go 1.25.9
require (
github.com/cavaliergopher/rpm v1.3.0
github.com/charmbracelet/bubbletea v1.3.10
github.com/charmbracelet/lipgloss v1.1.0
github.com/go-chi/chi/v5 v5.3.0
-2
View File
@@ -12,8 +12,6 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cavaliergopher/rpm v1.3.0 h1:UHX46sasX8MesUXXQ+UbkFLUX4eUWTlEcX8jcnRBIgI=
github.com/cavaliergopher/rpm v1.3.0/go.mod h1:vEumo1vvtrHM1Ov86f6+k8j7zNKOxQfHDCAIcR/36ZI=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
+33 -54
View File
@@ -6,6 +6,8 @@ import (
"io"
"log/slog"
"net/http"
"regexp"
"strings"
"github.com/go-chi/chi/v5"
@@ -15,8 +17,11 @@ import (
"git.unkin.net/unkin/artifactapi/internal/proxy"
"git.unkin.net/unkin/artifactapi/internal/storage"
"git.unkin.net/unkin/artifactapi/internal/virtual"
"git.unkin.net/unkin/artifactapi/pkg/models"
)
var semverRe = regexp.MustCompile(`^[0-9]+\.[0-9]+\.[0-9]+(?:-[a-zA-Z0-9.]+)?$`)
type ProxyHandler struct {
engine *proxy.Engine
virtualEngine *virtual.Engine
@@ -37,20 +42,6 @@ func (h *ProxyHandler) Routes() chi.Router {
return r
}
func (h *ProxyHandler) DockerV2Routes() chi.Router {
r := chi.NewRouter()
r.Get("/", h.handleDockerPing)
r.Head("/", h.handleDockerPing)
r.Get("/{remoteName}/*", h.handleProxy)
r.Head("/{remoteName}/*", h.handleProxyHead)
return r
}
func (h *ProxyHandler) handleDockerPing(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Docker-Distribution-Api-Version", "registry/2.0")
w.WriteHeader(http.StatusOK)
}
func (h *ProxyHandler) handleProxy(w http.ResponseWriter, r *http.Request) {
remoteName := chi.URLParam(r, "remoteName")
path := chi.URLParam(r, "*")
@@ -67,7 +58,7 @@ func (h *ProxyHandler) handleProxy(w http.ResponseWriter, r *http.Request) {
return
}
result, err := h.engine.Fetch(r.Context(), *remote, path, prov, r.Header)
result, err := h.engine.Fetch(r.Context(), *remote, path, prov)
if err != nil {
var proxyErr *proxy.ProxyError
if errors.As(err, &proxyErr) {
@@ -89,42 +80,6 @@ func (h *ProxyHandler) handleProxy(w http.ResponseWriter, r *http.Request) {
io.Copy(w, result.Reader)
}
func (h *ProxyHandler) handleProxyHead(w http.ResponseWriter, r *http.Request) {
remoteName := chi.URLParam(r, "remoteName")
path := chi.URLParam(r, "*")
remote, err := h.db.GetRemote(r.Context(), remoteName)
if err != nil {
http.Error(w, fmt.Sprintf("remote %q not found", remoteName), http.StatusNotFound)
return
}
prov, err := provider.Get(remote.PackageType)
if err != nil {
http.Error(w, fmt.Sprintf("no provider for %q", remote.PackageType), http.StatusInternalServerError)
return
}
result, err := h.engine.Head(r.Context(), *remote, path, prov)
if err != nil {
var proxyErr *proxy.ProxyError
if errors.As(err, &proxyErr) {
http.Error(w, proxyErr.Message, proxyErr.Status)
return
}
slog.Error("proxy head failed", "remote", remoteName, "path", path, "error", err)
http.Error(w, "bad gateway", http.StatusBadGateway)
return
}
w.Header().Set("Content-Type", result.ContentType)
w.Header().Set("X-Artifact-Source", result.Source)
if result.Size > 0 {
w.Header().Set("Content-Length", fmt.Sprintf("%d", result.Size))
}
w.WriteHeader(http.StatusOK)
}
func (h *ProxyHandler) handleVirtual(w http.ResponseWriter, r *http.Request) {
virtualName := chi.URLParam(r, "virtualName")
path := chi.URLParam(r, "*")
@@ -160,9 +115,8 @@ func (h *ProxyHandler) handleLocal(w http.ResponseWriter, r *http.Request) {
return
}
prov, _ := provider.Get(remote.PackageType)
if indexer, ok := prov.(provider.LocalIndexer); ok {
if indexer.ServeLocalIndex(w, r, h.db, remote.Name, path) {
if remote.PackageType == models.PackageTerraform {
if h.serveTerraformMirror(w, r, remote, path) {
return
}
}
@@ -170,6 +124,31 @@ func (h *ProxyHandler) handleLocal(w http.ResponseWriter, r *http.Request) {
h.serveLocalFile(w, r, localName, path)
}
func (h *ProxyHandler) serveTerraformMirror(w http.ResponseWriter, r *http.Request, remote *models.Remote, path string) bool {
parts := strings.Split(path, "/")
if len(parts) < 3 {
return false
}
namespace, typeName := parts[0], parts[1]
tail := parts[2]
if tail == "index.json" {
h.local.ServeTerraformIndex(w, r, remote.Name, namespace, typeName)
return true
}
if strings.HasSuffix(tail, ".json") {
version := strings.TrimSuffix(tail, ".json")
if semverRe.MatchString(version) {
h.local.ServeTerraformVersionDoc(w, r, remote.Name, namespace, typeName, version)
return true
}
}
return false
}
func (h *ProxyHandler) serveLocalFile(w http.ResponseWriter, r *http.Request, repoName, path string) {
file, err := h.db.GetLocalFile(r.Context(), repoName, path)
if err != nil {
+117 -26
View File
@@ -1,20 +1,25 @@
package v2
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"regexp"
"strings"
"github.com/go-chi/chi/v5"
"git.unkin.net/unkin/artifactapi/internal/database"
"git.unkin.net/unkin/artifactapi/internal/provider"
"git.unkin.net/unkin/artifactapi/internal/storage"
"git.unkin.net/unkin/artifactapi/pkg/models"
)
var providerZipRe = regexp.MustCompile(
`^terraform-provider-([a-zA-Z0-9_-]+)_([0-9]+\.[0-9]+\.[0-9]+(?:-[a-zA-Z0-9.]+)?)_([a-z0-9]+)_([a-z0-9]+)\.zip$`,
)
type LocalHandler struct {
db *database.DB
store *storage.S3
@@ -56,22 +61,41 @@ func (h *LocalHandler) upload(w http.ResponseWriter, r *http.Request) {
return
}
prov, _ := provider.Get(remote.PackageType)
if uploader, ok := prov.(provider.LocalUploader); ok {
h.uploadValidated(w, r, remote, filePath, prov, uploader)
if remote.PackageType == models.PackageTerraform {
h.uploadTerraformProvider(w, r, remote, filePath)
return
}
h.uploadGeneric(w, r, remote, filePath)
}
func (h *LocalHandler) uploadValidated(w http.ResponseWriter, r *http.Request, remote *models.Remote, filePath string, prov provider.Provider, uploader provider.LocalUploader) {
storagePath, contentType, err := uploader.ValidateUpload(filePath)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
func (h *LocalHandler) uploadTerraformProvider(w http.ResponseWriter, r *http.Request, remote *models.Remote, filePath string) {
parts := strings.Split(filePath, "/")
if len(parts) != 3 {
http.Error(w, "path must be {namespace}/{type}/{filename}.zip", http.StatusBadRequest)
return
}
namespace, typeName, filename := parts[0], parts[1], parts[2]
m := providerZipRe.FindStringSubmatch(filename)
if m == nil {
http.Error(w, fmt.Sprintf(
"filename %q does not match terraform-provider-{type}_{version}_{os}_{arch}.zip",
filename,
), http.StatusBadRequest)
return
}
fileType, version, os, arch := m[1], m[2], m[3], m[4]
if fileType != typeName {
http.Error(w, fmt.Sprintf(
"provider type in filename %q does not match path type %q",
fileType, typeName,
), http.StatusBadRequest)
return
}
storagePath := fmt.Sprintf("%s/%s/%s", namespace, typeName, filename)
existing, err := h.db.GetLocalFile(r.Context(), remote.Name, storagePath)
if err != nil {
@@ -79,17 +103,20 @@ func (h *LocalHandler) uploadValidated(w http.ResponseWriter, r *http.Request, r
return
}
if existing != nil {
http.Error(w, fmt.Sprintf("file %q already exists; overwrites are not allowed", storagePath), http.StatusConflict)
http.Error(w, fmt.Sprintf(
"provider %s/%s version %s for %s_%s already exists; overwrites are not allowed",
namespace, typeName, version, os, arch,
), http.StatusConflict)
return
}
result, err := h.cas.Store(r.Context(), r.Body, contentType)
result, err := h.cas.Store(r.Context(), r.Body, "application/zip")
if err != nil {
http.Error(w, fmt.Sprintf("store failed: %v", err), http.StatusInternalServerError)
return
}
if err := h.db.UpsertBlob(r.Context(), result.ContentHash, result.S3Key, result.SizeBytes, contentType); err != nil {
if err := h.db.UpsertBlob(r.Context(), result.ContentHash, result.S3Key, result.SizeBytes, "application/zip"); err != nil {
http.Error(w, fmt.Sprintf("record blob: %v", err), http.StatusInternalServerError)
return
}
@@ -103,11 +130,15 @@ func (h *LocalHandler) uploadValidated(w http.ResponseWriter, r *http.Request, r
return
}
if hook, ok := prov.(provider.PostUploadHook); ok {
go hook.AfterUpload(context.Background(), remote.Name, storagePath, result.ContentHash, h, h.db)
}
writeJSON(w, http.StatusCreated, uploader.UploadResponse(storagePath, result.ContentHash, result.SizeBytes))
writeJSON(w, http.StatusCreated, map[string]any{
"namespace": namespace,
"type": typeName,
"version": version,
"os": os,
"arch": arch,
"content_hash": result.ContentHash,
"size_bytes": result.SizeBytes,
})
}
func (h *LocalHandler) uploadGeneric(w http.ResponseWriter, r *http.Request, remote *models.Remote, filePath string) {
@@ -192,14 +223,74 @@ func (h *LocalHandler) remove(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
func (h *LocalHandler) DB() *database.DB {
return h.db
type terraformIndex struct {
Versions map[string]json.RawMessage `json:"versions"`
}
func (h *LocalHandler) Download(ctx context.Context, key string) (io.ReadCloser, int64, error) {
reader, info, err := h.store.Download(ctx, key)
if err != nil {
return nil, 0, err
}
return reader, info.Size, nil
type terraformVersionDoc struct {
Archives map[string]terraformArchive `json:"archives"`
}
type terraformArchive struct {
URL string `json:"url"`
Hashes []string `json:"hashes,omitempty"`
}
func (h *LocalHandler) ServeTerraformIndex(w http.ResponseWriter, r *http.Request, repoName, namespace, typeName string) {
prefix := fmt.Sprintf("%s/%s/", namespace, typeName)
files, err := h.db.ListLocalFilesByPrefix(r.Context(), repoName, prefix)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
versions := map[string]json.RawMessage{}
for _, f := range files {
filename := strings.TrimPrefix(f.FilePath, prefix)
m := providerZipRe.FindStringSubmatch(filename)
if m == nil {
continue
}
versions[m[2]] = json.RawMessage(`{}`)
}
if len(versions) == 0 {
http.Error(w, "not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(terraformIndex{Versions: versions})
}
func (h *LocalHandler) ServeTerraformVersionDoc(w http.ResponseWriter, r *http.Request, repoName, namespace, typeName, version string) {
prefix := fmt.Sprintf("%s/%s/terraform-provider-%s_%s_", namespace, typeName, typeName, version)
files, err := h.db.ListLocalFilesByPrefix(r.Context(), repoName, prefix)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
archives := map[string]terraformArchive{}
for _, f := range files {
filename := strings.TrimPrefix(f.FilePath, fmt.Sprintf("%s/%s/", namespace, typeName))
m := providerZipRe.FindStringSubmatch(filename)
if m == nil || m[2] != version {
continue
}
platform := m[3] + "_" + m[4]
archive := terraformArchive{URL: filename}
if f.ContentHash != "" {
archive.Hashes = []string{"zh:" + strings.TrimPrefix(f.ContentHash, "sha256:")}
}
archives[platform] = archive
}
if len(archives) == 0 {
http.Error(w, "not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(terraformVersionDoc{Archives: archives})
}
-8
View File
@@ -69,10 +69,6 @@ func (h *RemotesHandler) create(w http.ResponseWriter, r *http.Request) {
http.Error(w, "base_url is required for remote repositories", http.StatusBadRequest)
return
}
if err := remote.ValidatePatterns(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := h.db.CreateRemote(r.Context(), &remote); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -88,10 +84,6 @@ func (h *RemotesHandler) update(w http.ResponseWriter, r *http.Request) {
return
}
remote.Name = name
if err := remote.ValidatePatterns(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := h.db.UpdateRemote(r.Context(), &remote); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
-12
View File
@@ -70,18 +70,6 @@ func (r *Redis) GetETag(ctx context.Context, remote, path string) (string, error
return val, err
}
func (r *Redis) GetToken(ctx context.Context, key string) (string, error) {
val, err := r.client.Get(ctx, "token:"+key).Result()
if err == redis.Nil {
return "", nil
}
return val, err
}
func (r *Redis) SetToken(ctx context.Context, key, token string, ttl time.Duration) error {
return r.client.Set(ctx, "token:"+key, token, ttl).Err()
}
func (r *Redis) IncrCircuitFailure(ctx context.Context, remote string, cooldown time.Duration) (int64, error) {
key := fmt.Sprintf("circuit:%s", remote)
pipe := r.client.Pipeline()
+1 -1
View File
@@ -65,7 +65,7 @@ func Load() (*Config, error) {
}
func getenv(key, fallback string) string {
if v, ok := os.LookupEnv(key); ok {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
+3 -38
View File
@@ -4,8 +4,6 @@ import (
"context"
"time"
"github.com/jackc/pgx/v5"
"git.unkin.net/unkin/artifactapi/pkg/models"
)
@@ -111,49 +109,16 @@ func (db *DB) InsertAccessLog(ctx context.Context, remoteName, path string, cach
return err
}
// AccessLogEntry is one buffered access-log record.
type AccessLogEntry struct {
RemoteName string
Path string
CacheHit bool
SizeBytes int64
UpstreamMS int
ClientIP string
}
// InsertAccessLogBatch bulk-inserts access-log rows with a single COPY.
func (db *DB) InsertAccessLogBatch(ctx context.Context, entries []AccessLogEntry) error {
if len(entries) == 0 {
return nil
}
rows := make([][]any, len(entries))
for i, e := range entries {
rows[i] = []any{e.RemoteName, e.Path, e.CacheHit, e.SizeBytes, e.UpstreamMS, e.ClientIP}
}
_, err := db.Pool.CopyFrom(ctx,
pgx.Identifier{"access_log"},
[]string{"remote_name", "path", "cache_hit", "size_bytes", "upstream_ms", "client_ip"},
pgx.CopyFromRows(rows),
)
return err
}
// FindOrphanedBlobs returns blobs no longer referenced by any artifact or
// local file, restricted to those created before now()-minAge. The age cutoff
// is a grace period that avoids a TOCTOU race with in-flight dedup uploads,
// which insert the blob row before the referencing artifact/local_files row.
func (db *DB) FindOrphanedBlobs(ctx context.Context, minAge time.Duration) ([]models.Blob, error) {
cutoff := time.Now().Add(-minAge)
func (db *DB) FindOrphanedBlobs(ctx context.Context) ([]models.Blob, error) {
rows, err := db.Pool.Query(ctx, `
SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at
FROM blobs b
WHERE b.created_at < $1
AND b.content_hash NOT IN (
WHERE b.content_hash NOT IN (
SELECT content_hash FROM artifacts
UNION
SELECT content_hash FROM local_files
)
`, cutoff)
`)
if err != nil {
return nil, err
}
-41
View File
@@ -8,8 +8,6 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"git.unkin.net/unkin/artifactapi/internal/provider"
)
type LocalFile struct {
@@ -101,45 +99,6 @@ func (db *DB) ListLocalFilesByPrefix(ctx context.Context, repoName, prefix strin
return files, rows.Err()
}
func (db *DB) ListLocalFilePackages(ctx context.Context, repoName string) ([]string, error) {
rows, err := db.Pool.Query(ctx, `
SELECT DISTINCT split_part(file_path, '/', 1)
FROM local_files
WHERE repo_name = $1
ORDER BY 1
`, repoName)
if err != nil {
return nil, err
}
defer rows.Close()
var packages []string
for rows.Next() {
var pkg string
if err := rows.Scan(&pkg); err != nil {
return nil, err
}
packages = append(packages, pkg)
}
return packages, rows.Err()
}
func (db *DB) ListFilesByPrefix(ctx context.Context, repoName, prefix string) ([]provider.FileEntry, error) {
files, err := db.ListLocalFilesByPrefix(ctx, repoName, prefix)
if err != nil {
return nil, err
}
result := make([]provider.FileEntry, len(files))
for i, f := range files {
result[i] = provider.FileEntry{FilePath: f.FilePath, ContentHash: f.ContentHash}
}
return result, nil
}
func (db *DB) ListPackages(ctx context.Context, repoName string) ([]string, error) {
return db.ListLocalFilePackages(ctx, repoName)
}
func (db *DB) DeleteLocalFile(ctx context.Context, repoName, filePath string) error {
_, err := db.Pool.Exec(ctx, `DELETE FROM local_files WHERE repo_name = $1 AND file_path = $2`, repoName, filePath)
return err
-34
View File
@@ -124,40 +124,6 @@ func (db *DB) migrate() error {
CREATE INDEX IF NOT EXISTS idx_access_log_remote_time ON access_log(remote_name, created_at);
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS repo_type TEXT DEFAULT 'remote';
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_dial_timeout INTEGER DEFAULT 0;
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_tls_timeout INTEGER DEFAULT 0;
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_response_header_timeout INTEGER DEFAULT 0;
CREATE TABLE IF NOT EXISTS rpm_metadata (
id BIGSERIAL PRIMARY KEY,
repo_name TEXT NOT NULL,
file_path TEXT NOT NULL,
content_hash TEXT NOT NULL,
name TEXT NOT NULL,
epoch INTEGER DEFAULT 0,
version TEXT NOT NULL,
release TEXT NOT NULL,
arch TEXT NOT NULL,
summary TEXT DEFAULT '',
description TEXT DEFAULT '',
rpm_size BIGINT DEFAULT 0,
installed_size BIGINT DEFAULT 0,
license TEXT DEFAULT '',
vendor TEXT DEFAULT '',
build_group TEXT DEFAULT '',
build_host TEXT DEFAULT '',
source_rpm TEXT DEFAULT '',
url TEXT DEFAULT '',
packager TEXT DEFAULT '',
requires JSONB DEFAULT '[]',
provides JSONB DEFAULT '[]',
files JSONB DEFAULT '[]',
changelogs JSONB DEFAULT '[]',
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(repo_name, file_path)
);
CREATE INDEX IF NOT EXISTS idx_rpm_metadata_repo ON rpm_metadata(repo_name);
`)
return err
}
+5 -14
View File
@@ -11,9 +11,7 @@ const remoteCols = `name, package_type, repo_type, base_url, description, userna
patterns, blocklist, mutable_patterns, immutable_patterns,
ban_tags_enabled, ban_tags,
quarantine_enabled, quarantine_days, stale_on_error,
releases_remote, managed_by,
upstream_dial_timeout, upstream_tls_timeout, upstream_response_header_timeout,
created_at, updated_at`
releases_remote, managed_by, created_at, updated_at`
func scanRemote(scanner interface{ Scan(...any) error }, r *models.Remote) error {
return scanner.Scan(
@@ -22,9 +20,7 @@ func scanRemote(scanner interface{ Scan(...any) error }, r *models.Remote) error
&r.Patterns, &r.Blocklist, &r.MutablePatterns, &r.ImmutablePatterns,
&r.BanTagsEnabled, &r.BanTags,
&r.QuarantineEnabled, &r.QuarantineDays, &r.StaleOnError,
&r.ReleasesRemote, &r.ManagedBy,
&r.UpstreamDialTimeout, &r.UpstreamTLSTimeout, &r.UpstreamResponseHeaderTimeout,
&r.CreatedAt, &r.UpdatedAt,
&r.ReleasesRemote, &r.ManagedBy, &r.CreatedAt, &r.UpdatedAt,
)
}
@@ -63,9 +59,8 @@ func (db *DB) CreateRemote(ctx context.Context, r *models.Remote) error {
patterns, blocklist, mutable_patterns, immutable_patterns,
ban_tags_enabled, ban_tags,
quarantine_enabled, quarantine_days, stale_on_error,
releases_remote, managed_by,
upstream_dial_timeout, upstream_tls_timeout, upstream_response_header_timeout
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24)
releases_remote, managed_by
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21)
`,
r.Name, r.PackageType, r.RepoType, r.BaseURL, r.Description, r.Username, r.Password,
r.ImmutableTTL, r.MutableTTL, r.CheckMutable,
@@ -73,7 +68,6 @@ func (db *DB) CreateRemote(ctx context.Context, r *models.Remote) error {
r.BanTagsEnabled, r.BanTags,
r.QuarantineEnabled, r.QuarantineDays, r.StaleOnError,
r.ReleasesRemote, r.ManagedBy,
r.UpstreamDialTimeout, r.UpstreamTLSTimeout, r.UpstreamResponseHeaderTimeout,
)
return err
}
@@ -86,9 +80,7 @@ func (db *DB) UpdateRemote(ctx context.Context, r *models.Remote) error {
patterns=$11, blocklist=$12, mutable_patterns=$13, immutable_patterns=$14,
ban_tags_enabled=$15, ban_tags=$16,
quarantine_enabled=$17, quarantine_days=$18, stale_on_error=$19,
releases_remote=$20, managed_by=$21,
upstream_dial_timeout=$22, upstream_tls_timeout=$23, upstream_response_header_timeout=$24,
updated_at=NOW()
releases_remote=$20, managed_by=$21, updated_at=NOW()
WHERE name=$1
`,
r.Name, r.PackageType, r.RepoType, r.BaseURL, r.Description, r.Username, r.Password,
@@ -97,7 +89,6 @@ func (db *DB) UpdateRemote(ctx context.Context, r *models.Remote) error {
r.BanTagsEnabled, r.BanTags,
r.QuarantineEnabled, r.QuarantineDays, r.StaleOnError,
r.ReleasesRemote, r.ManagedBy,
r.UpstreamDialTimeout, r.UpstreamTLSTimeout, r.UpstreamResponseHeaderTimeout,
)
return err
}
-129
View File
@@ -1,129 +0,0 @@
package database
import (
"context"
"encoding/json"
"git.unkin.net/unkin/artifactapi/internal/provider"
)
func (db *DB) InsertRPMMetadata(ctx context.Context, meta *provider.RPMMetadata) error {
requiresJSON, _ := json.Marshal(meta.Requires)
providesJSON, _ := json.Marshal(meta.Provides)
filesJSON, _ := json.Marshal(meta.Files)
changelogsJSON, _ := json.Marshal(meta.Changelogs)
_, err := db.Pool.Exec(ctx, `
INSERT INTO rpm_metadata (
repo_name, file_path, content_hash,
name, epoch, version, release, arch,
summary, description, rpm_size, installed_size,
license, vendor, build_group, build_host, source_rpm, url, packager,
requires, provides, files, changelogs
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23)
ON CONFLICT (repo_name, file_path) DO NOTHING
`,
meta.RepoName, meta.FilePath, meta.ContentHash,
meta.Name, meta.Epoch, meta.Version, meta.Release, meta.Arch,
meta.Summary, meta.Description, meta.RPMSize, meta.InstalledSize,
meta.License, meta.Vendor, meta.Group, meta.BuildHost, meta.SourceRPM, meta.URL, meta.Packager,
requiresJSON, providesJSON, filesJSON, changelogsJSON,
)
return err
}
type RPMMetadataRow struct {
RepoName string
FilePath string
ContentHash string
Name string
Epoch int
Version string
Release string
Arch string
Summary string
Description string
RPMSize int64
InstalledSize int64
License string
Vendor string
Group string
BuildHost string
SourceRPM string
URL string
Packager string
Requires json.RawMessage
Provides json.RawMessage
Files json.RawMessage
Changelogs json.RawMessage
}
func (db *DB) ListRPMMetadataEntries(ctx context.Context, repoName string) ([]provider.RPMMetadata, error) {
rows, err := db.ListRPMMetadata(ctx, repoName)
if err != nil {
return nil, err
}
result := make([]provider.RPMMetadata, len(rows))
for i, r := range rows {
meta := provider.RPMMetadata{
RepoName: r.RepoName,
FilePath: r.FilePath,
ContentHash: r.ContentHash,
Name: r.Name,
Epoch: r.Epoch,
Version: r.Version,
Release: r.Release,
Arch: r.Arch,
Summary: r.Summary,
Description: r.Description,
RPMSize: r.RPMSize,
InstalledSize: r.InstalledSize,
License: r.License,
Vendor: r.Vendor,
Group: r.Group,
BuildHost: r.BuildHost,
SourceRPM: r.SourceRPM,
URL: r.URL,
Packager: r.Packager,
}
json.Unmarshal(r.Requires, &meta.Requires)
json.Unmarshal(r.Provides, &meta.Provides)
json.Unmarshal(r.Files, &meta.Files)
json.Unmarshal(r.Changelogs, &meta.Changelogs)
result[i] = meta
}
return result, nil
}
func (db *DB) ListRPMMetadata(ctx context.Context, repoName string) ([]RPMMetadataRow, error) {
rows, err := db.Pool.Query(ctx, `
SELECT repo_name, file_path, content_hash,
name, epoch, version, release, arch,
summary, description, rpm_size, installed_size,
license, vendor, build_group, build_host, source_rpm, url, packager,
requires, provides, files, changelogs
FROM rpm_metadata
WHERE repo_name = $1
ORDER BY name, epoch, version, release, arch
`, repoName)
if err != nil {
return nil, err
}
defer rows.Close()
var result []RPMMetadataRow
for rows.Next() {
var r RPMMetadataRow
if err := rows.Scan(
&r.RepoName, &r.FilePath, &r.ContentHash,
&r.Name, &r.Epoch, &r.Version, &r.Release, &r.Arch,
&r.Summary, &r.Description, &r.RPMSize, &r.InstalledSize,
&r.License, &r.Vendor, &r.Group, &r.BuildHost, &r.SourceRPM, &r.URL, &r.Packager,
&r.Requires, &r.Provides, &r.Files, &r.Changelogs,
); err != nil {
return nil, err
}
result = append(result, r)
}
return result, rows.Err()
}
-9
View File
@@ -30,15 +30,6 @@ func (db *DB) GetOverviewStats(ctx context.Context) (*models.OverviewStats, erro
return nil, err
}
err = db.Pool.QueryRow(ctx, `
SELECT COALESCE(SUM(size_bytes), 0)
FROM access_log
WHERE cache_hit = TRUE AND created_at > NOW() - INTERVAL '30 days'
`).Scan(&stats.BandwidthSaved30d)
if err != nil {
return nil, err
}
return &stats, nil
}
+1 -6
View File
@@ -9,11 +9,6 @@ import (
"git.unkin.net/unkin/artifactapi/internal/storage"
)
// blobGracePeriod is how old an orphaned blob must be before GC will delete
// it. This avoids racing in-flight dedup uploads that insert the blob row
// before the referencing artifact/local_files row exists.
const blobGracePeriod = 1 * time.Hour
type Collector struct {
db *database.DB
store *storage.S3
@@ -43,7 +38,7 @@ func (c *Collector) Run(ctx context.Context) {
func (c *Collector) sweep(ctx context.Context) {
start := time.Now()
orphaned, err := c.db.FindOrphanedBlobs(ctx, blobGracePeriod)
orphaned, err := c.db.FindOrphanedBlobs(ctx)
if err != nil {
slog.Error("gc: find orphaned blobs", "error", err)
return
-82
View File
@@ -3,7 +3,6 @@ package provider
import (
"context"
"fmt"
"io"
"net/http"
"git.unkin.net/unkin/artifactapi/pkg/models"
@@ -25,87 +24,6 @@ type Provider interface {
AuthHeaders(ctx context.Context, remote models.Remote) (http.Header, error)
}
type FileEntry struct {
FilePath string
ContentHash string
}
type FileStore interface {
ListFilesByPrefix(ctx context.Context, repoName, prefix string) ([]FileEntry, error)
ListPackages(ctx context.Context, repoName string) ([]string, error)
}
type LocalUploader interface {
ValidateUpload(filePath string) (storagePath, contentType string, err error)
UploadResponse(storagePath, contentHash string, sizeBytes int64) map[string]any
}
type LocalIndexer interface {
ServeLocalIndex(w http.ResponseWriter, r *http.Request, files FileStore, repoName, path string) bool
GenerateLocalIndex(ctx context.Context, files FileStore, repoName, path string) ([]byte, error)
}
type BlobReader interface {
Download(ctx context.Context, key string) (io.ReadCloser, int64, error)
}
type PostUploadHook interface {
AfterUpload(ctx context.Context, repoName, storagePath, contentHash string, blobs BlobReader, db MetadataStore)
}
type MetadataStore interface {
InsertRPMMetadata(ctx context.Context, meta *RPMMetadata) error
}
type RPMMetadataReader interface {
ListRPMMetadataEntries(ctx context.Context, repoName string) ([]RPMMetadata, error)
}
type RPMMetadata struct {
RepoName string
FilePath string
ContentHash string
Name string
Epoch int
Version string
Release string
Arch string
Summary string
Description string
RPMSize int64
InstalledSize int64
License string
Vendor string
Group string
BuildHost string
SourceRPM string
URL string
Packager string
Requires []RPMDep
Provides []RPMDep
Files []RPMFile
Changelogs []RPMChangelog
}
type RPMDep struct {
Name string `json:"name"`
Flags string `json:"flags,omitempty"`
Epoch string `json:"epoch,omitempty"`
Version string `json:"version,omitempty"`
Release string `json:"release,omitempty"`
}
type RPMFile struct {
Path string `json:"path"`
Type string `json:"type,omitempty"`
}
type RPMChangelog struct {
Author string `json:"author"`
Date int64 `json:"date"`
Text string `json:"text"`
}
type IndexMerger interface {
MergeIndexes(members []MemberIndex, proxyBaseURL string) ([]byte, error)
}
-180
View File
@@ -2,10 +2,7 @@ package pypi
import (
"context"
"fmt"
"io"
"net/http"
"regexp"
"strings"
"git.unkin.net/unkin/artifactapi/internal/auth"
@@ -17,9 +14,6 @@ func init() {
provider.Register(&Provider{})
}
var fileRe = regexp.MustCompile(`^[a-zA-Z0-9][a-zA-Z0-9._-]*\.(whl|tar\.gz|zip)$`)
var normalizeRe = regexp.MustCompile(`[-_.]+`)
type Provider struct{}
func (p *Provider) Type() models.PackageType { return models.PackagePyPI }
@@ -66,177 +60,3 @@ func (p *Provider) RewriteResponse(body []byte, remote models.Remote, proxyBaseU
func (p *Provider) AuthHeaders(_ context.Context, remote models.Remote) (http.Header, error) {
return auth.BasicHeaders(remote), nil
}
func normalize(name string) string {
return strings.ToLower(normalizeRe.ReplaceAllString(name, "-"))
}
func packageFromWheel(filename string) string {
parts := strings.SplitN(filename, "-", 3)
if len(parts) < 2 {
return ""
}
return normalize(parts[0])
}
func packageFromSdist(filename string) string {
name := filename
for _, suffix := range []string{".tar.gz", ".zip"} {
if strings.HasSuffix(name, suffix) {
name = strings.TrimSuffix(name, suffix)
break
}
}
idx := strings.LastIndex(name, "-")
if idx <= 0 {
return ""
}
return normalize(name[:idx])
}
func (p *Provider) ValidateUpload(filePath string) (storagePath, contentType string, err error) {
filename := filePath
if idx := strings.LastIndex(filePath, "/"); idx >= 0 {
filename = filePath[idx+1:]
}
if !fileRe.MatchString(filename) {
return "", "", fmt.Errorf("filename %q must be a .whl, .tar.gz, or .zip file", filename)
}
var pkgName string
if strings.HasSuffix(filename, ".whl") {
pkgName = packageFromWheel(filename)
} else {
pkgName = packageFromSdist(filename)
}
if pkgName == "" {
return "", "", fmt.Errorf("cannot parse package name from %q", filename)
}
ct := "application/zip"
if strings.HasSuffix(filename, ".tar.gz") {
ct = "application/gzip"
}
return pkgName + "/" + filename, ct, nil
}
func (p *Provider) UploadResponse(storagePath, contentHash string, sizeBytes int64) map[string]any {
parts := strings.SplitN(storagePath, "/", 2)
filename := storagePath
if len(parts) == 2 {
filename = parts[1]
}
return map[string]any{
"package": parts[0],
"filename": filename,
"content_hash": contentHash,
"size_bytes": sizeBytes,
}
}
func (p *Provider) ServeLocalIndex(w http.ResponseWriter, r *http.Request, files provider.FileStore, repoName, path string) bool {
if path == "simple" || path == "simple/" {
p.servePackageList(w, r, files, repoName)
return true
}
if strings.HasPrefix(path, "simple/") {
pkg := strings.TrimPrefix(path, "simple/")
pkg = strings.TrimSuffix(pkg, "/")
if pkg != "" && !strings.Contains(pkg, "/") {
p.servePackageFiles(w, r, files, repoName, pkg)
return true
}
}
return false
}
func (p *Provider) GenerateLocalIndex(ctx context.Context, files provider.FileStore, repoName, path string) ([]byte, error) {
if !strings.HasPrefix(path, "simple/") {
return nil, fmt.Errorf("unsupported index path: %q", path)
}
pkg := strings.TrimPrefix(path, "simple/")
pkg = strings.TrimSuffix(pkg, "/")
if pkg == "" {
return p.generatePackageListHTML(ctx, files, repoName)
}
return p.generatePackageFilesHTML(ctx, files, repoName, pkg)
}
func (p *Provider) servePackageList(w http.ResponseWriter, r *http.Request, files provider.FileStore, repoName string) {
body, err := p.generatePackageListHTML(r.Context(), files, repoName)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/html")
w.WriteHeader(http.StatusOK)
w.Write(body)
}
func (p *Provider) servePackageFiles(w http.ResponseWriter, r *http.Request, files provider.FileStore, repoName, packageName string) {
normalized := normalize(packageName)
prefix := normalized + "/"
entries, err := files.ListFilesByPrefix(r.Context(), repoName, prefix)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if len(entries) == 0 {
http.Error(w, "not found", http.StatusNotFound)
return
}
var b strings.Builder
b.WriteString("<!DOCTYPE html>\n<html><body>\n")
for _, f := range entries {
filename := strings.TrimPrefix(f.FilePath, normalized+"/")
hash := strings.TrimPrefix(f.ContentHash, "sha256:")
fmt.Fprintf(&b, "<a href=\"../../%s/%s#sha256=%s\">%s</a>\n",
normalized, filename, hash, filename)
}
b.WriteString("</body></html>\n")
w.Header().Set("Content-Type", "text/html")
w.WriteHeader(http.StatusOK)
io.WriteString(w, b.String())
}
func (p *Provider) generatePackageListHTML(ctx context.Context, files provider.FileStore, repoName string) ([]byte, error) {
packages, err := files.ListPackages(ctx, repoName)
if err != nil {
return nil, err
}
var b strings.Builder
b.WriteString("<!DOCTYPE html>\n<html><body>\n")
for _, pkg := range packages {
fmt.Fprintf(&b, "<a href=\"%s/\">%s</a>\n", pkg, pkg)
}
b.WriteString("</body></html>\n")
return []byte(b.String()), nil
}
func (p *Provider) generatePackageFilesHTML(ctx context.Context, files provider.FileStore, repoName, packageName string) ([]byte, error) {
normalized := normalize(packageName)
prefix := normalized + "/"
entries, err := files.ListFilesByPrefix(ctx, repoName, prefix)
if err != nil {
return nil, err
}
var b strings.Builder
b.WriteString("<!DOCTYPE html>\n<html><body>\n")
for _, f := range entries {
filename := strings.TrimPrefix(f.FilePath, normalized+"/")
hash := strings.TrimPrefix(f.ContentHash, "sha256:")
fmt.Fprintf(&b, "<a href=\"%s/%s#sha256=%s\">%s</a>\n",
normalized, filename, hash, filename)
}
b.WriteString("</body></html>\n")
return []byte(b.String()), nil
}
-387
View File
@@ -1,24 +1,13 @@
package rpm
import (
"bytes"
"compress/gzip"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/xml"
"fmt"
"log/slog"
"net/http"
"regexp"
"strings"
"time"
rpmlib "github.com/cavaliergopher/rpm"
"git.unkin.net/unkin/artifactapi/internal/auth"
"git.unkin.net/unkin/artifactapi/internal/provider"
"git.unkin.net/unkin/artifactapi/internal/storage"
"git.unkin.net/unkin/artifactapi/pkg/models"
)
@@ -66,379 +55,3 @@ func (p *Provider) RewriteResponse(_ []byte, _ models.Remote, _ string) ([]byte,
func (p *Provider) AuthHeaders(_ context.Context, remote models.Remote) (http.Header, error) {
return auth.BasicHeaders(remote), nil
}
func (p *Provider) ValidateUpload(filePath string) (storagePath, contentType string, err error) {
filename := filePath
if idx := strings.LastIndex(filePath, "/"); idx >= 0 {
filename = filePath[idx+1:]
}
if !strings.HasSuffix(strings.ToLower(filename), ".rpm") {
return "", "", fmt.Errorf("file must be an .rpm package")
}
return "Packages/" + filename, "application/x-rpm", nil
}
func (p *Provider) UploadResponse(storagePath, contentHash string, sizeBytes int64) map[string]any {
filename := strings.TrimPrefix(storagePath, "Packages/")
return map[string]any{
"filename": filename,
"content_hash": contentHash,
"size_bytes": sizeBytes,
}
}
func (p *Provider) AfterUpload(ctx context.Context, repoName, storagePath, contentHash string, blobs provider.BlobReader, db provider.MetadataStore) {
s3Key := storage.BlobKey(strings.TrimPrefix(contentHash, "sha256:"))
reader, blobSize, err := blobs.Download(ctx, s3Key)
if err != nil {
slog.Error("rpm metadata: download failed", "repo", repoName, "path", storagePath, "error", err)
return
}
defer reader.Close()
pkg, err := rpmlib.Read(reader)
if err != nil {
slog.Error("rpm metadata: parse failed", "repo", repoName, "path", storagePath, "error", err)
return
}
meta := &provider.RPMMetadata{
RepoName: repoName,
FilePath: storagePath,
ContentHash: contentHash,
Name: pkg.Name(),
Epoch: pkg.Epoch(),
Version: pkg.Version(),
Release: pkg.Release(),
Arch: pkg.Architecture(),
Summary: pkg.Summary(),
Description: pkg.Description(),
RPMSize: blobSize,
InstalledSize: int64(pkg.Size()),
License: pkg.License(),
Vendor: pkg.Vendor(),
Group: firstGroup(pkg.Groups()),
BuildHost: pkg.BuildHost(),
SourceRPM: pkg.SourceRPM(),
URL: pkg.URL(),
Packager: pkg.Packager(),
}
for _, req := range pkg.Requires() {
meta.Requires = append(meta.Requires, rpmDepFromEntry(req))
}
for _, prov := range pkg.Provides() {
meta.Provides = append(meta.Provides, rpmDepFromEntry(prov))
}
if meta.Requires == nil {
meta.Requires = []provider.RPMDep{}
}
if meta.Provides == nil {
meta.Provides = []provider.RPMDep{}
}
meta.Files = []provider.RPMFile{}
meta.Changelogs = []provider.RPMChangelog{}
if err := db.InsertRPMMetadata(ctx, meta); err != nil {
slog.Error("rpm metadata: insert failed", "repo", repoName, "path", storagePath, "error", err)
return
}
slog.Info("rpm metadata: parsed", "repo", repoName, "name", meta.Name, "version", meta.Version, "arch", meta.Arch)
}
func rpmDepFromEntry(e rpmlib.Dependency) provider.RPMDep {
dep := provider.RPMDep{Name: e.Name()}
if e.Flags() != 0 {
dep.Flags = rpmFlagString(e.Flags())
dep.Version = e.Version()
dep.Release = e.Release()
if e.Epoch() > 0 {
dep.Epoch = fmt.Sprintf("%d", e.Epoch())
}
}
return dep
}
func rpmFlagString(f int) string {
switch {
case f&0x08 != 0 && f&0x04 != 0:
return "GE"
case f&0x02 != 0 && f&0x04 != 0:
return "LE"
case f&0x08 != 0:
return "GT"
case f&0x02 != 0:
return "LT"
case f&0x04 != 0:
return "EQ"
default:
return ""
}
}
func firstGroup(groups []string) string {
if len(groups) > 0 {
return groups[0]
}
return "Unspecified"
}
func (p *Provider) ServeLocalIndex(w http.ResponseWriter, r *http.Request, files provider.FileStore, repoName, path string) bool {
if !strings.HasPrefix(path, "repodata/") {
return false
}
rpmReader, ok := files.(provider.RPMMetadataReader)
if !ok {
http.Error(w, "rpm metadata not available", http.StatusInternalServerError)
return true
}
tail := strings.TrimPrefix(path, "repodata/")
switch {
case tail == "repomd.xml":
p.serveRepomd(w, r, rpmReader, repoName)
case strings.HasSuffix(tail, "-primary.xml.gz"):
p.servePrimary(w, r, rpmReader, repoName)
case strings.HasSuffix(tail, "-filelists.xml.gz"):
p.serveFilelists(w, r, rpmReader, repoName)
case strings.HasSuffix(tail, "-other.xml.gz"):
p.serveOther(w, r, rpmReader, repoName)
default:
http.Error(w, "not found", http.StatusNotFound)
}
return true
}
func (p *Provider) GenerateLocalIndex(ctx context.Context, files provider.FileStore, repoName, path string) ([]byte, error) {
return nil, fmt.Errorf("rpm local index generation for virtual repos not supported")
}
func (p *Provider) serveRepomd(w http.ResponseWriter, r *http.Request, reader provider.RPMMetadataReader, repoName string) {
metas, err := reader.ListRPMMetadataEntries(r.Context(), repoName)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
primary := generatePrimaryXMLGZ(metas)
filelists := generateFilelistsXMLGZ(metas)
other := generateOtherXMLGZ(metas)
primaryHash := sha256Hex(primary)
filelistsHash := sha256Hex(filelists)
otherHash := sha256Hex(other)
repomd := generateRepomd(primaryHash, len(primary), filelistsHash, len(filelists), otherHash, len(other))
w.Header().Set("Content-Type", "application/xml")
w.WriteHeader(http.StatusOK)
w.Write(repomd)
}
func (p *Provider) servePrimary(w http.ResponseWriter, r *http.Request, reader provider.RPMMetadataReader, repoName string) {
metas, err := reader.ListRPMMetadataEntries(r.Context(), repoName)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/gzip")
w.WriteHeader(http.StatusOK)
w.Write(generatePrimaryXMLGZ(metas))
}
func (p *Provider) serveFilelists(w http.ResponseWriter, r *http.Request, reader provider.RPMMetadataReader, repoName string) {
metas, err := reader.ListRPMMetadataEntries(r.Context(), repoName)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/gzip")
w.WriteHeader(http.StatusOK)
w.Write(generateFilelistsXMLGZ(metas))
}
func (p *Provider) serveOther(w http.ResponseWriter, r *http.Request, reader provider.RPMMetadataReader, repoName string) {
metas, err := reader.ListRPMMetadataEntries(r.Context(), repoName)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/gzip")
w.WriteHeader(http.StatusOK)
w.Write(generateOtherXMLGZ(metas))
}
func generateRepomd(primaryHash string, primarySize int, filelistsHash string, filelistsSize int, otherHash string, otherSize int) []byte {
ts := fmt.Sprintf("%d", time.Now().Unix())
var b bytes.Buffer
b.WriteString(xml.Header)
b.WriteString(`<repomd xmlns="http://linux.duke.edu/metadata/repo" xmlns:rpm="http://linux.duke.edu/metadata/rpm">` + "\n")
fmt.Fprintf(&b, " <revision>%s</revision>\n", ts)
writeRepomdData(&b, "primary", primaryHash, primarySize, ts)
writeRepomdData(&b, "filelists", filelistsHash, filelistsSize, ts)
writeRepomdData(&b, "other", otherHash, otherSize, ts)
b.WriteString("</repomd>\n")
return b.Bytes()
}
func writeRepomdData(b *bytes.Buffer, dtype, hash string, size int, ts string) {
fmt.Fprintf(b, " <data type=\"%s\">\n", dtype)
fmt.Fprintf(b, " <checksum type=\"sha256\">%s</checksum>\n", hash)
fmt.Fprintf(b, " <location href=\"repodata/%s-%s.xml.gz\"/>\n", hash, dtype)
fmt.Fprintf(b, " <timestamp>%s</timestamp>\n", ts)
fmt.Fprintf(b, " <size>%d</size>\n", size)
fmt.Fprintf(b, " </data>\n")
}
func generatePrimaryXMLGZ(metas []provider.RPMMetadata) []byte {
var xmlBuf bytes.Buffer
xmlBuf.WriteString(xml.Header)
fmt.Fprintf(&xmlBuf, "<metadata xmlns=\"http://linux.duke.edu/metadata/common\" xmlns:rpm=\"http://linux.duke.edu/metadata/rpm\" packages=\"%d\">\n", len(metas))
for _, m := range metas {
pkgHash := strings.TrimPrefix(m.ContentHash, "sha256:")
fmt.Fprintf(&xmlBuf, "<package type=\"rpm\">\n")
fmt.Fprintf(&xmlBuf, " <name>%s</name>\n", xmlEscape(m.Name))
fmt.Fprintf(&xmlBuf, " <arch>%s</arch>\n", xmlEscape(m.Arch))
fmt.Fprintf(&xmlBuf, " <version epoch=\"%d\" ver=\"%s\" rel=\"%s\"/>\n", m.Epoch, xmlEscape(m.Version), xmlEscape(m.Release))
fmt.Fprintf(&xmlBuf, " <checksum type=\"sha256\" pkgid=\"YES\">%s</checksum>\n", pkgHash)
fmt.Fprintf(&xmlBuf, " <summary>%s</summary>\n", xmlEscape(m.Summary))
fmt.Fprintf(&xmlBuf, " <description>%s</description>\n", xmlEscape(m.Description))
if m.Packager != "" {
fmt.Fprintf(&xmlBuf, " <packager>%s</packager>\n", xmlEscape(m.Packager))
}
if m.URL != "" {
fmt.Fprintf(&xmlBuf, " <url>%s</url>\n", xmlEscape(m.URL))
}
fmt.Fprintf(&xmlBuf, " <time file=\"%d\" build=\"0\"/>\n", time.Now().Unix())
fmt.Fprintf(&xmlBuf, " <size package=\"%d\" installed=\"%d\" archive=\"0\"/>\n", m.RPMSize, m.InstalledSize)
fmt.Fprintf(&xmlBuf, " <location href=\"%s\"/>\n", xmlEscape(m.FilePath))
fmt.Fprintf(&xmlBuf, " <format>\n")
if m.License != "" {
fmt.Fprintf(&xmlBuf, " <rpm:license>%s</rpm:license>\n", xmlEscape(m.License))
}
if m.Vendor != "" {
fmt.Fprintf(&xmlBuf, " <rpm:vendor>%s</rpm:vendor>\n", xmlEscape(m.Vendor))
}
fmt.Fprintf(&xmlBuf, " <rpm:group>%s</rpm:group>\n", xmlEscape(m.Group))
if m.BuildHost != "" {
fmt.Fprintf(&xmlBuf, " <rpm:buildhost>%s</rpm:buildhost>\n", xmlEscape(m.BuildHost))
}
if m.SourceRPM != "" {
fmt.Fprintf(&xmlBuf, " <rpm:sourcerpm>%s</rpm:sourcerpm>\n", xmlEscape(m.SourceRPM))
}
if len(m.Provides) > 0 {
xmlBuf.WriteString(" <rpm:provides>\n")
for _, d := range m.Provides {
writeRPMEntry(&xmlBuf, d)
}
xmlBuf.WriteString(" </rpm:provides>\n")
}
if len(m.Requires) > 0 {
xmlBuf.WriteString(" <rpm:requires>\n")
for _, d := range m.Requires {
writeRPMEntry(&xmlBuf, d)
}
xmlBuf.WriteString(" </rpm:requires>\n")
}
fmt.Fprintf(&xmlBuf, " </format>\n")
fmt.Fprintf(&xmlBuf, "</package>\n")
}
xmlBuf.WriteString("</metadata>\n")
return gzipBytes(xmlBuf.Bytes())
}
func generateFilelistsXMLGZ(metas []provider.RPMMetadata) []byte {
var xmlBuf bytes.Buffer
xmlBuf.WriteString(xml.Header)
fmt.Fprintf(&xmlBuf, "<filelists xmlns=\"http://linux.duke.edu/metadata/filelists\" packages=\"%d\">\n", len(metas))
for _, m := range metas {
pkgHash := strings.TrimPrefix(m.ContentHash, "sha256:")
fmt.Fprintf(&xmlBuf, "<package pkgid=\"%s\" name=\"%s\" arch=\"%s\">\n", pkgHash, xmlEscape(m.Name), xmlEscape(m.Arch))
fmt.Fprintf(&xmlBuf, " <version epoch=\"%d\" ver=\"%s\" rel=\"%s\"/>\n", m.Epoch, xmlEscape(m.Version), xmlEscape(m.Release))
for _, f := range m.Files {
if f.Type != "" {
fmt.Fprintf(&xmlBuf, " <file type=\"%s\">%s</file>\n", f.Type, xmlEscape(f.Path))
} else {
fmt.Fprintf(&xmlBuf, " <file>%s</file>\n", xmlEscape(f.Path))
}
}
xmlBuf.WriteString("</package>\n")
}
xmlBuf.WriteString("</filelists>\n")
return gzipBytes(xmlBuf.Bytes())
}
func generateOtherXMLGZ(metas []provider.RPMMetadata) []byte {
var xmlBuf bytes.Buffer
xmlBuf.WriteString(xml.Header)
fmt.Fprintf(&xmlBuf, "<otherdata xmlns=\"http://linux.duke.edu/metadata/other\" packages=\"%d\">\n", len(metas))
for _, m := range metas {
pkgHash := strings.TrimPrefix(m.ContentHash, "sha256:")
fmt.Fprintf(&xmlBuf, "<package pkgid=\"%s\" name=\"%s\" arch=\"%s\">\n", pkgHash, xmlEscape(m.Name), xmlEscape(m.Arch))
fmt.Fprintf(&xmlBuf, " <version epoch=\"%d\" ver=\"%s\" rel=\"%s\"/>\n", m.Epoch, xmlEscape(m.Version), xmlEscape(m.Release))
for _, cl := range m.Changelogs {
fmt.Fprintf(&xmlBuf, " <changelog author=\"%s\" date=\"%d\">%s</changelog>\n",
xmlEscape(cl.Author), cl.Date, xmlEscape(cl.Text))
}
xmlBuf.WriteString("</package>\n")
}
xmlBuf.WriteString("</otherdata>\n")
return gzipBytes(xmlBuf.Bytes())
}
func writeRPMEntry(b *bytes.Buffer, d provider.RPMDep) {
if d.Flags != "" {
fmt.Fprintf(b, " <rpm:entry name=\"%s\" flags=\"%s\"", xmlEscape(d.Name), d.Flags)
if d.Epoch != "" {
fmt.Fprintf(b, " epoch=\"%s\"", d.Epoch)
}
if d.Version != "" {
fmt.Fprintf(b, " ver=\"%s\"", xmlEscape(d.Version))
}
if d.Release != "" {
fmt.Fprintf(b, " rel=\"%s\"", xmlEscape(d.Release))
}
b.WriteString("/>\n")
} else {
fmt.Fprintf(b, " <rpm:entry name=\"%s\"/>\n", xmlEscape(d.Name))
}
}
func xmlEscape(s string) string {
var b bytes.Buffer
xml.EscapeText(&b, []byte(s))
return b.String()
}
func gzipBytes(data []byte) []byte {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
gz.Write(data)
gz.Close()
return buf.Bytes()
}
func sha256Hex(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}
-149
View File
@@ -3,7 +3,6 @@ package terraform
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"regexp"
@@ -20,12 +19,6 @@ func init() {
var versionsRe = regexp.MustCompile(`[^/]+/[^/]+/versions$`)
var providerZipRe = regexp.MustCompile(
`^terraform-provider-([a-zA-Z0-9_-]+)_([0-9]+\.[0-9]+\.[0-9]+(?:-[a-zA-Z0-9.]+)?)_([a-z0-9]+)_([a-z0-9]+)\.zip$`,
)
var semverRe = regexp.MustCompile(`^[0-9]+\.[0-9]+\.[0-9]+(?:-[a-zA-Z0-9.]+)?$`)
type Provider struct{}
func (p *Provider) Type() models.PackageType { return models.PackageTerraform }
@@ -93,145 +86,3 @@ func rewriteDownloadURL(originalURL, releasesRemote, proxyBaseURL string) string
func (p *Provider) AuthHeaders(_ context.Context, remote models.Remote) (http.Header, error) {
return auth.BasicHeaders(remote), nil
}
func (p *Provider) ValidateUpload(filePath string) (storagePath, contentType string, err error) {
parts := strings.Split(filePath, "/")
if len(parts) != 3 {
return "", "", fmt.Errorf("path must be {namespace}/{type}/{filename}.zip")
}
namespace, typeName, filename := parts[0], parts[1], parts[2]
m := providerZipRe.FindStringSubmatch(filename)
if m == nil {
return "", "", fmt.Errorf("filename %q does not match terraform-provider-{type}_{version}_{os}_{arch}.zip", filename)
}
if m[1] != typeName {
return "", "", fmt.Errorf("provider type in filename %q does not match path type %q", m[1], typeName)
}
return fmt.Sprintf("%s/%s/%s", namespace, typeName, filename), "application/zip", nil
}
func (p *Provider) UploadResponse(storagePath, contentHash string, sizeBytes int64) map[string]any {
parts := strings.Split(storagePath, "/")
if len(parts) != 3 {
return map[string]any{"path": storagePath, "content_hash": contentHash, "size_bytes": sizeBytes}
}
m := providerZipRe.FindStringSubmatch(parts[2])
if m == nil {
return map[string]any{"path": storagePath, "content_hash": contentHash, "size_bytes": sizeBytes}
}
return map[string]any{
"namespace": parts[0],
"type": parts[1],
"version": m[2],
"os": m[3],
"arch": m[4],
"content_hash": contentHash,
"size_bytes": sizeBytes,
}
}
type terraformIndex struct {
Versions map[string]json.RawMessage `json:"versions"`
}
type terraformVersionDoc struct {
Archives map[string]terraformArchive `json:"archives"`
}
type terraformArchive struct {
URL string `json:"url"`
Hashes []string `json:"hashes,omitempty"`
}
func (p *Provider) ServeLocalIndex(w http.ResponseWriter, r *http.Request, files provider.FileStore, repoName, path string) bool {
parts := strings.Split(path, "/")
if len(parts) < 3 {
return false
}
namespace, typeName := parts[0], parts[1]
tail := parts[2]
if tail == "index.json" {
p.serveIndex(w, r, files, repoName, namespace, typeName)
return true
}
if strings.HasSuffix(tail, ".json") {
version := strings.TrimSuffix(tail, ".json")
if semverRe.MatchString(version) {
p.serveVersionDoc(w, r, files, repoName, namespace, typeName, version)
return true
}
}
return false
}
func (p *Provider) GenerateLocalIndex(ctx context.Context, files provider.FileStore, repoName, path string) ([]byte, error) {
return nil, fmt.Errorf("terraform local index generation for virtual repos not supported")
}
func (p *Provider) serveIndex(w http.ResponseWriter, r *http.Request, files provider.FileStore, repoName, namespace, typeName string) {
prefix := fmt.Sprintf("%s/%s/", namespace, typeName)
entries, err := files.ListFilesByPrefix(r.Context(), repoName, prefix)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
versions := map[string]json.RawMessage{}
for _, f := range entries {
filename := strings.TrimPrefix(f.FilePath, prefix)
m := providerZipRe.FindStringSubmatch(filename)
if m == nil {
continue
}
versions[m[2]] = json.RawMessage(`{}`)
}
if len(versions) == 0 {
http.Error(w, "not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(terraformIndex{Versions: versions})
}
func (p *Provider) serveVersionDoc(w http.ResponseWriter, r *http.Request, files provider.FileStore, repoName, namespace, typeName, version string) {
prefix := fmt.Sprintf("%s/%s/terraform-provider-%s_%s_", namespace, typeName, typeName, version)
entries, err := files.ListFilesByPrefix(r.Context(), repoName, prefix)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
archives := map[string]terraformArchive{}
for _, f := range entries {
filename := strings.TrimPrefix(f.FilePath, fmt.Sprintf("%s/%s/", namespace, typeName))
m := providerZipRe.FindStringSubmatch(filename)
if m == nil || m[2] != version {
continue
}
platform := m[3] + "_" + m[4]
archive := terraformArchive{URL: filename}
if f.ContentHash != "" {
archive.Hashes = []string{"zh:" + strings.TrimPrefix(f.ContentHash, "sha256:")}
}
archives[platform] = archive
}
if len(archives) == 0 {
http.Error(w, "not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(terraformVersionDoc{Archives: archives})
}
+1 -21
View File
@@ -2,7 +2,6 @@ package proxy
import (
"regexp"
"sync"
"git.unkin.net/unkin/artifactapi/internal/provider"
"git.unkin.net/unkin/artifactapi/pkg/models"
@@ -61,29 +60,10 @@ func (c *Classifier) Classify(remote models.Remote, path string) Classification
return ClassImmutable
}
// patternCache memoises regex compilation. Classify runs on every proxied
// request and previously recompiled each remote's pattern lists every time;
// keying by the pattern string lets each distinct pattern compile once and
// then be reused, with no invalidation needed (the pattern text is the key).
// A pattern that fails to compile is cached as a typed nil so we don't retry.
var patternCache sync.Map // map[string]*regexp.Regexp
func compileCached(pattern string) *regexp.Regexp {
if v, ok := patternCache.Load(pattern); ok {
return v.(*regexp.Regexp)
}
re, err := regexp.Compile(pattern)
if err != nil {
re = nil
}
patternCache.Store(pattern, re)
return re
}
func compilePatterns(patterns []string) []*regexp.Regexp {
compiled := make([]*regexp.Regexp, 0, len(patterns))
for _, p := range patterns {
if re := compileCached(p); re != nil {
if re, err := regexp.Compile(p); err == nil {
compiled = append(compiled, re)
}
}
+86 -396
View File
@@ -4,13 +4,10 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"time"
"git.unkin.net/unkin/artifactapi/internal/cache"
@@ -22,65 +19,19 @@ import (
const fetchLockTTL = 30 * time.Second
const (
accessLogBufferSize = 4096
accessLogBatchSize = 128
accessLogFlushEvery = 2 * time.Second
)
type Engine struct {
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
circuit *CircuitBreaker
accessLog chan database.AccessLogEntry
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
}
func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
e := &Engine{
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
circuit: NewCircuitBreaker(c),
accessLog: make(chan database.AccessLogEntry, accessLogBufferSize),
}
go e.runAccessLogWriter()
return e
}
// runAccessLogWriter drains the access-log channel and writes rows in batches,
// replacing a goroutine-per-request insert. It runs for the process lifetime;
// access logs are best-effort telemetry, so a small tail may be lost on abrupt
// shutdown.
func (e *Engine) runAccessLogWriter() {
ticker := time.NewTicker(accessLogFlushEvery)
defer ticker.Stop()
batch := make([]database.AccessLogEntry, 0, accessLogBatchSize)
flush := func() {
if len(batch) == 0 {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := e.db.InsertAccessLogBatch(ctx, batch); err != nil {
slog.Warn("access log batch insert failed", "error", err, "count", len(batch))
}
cancel()
batch = batch[:0]
}
for {
select {
case entry := <-e.accessLog:
batch = append(batch, entry)
if len(batch) >= accessLogBatchSize {
flush()
}
case <-ticker.C:
flush()
}
return &Engine{
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
}
}
@@ -91,7 +42,7 @@ type FetchResult struct {
Source string // "cache" or "remote"
}
func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, prov provider.Provider, clientHeaders ...http.Header) (*FetchResult, error) {
func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, prov provider.Provider) (*FetchResult, error) {
classifier := NewClassifier(prov)
class := classifier.Classify(remote, path)
@@ -110,7 +61,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
e.logAccess(remote.Name, path, true, result.Size, 0)
go e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
slog.Warn("cache hit but S3 miss, re-fetching", "remote", remote.Name, "path", path)
@@ -122,12 +73,11 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
}
if !locked {
// Another request holds the fetch lock. Poll the store until the leader
// populates it rather than immediately racing to fetch upstream too; a
// cold-cache stampede otherwise hits upstream once per waiter.
if result := e.waitForStore(ctx, remote, path); result != nil {
time.Sleep(500 * time.Millisecond)
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
e.logAccess(remote.Name, path, true, result.Size, 0)
go e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
}
@@ -146,138 +96,35 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
e.logAccess(remote.Name, path, true, result.Size, 0)
go e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
}
}
}
var fwdHeaders http.Header
if len(clientHeaders) > 0 && clientHeaders[0] != nil {
fwdHeaders = clientHeaders[0]
}
// Short-circuit upstream calls when the remote's breaker is open: serve
// stale from the store if we have it, otherwise fail fast rather than
// hammering a known-bad upstream.
if e.circuit.IsOpen(ctx, remote.Name) {
if stale, serr := e.serveFromStore(ctx, remote, path); serr == nil {
slog.Warn("circuit open, serving stale", "remote", remote.Name, "path", path)
stale.Source = "cache"
e.logAccess(remote.Name, path, true, stale.Size, 0)
return stale, nil
}
return nil, &ProxyError{Status: http.StatusServiceUnavailable, Message: "upstream circuit open"}
}
start := time.Now()
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders)
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl)
upstreamMS := int(time.Since(start).Milliseconds())
if err != nil {
if isNetworkError(err) {
e.circuit.RecordFailure(ctx, remote.Name)
}
if remote.StaleOnError && isNetworkError(err) {
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
stale, serr := e.serveFromStore(ctx, remote, path)
if serr == nil {
slog.Warn("serving stale on upstream error", "remote", remote.Name, "path", path, "error", err)
stale.Source = "cache"
e.logAccess(remote.Name, path, true, stale.Size, 0)
go e.logAccess(remote.Name, path, true, stale.Size, 0)
return stale, nil
}
}
return nil, err
}
e.circuit.RecordSuccess(ctx, remote.Name)
e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
go e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
return result, nil
}
// HeadResult carries artifact metadata for a HEAD request. There is no body.
type HeadResult struct {
ContentType string
Size int64
Source string // "cache" or "remote"
}
// Head resolves artifact metadata without fetching or streaming the body.
// Cached artifacts/indexes are answered from the store metadata; on a miss it
// issues an upstream HEAD. It never downloads or caches the body.
func (e *Engine) Head(ctx context.Context, remote models.Remote, path string, prov provider.Provider) (*HeadResult, error) {
class := NewClassifier(prov).Classify(remote, path)
if class == ClassDenied {
return nil, &ProxyError{Status: http.StatusForbidden, Message: "access denied"}
}
if artifact, err := e.db.GetArtifact(ctx, remote.Name, path); err == nil && artifact != nil {
return &HeadResult{ContentType: artifact.ContentType, Size: artifact.SizeBytes, Source: "cache"}, nil
}
if info, err := e.store.Stat(ctx, storage.IndexKey(remote.Name, path)); err == nil {
return &HeadResult{ContentType: info.ContentType, Size: info.Size, Source: "cache"}, nil
}
return e.headUpstream(ctx, remote, path, prov)
}
func (e *Engine) headUpstream(ctx context.Context, remote models.Remote, path string, prov provider.Provider) (*HeadResult, error) {
url := prov.UpstreamURL(remote, path)
authHeaders, err := prov.AuthHeaders(ctx, remote)
if err != nil {
return nil, fmt.Errorf("auth headers: %w", err)
}
doHead := func(extra http.Header) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
for k, vv := range authHeaders {
for _, v := range vv {
req.Header.Add(k, v)
}
}
for k, vv := range extra {
for _, v := range vv {
req.Header.Set(k, v)
}
}
return http.DefaultClient.Do(req)
}
resp, err := doHead(nil)
if err != nil {
return nil, &UpstreamError{Err: err}
}
if resp.StatusCode == http.StatusUnauthorized {
resp.Body.Close()
token, _, terr := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
if terr == nil && token != "" {
resp, err = doHead(http.Header{"Authorization": []string{"Bearer " + token}})
if err != nil {
return nil, &UpstreamError{Err: err}
}
} else {
return nil, &ProxyError{Status: http.StatusUnauthorized, Message: "upstream returned 401"}
}
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, &ProxyError{Status: resp.StatusCode, Message: fmt.Sprintf("upstream returned %d", resp.StatusCode)}
}
contentType := prov.ContentType(path)
if ct := resp.Header.Get("Content-Type"); ct != "" {
contentType = ct
}
return &HeadResult{ContentType: contentType, Size: resp.ContentLength, Source: "remote"}, nil
}
func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, path string, prov provider.Provider, class Classification, ttl time.Duration, clientHeaders http.Header) (*FetchResult, error) {
func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, path string, prov provider.Provider, class Classification, ttl time.Duration) (*FetchResult, error) {
url := prov.UpstreamURL(remote, path)
authHeaders, err := prov.AuthHeaders(ctx, remote)
@@ -294,144 +141,94 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
req.Header.Add(k, v)
}
}
if clientHeaders != nil {
if accept := clientHeaders.Get("Accept"); accept != "" {
req.Header.Set("Accept", accept)
}
}
resp, err := clientForRemote(remote).Do(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, &UpstreamError{Err: err}
}
if resp.StatusCode == http.StatusUnauthorized {
resp.Body.Close()
token, err := e.cachedBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
if err == nil && token != "" {
req2, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
req2.Header.Set("Authorization", "Bearer "+token)
if clientHeaders != nil {
if accept := clientHeaders.Get("Accept"); accept != "" {
req2.Header.Set("Accept", accept)
}
}
resp, err = clientForRemote(remote).Do(req2)
if err != nil {
return nil, &UpstreamError{Err: err}
}
} else {
return nil, &ProxyError{Status: http.StatusUnauthorized, Message: "upstream returned 401"}
}
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return nil, &ProxyError{Status: resp.StatusCode, Message: fmt.Sprintf("upstream returned %d", resp.StatusCode)}
}
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("read upstream body: %w", err)
}
rewritten, err := prov.RewriteResponse(body, remote, "")
if err != nil {
return nil, fmt.Errorf("rewrite response: %w", err)
}
if rewritten != nil {
body = rewritten
}
contentType := prov.ContentType(path)
if ct := resp.Header.Get("Content-Type"); ct != "" {
if ct := resp.Header.Get("Content-Type"); ct != "" && contentType == "application/octet-stream" {
contentType = ct
}
// Mutable indexes are small and may be rewritten, so buffer them in memory.
if class == ClassMutable {
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("read upstream body: %w", err)
}
rewritten, err := prov.RewriteResponse(body, remote, "")
if err != nil {
return nil, fmt.Errorf("rewrite response: %w", err)
}
if rewritten != nil {
body = rewritten
}
s3Key := storage.IndexKey(remote.Name, path)
if err := e.store.Upload(ctx, s3Key, bytesReader(body), int64(len(body)), contentType); err != nil {
return nil, fmt.Errorf("upload index: %w", err)
}
etag := resp.Header.Get("ETag")
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
if etag != "" {
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
}
} else {
hash := sha256Hash(body)
s3Key := storage.BlobKey(hash)
exists, _ := e.store.Exists(ctx, s3Key)
if !exists {
if err := e.store.Upload(ctx, s3Key, bytesReader(body), int64(len(body)), contentType); err != nil {
return nil, fmt.Errorf("upload blob: %w", err)
}
}
contentHash := fmt.Sprintf("sha256:%s", hash)
if err := e.db.UpsertBlob(ctx, contentHash, s3Key, int64(len(body)), contentType); err != nil {
slog.Warn("upsert blob failed", "error", err)
}
if err := e.db.UpsertArtifact(ctx, remote.Name, path, contentHash, resp.Header.Get("ETag")); err != nil {
slog.Warn("upsert artifact failed", "error", err)
}
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
if etag := resp.Header.Get("ETag"); etag != "" {
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
}
return &FetchResult{
Reader: io.NopCloser(bytesReader(body)),
ContentType: contentType,
Size: int64(len(body)),
Source: "remote",
}, nil
}
// Immutable blobs are streamed through the content-addressable store
// (tempfile -> sha256 -> S3) so arbitrarily large artifacts never sit
// fully in memory. Immutable content is never rewritten in the proxy path.
casResult, err := e.cas.Store(ctx, resp.Body, contentType)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("store blob: %w", err)
}
if err := e.db.UpsertBlob(ctx, casResult.ContentHash, casResult.S3Key, casResult.SizeBytes, contentType); err != nil {
slog.Warn("upsert blob failed", "error", err)
}
if err := e.db.UpsertArtifact(ctx, remote.Name, path, casResult.ContentHash, resp.Header.Get("ETag")); err != nil {
slog.Warn("upsert artifact failed", "error", err)
}
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
if etag := resp.Header.Get("ETag"); etag != "" {
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
}
reader, info, err := e.store.Download(ctx, casResult.S3Key)
if err != nil {
return nil, fmt.Errorf("serve stored blob: %w", err)
}
return &FetchResult{
Reader: reader,
ContentType: info.ContentType,
Size: casResult.SizeBytes,
Reader: io.NopCloser(bytesReader(body)),
ContentType: contentType,
Size: int64(len(body)),
Source: "remote",
}, nil
}
// waitForStore polls the store for an artifact populated by the request that
// holds the fetch lock, returning it once available or nil if it does not
// appear within the wait budget (after which the caller fetches upstream
// itself). It stops early if the request context is cancelled.
func (e *Engine) waitForStore(ctx context.Context, remote models.Remote, path string) *FetchResult {
const (
pollInterval = 100 * time.Millisecond
maxWait = 5 * time.Second
)
deadline := time.Now().Add(maxWait)
for {
if result, err := e.serveFromStore(ctx, remote, path); err == nil {
return result
}
if time.Now().After(deadline) {
return nil
}
select {
case <-ctx.Done():
return nil
case <-time.After(pollInterval):
}
}
}
func (e *Engine) serveFromStore(ctx context.Context, remote models.Remote, path string) (*FetchResult, error) {
artifact, err := e.db.GetArtifact(ctx, remote.Name, path)
if err == nil && artifact != nil {
reader, info, err := e.store.Download(ctx, artifact.ContentHash[len("sha256:"):])
if err == nil {
_ = e.db.TouchArtifactAccess(ctx, remote.Name, path)
return &FetchResult{
Reader: reader,
ContentType: info.ContentType,
Size: info.Size,
}, nil
}
s3Key := storage.BlobKey(artifact.ContentHash[len("sha256:"):])
reader, info, err := e.store.Download(ctx, s3Key)
reader, info, err = e.store.Download(ctx, s3Key)
if err == nil {
_ = e.db.TouchArtifactAccess(ctx, remote.Name, path)
return &FetchResult{
@@ -473,7 +270,7 @@ func (e *Engine) checkUpstream(ctx context.Context, remote models.Remote, path,
}
}
resp, err := clientForRemote(remote).Do(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false, &UpstreamError{Err: err}
}
@@ -494,20 +291,15 @@ func (e *Engine) ttlFor(remote models.Remote, class Classification) time.Duratio
}
}
// logAccess enqueues an access-log entry for the batch writer. It never blocks
// the request path: if the buffer is full the entry is dropped.
func (e *Engine) logAccess(remoteName, path string, cacheHit bool, size int64, upstreamMS int) {
select {
case e.accessLog <- database.AccessLogEntry{
RemoteName: remoteName,
Path: path,
CacheHit: cacheHit,
SizeBytes: size,
UpstreamMS: upstreamMS,
}:
default:
slog.Warn("access log buffer full, dropping entry", "remote", remoteName, "path", path)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = e.db.InsertAccessLog(ctx, remoteName, path, cacheHit, size, upstreamMS, "")
}
func sha256Hash(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}
func bytesReader(data []byte) io.Reader {
@@ -527,110 +319,6 @@ func (r readerAt) ReadAt(p []byte, off int64) (n int, err error) {
return
}
// bearerTokenTTLDefault/Margin bound how long a token is cached: the default
// is used when the token endpoint omits expires_in, and the margin is
// subtracted so a cached token is refreshed slightly before it actually expires.
const (
bearerTokenTTLDefault = 60 * time.Second
bearerTokenTTLMargin = 10 * time.Second
)
func sha256Hash(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}
// cachedBearerToken returns a bearer token for the given challenge, reusing a
// Redis-cached token for the same remote+challenge while it is still valid.
func (e *Engine) cachedBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) {
key := remote.Name + ":" + sha256Hash([]byte(wwwAuth))
if tok, err := e.cache.GetToken(ctx, key); err == nil && tok != "" {
return tok, nil
}
tok, ttl, err := fetchBearerToken(ctx, wwwAuth, remote)
if err != nil {
return "", err
}
if tok != "" {
if ttl <= 0 {
ttl = bearerTokenTTLDefault
}
if ttl > bearerTokenTTLMargin {
ttl -= bearerTokenTTLMargin
}
_ = e.cache.SetToken(ctx, key, tok, ttl)
}
return tok, nil
}
func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, time.Duration, error) {
if !strings.HasPrefix(wwwAuth, "Bearer ") {
return "", 0, fmt.Errorf("not a Bearer challenge")
}
params := map[string]string{}
for _, part := range strings.Split(wwwAuth[7:], ",") {
part = strings.TrimSpace(part)
eq := strings.Index(part, "=")
if eq < 0 {
continue
}
key := part[:eq]
val := strings.Trim(part[eq+1:], `"`)
params[key] = val
}
realm := params["realm"]
if realm == "" {
return "", 0, fmt.Errorf("no realm in Bearer challenge")
}
tokenURL := realm
sep := "?"
if s, ok := params["service"]; ok {
tokenURL += sep + "service=" + s
sep = "&"
}
if s, ok := params["scope"]; ok {
tokenURL += sep + "scope=" + s
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, tokenURL, nil)
if err != nil {
return "", 0, err
}
if remote.Username != "" && remote.Password != "" {
req.SetBasicAuth(remote.Username, remote.Password)
}
resp, err := clientForRemote(remote).Do(req)
if err != nil {
return "", 0, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", 0, fmt.Errorf("token endpoint returned %d", resp.StatusCode)
}
var tokenResp struct {
Token string `json:"token"`
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
}
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
return "", 0, err
}
ttl := time.Duration(tokenResp.ExpiresIn) * time.Second
if tokenResp.Token != "" {
return tokenResp.Token, ttl, nil
}
return tokenResp.AccessToken, ttl, nil
}
type ProxyError struct {
Status int
Message string
@@ -646,6 +334,8 @@ func (e *UpstreamError) Error() string { return fmt.Sprintf("upstream error: %v"
func (e *UpstreamError) Unwrap() error { return e.Err }
func isNetworkError(err error) bool {
var ue *UpstreamError
return errors.As(err, &ue)
if _, ok := err.(*UpstreamError); ok {
return true
}
return false
}
-83
View File
@@ -1,83 +0,0 @@
package proxy
import (
"net"
"net/http"
"sync"
"time"
"git.unkin.net/unkin/artifactapi/pkg/models"
)
// Default upstream timeouts. A remote may override any of these; a zero
// override falls back to the default here. There is deliberately no overall
// Client.Timeout: the proxy streams arbitrarily large artifacts and total time
// is bounded by the request context instead. We only constrain the phases that
// must never hang — connect, TLS handshake, and time-to-first-response-header —
// so a slow or wedged upstream cannot pin a goroutine and connection.
const (
defaultDialTimeout = 10 * time.Second
defaultTLSTimeout = 10 * time.Second
defaultResponseHeaderTimeout = 30 * time.Second
)
type clientKey struct {
dial time.Duration
tls time.Duration
respHeader time.Duration
}
var (
clientCacheMu sync.Mutex
clientCache = map[clientKey]*http.Client{}
)
// upstreamClientFor returns an HTTP client configured with the given timeouts,
// reusing a cached client (and its connection pool) for identical timeout sets.
// Zero values fall back to the defaults.
func upstreamClientFor(dial, tls, respHeader time.Duration) *http.Client {
if dial <= 0 {
dial = defaultDialTimeout
}
if tls <= 0 {
tls = defaultTLSTimeout
}
if respHeader <= 0 {
respHeader = defaultResponseHeaderTimeout
}
key := clientKey{dial: dial, tls: tls, respHeader: respHeader}
clientCacheMu.Lock()
defer clientCacheMu.Unlock()
if c, ok := clientCache[key]; ok {
return c
}
c := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: dial,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: tls,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: respHeader,
},
}
clientCache[key] = c
return c
}
// clientForRemote returns the upstream client for a remote, applying its
// per-remote timeout overrides (in seconds) on top of the defaults.
func clientForRemote(remote models.Remote) *http.Client {
return upstreamClientFor(
time.Duration(remote.UpstreamDialTimeout)*time.Second,
time.Duration(remote.UpstreamTLSTimeout)*time.Second,
time.Duration(remote.UpstreamResponseHeaderTimeout)*time.Second,
)
}
+23 -27
View File
@@ -34,19 +34,17 @@ import (
)
type Server struct {
cfg *config.Config
version string
router chi.Router
db *database.DB
cache *cache.Redis
store *storage.S3
engine *proxy.Engine
virtEngine *virtual.Engine
localHandler *v2.LocalHandler
gc *gc.Collector
cfg *config.Config
router chi.Router
db *database.DB
cache *cache.Redis
store *storage.S3
engine *proxy.Engine
virtEngine *virtual.Engine
gc *gc.Collector
}
func New(cfg *config.Config, version string) (*Server, error) {
func New(cfg *config.Config) (*Server, error) {
db, err := database.New(cfg.DatabaseDSN())
if err != nil {
return nil, fmt.Errorf("database: %w", err)
@@ -63,20 +61,17 @@ func New(cfg *config.Config, version string) (*Server, error) {
}
engine := proxy.NewEngine(db, redis, s3)
localHandler := v2.NewLocalHandler(db, s3)
virtEngine := virtual.NewEngine(db, engine)
collector := gc.New(db, s3, 1*time.Hour)
s := &Server{
cfg: cfg,
version: version,
db: db,
cache: redis,
store: s3,
engine: engine,
virtEngine: virtEngine,
localHandler: localHandler,
gc: collector,
cfg: cfg,
db: db,
cache: redis,
store: s3,
engine: engine,
virtEngine: virtEngine,
gc: collector,
}
s.router = s.routes()
@@ -96,9 +91,10 @@ func (s *Server) routes() chi.Router {
r.Get("/health", s.handleHealth)
r.Get("/", s.handleRoot)
proxyHandler := v1.NewProxyHandler(s.engine, s.virtEngine, s.db, s.store, s.localHandler)
localHandler := v2.NewLocalHandler(s.db, s.store)
proxyHandler := v1.NewProxyHandler(s.engine, s.virtEngine, s.db, s.store, localHandler)
r.Mount("/api/v1", proxyHandler.Routes())
r.Mount("/v2", proxyHandler.DockerV2Routes())
remotesHandler := v2.NewRemotesHandler(s.db)
virtualsHandler := v2.NewVirtualsHandler(s.db)
@@ -122,9 +118,9 @@ func (s *Server) routes() chi.Router {
})
r.Route("/remotes/{name}/files", func(r chi.Router) {
r.Put("/*", s.localHandler.Routes().ServeHTTP)
r.Get("/*", s.localHandler.Routes().ServeHTTP)
r.Delete("/*", s.localHandler.Routes().ServeHTTP)
r.Put("/*", localHandler.Routes().ServeHTTP)
r.Get("/*", localHandler.Routes().ServeHTTP)
r.Delete("/*", localHandler.Routes().ServeHTTP)
})
})
@@ -140,7 +136,7 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{"name":"artifactapi","version":"%s"}`, s.version)
fmt.Fprint(w, `{"name":"artifactapi","version":"3.0.0-dev"}`)
}
func (s *Server) newHTTPServer() *http.Server {
+1 -25
View File
@@ -73,16 +73,6 @@ func (e *Engine) fetchMemberIndexes(ctx context.Context, virt models.Virtual, pa
return
}
if remote.RepoType == models.RepoTypeLocal {
body, err := e.fetchLocalIndex(ctx, *remote, path)
if err != nil {
results[idx] = result{err: fmt.Errorf("local index %q: %w", name, err)}
return
}
results[idx] = result{index: MemberIndex{RemoteName: name, RepoType: remote.RepoType, BaseURL: remote.BaseURL, Body: body}}
return
}
prov, err := provider.Get(remote.PackageType)
if err != nil {
results[idx] = result{err: fmt.Errorf("provider %q: %w", remote.PackageType, err)}
@@ -102,7 +92,7 @@ func (e *Engine) fetchMemberIndexes(ctx context.Context, virt models.Virtual, pa
return
}
results[idx] = result{index: MemberIndex{RemoteName: name, RepoType: remote.RepoType, BaseURL: remote.BaseURL, Body: body}}
results[idx] = result{index: MemberIndex{RemoteName: name, Body: body}}
}(i, memberName)
}
@@ -119,17 +109,3 @@ func (e *Engine) fetchMemberIndexes(ctx context.Context, virt models.Virtual, pa
return members, nil
}
func (e *Engine) fetchLocalIndex(ctx context.Context, remote models.Remote, path string) ([]byte, error) {
prov, err := provider.Get(remote.PackageType)
if err != nil {
return nil, fmt.Errorf("no provider for %q: %w", remote.PackageType, err)
}
indexer, ok := prov.(provider.LocalIndexer)
if !ok {
return nil, fmt.Errorf("provider %q does not support local index generation", remote.PackageType)
}
return indexer.GenerateLocalIndex(ctx, e.db, remote.Name, path)
}
+3 -40
View File
@@ -54,27 +54,15 @@ func (m *HelmMerger) MergeIndexes(members []MemberIndex, proxyBaseURL string) ([
seen[chart][ver.Version] = true
if proxyBaseURL != "" {
routePrefix := "remote"
if member.RepoType == "local" {
routePrefix = "local"
}
baseHost := extractHost(member.BaseURL)
for i, u := range ver.URLs {
if strings.HasPrefix(u, "http://") || strings.HasPrefix(u, "https://") {
if baseHost != "" && extractHost(u) != baseHost {
continue
}
relPath := extractPathRelativeToBase(u, member.BaseURL)
ver.URLs[i] = fmt.Sprintf("%s/api/v1/%s/%s/%s",
ver.URLs[i] = fmt.Sprintf("%s/api/v1/remote/%s/%s",
strings.TrimRight(proxyBaseURL, "/"),
routePrefix,
member.RemoteName,
relPath)
extractPath(u))
} else {
ver.URLs[i] = fmt.Sprintf("%s/api/v1/%s/%s/%s",
ver.URLs[i] = fmt.Sprintf("%s/api/v1/remote/%s/%s",
strings.TrimRight(proxyBaseURL, "/"),
routePrefix,
member.RemoteName,
u)
}
@@ -90,31 +78,6 @@ func (m *HelmMerger) MergeIndexes(members []MemberIndex, proxyBaseURL string) ([
return yaml.Marshal(merged)
}
func extractHost(rawURL string) string {
idx := strings.Index(rawURL, "://")
if idx == -1 {
return ""
}
rest := rawURL[idx+3:]
slashIdx := strings.Index(rest, "/")
if slashIdx == -1 {
return rest
}
return rest[:slashIdx]
}
func extractPathRelativeToBase(rawURL, baseURL string) string {
fullPath := extractPath(rawURL)
basePath := extractPath(baseURL)
if basePath != "" {
basePath = strings.TrimRight(basePath, "/") + "/"
if strings.HasPrefix(fullPath, basePath) {
return fullPath[len(basePath):]
}
}
return fullPath
}
func extractPath(rawURL string) string {
idx := strings.Index(rawURL, "://")
if idx == -1 {
-2
View File
@@ -8,8 +8,6 @@ import (
type MemberIndex struct {
RemoteName string
RepoType models.RepoType
BaseURL string
Body []byte
}
+1 -6
View File
@@ -36,13 +36,8 @@ func (m *PyPIMerger) MergeIndexes(members []MemberIndex, proxyBaseURL string) ([
}
if proxyBaseURL != "" && href != "" {
routePrefix := "remote"
if member.RepoType == "local" {
routePrefix = "local"
}
href = fmt.Sprintf("%s/api/v1/%s/%s/%s",
href = fmt.Sprintf("%s/api/v1/remote/%s/%s",
strings.TrimRight(proxyBaseURL, "/"),
routePrefix,
member.RemoteName,
strings.TrimLeft(href, "/"))
}
-30
View File
@@ -2,7 +2,6 @@ package models
import (
"fmt"
"regexp"
"time"
)
@@ -47,11 +46,6 @@ type Remote struct {
MutableTTL int `json:"mutable_ttl"`
CheckMutable bool `json:"check_mutable"`
// Upstream HTTP timeouts in seconds. 0 means use the server default.
UpstreamDialTimeout int `json:"upstream_dial_timeout,omitempty"`
UpstreamTLSTimeout int `json:"upstream_tls_timeout,omitempty"`
UpstreamResponseHeaderTimeout int `json:"upstream_response_header_timeout,omitempty"`
Patterns []string `json:"patterns,omitempty"`
Blocklist []string `json:"blocklist,omitempty"`
MutablePatterns []string `json:"mutable_patterns,omitempty"`
@@ -72,30 +66,6 @@ type Remote struct {
UpdatedAt time.Time `json:"updated_at"`
}
// ValidatePatterns ensures every configured regex compiles. Storing an
// invalid pattern would otherwise be silently dropped at match time, which
// for the blocklist is a fail-open: a mistyped deny rule becomes a no-op.
func (r *Remote) ValidatePatterns() error {
groups := []struct {
field string
patterns []string
}{
{"patterns", r.Patterns},
{"blocklist", r.Blocklist},
{"mutable_patterns", r.MutablePatterns},
{"immutable_patterns", r.ImmutablePatterns},
{"ban_tags", r.BanTags},
}
for _, g := range groups {
for _, p := range g.patterns {
if _, err := regexp.Compile(p); err != nil {
return fmt.Errorf("invalid regex in %s: %q: %w", g.field, p, err)
}
}
}
return nil
}
type RemoteWithStats struct {
Remote
Stats RemoteStats `json:"stats"`
-19
View File
@@ -1,19 +0,0 @@
package models
import "testing"
func TestRemote_ValidatePatterns(t *testing.T) {
valid := &Remote{
Patterns: []string{`.*\.tar\.gz$`},
Blocklist: []string{`^secret/`},
ImmutablePatterns: []string{`\.rpm$`},
}
if err := valid.ValidatePatterns(); err != nil {
t.Fatalf("expected valid patterns, got %v", err)
}
bad := &Remote{Blocklist: []string{`[unterminated`}}
if err := bad.ValidatePatterns(); err == nil {
t.Fatal("expected error for invalid blocklist regex, got nil")
}
}
-40
View File
@@ -1,40 +0,0 @@
#!/usr/bin/env bash
# Build the artifactapi container, bring up the full stack (postgres, redis,
# minio, artifactapi) plus a static mock upstream, and run the dockerised e2e
# suite against the running product over HTTP. Tears everything down on exit.
set -euo pipefail
cd "$(dirname "$0")/.."
# Publish artifactapi on 8001 to avoid colliding with a local instance on 8000.
export ARTIFACTAPI_PORT="${ARTIFACTAPI_PORT:-8001}"
COMPOSE=(docker compose -f docker-compose.yml -f docker-compose.e2e.yml)
API_URL="${ARTIFACTAPI_URL:-http://localhost:${ARTIFACTAPI_PORT}}"
cleanup() {
echo "==> tearing down stack"
"${COMPOSE[@]}" down -v --remove-orphans >/dev/null 2>&1 || true
}
trap cleanup EXIT
echo "==> building and starting stack (postgres, redis, minio, mockupstream, artifactapi)"
"${COMPOSE[@]}" up -d --build postgres redis minio mockupstream artifactapi
echo "==> waiting for artifactapi health at ${API_URL}"
for i in $(seq 1 60); do
if curl -fsS "${API_URL}/health" >/dev/null 2>&1; then
echo " healthy after ${i}s"
break
fi
if [ "$i" -eq 60 ]; then
echo "!!! artifactapi did not become healthy in time; recent logs:"
"${COMPOSE[@]}" logs --tail=50 artifactapi
exit 1
fi
sleep 1
done
echo "==> running dockerised e2e suite"
ARTIFACTAPI_URL="${API_URL}" \
MOCK_UPSTREAM_INTERNAL="${MOCK_UPSTREAM_INTERNAL:-http://mockupstream}" \
go test -tags=dockere2e -count=1 -timeout=10m -v ./e2e-docker/...
-7
View File
@@ -6,20 +6,13 @@ COPY package.json package-lock.json* ./
RUN npm ci
COPY . .
ARG BASE_PATH=/
ENV BASE_PATH=${BASE_PATH}
RUN npm run build
FROM nginx:alpine
ARG BASE_PATH=/
COPY --from=builder /app/dist /usr/share/nginx/html
COPY nginx.conf /etc/nginx/conf.d/default.conf
RUN sed -i "s|\${BASE_PATH}|${BASE_PATH}|g" /etc/nginx/conf.d/default.conf
EXPOSE 80
CMD ["nginx", "-g", "daemon off;"]
+26 -5
View File
@@ -5,12 +5,33 @@ server {
root /usr/share/nginx/html;
index index.html;
location ${BASE_PATH}/ {
rewrite ^${BASE_PATH}(/.*)$ $1 break;
try_files $uri $uri/ /index.html;
location /api/ {
proxy_pass http://artifactapi:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_buffering off;
}
location = ${BASE_PATH} {
return 301 ${BASE_PATH}/;
location /v2/ {
proxy_pass http://artifactapi:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_buffering off;
}
location /health {
proxy_pass http://artifactapi:8000;
}
location /metrics {
proxy_pass http://artifactapi:8000;
}
location / {
try_files $uri $uri/ /index.html;
}
}
-6
View File
@@ -2,8 +2,6 @@ import { Routes, Route, NavLink } from 'react-router-dom';
import { Dashboard } from './pages/Dashboard';
import { Remotes } from './pages/Remotes';
import { RemoteDetail } from './pages/RemoteDetail';
import { Locals } from './pages/Locals';
import { LocalDetail } from './pages/LocalDetail';
import { Virtuals } from './pages/Virtuals';
import { Objects } from './pages/Objects';
import { Probe } from './pages/Probe';
@@ -20,7 +18,6 @@ export function App() {
<div className="sidebar-nav">
<NavLink to="/" end>Dashboard</NavLink>
<NavLink to="/remotes">Remotes</NavLink>
<NavLink to="/locals">Locals</NavLink>
<NavLink to="/virtuals">Virtuals</NavLink>
<NavLink to="/probe">Test Remote</NavLink>
</div>
@@ -34,9 +31,6 @@ export function App() {
<Route path="/remotes" element={<Remotes />} />
<Route path="/remotes/:name" element={<RemoteDetail />} />
<Route path="/remotes/:name/objects" element={<Objects />} />
<Route path="/locals" element={<Locals />} />
<Route path="/locals/:name" element={<LocalDetail />} />
<Route path="/locals/:name/objects" element={<Objects />} />
<Route path="/virtuals" element={<Virtuals />} />
<Route path="/probe" element={<Probe />} />
</Routes>
+1 -5
View File
@@ -4,13 +4,9 @@ import { BrowserRouter } from 'react-router-dom';
import { App } from './App';
import './index.css';
declare const __BASE_PATH__: string;
const basename = __BASE_PATH__.replace(/\/+$/, '') || '/';
createRoot(document.getElementById('root')!).render(
<StrictMode>
<BrowserRouter basename={basename}>
<BrowserRouter>
<App />
</BrowserRouter>
</StrictMode>,
-5
View File
@@ -50,11 +50,6 @@ export function Dashboard() {
value={formatNumber(stats.total_blobs_deduped)}
sub="shared blobs"
/>
<StatsCard
label="Bandwidth Saved"
value={formatBytes(stats.bandwidth_saved_30d)}
sub="last 30 days"
/>
</div>
{health && (
-46
View File
@@ -1,46 +0,0 @@
import { useEffect, useState } from 'react';
import { useParams, Link } from 'react-router-dom';
import { api } from '../api/client';
import type { Remote } from '../api/types';
import { Badge } from '../components/Badge';
import './RemoteDetail.css';
export function LocalDetail() {
const { name } = useParams<{ name: string }>();
const [remote, setRemote] = useState<Remote | null>(null);
const [error, setError] = useState<string | null>(null);
useEffect(() => {
if (!name) return;
api.getRemote(name)
.then(setRemote)
.catch(e => setError(e.message));
}, [name]);
if (error) return <div className="error-banner">{error}</div>;
if (!remote) return <div className="loading">Loading...</div>;
return (
<div>
<div className="detail-header">
<Link to="/locals" className="back-link">&larr; Locals</Link>
<h1 className="page-title">{remote.name}</h1>
<div className="detail-badges">
<Badge variant="blue">{remote.package_type}</Badge>
<Badge variant="default">local</Badge>
{remote.managed_by && <Badge variant="green">managed by {remote.managed_by}</Badge>}
</div>
</div>
{remote.description && (
<p className="detail-description">{remote.description}</p>
)}
<div className="detail-actions">
<Link to={`/locals/${remote.name}/objects`} className="btn btn-primary">
Browse Files
</Link>
</div>
</div>
);
}
-93
View File
@@ -1,93 +0,0 @@
import { useEffect, useState } from 'react';
import { useNavigate } from 'react-router-dom';
import { api } from '../api/client';
import type { Remote } from '../api/types';
import { Badge } from '../components/Badge';
import { DataTable } from '../components/DataTable';
import './Remotes.css';
const typeColors: Record<string, 'blue' | 'green' | 'yellow' | 'red' | 'default'> = {
docker: 'blue',
helm: 'green',
rpm: 'yellow',
pypi: 'blue',
npm: 'red',
generic: 'default',
alpine: 'green',
puppet: 'yellow',
terraform: 'blue',
goproxy: 'green',
};
export function Locals() {
const navigate = useNavigate();
const [remotes, setRemotes] = useState<Remote[]>([]);
const [filter, setFilter] = useState('');
const [loading, setLoading] = useState(true);
useEffect(() => {
api.listRemotes()
.then(r => setRemotes((r || []).filter(x => x.repo_type === 'local')))
.finally(() => setLoading(false));
}, []);
const filtered = remotes.filter(r => {
if (filter && !r.name.toLowerCase().includes(filter.toLowerCase())) return false;
return true;
});
return (
<div>
<h1 className="page-title">Local Repositories</h1>
<div className="remotes-toolbar">
<input
className="search-input"
placeholder="Filter by name..."
value={filter}
onChange={e => setFilter(e.target.value)}
/>
<span className="result-count">{filtered.length} locals</span>
</div>
{loading ? (
<div className="loading">Loading...</div>
) : (
<DataTable
columns={[
{
key: 'name',
header: 'Name',
render: (r: Remote) => <span className="mono">{r.name}</span>,
},
{
key: 'type',
header: 'Type',
render: (r: Remote) => (
<Badge variant={typeColors[r.package_type] || 'default'}>
{r.package_type}
</Badge>
),
width: '110px',
},
{
key: 'description',
header: 'Description',
render: (r: Remote) => r.description || <span className="text-muted"></span>,
},
{
key: 'managed',
header: 'Managed',
render: (r: Remote) =>
r.managed_by ? <Badge variant="blue">{r.managed_by}</Badge> : <span className="text-muted"></span>,
width: '100px',
},
]}
data={filtered}
emptyMessage="No local repositories configured"
onRowClick={(r) => navigate(`/locals/${r.name}`)}
/>
)}
</div>
);
}
+2 -5
View File
@@ -1,5 +1,5 @@
import { useEffect, useState, useCallback, useMemo } from 'react';
import { useParams, useLocation, Link } from 'react-router-dom';
import { useParams, Link } from 'react-router-dom';
import { api } from '../api/client';
import type { Artifact } from '../api/types';
import { formatBytes, timeAgo, truncateHash } from '../components/format';
@@ -171,9 +171,6 @@ function TreeRow({ node, depth, expanded, onToggle, onEvict }: TreeRowProps) {
export function Objects() {
const { name } = useParams<{ name: string }>();
const location = useLocation();
const isLocal = location.pathname.startsWith('/locals/');
const backLink = isLocal ? `/locals/${name}` : `/remotes/${name}`;
const [artifacts, setArtifacts] = useState<Artifact[]>([]);
const [loading, setLoading] = useState(true);
const [filter, setFilter] = useState('');
@@ -236,7 +233,7 @@ export function Objects() {
return (
<div>
<div className="detail-header">
<Link to={backLink} className="back-link">&larr; {name}</Link>
<Link to={`/remotes/${name}`} className="back-link">&larr; {name}</Link>
<h1 className="page-title">Cached Objects</h1>
</div>
+2 -3
View File
@@ -32,10 +32,9 @@ export function Remotes() {
.finally(() => setLoading(false));
}, []);
const remoteOnly = remotes.filter(r => r.repo_type !== 'local');
const types = [...new Set(remoteOnly.map(r => r.package_type))].sort();
const types = [...new Set(remotes.map(r => r.package_type))].sort();
const filtered = remoteOnly.filter(r => {
const filtered = remotes.filter(r => {
if (typeFilter && r.package_type !== typeFilter) return false;
if (filter && !r.name.toLowerCase().includes(filter.toLowerCase())) return false;
return true;
+10 -32
View File
@@ -1,38 +1,21 @@
import { useEffect, useState } from 'react';
import { Link } from 'react-router-dom';
import { api } from '../api/client';
import type { Remote, Virtual } from '../api/types';
import type { Virtual } from '../api/types';
import { Badge } from '../components/Badge';
import { DataTable } from '../components/DataTable';
import './Virtuals.css';
export function Virtuals() {
const [virtuals, setVirtuals] = useState<Virtual[]>([]);
const [remoteMap, setRemoteMap] = useState<Record<string, Remote>>({});
const [loading, setLoading] = useState(true);
const [expanded, setExpanded] = useState<string | null>(null);
useEffect(() => {
Promise.all([api.listVirtuals(), api.listRemotes()])
.then(([v, r]) => {
setVirtuals(v || []);
const map: Record<string, Remote> = {};
for (const remote of r || []) {
map[remote.name] = remote;
}
setRemoteMap(map);
})
api.listVirtuals()
.then(v => setVirtuals(v || []))
.finally(() => setLoading(false));
}, []);
function memberLink(name: string) {
const remote = remoteMap[name];
if (remote?.repo_type === 'local') {
return `/locals/${name}`;
}
return `/remotes/${name}`;
}
return (
<div>
<h1 className="page-title">Virtual Repositories</h1>
@@ -57,7 +40,7 @@ export function Virtuals() {
key: 'members',
header: 'Members',
render: (v: Virtual) => (
<span className="member-count">{v.members?.length || 0} repos</span>
<span className="member-count">{v.members?.length || 0} remotes</span>
),
width: '110px',
},
@@ -86,17 +69,12 @@ export function Virtuals() {
<ul className="member-list">
{virtuals
.find(v => v.name === expanded)
?.members?.map((m, i) => {
const remote = remoteMap[m];
const typeLabel = remote?.repo_type === 'local' ? 'local' : 'remote';
return (
<li key={m}>
<span className="member-priority">{i + 1}</span>
<Link to={memberLink(m)} className="mono">{m}</Link>
<Badge variant={typeLabel === 'local' ? 'yellow' : 'default'}>{typeLabel}</Badge>
</li>
);
})}
?.members?.map((m, i) => (
<li key={m}>
<span className="member-priority">{i + 1}</span>
<a href={`/remotes/${m}`} className="mono">{m}</a>
</li>
))}
</ul>
</div>
)}
-6
View File
@@ -1,10 +1,7 @@
import { defineConfig } from 'vite'
import react from '@vitejs/plugin-react'
const basePath = process.env.BASE_PATH || '/'
export default defineConfig({
base: basePath,
plugins: [react()],
server: {
proxy: {
@@ -14,7 +11,4 @@ export default defineConfig({
'/metrics': 'http://localhost:8000',
},
},
define: {
'__BASE_PATH__': JSON.stringify(basePath),
},
})