Compare commits

..

1 Commits

Author SHA1 Message Date
unkinben b698d1bdc0 perf: memoise regex compilation in the classifier
ci/woodpecker/pr/test Pipeline was successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/pr/build Pipeline was successful
Classify runs on every proxied request and recompiled each remote's
pattern lists every time. Cache compiled regexes keyed by pattern text so
each distinct pattern compiles once and is reused thereafter.

Refs #73
2026-07-02 00:33:46 +10:00
38 changed files with 108 additions and 1206 deletions
-4
View File
@@ -1,6 +1,2 @@
bin/
terraform/
# e2e-docker fixtures are real package files (.rpm, .tgz, .whl, .zip, ...) that
# are intentionally tracked, overriding any global ignore of those extensions.
!e2e-docker/fixtures/**
+1 -6
View File
@@ -1,4 +1,4 @@
.PHONY: build test lint fmt e2e docker-e2e docker docker-ui compose clean tidy check-go
.PHONY: build test lint fmt e2e docker docker-ui compose clean tidy check-go
BINARY := bin/artifactapi
MODULE := git.unkin.net/unkin/artifactapi
@@ -28,11 +28,6 @@ fmt: check-go
e2e: check-go
TESTCONTAINERS_RYUK_DISABLED=true go test -tags=e2e -race -count=1 -timeout=5m ./e2e/...
# Build the container, bring up the full docker-compose stack + a mock upstream,
# and run the black-box suite against the running product.
docker-e2e: check-go
./scripts/docker-e2e.sh
docker:
docker build -t artifactapi:$(VERSION) .
-18
View File
@@ -1,18 +0,0 @@
# Overlay for the dockerised end-to-end suite (scripts/docker-e2e.sh).
# Adds a static mock upstream that the artifactapi container proxies, so the
# caching tests are hermetic and need no internet access.
services:
mockupstream:
image: nginx:alpine
volumes:
- ./e2e-docker/fixtures:/usr/share/nginx/html:ro,z
# No host port needed: only the artifactapi container talks to it, and the
# tests compare served bytes against the on-disk fixtures.
artifactapi:
# The host port is set via ARTIFACTAPI_PORT (see scripts/docker-e2e.sh),
# defaulting to 8000; the e2e run uses 8001 to avoid colliding with a
# locally-running instance.
depends_on:
mockupstream:
condition: service_started
+1 -1
View File
@@ -2,7 +2,7 @@ services:
artifactapi:
build: .
ports:
- "${ARTIFACTAPI_PORT:-8000}:8000"
- "8000:8000"
environment:
LISTEN_ADDR: ":8000"
DBHOST: postgres
-39
View File
@@ -1,39 +0,0 @@
# Dockerised end-to-end suite
Black-box tests that run against a fully **containerised** artifactapi stack
(built image + Postgres + Redis + MinIO) plus a static mock upstream. Unlike the
in-process `e2e/` suite (testcontainers, server run in-process), these only speak
HTTP to the running product, so they exercise the shipped container image.
## Run
```bash
make docker-e2e # build image, compose up, run suite, compose down
```
`scripts/docker-e2e.sh` builds and starts `docker-compose.yml` +
`docker-compose.e2e.yml`, waits for `/health`, then runs
`go test -tags=dockere2e ./e2e-docker/...` and tears everything down.
The stack publishes artifactapi on host port **8001** (to avoid colliding with a
local instance on 8000). Override with `ARTIFACTAPI_URL` to point the tests at an
already-running stack.
## Coverage
- **Repository lifecycle** — add / change / delete for remote, local and virtual repos.
- **Caching** — one immutable artifact per remote package type (generic, docker,
helm, pypi, npm, rpm, alpine, puppet, terraform, goproxy) proxied through the
mock upstream: first fetch `X-Artifact-Source: remote`, second `cache`, bytes
verified against the origin fixture.
- **Local uploads** — generic (upload/download), pypi (wheel + generated `simple/`
index), rpm (real package + **automatic repodata** generation).
- **Virtual repositories** — pypi simple-index merge and helm `index.yaml` merge
across two members.
## Fixtures
`fixtures/` is served by the mock upstream at its web root. Paths mirror each
provider's upstream URL layout (e.g. `v2/...` for docker, `v1/providers/...` for
terraform). The RPM under `fixtures/rpmrepo/Packages/` is a real package so the
rpm provider can parse its metadata for repodata generation.
-76
View File
@@ -1,76 +0,0 @@
//go:build dockere2e
package e2edocker
import (
"bytes"
"fmt"
"net/http"
"testing"
)
// TestCachingPerProvider proxies one immutable artifact for every remote
// package type through the mock upstream and asserts: first fetch is served
// from the remote, the second from cache, and the bytes match the origin.
func TestCachingPerProvider(t *testing.T) {
cases := []struct {
pkgType string
// path is the request path under /api/v1/remote/<name>/. The provider
// derives the upstream URL from it (docker prepends /v2/, terraform
// prepends /v1/providers/), and the fixture lives at that resolved path.
path string
fixture string
}{
{"generic", "blobs/hello.bin", "blobs/hello.bin"},
{"npm", "mypkg/-/mypkg-1.0.0.tgz", "mypkg/-/mypkg-1.0.0.tgz"},
{"helm", "charts/mychart-1.0.0.tgz", "charts/mychart-1.0.0.tgz"},
{"pypi", "packages/foo-1.0-py3-none-any.whl", "packages/foo-1.0-py3-none-any.whl"},
{"rpm", "rpmrepo/Packages/e2e-testpkg-1.0-1.noarch.rpm", "rpmrepo/Packages/e2e-testpkg-1.0-1.noarch.rpm"},
{"alpine", "alpine/x86_64/testpkg-1.0-r0.apk", "alpine/x86_64/testpkg-1.0-r0.apk"},
{"puppet", "puppet-releases/author-mod-1.0.0.tar.gz", "puppet-releases/author-mod-1.0.0.tar.gz"},
{"goproxy", "goproxy/example.com/mod/@v/v1.0.0.zip", "goproxy/example.com/mod/@v/v1.0.0.zip"},
{"terraform", "hashicorp/aws/download/pkg.zip", "v1/providers/hashicorp/aws/download/pkg.zip"},
{"docker", "library/testimg/blobs/blobdata", "v2/library/testimg/blobs/blobdata"},
}
for _, tc := range cases {
t.Run(tc.pkgType, func(t *testing.T) {
name := "cache-" + tc.pkgType
createRepo(t, fmt.Sprintf(`{
"name": %q,
"package_type": %q,
"repo_type": "remote",
"base_url": %q,
"stale_on_error": true
}`, name, tc.pkgType, mockUpstream()))
defer deleteRepo(t, name)
want := fixtureBytes(t, tc.fixture)
url := api("/api/v1/remote/" + name + "/" + tc.path)
// First fetch: from remote.
resp, body := doRequest(t, http.MethodGet, url, nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("first fetch: status %d: %s", resp.StatusCode, body)
}
if src := resp.Header.Get("X-Artifact-Source"); src != "remote" {
t.Fatalf("first fetch source = %q, want remote", src)
}
if !bytes.Equal(body, want) {
t.Fatalf("first fetch body mismatch: got %d bytes, want %d", len(body), len(want))
}
// Second fetch: from cache, identical bytes.
resp, body = doRequest(t, http.MethodGet, url, nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("second fetch: status %d: %s", resp.StatusCode, body)
}
if src := resp.Header.Get("X-Artifact-Source"); src != "cache" {
t.Fatalf("second fetch source = %q, want cache", src)
}
if !bytes.Equal(body, want) {
t.Fatalf("cached body mismatch: got %d bytes, want %d", len(body), len(want))
}
})
}
}
Binary file not shown.
-1
View File
@@ -1 +0,0 @@
hello artifactapi generic blob
Binary file not shown.
-8
View File
@@ -1,8 +0,0 @@
apiVersion: v1
entries:
alpha:
- name: alpha
version: 1.0.0
urls:
- charts/alpha-1.0.0.tgz
generated: "2026-01-01T00:00:00Z"
-8
View File
@@ -1,8 +0,0 @@
apiVersion: v1
entries:
beta:
- name: beta
version: 2.0.0
urls:
- charts/beta-2.0.0.tgz
generated: "2026-01-01T00:00:00Z"
Binary file not shown.
-108
View File
@@ -1,108 +0,0 @@
//go:build dockere2e
// Package e2edocker holds the black-box end-to-end suite that runs against a
// fully dockerised artifactapi stack (see scripts/docker-e2e.sh). Unlike the
// in-process e2e suite, these tests only speak HTTP to the running container.
package e2edocker
import (
"bytes"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"testing"
"time"
)
func baseURL() string {
if v := os.Getenv("ARTIFACTAPI_URL"); v != "" {
return strings.TrimRight(v, "/")
}
return "http://localhost:8000"
}
// mockUpstream is the base URL the artifactapi *container* uses to reach the
// static mock upstream. It is resolved on the compose network, not the host.
func mockUpstream() string {
if v := os.Getenv("MOCK_UPSTREAM_INTERNAL"); v != "" {
return strings.TrimRight(v, "/")
}
return "http://mockupstream"
}
func api(path string) string { return baseURL() + path }
func fixtureBytes(t *testing.T, rel string) []byte {
t.Helper()
b, err := os.ReadFile(filepath.Join("fixtures", rel))
if err != nil {
t.Fatalf("read fixture %s: %v", rel, err)
}
return b
}
func doRequest(t *testing.T, method, url string, body []byte, contentType string) (*http.Response, []byte) {
t.Helper()
var r io.Reader
if body != nil {
r = bytes.NewReader(body)
}
req, err := http.NewRequest(method, url, r)
if err != nil {
t.Fatalf("%s %s: %v", method, url, err)
}
if contentType != "" {
req.Header.Set("Content-Type", contentType)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("%s %s: %v", method, url, err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
return resp, respBody
}
func createRepo(t *testing.T, jsonBody string) {
t.Helper()
resp, body := doRequest(t, http.MethodPost, api("/api/v2/remotes"), []byte(jsonBody), "application/json")
if resp.StatusCode != http.StatusCreated {
t.Fatalf("create repo: status %d: %s", resp.StatusCode, body)
}
}
func deleteRepo(t *testing.T, name string) {
t.Helper()
doRequest(t, http.MethodDelete, api("/api/v2/remotes/"+name), nil, "")
}
func createVirtual(t *testing.T, jsonBody string) {
t.Helper()
resp, body := doRequest(t, http.MethodPost, api("/api/v2/virtuals"), []byte(jsonBody), "application/json")
if resp.StatusCode != http.StatusCreated {
t.Fatalf("create virtual: status %d: %s", resp.StatusCode, body)
}
}
func deleteVirtual(t *testing.T, name string) {
t.Helper()
doRequest(t, http.MethodDelete, api("/api/v2/virtuals/"+name), nil, "")
}
// getEventually retries a GET until it returns 200 or the deadline passes. Used
// for asynchronously-generated artifacts (e.g. rpm repodata after upload).
func getEventually(t *testing.T, url string, timeout time.Duration) (*http.Response, []byte) {
t.Helper()
deadline := time.Now().Add(timeout)
var resp *http.Response
var body []byte
for {
resp, body = doRequest(t, http.MethodGet, url, nil, "")
if resp.StatusCode == http.StatusOK || time.Now().After(deadline) {
return resp, body
}
time.Sleep(250 * time.Millisecond)
}
}
-93
View File
@@ -1,93 +0,0 @@
//go:build dockere2e
package e2edocker
import (
"bytes"
"net/http"
"strings"
"testing"
"time"
)
func uploadFile(t *testing.T, repo, filePath string, body []byte, contentType string) {
t.Helper()
url := api("/api/v2/remotes/" + repo + "/files/" + filePath)
resp, respBody := doRequest(t, http.MethodPut, url, body, contentType)
if resp.StatusCode != http.StatusCreated {
t.Fatalf("upload %s: status %d: %s", filePath, resp.StatusCode, respBody)
}
}
// TestLocalGenericUpload uploads a generic file and downloads it back.
func TestLocalGenericUpload(t *testing.T) {
createRepo(t, `{"name":"local-generic","package_type":"generic","repo_type":"local"}`)
defer deleteRepo(t, "local-generic")
content := []byte("artifactapi local generic upload payload")
uploadFile(t, "local-generic", "data/hello.bin", content, "application/octet-stream")
resp, body := doRequest(t, http.MethodGet, api("/api/v1/local/local-generic/data/hello.bin"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("download: status %d: %s", resp.StatusCode, body)
}
if !bytes.Equal(body, content) {
t.Fatalf("downloaded content mismatch")
}
}
// TestLocalPyPIUpload uploads a wheel and validates the generated simple index.
func TestLocalPyPIUpload(t *testing.T) {
createRepo(t, `{"name":"local-pypi","package_type":"pypi","repo_type":"local"}`)
defer deleteRepo(t, "local-pypi")
wheel := fixtureBytes(t, "packages/foo-1.0-py3-none-any.whl")
uploadFile(t, "local-pypi", "foo-1.0-py3-none-any.whl", wheel, "application/zip")
// Root index lists the package.
resp, body := doRequest(t, http.MethodGet, api("/api/v1/local/local-pypi/simple/"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("simple index: status %d: %s", resp.StatusCode, body)
}
if !strings.Contains(string(body), "foo") {
t.Fatalf("simple index missing package 'foo': %s", body)
}
// Per-package index lists the wheel file.
resp, body = doRequest(t, http.MethodGet, api("/api/v1/local/local-pypi/simple/foo/"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("package index: status %d: %s", resp.StatusCode, body)
}
if !strings.Contains(string(body), "foo-1.0-py3-none-any.whl") {
t.Fatalf("package index missing wheel: %s", body)
}
// The wheel downloads back byte-identical.
resp, body = doRequest(t, http.MethodGet, api("/api/v1/local/local-pypi/foo/foo-1.0-py3-none-any.whl"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("download wheel: status %d: %s", resp.StatusCode, body)
}
if !bytes.Equal(body, wheel) {
t.Fatalf("wheel content mismatch")
}
}
// TestLocalRPMRepodata uploads a real RPM and validates that repodata is
// generated automatically (the special rpm-local feature).
func TestLocalRPMRepodata(t *testing.T) {
createRepo(t, `{"name":"local-rpm","package_type":"rpm","repo_type":"local"}`)
defer deleteRepo(t, "local-rpm")
rpm := fixtureBytes(t, "rpmrepo/Packages/e2e-testpkg-1.0-1.noarch.rpm")
uploadFile(t, "local-rpm", "e2e-testpkg-1.0-1.noarch.rpm", rpm, "application/x-rpm")
// repodata is generated asynchronously after upload; poll for it.
resp, body := getEventually(t, api("/api/v1/local/local-rpm/repodata/repomd.xml"), 15*time.Second)
if resp.StatusCode != http.StatusOK {
t.Fatalf("repomd.xml: status %d: %s", resp.StatusCode, body)
}
s := string(body)
if !strings.Contains(s, "<repomd") || !strings.Contains(s, "primary") {
t.Fatalf("repomd.xml not a valid repodata document: %s", s)
}
}
-134
View File
@@ -1,134 +0,0 @@
//go:build dockere2e
package e2edocker
import (
"encoding/json"
"net/http"
"testing"
)
func TestHealth(t *testing.T) {
resp, body := doRequest(t, http.MethodGet, api("/health"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("health: status %d: %s", resp.StatusCode, body)
}
}
// TestRemoteLifecycle covers add/change/delete for a remote repository.
func TestRemoteLifecycle(t *testing.T) {
createRepo(t, `{
"name": "crud-remote",
"package_type": "generic",
"repo_type": "remote",
"base_url": "https://example.com",
"mutable_ttl": 600,
"stale_on_error": true
}`)
defer deleteRepo(t, "crud-remote")
got := getRepo(t, "crud-remote")
if got["base_url"] != "https://example.com" || got["mutable_ttl"].(float64) != 600 {
t.Fatalf("unexpected created remote: %v", got)
}
// change
resp, body := doRequest(t, http.MethodPut, api("/api/v2/remotes/crud-remote"), []byte(`{
"package_type": "generic",
"base_url": "https://updated.example.com",
"mutable_ttl": 120,
"stale_on_error": true
}`), "application/json")
if resp.StatusCode != http.StatusOK {
t.Fatalf("update remote: status %d: %s", resp.StatusCode, body)
}
got = getRepo(t, "crud-remote")
if got["base_url"] != "https://updated.example.com" || got["mutable_ttl"].(float64) != 120 {
t.Fatalf("update not applied: %v", got)
}
// delete
resp, _ = doRequest(t, http.MethodDelete, api("/api/v2/remotes/crud-remote"), nil, "")
if resp.StatusCode != http.StatusNoContent {
t.Fatalf("delete remote: status %d", resp.StatusCode)
}
resp, _ = doRequest(t, http.MethodGet, api("/api/v2/remotes/crud-remote"), nil, "")
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("expected 404 after delete, got %d", resp.StatusCode)
}
}
// TestLocalLifecycle covers add/delete for a local repository.
func TestLocalLifecycle(t *testing.T) {
createRepo(t, `{
"name": "crud-local",
"package_type": "generic",
"repo_type": "local"
}`)
defer deleteRepo(t, "crud-local")
got := getRepo(t, "crud-local")
if got["repo_type"] != "local" {
t.Fatalf("expected repo_type local, got %v", got["repo_type"])
}
resp, _ := doRequest(t, http.MethodDelete, api("/api/v2/remotes/crud-local"), nil, "")
if resp.StatusCode != http.StatusNoContent {
t.Fatalf("delete local: status %d", resp.StatusCode)
}
}
// TestVirtualLifecycle covers add/change/delete for a virtual repository.
func TestVirtualLifecycle(t *testing.T) {
createRepo(t, `{"name":"vmem-a","package_type":"helm","repo_type":"remote","base_url":"https://a.example.com","stale_on_error":true}`)
createRepo(t, `{"name":"vmem-b","package_type":"helm","repo_type":"remote","base_url":"https://b.example.com","stale_on_error":true}`)
defer deleteRepo(t, "vmem-a")
defer deleteRepo(t, "vmem-b")
createVirtual(t, `{
"name": "crud-virtual",
"package_type": "helm",
"members": ["vmem-a"]
}`)
defer deleteVirtual(t, "crud-virtual")
// change members
resp, body := doRequest(t, http.MethodPut, api("/api/v2/virtuals/crud-virtual"), []byte(`{
"package_type": "helm",
"members": ["vmem-a", "vmem-b"]
}`), "application/json")
if resp.StatusCode != http.StatusOK {
t.Fatalf("update virtual: status %d: %s", resp.StatusCode, body)
}
resp, body = doRequest(t, http.MethodGet, api("/api/v2/virtuals/crud-virtual"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("get virtual: status %d: %s", resp.StatusCode, body)
}
var v map[string]any
if err := json.Unmarshal(body, &v); err != nil {
t.Fatalf("decode virtual: %v", err)
}
members, _ := v["members"].([]any)
if len(members) != 2 {
t.Fatalf("expected 2 members after update, got %v", v["members"])
}
resp, _ = doRequest(t, http.MethodDelete, api("/api/v2/virtuals/crud-virtual"), nil, "")
if resp.StatusCode != http.StatusNoContent {
t.Fatalf("delete virtual: status %d", resp.StatusCode)
}
}
func getRepo(t *testing.T, name string) map[string]any {
t.Helper()
resp, body := doRequest(t, http.MethodGet, api("/api/v2/remotes/"+name), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("get remote %s: status %d: %s", name, resp.StatusCode, body)
}
var m map[string]any
if err := json.Unmarshal(body, &m); err != nil {
t.Fatalf("decode remote %s: %v", name, err)
}
return m
}
-54
View File
@@ -1,54 +0,0 @@
//go:build dockere2e
package e2edocker
import (
"net/http"
"strings"
"testing"
)
// TestVirtualPyPIMerge uploads different packages to two pypi locals and
// checks that a virtual over them serves a merged simple index.
func TestVirtualPyPIMerge(t *testing.T) {
createRepo(t, `{"name":"pmerge-a","package_type":"pypi","repo_type":"local"}`)
createRepo(t, `{"name":"pmerge-b","package_type":"pypi","repo_type":"local"}`)
defer deleteRepo(t, "pmerge-a")
defer deleteRepo(t, "pmerge-b")
uploadFile(t, "pmerge-a", "foo-1.0-py3-none-any.whl", fixtureBytes(t, "packages/foo-1.0-py3-none-any.whl"), "application/zip")
uploadFile(t, "pmerge-b", "bar-2.0-py3-none-any.whl", []byte("bar wheel payload"), "application/zip")
createVirtual(t, `{"name":"pmerge-v","package_type":"pypi","members":["pmerge-a","pmerge-b"]}`)
defer deleteVirtual(t, "pmerge-v")
resp, body := doRequest(t, http.MethodGet, api("/api/v1/virtual/pmerge-v/simple/"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("virtual simple index: status %d: %s", resp.StatusCode, body)
}
s := string(body)
if !strings.Contains(s, "foo") || !strings.Contains(s, "bar") {
t.Fatalf("merged index missing a member package (want foo and bar): %s", s)
}
}
// TestVirtualHelmMerge points two helm remotes at mock index.yaml documents
// with distinct charts and checks the virtual merges both into one index.
func TestVirtualHelmMerge(t *testing.T) {
createRepo(t, `{"name":"hmerge-a","package_type":"helm","repo_type":"remote","base_url":"`+mockUpstream()+`/helm-a","stale_on_error":true}`)
createRepo(t, `{"name":"hmerge-b","package_type":"helm","repo_type":"remote","base_url":"`+mockUpstream()+`/helm-b","stale_on_error":true}`)
defer deleteRepo(t, "hmerge-a")
defer deleteRepo(t, "hmerge-b")
createVirtual(t, `{"name":"hmerge-v","package_type":"helm","members":["hmerge-a","hmerge-b"]}`)
defer deleteVirtual(t, "hmerge-v")
resp, body := doRequest(t, http.MethodGet, api("/api/v1/virtual/hmerge-v/index.yaml"), nil, "")
if resp.StatusCode != http.StatusOK {
t.Fatalf("virtual index.yaml: status %d: %s", resp.StatusCode, body)
}
s := string(body)
if !strings.Contains(s, "alpha") || !strings.Contains(s, "beta") {
t.Fatalf("merged helm index missing a member chart (want alpha and beta): %s", s)
}
}
+1 -1
View File
@@ -95,7 +95,7 @@ func TestMain(m *testing.M) {
}
cfg.ListenAddr = "127.0.0.1:0"
srv, err := server.New(cfg, "e2e-test")
srv, err := server.New(cfg)
if err != nil {
log.Fatalf("server: %v", err)
}
-24
View File
@@ -24,30 +24,6 @@ func TestRoot(t *testing.T) {
}
}
func TestRemoteUpstreamTimeouts(t *testing.T) {
createRemote(t, `{
"name": "timeout-test",
"package_type": "generic",
"base_url": "https://example.com",
"stale_on_error": true,
"upstream_dial_timeout": 3,
"upstream_tls_timeout": 4,
"upstream_response_header_timeout": 5
}`)
defer deleteRemote(t, "timeout-test")
remote := getJSON(t, apiURL("/api/v2/remotes/timeout-test"))
for field, want := range map[string]float64{
"upstream_dial_timeout": 3,
"upstream_tls_timeout": 4,
"upstream_response_header_timeout": 5,
} {
if got, _ := remote[field].(float64); got != want {
t.Errorf("%s: got %v, want %v", field, remote[field], want)
}
}
}
func TestRemoteCRUD(t *testing.T) {
createRemote(t, `{
"name": "test-generic",
-33
View File
@@ -24,39 +24,6 @@ func TestProxyBlocklist(t *testing.T) {
assertStatus(t, apiURL("/api/v1/remote/blocklist-test/malware.exe"), http.StatusForbidden)
}
func TestProxyHeadBlocklist(t *testing.T) {
createRemote(t, `{
"name": "head-block-test",
"package_type": "generic",
"base_url": "https://example.com",
"blocklist": ["\\.exe$"],
"stale_on_error": true
}`)
defer deleteRemote(t, "head-block-test")
req, _ := http.NewRequest(http.MethodHead, apiURL("/v2/head-block-test/malware.exe"), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("HEAD: %v", err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusForbidden {
t.Fatalf("HEAD blocklisted path: got %d, want 403", resp.StatusCode)
}
}
func TestProxyHeadUnknownRemote(t *testing.T) {
req, _ := http.NewRequest(http.MethodHead, apiURL("/v2/nonexistent/some/path"), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("HEAD: %v", err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("HEAD unknown remote: got %d, want 404", resp.StatusCode)
}
}
func TestProxyPatterns(t *testing.T) {
createRemote(t, `{
"name": "patterns-test",
+1 -37
View File
@@ -42,7 +42,7 @@ func (h *ProxyHandler) DockerV2Routes() chi.Router {
r.Get("/", h.handleDockerPing)
r.Head("/", h.handleDockerPing)
r.Get("/{remoteName}/*", h.handleProxy)
r.Head("/{remoteName}/*", h.handleProxyHead)
r.Head("/{remoteName}/*", h.handleProxy)
return r
}
@@ -89,42 +89,6 @@ func (h *ProxyHandler) handleProxy(w http.ResponseWriter, r *http.Request) {
io.Copy(w, result.Reader)
}
func (h *ProxyHandler) handleProxyHead(w http.ResponseWriter, r *http.Request) {
remoteName := chi.URLParam(r, "remoteName")
path := chi.URLParam(r, "*")
remote, err := h.db.GetRemote(r.Context(), remoteName)
if err != nil {
http.Error(w, fmt.Sprintf("remote %q not found", remoteName), http.StatusNotFound)
return
}
prov, err := provider.Get(remote.PackageType)
if err != nil {
http.Error(w, fmt.Sprintf("no provider for %q", remote.PackageType), http.StatusInternalServerError)
return
}
result, err := h.engine.Head(r.Context(), *remote, path, prov)
if err != nil {
var proxyErr *proxy.ProxyError
if errors.As(err, &proxyErr) {
http.Error(w, proxyErr.Message, proxyErr.Status)
return
}
slog.Error("proxy head failed", "remote", remoteName, "path", path, "error", err)
http.Error(w, "bad gateway", http.StatusBadGateway)
return
}
w.Header().Set("Content-Type", result.ContentType)
w.Header().Set("X-Artifact-Source", result.Source)
if result.Size > 0 {
w.Header().Set("Content-Length", fmt.Sprintf("%d", result.Size))
}
w.WriteHeader(http.StatusOK)
}
func (h *ProxyHandler) handleVirtual(w http.ResponseWriter, r *http.Request) {
virtualName := chi.URLParam(r, "virtualName")
path := chi.URLParam(r, "*")
-8
View File
@@ -69,10 +69,6 @@ func (h *RemotesHandler) create(w http.ResponseWriter, r *http.Request) {
http.Error(w, "base_url is required for remote repositories", http.StatusBadRequest)
return
}
if err := remote.ValidatePatterns(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := h.db.CreateRemote(r.Context(), &remote); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -88,10 +84,6 @@ func (h *RemotesHandler) update(w http.ResponseWriter, r *http.Request) {
return
}
remote.Name = name
if err := remote.ValidatePatterns(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := h.db.UpdateRemote(r.Context(), &remote); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
-12
View File
@@ -70,18 +70,6 @@ func (r *Redis) GetETag(ctx context.Context, remote, path string) (string, error
return val, err
}
func (r *Redis) GetToken(ctx context.Context, key string) (string, error) {
val, err := r.client.Get(ctx, "token:"+key).Result()
if err == redis.Nil {
return "", nil
}
return val, err
}
func (r *Redis) SetToken(ctx context.Context, key, token string, ttl time.Duration) error {
return r.client.Set(ctx, "token:"+key, token, ttl).Err()
}
func (r *Redis) IncrCircuitFailure(ctx context.Context, remote string, cooldown time.Duration) (int64, error) {
key := fmt.Sprintf("circuit:%s", remote)
pipe := r.client.Pipeline()
+1 -1
View File
@@ -65,7 +65,7 @@ func Load() (*Config, error) {
}
func getenv(key, fallback string) string {
if v, ok := os.LookupEnv(key); ok {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
+3 -38
View File
@@ -4,8 +4,6 @@ import (
"context"
"time"
"github.com/jackc/pgx/v5"
"git.unkin.net/unkin/artifactapi/pkg/models"
)
@@ -111,49 +109,16 @@ func (db *DB) InsertAccessLog(ctx context.Context, remoteName, path string, cach
return err
}
// AccessLogEntry is one buffered access-log record.
type AccessLogEntry struct {
RemoteName string
Path string
CacheHit bool
SizeBytes int64
UpstreamMS int
ClientIP string
}
// InsertAccessLogBatch bulk-inserts access-log rows with a single COPY.
func (db *DB) InsertAccessLogBatch(ctx context.Context, entries []AccessLogEntry) error {
if len(entries) == 0 {
return nil
}
rows := make([][]any, len(entries))
for i, e := range entries {
rows[i] = []any{e.RemoteName, e.Path, e.CacheHit, e.SizeBytes, e.UpstreamMS, e.ClientIP}
}
_, err := db.Pool.CopyFrom(ctx,
pgx.Identifier{"access_log"},
[]string{"remote_name", "path", "cache_hit", "size_bytes", "upstream_ms", "client_ip"},
pgx.CopyFromRows(rows),
)
return err
}
// FindOrphanedBlobs returns blobs no longer referenced by any artifact or
// local file, restricted to those created before now()-minAge. The age cutoff
// is a grace period that avoids a TOCTOU race with in-flight dedup uploads,
// which insert the blob row before the referencing artifact/local_files row.
func (db *DB) FindOrphanedBlobs(ctx context.Context, minAge time.Duration) ([]models.Blob, error) {
cutoff := time.Now().Add(-minAge)
func (db *DB) FindOrphanedBlobs(ctx context.Context) ([]models.Blob, error) {
rows, err := db.Pool.Query(ctx, `
SELECT b.content_hash, b.s3_key, b.size_bytes, b.content_type, b.created_at
FROM blobs b
WHERE b.created_at < $1
AND b.content_hash NOT IN (
WHERE b.content_hash NOT IN (
SELECT content_hash FROM artifacts
UNION
SELECT content_hash FROM local_files
)
`, cutoff)
`)
if err != nil {
return nil, err
}
-3
View File
@@ -124,9 +124,6 @@ func (db *DB) migrate() error {
CREATE INDEX IF NOT EXISTS idx_access_log_remote_time ON access_log(remote_name, created_at);
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS repo_type TEXT DEFAULT 'remote';
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_dial_timeout INTEGER DEFAULT 0;
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_tls_timeout INTEGER DEFAULT 0;
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_response_header_timeout INTEGER DEFAULT 0;
CREATE TABLE IF NOT EXISTS rpm_metadata (
id BIGSERIAL PRIMARY KEY,
+5 -14
View File
@@ -11,9 +11,7 @@ const remoteCols = `name, package_type, repo_type, base_url, description, userna
patterns, blocklist, mutable_patterns, immutable_patterns,
ban_tags_enabled, ban_tags,
quarantine_enabled, quarantine_days, stale_on_error,
releases_remote, managed_by,
upstream_dial_timeout, upstream_tls_timeout, upstream_response_header_timeout,
created_at, updated_at`
releases_remote, managed_by, created_at, updated_at`
func scanRemote(scanner interface{ Scan(...any) error }, r *models.Remote) error {
return scanner.Scan(
@@ -22,9 +20,7 @@ func scanRemote(scanner interface{ Scan(...any) error }, r *models.Remote) error
&r.Patterns, &r.Blocklist, &r.MutablePatterns, &r.ImmutablePatterns,
&r.BanTagsEnabled, &r.BanTags,
&r.QuarantineEnabled, &r.QuarantineDays, &r.StaleOnError,
&r.ReleasesRemote, &r.ManagedBy,
&r.UpstreamDialTimeout, &r.UpstreamTLSTimeout, &r.UpstreamResponseHeaderTimeout,
&r.CreatedAt, &r.UpdatedAt,
&r.ReleasesRemote, &r.ManagedBy, &r.CreatedAt, &r.UpdatedAt,
)
}
@@ -63,9 +59,8 @@ func (db *DB) CreateRemote(ctx context.Context, r *models.Remote) error {
patterns, blocklist, mutable_patterns, immutable_patterns,
ban_tags_enabled, ban_tags,
quarantine_enabled, quarantine_days, stale_on_error,
releases_remote, managed_by,
upstream_dial_timeout, upstream_tls_timeout, upstream_response_header_timeout
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24)
releases_remote, managed_by
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21)
`,
r.Name, r.PackageType, r.RepoType, r.BaseURL, r.Description, r.Username, r.Password,
r.ImmutableTTL, r.MutableTTL, r.CheckMutable,
@@ -73,7 +68,6 @@ func (db *DB) CreateRemote(ctx context.Context, r *models.Remote) error {
r.BanTagsEnabled, r.BanTags,
r.QuarantineEnabled, r.QuarantineDays, r.StaleOnError,
r.ReleasesRemote, r.ManagedBy,
r.UpstreamDialTimeout, r.UpstreamTLSTimeout, r.UpstreamResponseHeaderTimeout,
)
return err
}
@@ -86,9 +80,7 @@ func (db *DB) UpdateRemote(ctx context.Context, r *models.Remote) error {
patterns=$11, blocklist=$12, mutable_patterns=$13, immutable_patterns=$14,
ban_tags_enabled=$15, ban_tags=$16,
quarantine_enabled=$17, quarantine_days=$18, stale_on_error=$19,
releases_remote=$20, managed_by=$21,
upstream_dial_timeout=$22, upstream_tls_timeout=$23, upstream_response_header_timeout=$24,
updated_at=NOW()
releases_remote=$20, managed_by=$21, updated_at=NOW()
WHERE name=$1
`,
r.Name, r.PackageType, r.RepoType, r.BaseURL, r.Description, r.Username, r.Password,
@@ -97,7 +89,6 @@ func (db *DB) UpdateRemote(ctx context.Context, r *models.Remote) error {
r.BanTagsEnabled, r.BanTags,
r.QuarantineEnabled, r.QuarantineDays, r.StaleOnError,
r.ReleasesRemote, r.ManagedBy,
r.UpstreamDialTimeout, r.UpstreamTLSTimeout, r.UpstreamResponseHeaderTimeout,
)
return err
}
+1 -6
View File
@@ -9,11 +9,6 @@ import (
"git.unkin.net/unkin/artifactapi/internal/storage"
)
// blobGracePeriod is how old an orphaned blob must be before GC will delete
// it. This avoids racing in-flight dedup uploads that insert the blob row
// before the referencing artifact/local_files row exists.
const blobGracePeriod = 1 * time.Hour
type Collector struct {
db *database.DB
store *storage.S3
@@ -43,7 +38,7 @@ func (c *Collector) Run(ctx context.Context) {
func (c *Collector) sweep(ctx context.Context) {
start := time.Now()
orphaned, err := c.db.FindOrphanedBlobs(ctx, blobGracePeriod)
orphaned, err := c.db.FindOrphanedBlobs(ctx)
if err != nil {
slog.Error("gc: find orphaned blobs", "error", err)
return
+94 -307
View File
@@ -5,7 +5,6 @@ import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
@@ -22,65 +21,19 @@ import (
const fetchLockTTL = 30 * time.Second
const (
accessLogBufferSize = 4096
accessLogBatchSize = 128
accessLogFlushEvery = 2 * time.Second
)
type Engine struct {
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
circuit *CircuitBreaker
accessLog chan database.AccessLogEntry
db *database.DB
cache *cache.Redis
store *storage.S3
cas *storage.CAS
}
func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
e := &Engine{
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
circuit: NewCircuitBreaker(c),
accessLog: make(chan database.AccessLogEntry, accessLogBufferSize),
}
go e.runAccessLogWriter()
return e
}
// runAccessLogWriter drains the access-log channel and writes rows in batches,
// replacing a goroutine-per-request insert. It runs for the process lifetime;
// access logs are best-effort telemetry, so a small tail may be lost on abrupt
// shutdown.
func (e *Engine) runAccessLogWriter() {
ticker := time.NewTicker(accessLogFlushEvery)
defer ticker.Stop()
batch := make([]database.AccessLogEntry, 0, accessLogBatchSize)
flush := func() {
if len(batch) == 0 {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := e.db.InsertAccessLogBatch(ctx, batch); err != nil {
slog.Warn("access log batch insert failed", "error", err, "count", len(batch))
}
cancel()
batch = batch[:0]
}
for {
select {
case entry := <-e.accessLog:
batch = append(batch, entry)
if len(batch) >= accessLogBatchSize {
flush()
}
case <-ticker.C:
flush()
}
return &Engine{
db: db,
cache: c,
store: s,
cas: storage.NewCAS(s),
}
}
@@ -110,7 +63,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
e.logAccess(remote.Name, path, true, result.Size, 0)
go e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
slog.Warn("cache hit but S3 miss, re-fetching", "remote", remote.Name, "path", path)
@@ -122,12 +75,11 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
}
if !locked {
// Another request holds the fetch lock. Poll the store until the leader
// populates it rather than immediately racing to fetch upstream too; a
// cold-cache stampede otherwise hits upstream once per waiter.
if result := e.waitForStore(ctx, remote, path); result != nil {
time.Sleep(500 * time.Millisecond)
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
e.logAccess(remote.Name, path, true, result.Size, 0)
go e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
}
@@ -146,7 +98,7 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
result, err := e.serveFromStore(ctx, remote, path)
if err == nil {
result.Source = "cache"
e.logAccess(remote.Name, path, true, result.Size, 0)
go e.logAccess(remote.Name, path, true, result.Size, 0)
return result, nil
}
}
@@ -158,125 +110,27 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
fwdHeaders = clientHeaders[0]
}
// Short-circuit upstream calls when the remote's breaker is open: serve
// stale from the store if we have it, otherwise fail fast rather than
// hammering a known-bad upstream.
if e.circuit.IsOpen(ctx, remote.Name) {
if stale, serr := e.serveFromStore(ctx, remote, path); serr == nil {
slog.Warn("circuit open, serving stale", "remote", remote.Name, "path", path)
stale.Source = "cache"
e.logAccess(remote.Name, path, true, stale.Size, 0)
return stale, nil
}
return nil, &ProxyError{Status: http.StatusServiceUnavailable, Message: "upstream circuit open"}
}
start := time.Now()
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders)
upstreamMS := int(time.Since(start).Milliseconds())
if err != nil {
if isNetworkError(err) {
e.circuit.RecordFailure(ctx, remote.Name)
}
if remote.StaleOnError && isNetworkError(err) {
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
stale, serr := e.serveFromStore(ctx, remote, path)
if serr == nil {
slog.Warn("serving stale on upstream error", "remote", remote.Name, "path", path, "error", err)
stale.Source = "cache"
e.logAccess(remote.Name, path, true, stale.Size, 0)
go e.logAccess(remote.Name, path, true, stale.Size, 0)
return stale, nil
}
}
return nil, err
}
e.circuit.RecordSuccess(ctx, remote.Name)
e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
go e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
return result, nil
}
// HeadResult carries artifact metadata for a HEAD request. There is no body.
type HeadResult struct {
ContentType string
Size int64
Source string // "cache" or "remote"
}
// Head resolves artifact metadata without fetching or streaming the body.
// Cached artifacts/indexes are answered from the store metadata; on a miss it
// issues an upstream HEAD. It never downloads or caches the body.
func (e *Engine) Head(ctx context.Context, remote models.Remote, path string, prov provider.Provider) (*HeadResult, error) {
class := NewClassifier(prov).Classify(remote, path)
if class == ClassDenied {
return nil, &ProxyError{Status: http.StatusForbidden, Message: "access denied"}
}
if artifact, err := e.db.GetArtifact(ctx, remote.Name, path); err == nil && artifact != nil {
return &HeadResult{ContentType: artifact.ContentType, Size: artifact.SizeBytes, Source: "cache"}, nil
}
if info, err := e.store.Stat(ctx, storage.IndexKey(remote.Name, path)); err == nil {
return &HeadResult{ContentType: info.ContentType, Size: info.Size, Source: "cache"}, nil
}
return e.headUpstream(ctx, remote, path, prov)
}
func (e *Engine) headUpstream(ctx context.Context, remote models.Remote, path string, prov provider.Provider) (*HeadResult, error) {
url := prov.UpstreamURL(remote, path)
authHeaders, err := prov.AuthHeaders(ctx, remote)
if err != nil {
return nil, fmt.Errorf("auth headers: %w", err)
}
doHead := func(extra http.Header) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
for k, vv := range authHeaders {
for _, v := range vv {
req.Header.Add(k, v)
}
}
for k, vv := range extra {
for _, v := range vv {
req.Header.Set(k, v)
}
}
return http.DefaultClient.Do(req)
}
resp, err := doHead(nil)
if err != nil {
return nil, &UpstreamError{Err: err}
}
if resp.StatusCode == http.StatusUnauthorized {
resp.Body.Close()
token, _, terr := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
if terr == nil && token != "" {
resp, err = doHead(http.Header{"Authorization": []string{"Bearer " + token}})
if err != nil {
return nil, &UpstreamError{Err: err}
}
} else {
return nil, &ProxyError{Status: http.StatusUnauthorized, Message: "upstream returned 401"}
}
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, &ProxyError{Status: resp.StatusCode, Message: fmt.Sprintf("upstream returned %d", resp.StatusCode)}
}
contentType := prov.ContentType(path)
if ct := resp.Header.Get("Content-Type"); ct != "" {
contentType = ct
}
return &HeadResult{ContentType: contentType, Size: resp.ContentLength, Source: "remote"}, nil
}
func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, path string, prov provider.Provider, class Classification, ttl time.Duration, clientHeaders http.Header) (*FetchResult, error) {
url := prov.UpstreamURL(remote, path)
@@ -300,14 +154,14 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
}
}
resp, err := clientForRemote(remote).Do(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, &UpstreamError{Err: err}
}
if resp.StatusCode == http.StatusUnauthorized {
resp.Body.Close()
token, err := e.cachedBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
token, err := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
if err == nil && token != "" {
req2, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
req2.Header.Set("Authorization", "Bearer "+token)
@@ -316,7 +170,7 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
req2.Header.Set("Accept", accept)
}
}
resp, err = clientForRemote(remote).Do(req2)
resp, err = http.DefaultClient.Do(req2)
if err != nil {
return nil, &UpstreamError{Err: err}
}
@@ -330,108 +184,83 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
return nil, &ProxyError{Status: resp.StatusCode, Message: fmt.Sprintf("upstream returned %d", resp.StatusCode)}
}
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("read upstream body: %w", err)
}
rewritten, err := prov.RewriteResponse(body, remote, "")
if err != nil {
return nil, fmt.Errorf("rewrite response: %w", err)
}
if rewritten != nil {
body = rewritten
}
contentType := prov.ContentType(path)
if ct := resp.Header.Get("Content-Type"); ct != "" {
contentType = ct
}
// Mutable indexes are small and may be rewritten, so buffer them in memory.
if class == ClassMutable {
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("read upstream body: %w", err)
}
rewritten, err := prov.RewriteResponse(body, remote, "")
if err != nil {
return nil, fmt.Errorf("rewrite response: %w", err)
}
if rewritten != nil {
body = rewritten
}
s3Key := storage.IndexKey(remote.Name, path)
if err := e.store.Upload(ctx, s3Key, bytesReader(body), int64(len(body)), contentType); err != nil {
return nil, fmt.Errorf("upload index: %w", err)
}
etag := resp.Header.Get("ETag")
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
if etag != "" {
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
}
} else {
hash := sha256Hash(body)
s3Key := storage.BlobKey(hash)
exists, _ := e.store.Exists(ctx, s3Key)
if !exists {
if err := e.store.Upload(ctx, s3Key, bytesReader(body), int64(len(body)), contentType); err != nil {
return nil, fmt.Errorf("upload blob: %w", err)
}
}
contentHash := fmt.Sprintf("sha256:%s", hash)
if err := e.db.UpsertBlob(ctx, contentHash, s3Key, int64(len(body)), contentType); err != nil {
slog.Warn("upsert blob failed", "error", err)
}
if err := e.db.UpsertArtifact(ctx, remote.Name, path, contentHash, resp.Header.Get("ETag")); err != nil {
slog.Warn("upsert artifact failed", "error", err)
}
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
if etag := resp.Header.Get("ETag"); etag != "" {
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
}
return &FetchResult{
Reader: io.NopCloser(bytesReader(body)),
ContentType: contentType,
Size: int64(len(body)),
Source: "remote",
}, nil
}
// Immutable blobs are streamed through the content-addressable store
// (tempfile -> sha256 -> S3) so arbitrarily large artifacts never sit
// fully in memory. Immutable content is never rewritten in the proxy path.
casResult, err := e.cas.Store(ctx, resp.Body, contentType)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("store blob: %w", err)
}
if err := e.db.UpsertBlob(ctx, casResult.ContentHash, casResult.S3Key, casResult.SizeBytes, contentType); err != nil {
slog.Warn("upsert blob failed", "error", err)
}
if err := e.db.UpsertArtifact(ctx, remote.Name, path, casResult.ContentHash, resp.Header.Get("ETag")); err != nil {
slog.Warn("upsert artifact failed", "error", err)
}
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
if etag := resp.Header.Get("ETag"); etag != "" {
_ = e.cache.SetETag(ctx, remote.Name, path, etag, ttl)
}
reader, info, err := e.store.Download(ctx, casResult.S3Key)
if err != nil {
return nil, fmt.Errorf("serve stored blob: %w", err)
}
return &FetchResult{
Reader: reader,
ContentType: info.ContentType,
Size: casResult.SizeBytes,
Reader: io.NopCloser(bytesReader(body)),
ContentType: contentType,
Size: int64(len(body)),
Source: "remote",
}, nil
}
// waitForStore polls the store for an artifact populated by the request that
// holds the fetch lock, returning it once available or nil if it does not
// appear within the wait budget (after which the caller fetches upstream
// itself). It stops early if the request context is cancelled.
func (e *Engine) waitForStore(ctx context.Context, remote models.Remote, path string) *FetchResult {
const (
pollInterval = 100 * time.Millisecond
maxWait = 5 * time.Second
)
deadline := time.Now().Add(maxWait)
for {
if result, err := e.serveFromStore(ctx, remote, path); err == nil {
return result
}
if time.Now().After(deadline) {
return nil
}
select {
case <-ctx.Done():
return nil
case <-time.After(pollInterval):
}
}
}
func (e *Engine) serveFromStore(ctx context.Context, remote models.Remote, path string) (*FetchResult, error) {
artifact, err := e.db.GetArtifact(ctx, remote.Name, path)
if err == nil && artifact != nil {
reader, info, err := e.store.Download(ctx, artifact.ContentHash[len("sha256:"):])
if err == nil {
_ = e.db.TouchArtifactAccess(ctx, remote.Name, path)
return &FetchResult{
Reader: reader,
ContentType: info.ContentType,
Size: info.Size,
}, nil
}
s3Key := storage.BlobKey(artifact.ContentHash[len("sha256:"):])
reader, info, err := e.store.Download(ctx, s3Key)
reader, info, err = e.store.Download(ctx, s3Key)
if err == nil {
_ = e.db.TouchArtifactAccess(ctx, remote.Name, path)
return &FetchResult{
@@ -473,7 +302,7 @@ func (e *Engine) checkUpstream(ctx context.Context, remote models.Remote, path,
}
}
resp, err := clientForRemote(remote).Do(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false, &UpstreamError{Err: err}
}
@@ -494,20 +323,15 @@ func (e *Engine) ttlFor(remote models.Remote, class Classification) time.Duratio
}
}
// logAccess enqueues an access-log entry for the batch writer. It never blocks
// the request path: if the buffer is full the entry is dropped.
func (e *Engine) logAccess(remoteName, path string, cacheHit bool, size int64, upstreamMS int) {
select {
case e.accessLog <- database.AccessLogEntry{
RemoteName: remoteName,
Path: path,
CacheHit: cacheHit,
SizeBytes: size,
UpstreamMS: upstreamMS,
}:
default:
slog.Warn("access log buffer full, dropping entry", "remote", remoteName, "path", path)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = e.db.InsertAccessLog(ctx, remoteName, path, cacheHit, size, upstreamMS, "")
}
func sha256Hash(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}
func bytesReader(data []byte) io.Reader {
@@ -527,46 +351,9 @@ func (r readerAt) ReadAt(p []byte, off int64) (n int, err error) {
return
}
// bearerTokenTTLDefault/Margin bound how long a token is cached: the default
// is used when the token endpoint omits expires_in, and the margin is
// subtracted so a cached token is refreshed slightly before it actually expires.
const (
bearerTokenTTLDefault = 60 * time.Second
bearerTokenTTLMargin = 10 * time.Second
)
func sha256Hash(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}
// cachedBearerToken returns a bearer token for the given challenge, reusing a
// Redis-cached token for the same remote+challenge while it is still valid.
func (e *Engine) cachedBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) {
key := remote.Name + ":" + sha256Hash([]byte(wwwAuth))
if tok, err := e.cache.GetToken(ctx, key); err == nil && tok != "" {
return tok, nil
}
tok, ttl, err := fetchBearerToken(ctx, wwwAuth, remote)
if err != nil {
return "", err
}
if tok != "" {
if ttl <= 0 {
ttl = bearerTokenTTLDefault
}
if ttl > bearerTokenTTLMargin {
ttl -= bearerTokenTTLMargin
}
_ = e.cache.SetToken(ctx, key, tok, ttl)
}
return tok, nil
}
func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, time.Duration, error) {
func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) {
if !strings.HasPrefix(wwwAuth, "Bearer ") {
return "", 0, fmt.Errorf("not a Bearer challenge")
return "", fmt.Errorf("not a Bearer challenge")
}
params := map[string]string{}
@@ -583,7 +370,7 @@ func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote)
realm := params["realm"]
if realm == "" {
return "", 0, fmt.Errorf("no realm in Bearer challenge")
return "", fmt.Errorf("no realm in Bearer challenge")
}
tokenURL := realm
@@ -598,37 +385,35 @@ func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, tokenURL, nil)
if err != nil {
return "", 0, err
return "", err
}
if remote.Username != "" && remote.Password != "" {
req.SetBasicAuth(remote.Username, remote.Password)
}
resp, err := clientForRemote(remote).Do(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", 0, err
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", 0, fmt.Errorf("token endpoint returned %d", resp.StatusCode)
return "", fmt.Errorf("token endpoint returned %d", resp.StatusCode)
}
var tokenResp struct {
Token string `json:"token"`
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
}
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
return "", 0, err
return "", err
}
ttl := time.Duration(tokenResp.ExpiresIn) * time.Second
if tokenResp.Token != "" {
return tokenResp.Token, ttl, nil
return tokenResp.Token, nil
}
return tokenResp.AccessToken, ttl, nil
return tokenResp.AccessToken, nil
}
type ProxyError struct {
@@ -646,6 +431,8 @@ func (e *UpstreamError) Error() string { return fmt.Sprintf("upstream error: %v"
func (e *UpstreamError) Unwrap() error { return e.Err }
func isNetworkError(err error) bool {
var ue *UpstreamError
return errors.As(err, &ue)
if _, ok := err.(*UpstreamError); ok {
return true
}
return false
}
-83
View File
@@ -1,83 +0,0 @@
package proxy
import (
"net"
"net/http"
"sync"
"time"
"git.unkin.net/unkin/artifactapi/pkg/models"
)
// Default upstream timeouts. A remote may override any of these; a zero
// override falls back to the default here. There is deliberately no overall
// Client.Timeout: the proxy streams arbitrarily large artifacts and total time
// is bounded by the request context instead. We only constrain the phases that
// must never hang — connect, TLS handshake, and time-to-first-response-header —
// so a slow or wedged upstream cannot pin a goroutine and connection.
const (
defaultDialTimeout = 10 * time.Second
defaultTLSTimeout = 10 * time.Second
defaultResponseHeaderTimeout = 30 * time.Second
)
type clientKey struct {
dial time.Duration
tls time.Duration
respHeader time.Duration
}
var (
clientCacheMu sync.Mutex
clientCache = map[clientKey]*http.Client{}
)
// upstreamClientFor returns an HTTP client configured with the given timeouts,
// reusing a cached client (and its connection pool) for identical timeout sets.
// Zero values fall back to the defaults.
func upstreamClientFor(dial, tls, respHeader time.Duration) *http.Client {
if dial <= 0 {
dial = defaultDialTimeout
}
if tls <= 0 {
tls = defaultTLSTimeout
}
if respHeader <= 0 {
respHeader = defaultResponseHeaderTimeout
}
key := clientKey{dial: dial, tls: tls, respHeader: respHeader}
clientCacheMu.Lock()
defer clientCacheMu.Unlock()
if c, ok := clientCache[key]; ok {
return c
}
c := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: dial,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: tls,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: respHeader,
},
}
clientCache[key] = c
return c
}
// clientForRemote returns the upstream client for a remote, applying its
// per-remote timeout overrides (in seconds) on top of the defaults.
func clientForRemote(remote models.Remote) *http.Client {
return upstreamClientFor(
time.Duration(remote.UpstreamDialTimeout)*time.Second,
time.Duration(remote.UpstreamTLSTimeout)*time.Second,
time.Duration(remote.UpstreamResponseHeaderTimeout)*time.Second,
)
}
-30
View File
@@ -2,7 +2,6 @@ package models
import (
"fmt"
"regexp"
"time"
)
@@ -47,11 +46,6 @@ type Remote struct {
MutableTTL int `json:"mutable_ttl"`
CheckMutable bool `json:"check_mutable"`
// Upstream HTTP timeouts in seconds. 0 means use the server default.
UpstreamDialTimeout int `json:"upstream_dial_timeout,omitempty"`
UpstreamTLSTimeout int `json:"upstream_tls_timeout,omitempty"`
UpstreamResponseHeaderTimeout int `json:"upstream_response_header_timeout,omitempty"`
Patterns []string `json:"patterns,omitempty"`
Blocklist []string `json:"blocklist,omitempty"`
MutablePatterns []string `json:"mutable_patterns,omitempty"`
@@ -72,30 +66,6 @@ type Remote struct {
UpdatedAt time.Time `json:"updated_at"`
}
// ValidatePatterns ensures every configured regex compiles. Storing an
// invalid pattern would otherwise be silently dropped at match time, which
// for the blocklist is a fail-open: a mistyped deny rule becomes a no-op.
func (r *Remote) ValidatePatterns() error {
groups := []struct {
field string
patterns []string
}{
{"patterns", r.Patterns},
{"blocklist", r.Blocklist},
{"mutable_patterns", r.MutablePatterns},
{"immutable_patterns", r.ImmutablePatterns},
{"ban_tags", r.BanTags},
}
for _, g := range groups {
for _, p := range g.patterns {
if _, err := regexp.Compile(p); err != nil {
return fmt.Errorf("invalid regex in %s: %q: %w", g.field, p, err)
}
}
}
return nil
}
type RemoteWithStats struct {
Remote
Stats RemoteStats `json:"stats"`
-19
View File
@@ -1,19 +0,0 @@
package models
import "testing"
func TestRemote_ValidatePatterns(t *testing.T) {
valid := &Remote{
Patterns: []string{`.*\.tar\.gz$`},
Blocklist: []string{`^secret/`},
ImmutablePatterns: []string{`\.rpm$`},
}
if err := valid.ValidatePatterns(); err != nil {
t.Fatalf("expected valid patterns, got %v", err)
}
bad := &Remote{Blocklist: []string{`[unterminated`}}
if err := bad.ValidatePatterns(); err == nil {
t.Fatal("expected error for invalid blocklist regex, got nil")
}
}
-40
View File
@@ -1,40 +0,0 @@
#!/usr/bin/env bash
# Build the artifactapi container, bring up the full stack (postgres, redis,
# minio, artifactapi) plus a static mock upstream, and run the dockerised e2e
# suite against the running product over HTTP. Tears everything down on exit.
set -euo pipefail
cd "$(dirname "$0")/.."
# Publish artifactapi on 8001 to avoid colliding with a local instance on 8000.
export ARTIFACTAPI_PORT="${ARTIFACTAPI_PORT:-8001}"
COMPOSE=(docker compose -f docker-compose.yml -f docker-compose.e2e.yml)
API_URL="${ARTIFACTAPI_URL:-http://localhost:${ARTIFACTAPI_PORT}}"
cleanup() {
echo "==> tearing down stack"
"${COMPOSE[@]}" down -v --remove-orphans >/dev/null 2>&1 || true
}
trap cleanup EXIT
echo "==> building and starting stack (postgres, redis, minio, mockupstream, artifactapi)"
"${COMPOSE[@]}" up -d --build postgres redis minio mockupstream artifactapi
echo "==> waiting for artifactapi health at ${API_URL}"
for i in $(seq 1 60); do
if curl -fsS "${API_URL}/health" >/dev/null 2>&1; then
echo " healthy after ${i}s"
break
fi
if [ "$i" -eq 60 ]; then
echo "!!! artifactapi did not become healthy in time; recent logs:"
"${COMPOSE[@]}" logs --tail=50 artifactapi
exit 1
fi
sleep 1
done
echo "==> running dockerised e2e suite"
ARTIFACTAPI_URL="${API_URL}" \
MOCK_UPSTREAM_INTERNAL="${MOCK_UPSTREAM_INTERNAL:-http://mockupstream}" \
go test -tags=dockere2e -count=1 -timeout=10m -v ./e2e-docker/...