Initial implementation of generic artifact storage system

- FastAPI-based caching proxy for remote file servers
- YAML configuration for multiple remotes (GitHub, Gitea, HashiCorp, etc.)
- Direct URL API: /api/v1/remote/{remote}/{path} with auto-download and caching
- Pattern-based access control with regex filtering
- S3/MinIO backend storage with predictable paths
- Docker Compose setup with MinIO for local development
This commit is contained in:
2025-12-31 20:07:32 +11:00
commit 46711eec6a
13 changed files with 2312 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
# Artifact API package
+69
View File
@@ -0,0 +1,69 @@
import time
import hashlib
import redis
class RedisCache:
def __init__(self, redis_url: str):
self.redis_url = redis_url
try:
self.client = redis.from_url(self.redis_url, decode_responses=True)
# Test connection
self.client.ping()
self.available = True
except Exception as e:
print(f"Redis not available: {e}")
self.client = None
self.available = False
def is_index_file(self, file_path: str) -> bool:
"""Check if the file is an index file that should have TTL"""
return (
file_path.endswith("APKINDEX.tar.gz")
or file_path.endswith("Packages.gz")
or file_path.endswith("repomd.xml")
or "repodata/" in file_path
and file_path.endswith((".xml", ".xml.gz", ".xml.bz2", ".xml.xz"))
)
def get_index_cache_key(self, remote_name: str, path: str) -> str:
"""Generate cache key for index files"""
return f"index:{remote_name}:{hashlib.sha256(path.encode()).hexdigest()[:16]}"
def is_index_valid(
self, remote_name: str, path: str, ttl_override: int = None
) -> bool:
"""Check if index file is still valid (not expired)"""
if not self.available:
return False
try:
key = self.get_index_cache_key(remote_name, path)
return self.client.exists(key) > 0
except Exception:
return False
def mark_index_cached(self, remote_name: str, path: str, ttl: int = 300) -> None:
"""Mark index file as cached with TTL"""
if not self.available:
return
try:
key = self.get_index_cache_key(remote_name, path)
self.client.setex(key, ttl, str(int(time.time())))
except Exception:
pass
def cleanup_expired_index(self, storage, remote_name: str, path: str) -> None:
"""Remove expired index from S3 storage"""
if not self.available:
return
try:
# Get the S3 key and remove it
s3_key = storage.get_object_key_from_path(remote_name, path)
if storage.exists(s3_key):
storage.client.delete_object(Bucket=storage.bucket, Key=s3_key)
except Exception:
pass
+120
View File
@@ -0,0 +1,120 @@
import os
import json
import yaml
from typing import Optional
class ConfigManager:
def __init__(self, config_file: str = "remotes.yaml"):
self.config_file = config_file
self._last_modified = 0
self.config = self._load_config()
def _load_config(self) -> dict:
try:
with open(self.config_file, "r") as f:
if self.config_file.endswith(".yaml") or self.config_file.endswith(
".yml"
):
return yaml.safe_load(f)
else:
return json.load(f)
except FileNotFoundError:
return {"remotes": {}}
def _check_reload(self) -> None:
"""Check if config file has been modified and reload if needed"""
try:
import os
current_modified = os.path.getmtime(self.config_file)
if current_modified > self._last_modified:
self._last_modified = current_modified
self.config = self._load_config()
print(f"Config reloaded from {self.config_file}")
except OSError:
pass
def get_remote_config(self, remote_name: str) -> Optional[dict]:
self._check_reload()
return self.config.get("remotes", {}).get(remote_name)
def get_repository_patterns(self, remote_name: str, repo_path: str) -> list:
remote_config = self.get_remote_config(remote_name)
if not remote_config:
return []
repositories = remote_config.get("repositories", {})
# Handle both dict (GitHub style) and list (Alpine style) repositories
if isinstance(repositories, dict):
repo_config = repositories.get(repo_path)
if repo_config:
patterns = repo_config.get("include_patterns", [])
else:
patterns = remote_config.get("include_patterns", [])
elif isinstance(repositories, list):
# For Alpine, repositories is just a list of allowed repo names
# Pattern matching is handled by the main include_patterns
patterns = remote_config.get("include_patterns", [])
else:
patterns = remote_config.get("include_patterns", [])
return patterns
def get_s3_config(self) -> dict:
"""Get S3 configuration from environment variables"""
endpoint = os.getenv("MINIO_ENDPOINT")
access_key = os.getenv("MINIO_ACCESS_KEY")
secret_key = os.getenv("MINIO_SECRET_KEY")
bucket = os.getenv("MINIO_BUCKET")
if not endpoint:
raise ValueError("MINIO_ENDPOINT environment variable is required")
if not access_key:
raise ValueError("MINIO_ACCESS_KEY environment variable is required")
if not secret_key:
raise ValueError("MINIO_SECRET_KEY environment variable is required")
if not bucket:
raise ValueError("MINIO_BUCKET environment variable is required")
return {
"endpoint": endpoint,
"access_key": access_key,
"secret_key": secret_key,
"bucket": bucket,
"secure": os.getenv("MINIO_SECURE", "false").lower() == "true",
}
def get_redis_config(self) -> dict:
"""Get Redis configuration from environment variables"""
redis_url = os.getenv("REDIS_URL")
if not redis_url:
raise ValueError("REDIS_URL environment variable is required")
return {
"url": redis_url
}
def get_database_config(self) -> dict:
"""Get database configuration from environment variables"""
db_host = os.getenv("DBHOST")
db_port = os.getenv("DBPORT")
db_user = os.getenv("DBUSER")
db_pass = os.getenv("DBPASS")
db_name = os.getenv("DBNAME")
if not all([db_host, db_port, db_user, db_pass, db_name]):
missing = [var for var, val in [("DBHOST", db_host), ("DBPORT", db_port), ("DBUSER", db_user), ("DBPASS", db_pass), ("DBNAME", db_name)] if not val]
raise ValueError(f"All database environment variables are required: {', '.join(missing)}")
db_url = f"postgresql://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}"
return {"url": db_url}
def get_cache_config(self, remote_name: str) -> dict:
"""Get cache configuration for a specific remote"""
remote_config = self.get_remote_config(remote_name)
if not remote_config:
return {}
return remote_config.get("cache", {})
+282
View File
@@ -0,0 +1,282 @@
import os
from typing import Optional
import psycopg2
from psycopg2.extras import RealDictCursor
class DatabaseManager:
def __init__(self, db_url: str):
self.db_url = db_url
self.available = False
self._init_database()
def _init_database(self):
"""Initialize database connection and create schema if needed"""
try:
self.connection = psycopg2.connect(self.db_url)
self.connection.autocommit = True
self._create_schema()
self.available = True
print("Database connection established")
except Exception as e:
print(f"Database not available: {e}")
self.available = False
def _create_schema(self):
"""Create tables if they don't exist"""
try:
with self.connection.cursor() as cursor:
# Create table to map S3 keys to remote names
cursor.execute("""
CREATE TABLE IF NOT EXISTS artifact_mappings (
id SERIAL PRIMARY KEY,
s3_key VARCHAR(255) UNIQUE NOT NULL,
remote_name VARCHAR(100) NOT NULL,
file_path TEXT NOT NULL,
size_bytes BIGINT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS local_files (
id SERIAL PRIMARY KEY,
repository_name VARCHAR(100) NOT NULL,
file_path TEXT NOT NULL,
s3_key VARCHAR(255) UNIQUE NOT NULL,
size_bytes BIGINT NOT NULL,
sha256_sum VARCHAR(64) NOT NULL,
content_type VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
uploaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(repository_name, file_path)
)
""")
# Create indexes separately
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_s3_key ON artifact_mappings (s3_key)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_remote_name ON artifact_mappings (remote_name)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_local_repo_path ON local_files (repository_name, file_path)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_local_s3_key ON local_files (s3_key)"
)
print("Database schema initialized")
except Exception as e:
print(f"Error creating schema: {e}")
def record_artifact_mapping(
self, s3_key: str, remote_name: str, file_path: str, size_bytes: int
):
"""Record mapping between S3 key and remote"""
if not self.available:
return
try:
with self.connection.cursor() as cursor:
cursor.execute(
"""
INSERT INTO artifact_mappings (s3_key, remote_name, file_path, size_bytes)
VALUES (%s, %s, %s, %s)
ON CONFLICT (s3_key)
DO UPDATE SET
remote_name = EXCLUDED.remote_name,
file_path = EXCLUDED.file_path,
size_bytes = EXCLUDED.size_bytes
""",
(s3_key, remote_name, file_path, size_bytes),
)
except Exception as e:
print(f"Error recording artifact mapping: {e}")
def get_storage_by_remote(self) -> dict[str, int]:
"""Get storage size breakdown by remote from database"""
if not self.available:
return {}
try:
with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute("""
SELECT remote_name, SUM(size_bytes) as total_size
FROM artifact_mappings
GROUP BY remote_name
""")
results = cursor.fetchall()
return {row["remote_name"]: row["total_size"] or 0 for row in results}
except Exception as e:
print(f"Error getting storage by remote: {e}")
return {}
def get_remote_for_s3_key(self, s3_key: str) -> Optional[str]:
"""Get remote name for given S3 key"""
if not self.available:
return None
try:
with self.connection.cursor() as cursor:
cursor.execute(
"SELECT remote_name FROM artifact_mappings WHERE s3_key = %s",
(s3_key,),
)
result = cursor.fetchone()
return result[0] if result else None
except Exception as e:
print(f"Error getting remote for S3 key: {e}")
return None
def add_local_file(
self,
repository_name: str,
file_path: str,
s3_key: str,
size_bytes: int,
sha256_sum: str,
content_type: str = None,
):
"""Add a file to local repository"""
if not self.available:
return False
try:
with self.connection.cursor() as cursor:
cursor.execute(
"""
INSERT INTO local_files (repository_name, file_path, s3_key, size_bytes, sha256_sum, content_type)
VALUES (%s, %s, %s, %s, %s, %s)
""",
(
repository_name,
file_path,
s3_key,
size_bytes,
sha256_sum,
content_type,
),
)
self.connection.commit()
return True
except Exception as e:
print(f"Error adding local file: {e}")
return False
def get_local_file_metadata(self, repository_name: str, file_path: str):
"""Get metadata for a local file"""
if not self.available:
return None
try:
with self.connection.cursor() as cursor:
cursor.execute(
"""
SELECT repository_name, file_path, s3_key, size_bytes, sha256_sum, content_type, created_at, uploaded_at
FROM local_files
WHERE repository_name = %s AND file_path = %s
""",
(repository_name, file_path),
)
result = cursor.fetchone()
if result:
return {
"repository_name": result[0],
"file_path": result[1],
"s3_key": result[2],
"size_bytes": result[3],
"sha256_sum": result[4],
"content_type": result[5],
"created_at": result[6],
"uploaded_at": result[7],
}
return None
except Exception as e:
print(f"Error getting local file metadata: {e}")
return None
def list_local_files(self, repository_name: str, prefix: str = ""):
"""List files in local repository with optional path prefix"""
if not self.available:
return []
try:
with self.connection.cursor() as cursor:
if prefix:
cursor.execute(
"""
SELECT file_path, size_bytes, sha256_sum, content_type, created_at, uploaded_at
FROM local_files
WHERE repository_name = %s AND file_path LIKE %s
ORDER BY file_path
""",
(repository_name, f"{prefix}%"),
)
else:
cursor.execute(
"""
SELECT file_path, size_bytes, sha256_sum, content_type, created_at, uploaded_at
FROM local_files
WHERE repository_name = %s
ORDER BY file_path
""",
(repository_name,),
)
results = cursor.fetchall()
return [
{
"file_path": result[0],
"size_bytes": result[1],
"sha256_sum": result[2],
"content_type": result[3],
"created_at": result[4],
"uploaded_at": result[5],
}
for result in results
]
except Exception as e:
print(f"Error listing local files: {e}")
return []
def delete_local_file(self, repository_name: str, file_path: str):
"""Delete a file from local repository"""
if not self.available:
return False
try:
with self.connection.cursor() as cursor:
cursor.execute(
"""
DELETE FROM local_files
WHERE repository_name = %s AND file_path = %s
RETURNING s3_key
""",
(repository_name, file_path),
)
result = cursor.fetchone()
self.connection.commit()
return result[0] if result else None
except Exception as e:
print(f"Error deleting local file: {e}")
return None
def file_exists(self, repository_name: str, file_path: str):
"""Check if file exists in local repository"""
if not self.available:
return False
try:
with self.connection.cursor() as cursor:
cursor.execute(
"""
SELECT 1 FROM local_files
WHERE repository_name = %s AND file_path = %s
""",
(repository_name, file_path),
)
return cursor.fetchone() is not None
except Exception as e:
print(f"Error checking file existence: {e}")
return False
+580
View File
@@ -0,0 +1,580 @@
import os
import re
import hashlib
from typing import Dict, Any, Optional
import httpx
from fastapi import FastAPI, HTTPException, Response, Query, File, UploadFile
from fastapi.responses import PlainTextResponse, JSONResponse
from pydantic import BaseModel
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from .config import ConfigManager
from .database import DatabaseManager
from .storage import S3Storage
from .cache import RedisCache
from .metrics import MetricsManager
class ArtifactRequest(BaseModel):
remote: str
include_pattern: str
app = FastAPI(title="Artifact Storage API", version="2.0.0")
# Initialize components using config
config = ConfigManager("remotes.yaml")
# Get configurations
s3_config = config.get_s3_config()
redis_config = config.get_redis_config()
db_config = config.get_database_config()
# Initialize services
storage = S3Storage(**s3_config)
cache = RedisCache(redis_config["url"])
database = DatabaseManager(db_config["url"])
metrics = MetricsManager(cache, database)
@app.get("/")
def read_root():
config._check_reload()
return {
"message": "Artifact Storage API",
"version": "2.0.0",
"remotes": list(config.config.get("remotes", {}).keys()),
}
@app.get("/health")
def health_check():
return {"status": "healthy"}
async def construct_remote_url(remote_name: str, path: str) -> str:
remote_config = config.get_remote_config(remote_name)
if not remote_config:
raise HTTPException(
status_code=404, detail=f"Remote '{remote_name}' not configured"
)
base_url = remote_config.get("base_url")
if not base_url:
raise HTTPException(
status_code=500, detail=f"No base_url configured for remote '{remote_name}'"
)
return f"{base_url}/{path}"
async def check_artifact_patterns(
remote_name: str, repo_path: str, file_path: str, full_path: str
) -> bool:
# First check if this is an index file - always allow index files
if cache.is_index_file(file_path) or cache.is_index_file(full_path):
return True
# Then check basic include patterns
patterns = config.get_repository_patterns(remote_name, repo_path)
if not patterns:
return True # Allow all if no patterns configured
pattern_matched = False
for pattern in patterns:
# Check both file_path and full_path to handle different pattern types
if re.search(pattern, file_path) or re.search(pattern, full_path):
pattern_matched = True
break
if not pattern_matched:
return False
# All remotes now use pattern-based filtering only - no additional checks needed
return True
async def cache_single_artifact(url: str, remote_name: str, path: str) -> dict:
# Check if using URL-based key or path-based key
if url.startswith("http"):
key = storage.get_object_key(url)
else:
key = storage.get_object_key_from_path(remote_name, path)
if storage.exists(key):
return {
"url": url,
"cached_url": storage.get_url(key),
"status": "already_cached",
}
try:
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(url)
response.raise_for_status()
storage_path = storage.upload(key, response.content)
return {
"url": url,
"cached_url": storage.get_url(key),
"storage_path": storage_path,
"size": len(response.content),
"status": "cached",
}
except Exception as e:
return {"url": url, "status": "error", "error": str(e)}
@app.get("/api/v1/remote/{remote_name}/{path:path}")
async def get_artifact(remote_name: str, path: str):
# Check if remote is configured
remote_config = config.get_remote_config(remote_name)
if not remote_config:
raise HTTPException(
status_code=404, detail=f"Remote '{remote_name}' not configured"
)
# Check if this is a local repository
if remote_config.get("type") == "local":
# Handle local repository download
metadata = database.get_local_file_metadata(remote_name, path)
if not metadata:
raise HTTPException(status_code=404, detail="File not found")
# Get file from S3
content = storage.download_object(metadata["s3_key"])
if content is None:
raise HTTPException(status_code=500, detail="File not accessible")
# Determine content type
content_type = metadata.get("content_type", "application/octet-stream")
return Response(
content=content,
media_type=content_type,
headers={
"Content-Disposition": f"attachment; filename={os.path.basename(path)}"
},
)
# Extract repository path for pattern checking
path_parts = path.split("/")
if len(path_parts) >= 2:
repo_path = f"{path_parts[0]}/{path_parts[1]}"
file_path = "/".join(path_parts[2:])
else:
repo_path = path
file_path = path
# Check if artifact matches configured patterns
if not await check_artifact_patterns(remote_name, repo_path, file_path, path):
raise HTTPException(
status_code=403, detail="Artifact not allowed by configuration patterns"
)
# Construct the remote URL
remote_url = await construct_remote_url(remote_name, path)
# Check if artifact is already cached (try both URL and path-based keys)
url_key = storage.get_object_key(remote_url)
path_key = storage.get_object_key_from_path(remote_name, path)
cached_key = None
if storage.exists(url_key):
cached_key = url_key
elif storage.exists(path_key):
cached_key = path_key
# For index files, check Redis TTL validity
filename = os.path.basename(path)
is_index = cache.is_index_file(filename)
if cached_key and is_index:
# Index file exists, but check if it's still valid
if not cache.is_index_valid(remote_name, path):
# Index has expired, remove it from S3
cache.cleanup_expired_index(storage, remote_name, path)
cached_key = None # Force re-download
if cached_key:
# Return cached artifact
try:
artifact_data = storage.download_object(cached_key)
filename = os.path.basename(path)
# Determine content type based on file extension
content_type = "application/octet-stream"
if filename.endswith(".tar.gz"):
content_type = "application/gzip"
elif filename.endswith(".zip"):
content_type = "application/zip"
elif filename.endswith(".exe"):
content_type = "application/x-msdownload"
elif filename.endswith(".rpm"):
content_type = "application/x-rpm"
elif filename.endswith(".xml"):
content_type = "application/xml"
elif filename.endswith((".xml.gz", ".xml.bz2", ".xml.xz")):
content_type = "application/gzip"
# Record cache hit metrics
metrics.record_cache_hit(remote_name, len(artifact_data))
# Record artifact mapping in database if not already recorded
database.record_artifact_mapping(
cached_key, remote_name, path, len(artifact_data)
)
return Response(
content=artifact_data,
media_type=content_type,
headers={
"Content-Disposition": f"attachment; filename={filename}",
"X-Artifact-Source": "cache",
"X-Artifact-Size": str(len(artifact_data)),
},
)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error retrieving cached artifact: {str(e)}"
)
# Artifact not cached, cache it first
result = await cache_single_artifact(remote_url, remote_name, path)
if result["status"] == "error":
raise HTTPException(
status_code=502, detail=f"Failed to fetch artifact: {result['error']}"
)
# Mark index files as cached in Redis if this was a new download
if result["status"] == "cached" and is_index:
# Get TTL from remote config
cache_config = config.get_cache_config(remote_name)
index_ttl = cache_config.get("index_ttl", 300) # Default 5 minutes
cache.mark_index_cached(remote_name, path, index_ttl)
# Now return the cached artifact
try:
cache_key = storage.get_object_key(remote_url)
artifact_data = storage.download_object(cache_key)
filename = os.path.basename(path)
content_type = "application/octet-stream"
if filename.endswith(".tar.gz"):
content_type = "application/gzip"
elif filename.endswith(".zip"):
content_type = "application/zip"
elif filename.endswith(".exe"):
content_type = "application/x-msdownload"
elif filename.endswith(".rpm"):
content_type = "application/x-rpm"
elif filename.endswith(".xml"):
content_type = "application/xml"
elif filename.endswith((".xml.gz", ".xml.bz2", ".xml.xz")):
content_type = "application/gzip"
# Record cache miss metrics
metrics.record_cache_miss(remote_name, len(artifact_data))
# Record artifact mapping in database
cache_key = storage.get_object_key(remote_url)
database.record_artifact_mapping(
cache_key, remote_name, path, len(artifact_data)
)
return Response(
content=artifact_data,
media_type=content_type,
headers={
"Content-Disposition": f"attachment; filename={filename}",
"X-Artifact-Source": "remote",
"X-Artifact-Size": str(len(artifact_data)),
},
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error serving artifact: {str(e)}")
async def discover_artifacts(remote: str, include_pattern: str) -> list[str]:
if "github.com" in remote:
return await discover_github_releases(remote, include_pattern)
else:
raise HTTPException(status_code=400, detail=f"Unsupported remote: {remote}")
async def discover_github_releases(remote: str, include_pattern: str) -> list[str]:
match = re.match(r"github\.com/([^/]+)/([^/]+)", remote)
if not match:
raise HTTPException(status_code=400, detail="Invalid GitHub remote format")
owner, repo = match.groups()
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(
f"https://api.github.com/repos/{owner}/{repo}/releases"
)
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"Failed to fetch releases: {response.text}",
)
releases = response.json()
matching_urls = []
pattern = include_pattern.replace("*", ".*")
regex = re.compile(pattern)
for release in releases:
for asset in release.get("assets", []):
download_url = asset["browser_download_url"]
if regex.search(download_url):
matching_urls.append(download_url)
return matching_urls
@app.put("/api/v1/remote/{remote_name}/{path:path}")
async def upload_file(remote_name: str, path: str, file: UploadFile = File(...)):
"""Upload a file to local repository"""
# Check if remote is configured and is local
remote_config = config.get_remote_config(remote_name)
if not remote_config:
raise HTTPException(
status_code=404, detail=f"Remote '{remote_name}' not configured"
)
if remote_config.get("type") != "local":
raise HTTPException(
status_code=400, detail="Upload only supported for local repositories"
)
try:
# Read file content
content = await file.read()
# Calculate SHA256
sha256_sum = hashlib.sha256(content).hexdigest()
# Check if file already exists (prevent overwrite)
if database.file_exists(remote_name, path):
raise HTTPException(status_code=409, detail="File already exists")
# Generate S3 key
s3_key = f"local/{remote_name}/{path}"
# Determine content type
content_type = file.content_type or "application/octet-stream"
# Upload to S3
try:
storage.upload(s3_key, content)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Upload failed: {e}")
# Add to database
success = database.add_local_file(
repository_name=remote_name,
file_path=path,
s3_key=s3_key,
size_bytes=len(content),
sha256_sum=sha256_sum,
content_type=content_type,
)
if not success:
# Clean up S3 if database insert failed
storage.delete_object(s3_key)
raise HTTPException(status_code=500, detail="Failed to save file metadata")
return JSONResponse(
{
"message": "File uploaded successfully",
"file_path": path,
"size_bytes": len(content),
"sha256_sum": sha256_sum,
"content_type": content_type,
}
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
@app.head("/api/v1/remote/{remote_name}/{path:path}")
def check_file_exists(remote_name: str, path: str):
"""Check if file exists (for CI jobs) - supports local repositories only"""
# Check if remote is configured
remote_config = config.get_remote_config(remote_name)
if not remote_config:
raise HTTPException(
status_code=404, detail=f"Remote '{remote_name}' not configured"
)
# Handle local repository
if remote_config.get("type") == "local":
try:
metadata = database.get_local_file_metadata(remote_name, path)
if not metadata:
raise HTTPException(status_code=404, detail="File not found")
return Response(
headers={
"Content-Length": str(metadata["size_bytes"]),
"Content-Type": metadata.get(
"content_type", "application/octet-stream"
),
"X-SHA256": metadata["sha256_sum"],
"X-Created-At": metadata["created_at"].isoformat()
if metadata["created_at"]
else "",
"X-Uploaded-At": metadata["uploaded_at"].isoformat()
if metadata["uploaded_at"]
else "",
}
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Check failed: {str(e)}")
else:
# For remote repositories, just return 405 Method Not Allowed
raise HTTPException(
status_code=405, detail="HEAD method only supported for local repositories"
)
@app.delete("/api/v1/remote/{remote_name}/{path:path}")
def delete_file(remote_name: str, path: str):
"""Delete a file from local repository"""
# Check if remote is configured and is local
remote_config = config.get_remote_config(remote_name)
if not remote_config:
raise HTTPException(
status_code=404, detail=f"Remote '{remote_name}' not configured"
)
if remote_config.get("type") != "local":
raise HTTPException(
status_code=400, detail="Delete only supported for local repositories"
)
try:
# Get S3 key before deleting from database
s3_key = database.delete_local_file(remote_name, path)
if not s3_key:
raise HTTPException(status_code=404, detail="File not found")
# Delete from S3
if not storage.delete_object(s3_key):
# File was deleted from database but not from S3 - log warning but continue
print(f"Warning: Failed to delete S3 object {s3_key}")
return JSONResponse({"message": "File deleted successfully"})
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Delete failed: {str(e)}")
@app.post("/api/v1/artifacts/cache")
async def cache_artifact(request: ArtifactRequest) -> Dict[str, Any]:
try:
matching_urls = await discover_artifacts(
request.remote, request.include_pattern
)
if not matching_urls:
return {
"message": "No matching artifacts found",
"cached_count": 0,
"artifacts": [],
}
cached_artifacts = []
for url in matching_urls:
result = await cache_single_artifact(url, "", "")
cached_artifacts.append(result)
cached_count = sum(
1
for artifact in cached_artifacts
if artifact["status"] in ["cached", "already_cached"]
)
return {
"message": f"Processed {len(matching_urls)} artifacts, {cached_count} successfully cached",
"cached_count": cached_count,
"artifacts": cached_artifacts,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/v1/artifacts/{remote:path}")
async def list_cached_artifacts(
remote: str, include_pattern: str = ".*"
) -> Dict[str, Any]:
try:
matching_urls = await discover_artifacts(remote, include_pattern)
cached_artifacts = []
for url in matching_urls:
key = storage.get_object_key(url)
if storage.exists(key):
cached_artifacts.append(
{"url": url, "cached_url": storage.get_url(key), "key": key}
)
return {
"remote": remote,
"pattern": include_pattern,
"total_found": len(matching_urls),
"cached_count": len(cached_artifacts),
"artifacts": cached_artifacts,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/metrics")
def get_metrics(
json: Optional[bool] = Query(
False, description="Return JSON format instead of Prometheus"
),
):
"""Get comprehensive metrics about the artifact storage system"""
config._check_reload()
if json:
# Return JSON format
return metrics.get_metrics(storage, config)
else:
# Return Prometheus format
metrics.get_metrics(storage, config) # Update gauges
prometheus_data = generate_latest().decode("utf-8")
return PlainTextResponse(prometheus_data, media_type=CONTENT_TYPE_LATEST)
@app.get("/config")
def get_config():
return config.config
def main():
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
if __name__ == "__main__":
main()
+229
View File
@@ -0,0 +1,229 @@
from datetime import datetime
from typing import Dict, Any
from prometheus_client import Counter, Gauge
# Prometheus metrics
request_counter = Counter(
"artifact_requests_total", "Total artifact requests", ["remote", "status"]
)
cache_hit_counter = Counter("artifact_cache_hits_total", "Total cache hits", ["remote"])
cache_miss_counter = Counter(
"artifact_cache_misses_total", "Total cache misses", ["remote"]
)
bandwidth_saved_counter = Counter(
"artifact_bandwidth_saved_bytes_total", "Total bandwidth saved", ["remote"]
)
storage_size_gauge = Gauge(
"artifact_storage_size_bytes", "Storage size by remote", ["remote"]
)
redis_keys_gauge = Gauge("artifact_redis_keys_total", "Total Redis keys")
class MetricsManager:
def __init__(self, redis_client=None, database_manager=None):
self.redis_client = redis_client
self.database_manager = database_manager
self.start_time = datetime.now()
def record_cache_hit(self, remote_name: str, size_bytes: int):
"""Record a cache hit with size for bandwidth calculation"""
# Update Prometheus metrics
request_counter.labels(remote=remote_name, status="cache_hit").inc()
cache_hit_counter.labels(remote=remote_name).inc()
bandwidth_saved_counter.labels(remote=remote_name).inc(size_bytes)
# Update Redis for persistence across instances
if self.redis_client and self.redis_client.available:
try:
# Increment global counters
self.redis_client.client.incr("metrics:cache_hits")
self.redis_client.client.incr("metrics:total_requests")
self.redis_client.client.incrby("metrics:bandwidth_saved", size_bytes)
# Increment per-remote counters
self.redis_client.client.incr(f"metrics:cache_hits:{remote_name}")
self.redis_client.client.incr(f"metrics:total_requests:{remote_name}")
self.redis_client.client.incrby(
f"metrics:bandwidth_saved:{remote_name}", size_bytes
)
except Exception:
pass
def record_cache_miss(self, remote_name: str, size_bytes: int):
"""Record a cache miss (new download)"""
# Update Prometheus metrics
request_counter.labels(remote=remote_name, status="cache_miss").inc()
cache_miss_counter.labels(remote=remote_name).inc()
# Update Redis for persistence across instances
if self.redis_client and self.redis_client.available:
try:
# Increment global counters
self.redis_client.client.incr("metrics:cache_misses")
self.redis_client.client.incr("metrics:total_requests")
# Increment per-remote counters
self.redis_client.client.incr(f"metrics:cache_misses:{remote_name}")
self.redis_client.client.incr(f"metrics:total_requests:{remote_name}")
except Exception:
pass
def get_redis_key_count(self) -> int:
"""Get total number of keys in Redis"""
if self.redis_client and self.redis_client.available:
try:
return self.redis_client.client.dbsize()
except Exception:
return 0
return 0
def get_s3_total_size(self, storage) -> int:
"""Get total size of all objects in S3 bucket"""
try:
total_size = 0
paginator = storage.client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=storage.bucket):
if "Contents" in page:
for obj in page["Contents"]:
total_size += obj["Size"]
return total_size
except Exception:
return 0
def get_s3_size_by_remote(self, storage, config_manager) -> Dict[str, int]:
"""Get size of stored data per remote using database mappings"""
if self.database_manager and self.database_manager.available:
# Get from database if available
db_sizes = self.database_manager.get_storage_by_remote()
if db_sizes:
# Initialize all configured remotes to 0
remote_sizes = {}
for remote in config_manager.config.get("remotes", {}).keys():
remote_sizes[remote] = db_sizes.get(remote, 0)
# Update Prometheus gauges
for remote, size in remote_sizes.items():
storage_size_gauge.labels(remote=remote).set(size)
return remote_sizes
# Fallback to S3 scanning if database not available
try:
remote_sizes = {}
remotes = config_manager.config.get("remotes", {}).keys()
# Initialize all remotes to 0
for remote in remotes:
remote_sizes[remote] = 0
paginator = storage.client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=storage.bucket):
if "Contents" in page:
for obj in page["Contents"]:
key = obj["Key"]
# Try to map from database first
remote = None
if self.database_manager:
remote = self.database_manager.get_remote_for_s3_key(key)
# Fallback to key parsing
if not remote:
remote = key.split("/")[0] if "/" in key else "unknown"
if remote in remote_sizes:
remote_sizes[remote] += obj["Size"]
else:
remote_sizes.setdefault("unknown", 0)
remote_sizes["unknown"] += obj["Size"]
# Update Prometheus gauges
for remote, size in remote_sizes.items():
if remote != "unknown": # Don't set gauge for unknown
storage_size_gauge.labels(remote=remote).set(size)
return remote_sizes
except Exception:
return {}
def get_metrics(self, storage, config_manager) -> Dict[str, Any]:
"""Get comprehensive metrics"""
# Update Redis keys gauge
redis_key_count = self.get_redis_key_count()
redis_keys_gauge.set(redis_key_count)
metrics = {
"timestamp": datetime.now().isoformat(),
"uptime_seconds": int((datetime.now() - self.start_time).total_seconds()),
"redis": {"total_keys": redis_key_count},
"storage": {
"total_size_bytes": self.get_s3_total_size(storage),
"size_by_remote": self.get_s3_size_by_remote(storage, config_manager),
},
"requests": {
"cache_hits": 0,
"cache_misses": 0,
"total_requests": 0,
"cache_hit_ratio": 0.0,
},
"bandwidth": {"saved_bytes": 0},
"per_remote": {},
}
if self.redis_client and self.redis_client.available:
try:
# Get global metrics
cache_hits = int(
self.redis_client.client.get("metrics:cache_hits") or 0
)
cache_misses = int(
self.redis_client.client.get("metrics:cache_misses") or 0
)
total_requests = cache_hits + cache_misses
bandwidth_saved = int(
self.redis_client.client.get("metrics:bandwidth_saved") or 0
)
metrics["requests"]["cache_hits"] = cache_hits
metrics["requests"]["cache_misses"] = cache_misses
metrics["requests"]["total_requests"] = total_requests
metrics["requests"]["cache_hit_ratio"] = (
cache_hits / total_requests if total_requests > 0 else 0.0
)
metrics["bandwidth"]["saved_bytes"] = bandwidth_saved
# Get per-remote metrics
for remote in config_manager.config.get("remotes", {}).keys():
remote_cache_hits = int(
self.redis_client.client.get(f"metrics:cache_hits:{remote}")
or 0
)
remote_cache_misses = int(
self.redis_client.client.get(f"metrics:cache_misses:{remote}")
or 0
)
remote_total = remote_cache_hits + remote_cache_misses
remote_bandwidth_saved = int(
self.redis_client.client.get(
f"metrics:bandwidth_saved:{remote}"
)
or 0
)
metrics["per_remote"][remote] = {
"cache_hits": remote_cache_hits,
"cache_misses": remote_cache_misses,
"total_requests": remote_total,
"cache_hit_ratio": remote_cache_hits / remote_total
if remote_total > 0
else 0.0,
"bandwidth_saved_bytes": remote_bandwidth_saved,
"storage_size_bytes": metrics["storage"]["size_by_remote"].get(
remote, 0
),
}
except Exception:
pass
return metrics
+96
View File
@@ -0,0 +1,96 @@
import os
import hashlib
from urllib.parse import urlparse
import boto3
from botocore.exceptions import ClientError
from fastapi import HTTPException
class S3Storage:
def __init__(
self,
endpoint: str,
access_key: str,
secret_key: str,
bucket: str,
secure: bool = False,
):
self.endpoint = endpoint
self.access_key = access_key
self.secret_key = secret_key
self.bucket = bucket
self.secure = secure
self.client = boto3.client(
"s3",
endpoint_url=f"http{'s' if self.secure else ''}://{self.endpoint}",
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
)
# Try to ensure bucket exists, but don't fail if MinIO isn't ready yet
try:
self._ensure_bucket_exists()
except Exception as e:
print(f"Warning: Could not ensure bucket exists during initialization: {e}")
print("Bucket creation will be attempted on first use")
def _ensure_bucket_exists(self):
try:
self.client.head_bucket(Bucket=self.bucket)
except ClientError:
self.client.create_bucket(Bucket=self.bucket)
def get_object_key(self, url: str) -> str:
url_hash = hashlib.sha256(url.encode()).hexdigest()[:16]
parsed = urlparse(url)
filename = os.path.basename(parsed.path)
return f"{parsed.netloc}/{url_hash}/{filename}"
def get_object_key_from_path(self, remote_name: str, path: str) -> str:
# Create a key based on the API path for direct access
path_hash = hashlib.sha256(path.encode()).hexdigest()[:16]
filename = os.path.basename(path)
return f"{remote_name}/{path_hash}/{filename}"
def exists(self, key: str) -> bool:
try:
self._ensure_bucket_exists()
self.client.head_object(Bucket=self.bucket, Key=key)
return True
except ClientError:
return False
def upload(self, key: str, data: bytes) -> str:
self._ensure_bucket_exists()
self.client.put_object(Bucket=self.bucket, Key=key, Body=data)
return f"s3://{self.bucket}/{key}"
def get_url(self, key: str) -> str:
return f"http://{self.endpoint}/{self.bucket}/{key}"
def get_presigned_url(self, key: str, expiration: int = 3600) -> str:
try:
return self.client.generate_presigned_url(
"get_object",
Params={"Bucket": self.bucket, "Key": key},
ExpiresIn=expiration,
)
except Exception:
return self.get_url(key)
def download_object(self, key: str) -> bytes:
try:
self._ensure_bucket_exists()
response = self.client.get_object(Bucket=self.bucket, Key=key)
return response["Body"].read()
except ClientError:
raise HTTPException(status_code=404, detail="Artifact not found")
def delete_object(self, key: str) -> bool:
try:
self._ensure_bucket_exists()
self.client.delete_object(Bucket=self.bucket, Key=key)
return True
except ClientError:
return False