Compare commits
8 Commits
v3.6.1
..
6d6cc4b78c
| Author | SHA1 | Date | |
|---|---|---|---|
| 6d6cc4b78c | |||
| 8d9bc1c422 | |||
| 30b7cef026 | |||
| 603be5b989 | |||
| 9eba49500c | |||
| 0083d67272 | |||
| 8ec7de50e3 | |||
| 9c465cbd4c |
@@ -8,6 +8,8 @@ steps:
|
||||
settings:
|
||||
registry: git.unkin.net
|
||||
repo: git.unkin.net/unkin/artifactapi
|
||||
build_args:
|
||||
VERSION: ${CI_COMMIT_TAG}
|
||||
username: droneci
|
||||
password:
|
||||
from_secret: DRONECI_PASSWORD
|
||||
@@ -23,7 +25,7 @@ steps:
|
||||
dockerfile: ui/Dockerfile.ui
|
||||
context: ui
|
||||
build_args:
|
||||
- BASE_PATH=/ui
|
||||
BASE_PATH: /ui
|
||||
username: droneci
|
||||
password:
|
||||
from_secret: DRONECI_PASSWORD
|
||||
|
||||
+2
-1
@@ -9,7 +9,8 @@ RUN go mod download
|
||||
|
||||
COPY . .
|
||||
|
||||
RUN CGO_ENABLED=0 go build -ldflags="-s -w" -o artifactapi ./cmd/artifactapi
|
||||
ARG VERSION=dev
|
||||
RUN CGO_ENABLED=0 go build -ldflags="-s -w -X main.version=${VERSION}" -o artifactapi ./cmd/artifactapi
|
||||
|
||||
FROM gcr.io/distroless/static-debian12:nonroot
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ check-go:
|
||||
fi
|
||||
|
||||
build: check-go tidy
|
||||
go build -ldflags="-s -w" -o $(BINARY) ./cmd/artifactapi
|
||||
go build -ldflags="-s -w -X main.version=$(VERSION)" -o $(BINARY) ./cmd/artifactapi
|
||||
|
||||
test: check-go
|
||||
go test -race -count=1 ./pkg/... ./internal/...
|
||||
|
||||
@@ -13,6 +13,8 @@ import (
|
||||
"git.unkin.net/unkin/artifactapi/internal/tui"
|
||||
)
|
||||
|
||||
var version = "dev"
|
||||
|
||||
func main() {
|
||||
if len(os.Args) > 1 && os.Args[1] == "tui" {
|
||||
endpoint := os.Getenv("ARTIFACTAPI_ENDPOINT")
|
||||
@@ -42,7 +44,7 @@ func main() {
|
||||
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
defer cancel()
|
||||
|
||||
srv, err := server.New(cfg)
|
||||
srv, err := server.New(cfg, version)
|
||||
if err != nil {
|
||||
slog.Error("failed to create server", "error", err)
|
||||
os.Exit(1)
|
||||
|
||||
@@ -67,7 +67,7 @@ func (h *ProxyHandler) handleProxy(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
result, err := h.engine.Fetch(r.Context(), *remote, path, prov)
|
||||
result, err := h.engine.Fetch(r.Context(), *remote, path, prov, r.Header)
|
||||
if err != nil {
|
||||
var proxyErr *proxy.ProxyError
|
||||
if errors.As(err, &proxyErr) {
|
||||
|
||||
@@ -30,6 +30,15 @@ func (db *DB) GetOverviewStats(ctx context.Context) (*models.OverviewStats, erro
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = db.Pool.QueryRow(ctx, `
|
||||
SELECT COALESCE(SUM(size_bytes), 0)
|
||||
FROM access_log
|
||||
WHERE cache_hit = TRUE AND created_at > NOW() - INTERVAL '30 days'
|
||||
`).Scan(&stats.BandwidthSaved30d)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &stats, nil
|
||||
}
|
||||
|
||||
|
||||
+128
-12
@@ -4,10 +4,12 @@ import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.unkin.net/unkin/artifactapi/internal/cache"
|
||||
@@ -20,18 +22,20 @@ import (
|
||||
const fetchLockTTL = 30 * time.Second
|
||||
|
||||
type Engine struct {
|
||||
db *database.DB
|
||||
cache *cache.Redis
|
||||
store *storage.S3
|
||||
cas *storage.CAS
|
||||
db *database.DB
|
||||
cache *cache.Redis
|
||||
store *storage.S3
|
||||
cas *storage.CAS
|
||||
circuit *CircuitBreaker
|
||||
}
|
||||
|
||||
func NewEngine(db *database.DB, c *cache.Redis, s *storage.S3) *Engine {
|
||||
return &Engine{
|
||||
db: db,
|
||||
cache: c,
|
||||
store: s,
|
||||
cas: storage.NewCAS(s),
|
||||
db: db,
|
||||
cache: c,
|
||||
store: s,
|
||||
cas: storage.NewCAS(s),
|
||||
circuit: NewCircuitBreaker(c),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,7 +46,7 @@ type FetchResult struct {
|
||||
Source string // "cache" or "remote"
|
||||
}
|
||||
|
||||
func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, prov provider.Provider) (*FetchResult, error) {
|
||||
func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, prov provider.Provider, clientHeaders ...http.Header) (*FetchResult, error) {
|
||||
classifier := NewClassifier(prov)
|
||||
class := classifier.Classify(remote, path)
|
||||
|
||||
@@ -103,10 +107,31 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
||||
}
|
||||
}
|
||||
|
||||
var fwdHeaders http.Header
|
||||
if len(clientHeaders) > 0 && clientHeaders[0] != nil {
|
||||
fwdHeaders = clientHeaders[0]
|
||||
}
|
||||
|
||||
// Short-circuit upstream calls when the remote's breaker is open: serve
|
||||
// stale from the store if we have it, otherwise fail fast rather than
|
||||
// hammering a known-bad upstream.
|
||||
if e.circuit.IsOpen(ctx, remote.Name) {
|
||||
if stale, serr := e.serveFromStore(ctx, remote, path); serr == nil {
|
||||
slog.Warn("circuit open, serving stale", "remote", remote.Name, "path", path)
|
||||
stale.Source = "cache"
|
||||
go e.logAccess(remote.Name, path, true, stale.Size, 0)
|
||||
return stale, nil
|
||||
}
|
||||
return nil, &ProxyError{Status: http.StatusServiceUnavailable, Message: "upstream circuit open"}
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl)
|
||||
result, err := e.fetchFromUpstream(ctx, remote, path, prov, class, ttl, fwdHeaders)
|
||||
upstreamMS := int(time.Since(start).Milliseconds())
|
||||
if err != nil {
|
||||
if isNetworkError(err) {
|
||||
e.circuit.RecordFailure(ctx, remote.Name)
|
||||
}
|
||||
if remote.StaleOnError && isNetworkError(err) {
|
||||
_ = e.cache.SetTTL(ctx, remote.Name, path, ttl)
|
||||
stale, serr := e.serveFromStore(ctx, remote, path)
|
||||
@@ -120,11 +145,12 @@ func (e *Engine) Fetch(ctx context.Context, remote models.Remote, path string, p
|
||||
return nil, err
|
||||
}
|
||||
|
||||
e.circuit.RecordSuccess(ctx, remote.Name)
|
||||
go e.logAccess(remote.Name, path, false, result.Size, upstreamMS)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, path string, prov provider.Provider, class Classification, ttl time.Duration) (*FetchResult, error) {
|
||||
func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, path string, prov provider.Provider, class Classification, ttl time.Duration, clientHeaders http.Header) (*FetchResult, error) {
|
||||
url := prov.UpstreamURL(remote, path)
|
||||
|
||||
authHeaders, err := prov.AuthHeaders(ctx, remote)
|
||||
@@ -141,12 +167,37 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
|
||||
req.Header.Add(k, v)
|
||||
}
|
||||
}
|
||||
if clientHeaders != nil {
|
||||
if accept := clientHeaders.Get("Accept"); accept != "" {
|
||||
req.Header.Set("Accept", accept)
|
||||
}
|
||||
}
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, &UpstreamError{Err: err}
|
||||
}
|
||||
|
||||
if resp.StatusCode == http.StatusUnauthorized {
|
||||
resp.Body.Close()
|
||||
token, err := fetchBearerToken(ctx, resp.Header.Get("Www-Authenticate"), remote)
|
||||
if err == nil && token != "" {
|
||||
req2, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
req2.Header.Set("Authorization", "Bearer "+token)
|
||||
if clientHeaders != nil {
|
||||
if accept := clientHeaders.Get("Accept"); accept != "" {
|
||||
req2.Header.Set("Accept", accept)
|
||||
}
|
||||
}
|
||||
resp, err = http.DefaultClient.Do(req2)
|
||||
if err != nil {
|
||||
return nil, &UpstreamError{Err: err}
|
||||
}
|
||||
} else {
|
||||
return nil, &ProxyError{Status: http.StatusUnauthorized, Message: "upstream returned 401"}
|
||||
}
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
resp.Body.Close()
|
||||
return nil, &ProxyError{Status: resp.StatusCode, Message: fmt.Sprintf("upstream returned %d", resp.StatusCode)}
|
||||
@@ -167,7 +218,7 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
|
||||
}
|
||||
|
||||
contentType := prov.ContentType(path)
|
||||
if ct := resp.Header.Get("Content-Type"); ct != "" && contentType == "application/octet-stream" {
|
||||
if ct := resp.Header.Get("Content-Type"); ct != "" {
|
||||
contentType = ct
|
||||
}
|
||||
|
||||
@@ -319,6 +370,71 @@ func (r readerAt) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote) (string, error) {
|
||||
if !strings.HasPrefix(wwwAuth, "Bearer ") {
|
||||
return "", fmt.Errorf("not a Bearer challenge")
|
||||
}
|
||||
|
||||
params := map[string]string{}
|
||||
for _, part := range strings.Split(wwwAuth[7:], ",") {
|
||||
part = strings.TrimSpace(part)
|
||||
eq := strings.Index(part, "=")
|
||||
if eq < 0 {
|
||||
continue
|
||||
}
|
||||
key := part[:eq]
|
||||
val := strings.Trim(part[eq+1:], `"`)
|
||||
params[key] = val
|
||||
}
|
||||
|
||||
realm := params["realm"]
|
||||
if realm == "" {
|
||||
return "", fmt.Errorf("no realm in Bearer challenge")
|
||||
}
|
||||
|
||||
tokenURL := realm
|
||||
sep := "?"
|
||||
if s, ok := params["service"]; ok {
|
||||
tokenURL += sep + "service=" + s
|
||||
sep = "&"
|
||||
}
|
||||
if s, ok := params["scope"]; ok {
|
||||
tokenURL += sep + "scope=" + s
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, tokenURL, nil)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if remote.Username != "" && remote.Password != "" {
|
||||
req.SetBasicAuth(remote.Username, remote.Password)
|
||||
}
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return "", fmt.Errorf("token endpoint returned %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
var tokenResp struct {
|
||||
Token string `json:"token"`
|
||||
AccessToken string `json:"access_token"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if tokenResp.Token != "" {
|
||||
return tokenResp.Token, nil
|
||||
}
|
||||
return tokenResp.AccessToken, nil
|
||||
}
|
||||
|
||||
type ProxyError struct {
|
||||
Status int
|
||||
Message string
|
||||
|
||||
@@ -35,6 +35,7 @@ import (
|
||||
|
||||
type Server struct {
|
||||
cfg *config.Config
|
||||
version string
|
||||
router chi.Router
|
||||
db *database.DB
|
||||
cache *cache.Redis
|
||||
@@ -45,7 +46,7 @@ type Server struct {
|
||||
gc *gc.Collector
|
||||
}
|
||||
|
||||
func New(cfg *config.Config) (*Server, error) {
|
||||
func New(cfg *config.Config, version string) (*Server, error) {
|
||||
db, err := database.New(cfg.DatabaseDSN())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("database: %w", err)
|
||||
@@ -68,6 +69,7 @@ func New(cfg *config.Config) (*Server, error) {
|
||||
|
||||
s := &Server{
|
||||
cfg: cfg,
|
||||
version: version,
|
||||
db: db,
|
||||
cache: redis,
|
||||
store: s3,
|
||||
@@ -138,7 +140,7 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
||||
func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprint(w, `{"name":"artifactapi","version":"3.0.0-dev"}`)
|
||||
fmt.Fprintf(w, `{"name":"artifactapi","version":"%s"}`, s.version)
|
||||
}
|
||||
|
||||
func (s *Server) newHTTPServer() *http.Server {
|
||||
|
||||
@@ -65,11 +65,12 @@ func (m *HelmMerger) MergeIndexes(members []MemberIndex, proxyBaseURL string) ([
|
||||
if baseHost != "" && extractHost(u) != baseHost {
|
||||
continue
|
||||
}
|
||||
relPath := extractPathRelativeToBase(u, member.BaseURL)
|
||||
ver.URLs[i] = fmt.Sprintf("%s/api/v1/%s/%s/%s",
|
||||
strings.TrimRight(proxyBaseURL, "/"),
|
||||
routePrefix,
|
||||
member.RemoteName,
|
||||
extractPath(u))
|
||||
relPath)
|
||||
} else {
|
||||
ver.URLs[i] = fmt.Sprintf("%s/api/v1/%s/%s/%s",
|
||||
strings.TrimRight(proxyBaseURL, "/"),
|
||||
@@ -102,6 +103,18 @@ func extractHost(rawURL string) string {
|
||||
return rest[:slashIdx]
|
||||
}
|
||||
|
||||
func extractPathRelativeToBase(rawURL, baseURL string) string {
|
||||
fullPath := extractPath(rawURL)
|
||||
basePath := extractPath(baseURL)
|
||||
if basePath != "" {
|
||||
basePath = strings.TrimRight(basePath, "/") + "/"
|
||||
if strings.HasPrefix(fullPath, basePath) {
|
||||
return fullPath[len(basePath):]
|
||||
}
|
||||
}
|
||||
return fullPath
|
||||
}
|
||||
|
||||
func extractPath(rawURL string) string {
|
||||
idx := strings.Index(rawURL, "://")
|
||||
if idx == -1 {
|
||||
|
||||
+6
-1
@@ -5,7 +5,12 @@ server {
|
||||
root /usr/share/nginx/html;
|
||||
index index.html;
|
||||
|
||||
location ${BASE_PATH} {
|
||||
location ${BASE_PATH}/ {
|
||||
rewrite ^${BASE_PATH}(/.*)$ $1 break;
|
||||
try_files $uri $uri/ /index.html;
|
||||
}
|
||||
|
||||
location = ${BASE_PATH} {
|
||||
return 301 ${BASE_PATH}/;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,6 +50,11 @@ export function Dashboard() {
|
||||
value={formatNumber(stats.total_blobs_deduped)}
|
||||
sub="shared blobs"
|
||||
/>
|
||||
<StatsCard
|
||||
label="Bandwidth Saved"
|
||||
value={formatBytes(stats.bandwidth_saved_30d)}
|
||||
sub="last 30 days"
|
||||
/>
|
||||
</div>
|
||||
|
||||
{health && (
|
||||
|
||||
Reference in New Issue
Block a user