fix: set timeouts on the upstream HTTP client #83
@@ -24,6 +24,30 @@ func TestRoot(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRemoteUpstreamTimeouts(t *testing.T) {
|
||||||
|
createRemote(t, `{
|
||||||
|
"name": "timeout-test",
|
||||||
|
"package_type": "generic",
|
||||||
|
"base_url": "https://example.com",
|
||||||
|
"stale_on_error": true,
|
||||||
|
"upstream_dial_timeout": 3,
|
||||||
|
"upstream_tls_timeout": 4,
|
||||||
|
"upstream_response_header_timeout": 5
|
||||||
|
}`)
|
||||||
|
defer deleteRemote(t, "timeout-test")
|
||||||
|
|
||||||
|
remote := getJSON(t, apiURL("/api/v2/remotes/timeout-test"))
|
||||||
|
for field, want := range map[string]float64{
|
||||||
|
"upstream_dial_timeout": 3,
|
||||||
|
"upstream_tls_timeout": 4,
|
||||||
|
"upstream_response_header_timeout": 5,
|
||||||
|
} {
|
||||||
|
if got, _ := remote[field].(float64); got != want {
|
||||||
|
t.Errorf("%s: got %v, want %v", field, remote[field], want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRemoteCRUD(t *testing.T) {
|
func TestRemoteCRUD(t *testing.T) {
|
||||||
createRemote(t, `{
|
createRemote(t, `{
|
||||||
"name": "test-generic",
|
"name": "test-generic",
|
||||||
|
|||||||
@@ -124,6 +124,9 @@ func (db *DB) migrate() error {
|
|||||||
CREATE INDEX IF NOT EXISTS idx_access_log_remote_time ON access_log(remote_name, created_at);
|
CREATE INDEX IF NOT EXISTS idx_access_log_remote_time ON access_log(remote_name, created_at);
|
||||||
|
|
||||||
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS repo_type TEXT DEFAULT 'remote';
|
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS repo_type TEXT DEFAULT 'remote';
|
||||||
|
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_dial_timeout INTEGER DEFAULT 0;
|
||||||
|
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_tls_timeout INTEGER DEFAULT 0;
|
||||||
|
ALTER TABLE remotes ADD COLUMN IF NOT EXISTS upstream_response_header_timeout INTEGER DEFAULT 0;
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS rpm_metadata (
|
CREATE TABLE IF NOT EXISTS rpm_metadata (
|
||||||
id BIGSERIAL PRIMARY KEY,
|
id BIGSERIAL PRIMARY KEY,
|
||||||
|
|||||||
@@ -11,7 +11,9 @@ const remoteCols = `name, package_type, repo_type, base_url, description, userna
|
|||||||
patterns, blocklist, mutable_patterns, immutable_patterns,
|
patterns, blocklist, mutable_patterns, immutable_patterns,
|
||||||
ban_tags_enabled, ban_tags,
|
ban_tags_enabled, ban_tags,
|
||||||
quarantine_enabled, quarantine_days, stale_on_error,
|
quarantine_enabled, quarantine_days, stale_on_error,
|
||||||
releases_remote, managed_by, created_at, updated_at`
|
releases_remote, managed_by,
|
||||||
|
upstream_dial_timeout, upstream_tls_timeout, upstream_response_header_timeout,
|
||||||
|
created_at, updated_at`
|
||||||
|
|
||||||
func scanRemote(scanner interface{ Scan(...any) error }, r *models.Remote) error {
|
func scanRemote(scanner interface{ Scan(...any) error }, r *models.Remote) error {
|
||||||
return scanner.Scan(
|
return scanner.Scan(
|
||||||
@@ -20,7 +22,9 @@ func scanRemote(scanner interface{ Scan(...any) error }, r *models.Remote) error
|
|||||||
&r.Patterns, &r.Blocklist, &r.MutablePatterns, &r.ImmutablePatterns,
|
&r.Patterns, &r.Blocklist, &r.MutablePatterns, &r.ImmutablePatterns,
|
||||||
&r.BanTagsEnabled, &r.BanTags,
|
&r.BanTagsEnabled, &r.BanTags,
|
||||||
&r.QuarantineEnabled, &r.QuarantineDays, &r.StaleOnError,
|
&r.QuarantineEnabled, &r.QuarantineDays, &r.StaleOnError,
|
||||||
&r.ReleasesRemote, &r.ManagedBy, &r.CreatedAt, &r.UpdatedAt,
|
&r.ReleasesRemote, &r.ManagedBy,
|
||||||
|
&r.UpstreamDialTimeout, &r.UpstreamTLSTimeout, &r.UpstreamResponseHeaderTimeout,
|
||||||
|
&r.CreatedAt, &r.UpdatedAt,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -59,8 +63,9 @@ func (db *DB) CreateRemote(ctx context.Context, r *models.Remote) error {
|
|||||||
patterns, blocklist, mutable_patterns, immutable_patterns,
|
patterns, blocklist, mutable_patterns, immutable_patterns,
|
||||||
ban_tags_enabled, ban_tags,
|
ban_tags_enabled, ban_tags,
|
||||||
quarantine_enabled, quarantine_days, stale_on_error,
|
quarantine_enabled, quarantine_days, stale_on_error,
|
||||||
releases_remote, managed_by
|
releases_remote, managed_by,
|
||||||
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21)
|
upstream_dial_timeout, upstream_tls_timeout, upstream_response_header_timeout
|
||||||
|
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24)
|
||||||
`,
|
`,
|
||||||
r.Name, r.PackageType, r.RepoType, r.BaseURL, r.Description, r.Username, r.Password,
|
r.Name, r.PackageType, r.RepoType, r.BaseURL, r.Description, r.Username, r.Password,
|
||||||
r.ImmutableTTL, r.MutableTTL, r.CheckMutable,
|
r.ImmutableTTL, r.MutableTTL, r.CheckMutable,
|
||||||
@@ -68,6 +73,7 @@ func (db *DB) CreateRemote(ctx context.Context, r *models.Remote) error {
|
|||||||
r.BanTagsEnabled, r.BanTags,
|
r.BanTagsEnabled, r.BanTags,
|
||||||
r.QuarantineEnabled, r.QuarantineDays, r.StaleOnError,
|
r.QuarantineEnabled, r.QuarantineDays, r.StaleOnError,
|
||||||
r.ReleasesRemote, r.ManagedBy,
|
r.ReleasesRemote, r.ManagedBy,
|
||||||
|
r.UpstreamDialTimeout, r.UpstreamTLSTimeout, r.UpstreamResponseHeaderTimeout,
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -80,7 +86,9 @@ func (db *DB) UpdateRemote(ctx context.Context, r *models.Remote) error {
|
|||||||
patterns=$11, blocklist=$12, mutable_patterns=$13, immutable_patterns=$14,
|
patterns=$11, blocklist=$12, mutable_patterns=$13, immutable_patterns=$14,
|
||||||
ban_tags_enabled=$15, ban_tags=$16,
|
ban_tags_enabled=$15, ban_tags=$16,
|
||||||
quarantine_enabled=$17, quarantine_days=$18, stale_on_error=$19,
|
quarantine_enabled=$17, quarantine_days=$18, stale_on_error=$19,
|
||||||
releases_remote=$20, managed_by=$21, updated_at=NOW()
|
releases_remote=$20, managed_by=$21,
|
||||||
|
upstream_dial_timeout=$22, upstream_tls_timeout=$23, upstream_response_header_timeout=$24,
|
||||||
|
updated_at=NOW()
|
||||||
WHERE name=$1
|
WHERE name=$1
|
||||||
`,
|
`,
|
||||||
r.Name, r.PackageType, r.RepoType, r.BaseURL, r.Description, r.Username, r.Password,
|
r.Name, r.PackageType, r.RepoType, r.BaseURL, r.Description, r.Username, r.Password,
|
||||||
@@ -89,6 +97,7 @@ func (db *DB) UpdateRemote(ctx context.Context, r *models.Remote) error {
|
|||||||
r.BanTagsEnabled, r.BanTags,
|
r.BanTagsEnabled, r.BanTags,
|
||||||
r.QuarantineEnabled, r.QuarantineDays, r.StaleOnError,
|
r.QuarantineEnabled, r.QuarantineDays, r.StaleOnError,
|
||||||
r.ReleasesRemote, r.ManagedBy,
|
r.ReleasesRemote, r.ManagedBy,
|
||||||
|
r.UpstreamDialTimeout, r.UpstreamTLSTimeout, r.UpstreamResponseHeaderTimeout,
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -154,7 +154,7 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
resp, err := clientForRemote(remote).Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &UpstreamError{Err: err}
|
return nil, &UpstreamError{Err: err}
|
||||||
}
|
}
|
||||||
@@ -170,7 +170,7 @@ func (e *Engine) fetchFromUpstream(ctx context.Context, remote models.Remote, pa
|
|||||||
req2.Header.Set("Accept", accept)
|
req2.Header.Set("Accept", accept)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
resp, err = http.DefaultClient.Do(req2)
|
resp, err = clientForRemote(remote).Do(req2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &UpstreamError{Err: err}
|
return nil, &UpstreamError{Err: err}
|
||||||
}
|
}
|
||||||
@@ -302,7 +302,7 @@ func (e *Engine) checkUpstream(ctx context.Context, remote models.Remote, path,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
resp, err := clientForRemote(remote).Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, &UpstreamError{Err: err}
|
return false, &UpstreamError{Err: err}
|
||||||
}
|
}
|
||||||
@@ -392,7 +392,7 @@ func fetchBearerToken(ctx context.Context, wwwAuth string, remote models.Remote)
|
|||||||
req.SetBasicAuth(remote.Username, remote.Password)
|
req.SetBasicAuth(remote.Username, remote.Password)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
resp, err := clientForRemote(remote).Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,83 @@
|
|||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.unkin.net/unkin/artifactapi/pkg/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Default upstream timeouts. A remote may override any of these; a zero
|
||||||
|
// override falls back to the default here. There is deliberately no overall
|
||||||
|
// Client.Timeout: the proxy streams arbitrarily large artifacts and total time
|
||||||
|
// is bounded by the request context instead. We only constrain the phases that
|
||||||
|
// must never hang — connect, TLS handshake, and time-to-first-response-header —
|
||||||
|
// so a slow or wedged upstream cannot pin a goroutine and connection.
|
||||||
|
const (
|
||||||
|
defaultDialTimeout = 10 * time.Second
|
||||||
|
defaultTLSTimeout = 10 * time.Second
|
||||||
|
defaultResponseHeaderTimeout = 30 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
type clientKey struct {
|
||||||
|
dial time.Duration
|
||||||
|
tls time.Duration
|
||||||
|
respHeader time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
clientCacheMu sync.Mutex
|
||||||
|
clientCache = map[clientKey]*http.Client{}
|
||||||
|
)
|
||||||
|
|
||||||
|
// upstreamClientFor returns an HTTP client configured with the given timeouts,
|
||||||
|
// reusing a cached client (and its connection pool) for identical timeout sets.
|
||||||
|
// Zero values fall back to the defaults.
|
||||||
|
func upstreamClientFor(dial, tls, respHeader time.Duration) *http.Client {
|
||||||
|
if dial <= 0 {
|
||||||
|
dial = defaultDialTimeout
|
||||||
|
}
|
||||||
|
if tls <= 0 {
|
||||||
|
tls = defaultTLSTimeout
|
||||||
|
}
|
||||||
|
if respHeader <= 0 {
|
||||||
|
respHeader = defaultResponseHeaderTimeout
|
||||||
|
}
|
||||||
|
key := clientKey{dial: dial, tls: tls, respHeader: respHeader}
|
||||||
|
|
||||||
|
clientCacheMu.Lock()
|
||||||
|
defer clientCacheMu.Unlock()
|
||||||
|
if c, ok := clientCache[key]; ok {
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
c := &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
Proxy: http.ProxyFromEnvironment,
|
||||||
|
DialContext: (&net.Dialer{
|
||||||
|
Timeout: dial,
|
||||||
|
KeepAlive: 30 * time.Second,
|
||||||
|
}).DialContext,
|
||||||
|
MaxIdleConns: 100,
|
||||||
|
MaxIdleConnsPerHost: 10,
|
||||||
|
IdleConnTimeout: 90 * time.Second,
|
||||||
|
TLSHandshakeTimeout: tls,
|
||||||
|
ExpectContinueTimeout: 1 * time.Second,
|
||||||
|
ResponseHeaderTimeout: respHeader,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
clientCache[key] = c
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
// clientForRemote returns the upstream client for a remote, applying its
|
||||||
|
// per-remote timeout overrides (in seconds) on top of the defaults.
|
||||||
|
func clientForRemote(remote models.Remote) *http.Client {
|
||||||
|
return upstreamClientFor(
|
||||||
|
time.Duration(remote.UpstreamDialTimeout)*time.Second,
|
||||||
|
time.Duration(remote.UpstreamTLSTimeout)*time.Second,
|
||||||
|
time.Duration(remote.UpstreamResponseHeaderTimeout)*time.Second,
|
||||||
|
)
|
||||||
|
}
|
||||||
@@ -46,6 +46,11 @@ type Remote struct {
|
|||||||
MutableTTL int `json:"mutable_ttl"`
|
MutableTTL int `json:"mutable_ttl"`
|
||||||
CheckMutable bool `json:"check_mutable"`
|
CheckMutable bool `json:"check_mutable"`
|
||||||
|
|
||||||
|
// Upstream HTTP timeouts in seconds. 0 means use the server default.
|
||||||
|
UpstreamDialTimeout int `json:"upstream_dial_timeout,omitempty"`
|
||||||
|
UpstreamTLSTimeout int `json:"upstream_tls_timeout,omitempty"`
|
||||||
|
UpstreamResponseHeaderTimeout int `json:"upstream_response_header_timeout,omitempty"`
|
||||||
|
|
||||||
Patterns []string `json:"patterns,omitempty"`
|
Patterns []string `json:"patterns,omitempty"`
|
||||||
Blocklist []string `json:"blocklist,omitempty"`
|
Blocklist []string `json:"blocklist,omitempty"`
|
||||||
MutablePatterns []string `json:"mutable_patterns,omitempty"`
|
MutablePatterns []string `json:"mutable_patterns,omitempty"`
|
||||||
|
|||||||
Reference in New Issue
Block a user