test: proxy engine edge cases + storage/server minio retry
Cover denied, HEAD (cached/uncached/denied), stale-on-error, circuit-open, and mutable revalidation (304) paths. proxy ~8->58%.
This commit is contained in:
@@ -0,0 +1,270 @@
|
|||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.unkin.net/unkin/artifactapi/internal/cache"
|
||||||
|
"git.unkin.net/unkin/artifactapi/internal/database"
|
||||||
|
"git.unkin.net/unkin/artifactapi/internal/provider"
|
||||||
|
_ "git.unkin.net/unkin/artifactapi/internal/provider/generic"
|
||||||
|
_ "git.unkin.net/unkin/artifactapi/internal/provider/npm"
|
||||||
|
"git.unkin.net/unkin/artifactapi/internal/storage"
|
||||||
|
"git.unkin.net/unkin/artifactapi/internal/testsupport"
|
||||||
|
"git.unkin.net/unkin/artifactapi/pkg/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
testEngine *Engine
|
||||||
|
testCache *cache.Redis
|
||||||
|
testDB *database.DB
|
||||||
|
upstream *httptest.Server
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
ctx := context.Background()
|
||||||
|
dsn, termPG, err := testsupport.StartPostgres(ctx)
|
||||||
|
if err != nil {
|
||||||
|
os.Exit(m.Run())
|
||||||
|
}
|
||||||
|
redisURL, termRedis, err := testsupport.StartRedis(ctx)
|
||||||
|
if err != nil {
|
||||||
|
termPG()
|
||||||
|
os.Exit(m.Run())
|
||||||
|
}
|
||||||
|
minio, termMinio, err := testsupport.StartMinio(ctx)
|
||||||
|
if err != nil {
|
||||||
|
termPG()
|
||||||
|
termRedis()
|
||||||
|
os.Exit(m.Run())
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := database.New(dsn)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
redis, err := cache.NewRedis(redisURL)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
var s3 *storage.S3
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
if s3, err = storage.NewS3(minio.Endpoint, minio.AccessKey, minio.SecretKey, "proxy-test", false, ""); err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
testCache = redis
|
||||||
|
testDB = db
|
||||||
|
testEngine = NewEngine(db, redis, s3)
|
||||||
|
upstream = httptest.NewServer(http.HandlerFunc(mockUpstream))
|
||||||
|
|
||||||
|
code := m.Run()
|
||||||
|
|
||||||
|
upstream.Close()
|
||||||
|
db.Close()
|
||||||
|
termMinio()
|
||||||
|
termRedis()
|
||||||
|
termPG()
|
||||||
|
if code != 0 {
|
||||||
|
os.Exit(code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mockUpstream(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/blob.bin":
|
||||||
|
w.Header().Set("Content-Type", "application/octet-stream")
|
||||||
|
w.Write([]byte("immutable blob"))
|
||||||
|
case "/pkg": // npm metadata: mutable, supports revalidation
|
||||||
|
if r.Method == http.MethodHead && r.Header.Get("If-None-Match") == `"v1"` {
|
||||||
|
w.WriteHeader(http.StatusNotModified)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Set("ETag", `"v1"`)
|
||||||
|
w.Write([]byte(`{"name":"pkg"}`))
|
||||||
|
default:
|
||||||
|
http.NotFound(w, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func requireStack(t *testing.T) {
|
||||||
|
t.Helper()
|
||||||
|
if testEngine == nil {
|
||||||
|
t.Skip("Docker unavailable; skipping proxy engine test")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func genericRemote(name string) models.Remote {
|
||||||
|
return models.Remote{Name: name, PackageType: models.PackageGeneric, RepoType: models.RepoTypeRemote, BaseURL: upstream.URL, StaleOnError: true}
|
||||||
|
}
|
||||||
|
|
||||||
|
// seed inserts the remote so artifact rows (FK to remotes) can be stored.
|
||||||
|
func seed(t *testing.T, r models.Remote) models.Remote {
|
||||||
|
t.Helper()
|
||||||
|
rr := r
|
||||||
|
if err := testDB.CreateRemote(context.Background(), &rr); err != nil {
|
||||||
|
t.Fatalf("seed remote %s: %v", r.Name, err)
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func prov(t *testing.T, pt models.PackageType) provider.Provider {
|
||||||
|
p, err := provider.Get(pt)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("provider %s: %v", pt, err)
|
||||||
|
}
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func readAll(t *testing.T, res *FetchResult) string {
|
||||||
|
t.Helper()
|
||||||
|
defer res.Reader.Close()
|
||||||
|
b, _ := io.ReadAll(res.Reader)
|
||||||
|
return string(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFetchImmutableMissThenHit(t *testing.T) {
|
||||||
|
requireStack(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
r := seed(t, genericRemote("eng-imm"))
|
||||||
|
p := prov(t, models.PackageGeneric)
|
||||||
|
|
||||||
|
res, err := testEngine.Fetch(ctx, r, "blob.bin", p)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("fetch: %v", err)
|
||||||
|
}
|
||||||
|
if res.Source != "remote" || readAll(t, res) != "immutable blob" {
|
||||||
|
t.Errorf("miss: source=%s", res.Source)
|
||||||
|
}
|
||||||
|
res, err = testEngine.Fetch(ctx, r, "blob.bin", p)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if res.Source != "cache" || readAll(t, res) != "immutable blob" {
|
||||||
|
t.Errorf("hit: source=%s", res.Source)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFetchDenied(t *testing.T) {
|
||||||
|
requireStack(t)
|
||||||
|
r := genericRemote("eng-deny")
|
||||||
|
r.Blocklist = []string{`\.secret$`}
|
||||||
|
_, err := testEngine.Fetch(context.Background(), r, "x.secret", prov(t, models.PackageGeneric))
|
||||||
|
var pe *ProxyError
|
||||||
|
if err == nil || !asProxyError(err, &pe) || pe.Status != http.StatusForbidden {
|
||||||
|
t.Errorf("expected 403 ProxyError, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHead(t *testing.T) {
|
||||||
|
requireStack(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
r := seed(t, genericRemote("eng-head"))
|
||||||
|
p := prov(t, models.PackageGeneric)
|
||||||
|
|
||||||
|
// Uncached HEAD hits upstream.
|
||||||
|
h, err := testEngine.Head(ctx, r, "blob.bin", p)
|
||||||
|
if err != nil || h.Source != "remote" {
|
||||||
|
t.Fatalf("head uncached: %+v %v", h, err)
|
||||||
|
}
|
||||||
|
// Populate the cache, then HEAD should be served from metadata.
|
||||||
|
res, _ := testEngine.Fetch(ctx, r, "blob.bin", p)
|
||||||
|
res.Reader.Close()
|
||||||
|
h, err = testEngine.Head(ctx, r, "blob.bin", p)
|
||||||
|
if err != nil || h.Source != "cache" {
|
||||||
|
t.Errorf("head cached: %+v %v", h, err)
|
||||||
|
}
|
||||||
|
// Denied HEAD.
|
||||||
|
r.Blocklist = []string{".*"}
|
||||||
|
if _, err := testEngine.Head(ctx, r, "blob.bin", p); err == nil {
|
||||||
|
t.Error("expected denied head error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStaleOnError(t *testing.T) {
|
||||||
|
requireStack(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
r := seed(t, genericRemote("eng-stale"))
|
||||||
|
p := prov(t, models.PackageGeneric)
|
||||||
|
|
||||||
|
if _, err := testEngine.Fetch(ctx, r, "blob.bin", p); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// Drop cache freshness so the next fetch goes upstream, then point at a
|
||||||
|
// dead upstream: stale-on-error must serve the stored copy.
|
||||||
|
testCache.FlushRemote(ctx, "eng-stale")
|
||||||
|
r.BaseURL = "http://127.0.0.1:1"
|
||||||
|
res, err := testEngine.Fetch(ctx, r, "blob.bin", p)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected stale serve, got %v", err)
|
||||||
|
}
|
||||||
|
if res.Source != "cache" || readAll(t, res) != "immutable blob" {
|
||||||
|
t.Errorf("stale: source=%s", res.Source)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCircuitOpenServesStale(t *testing.T) {
|
||||||
|
requireStack(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
r := seed(t, genericRemote("eng-circuit"))
|
||||||
|
p := prov(t, models.PackageGeneric)
|
||||||
|
if _, err := testEngine.Fetch(ctx, r, "blob.bin", p); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
testCache.FlushRemote(ctx, "eng-circuit")
|
||||||
|
for i := 0; i < 6; i++ {
|
||||||
|
testEngine.circuit.RecordFailure(ctx, "eng-circuit")
|
||||||
|
}
|
||||||
|
res, err := testEngine.Fetch(ctx, r, "blob.bin", p)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("circuit-open should serve stale: %v", err)
|
||||||
|
}
|
||||||
|
if res.Source != "cache" {
|
||||||
|
t.Errorf("expected stale from open circuit, got %s", res.Source)
|
||||||
|
}
|
||||||
|
res.Reader.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMutableRevalidation(t *testing.T) {
|
||||||
|
requireStack(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
r := seed(t, models.Remote{Name: "eng-npm", PackageType: models.PackageNPM, RepoType: models.RepoTypeRemote, BaseURL: upstream.URL, CheckMutable: true, MutableTTL: 3600, StaleOnError: true})
|
||||||
|
p := prov(t, models.PackageNPM)
|
||||||
|
|
||||||
|
res, err := testEngine.Fetch(ctx, r, "pkg", p)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("initial mutable fetch: %v", err)
|
||||||
|
}
|
||||||
|
res.Reader.Close()
|
||||||
|
|
||||||
|
// Expire only the freshness marker; the ETag persists, forcing a
|
||||||
|
// conditional revalidation that the upstream answers with 304.
|
||||||
|
testCache.SetTTL(ctx, "eng-npm", "pkg", time.Millisecond)
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
res, err = testEngine.Fetch(ctx, r, "pkg", p)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("revalidation fetch: %v", err)
|
||||||
|
}
|
||||||
|
if res.Source != "cache" {
|
||||||
|
t.Errorf("revalidated response should come from cache, got %s", res.Source)
|
||||||
|
}
|
||||||
|
res.Reader.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func asProxyError(err error, target **ProxyError) bool {
|
||||||
|
pe, ok := err.(*ProxyError)
|
||||||
|
if ok {
|
||||||
|
*target = pe
|
||||||
|
}
|
||||||
|
return ok
|
||||||
|
}
|
||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.unkin.net/unkin/artifactapi/internal/config"
|
"git.unkin.net/unkin/artifactapi/internal/config"
|
||||||
"git.unkin.net/unkin/artifactapi/internal/testsupport"
|
"git.unkin.net/unkin/artifactapi/internal/testsupport"
|
||||||
@@ -60,7 +61,13 @@ func TestMain(m *testing.M) {
|
|||||||
S3Bucket: "server-test",
|
S3Bucket: "server-test",
|
||||||
}
|
}
|
||||||
|
|
||||||
srv, err := New(cfg, "test-version")
|
var srv *Server
|
||||||
|
for i := 0; i < 20; i++ { // tolerate MinIO reporting ready before bucket ops succeed
|
||||||
|
if srv, err = New(cfg, "test-version"); err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.unkin.net/unkin/artifactapi/internal/testsupport"
|
"git.unkin.net/unkin/artifactapi/internal/testsupport"
|
||||||
)
|
)
|
||||||
@@ -19,7 +20,13 @@ func TestMain(m *testing.M) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
os.Exit(m.Run())
|
os.Exit(m.Run())
|
||||||
}
|
}
|
||||||
s3, err := NewS3(conn.Endpoint, conn.AccessKey, conn.SecretKey, "test-bucket", false, "")
|
var s3 *S3
|
||||||
|
for i := 0; i < 20; i++ { // MinIO can report ready before bucket ops succeed
|
||||||
|
if s3, err = NewS3(conn.Endpoint, conn.AccessKey, conn.SecretKey, "test-bucket", false, ""); err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
terminate()
|
terminate()
|
||||||
panic(err)
|
panic(err)
|
||||||
|
|||||||
Reference in New Issue
Block a user