Compare commits

...

16 Commits

Author SHA1 Message Date
5733d52e51 Merge pull request 'feature/cache-flush-api-enhancement' (#7) from feature/cache-flush-api-enhancement into master
Reviewed-on: #7
2026-01-25 11:34:43 +11:00
bf8a176dda Bump version: 2.0.4 → 2.0.5 2026-01-25 11:09:29 +11:00
3e8e819ecf feat: enhance cache flush API for remote-specific S3 clearing
Add support for remote-specific S3 object deletion using hierarchical key prefixes.
When a remote parameter is provided, only objects under that remote's hierarchy
(e.g., 'fedora/*') are deleted, preserving cached files for other remotes.

Key improvements:
- Use S3 list_objects_v2 Prefix parameter for targeted deletion
- Enhanced logging to indicate scope of cache clearing operations
- Backward compatible - existing behavior preserved when no remote specified

Resolves artifactapi-u46
2026-01-25 11:08:46 +11:00
de04e4d2b2 Merge pull request 'benvin/path-based-storage' (#6) from benvin/path-based-storage into master
Reviewed-on: #6
2026-01-25 00:00:53 +11:00
16c8bd60eb Bump version: 2.0.3 → 2.0.4 2026-01-24 23:59:48 +11:00
d39550c4e8 chore: migrate from bumpver to bump-my-version for better reliability 2026-01-24 23:59:05 +11:00
424de5cc13 fix: sync package version with bumpver version (2.0.3) 2026-01-24 23:54:42 +11:00
3f54874421 Bump version 2.0.2 → 2.0.3 2026-01-24 23:53:19 +11:00
6e8912eed1 chore: ignore local config overrides (docker-compose.yml, ca-bundle.pem) 2026-01-24 23:53:08 +11:00
5a0e8b4e0b feat: implement hierarchical S3 keys and automated version management
This commit introduces two major improvements:

1. **Hierarchical S3 Key Structure**:
   - Replace URL-based hashing with remote-name/hash(directory_path)/filename format
   - Enables remote-specific cache operations and intuitive S3 organization
   - Cache keys now independent of mirror URL changes
   - Example: fedora/886d215f6d1a0108/eccodes-2.44.0-1.fc42.x86_64.rpm

2. **Automated Version Management**:
   - Add bumpver for semantic version bumping
   - Single source of truth in pyproject.toml
   - FastAPI dynamically reads version from package metadata
   - Eliminates manual version synchronization between files

Changes:
- storage.py: New get_object_key(remote_name, path) method with directory hashing
- main.py: Dynamic version import and updated cache key generation calls
- cache.py: Updated to use new hierarchical key structure
- pyproject.toml: Added bumpver config and dev dependency

Breaking change: S3 key format changed, existing cache will need regeneration
2026-01-24 23:51:03 +11:00
1a71a2c9fa Merge pull request 'feat: add cache flush API and fix cache key consistency' (#5) from benvin/cache_flush into master
Reviewed-on: #5
2026-01-13 19:02:52 +11:00
d2ecc6b1a0 feat: add cache flush API and fix cache key consistency
- Add PUT /cache/flush endpoint for selective cache management
- Fix critical cache key mismatch in cleanup_expired_index()
- Update index file detection to use full path instead of filename
- Bump version to 2.0.2

The key changes include adding a comprehensive cache flush API that supports selective flushing by cache type (index, files, metrics) and fixing a critical bug where cache keys were
inconsistent between storage and cleanup operations, preventing proper cache invalidation.
2026-01-13 19:02:31 +11:00
e4013e6a2a Merge pull request 'feat: index caching' (#4) from benvin/index_caching into master
Reviewed-on: #4
2026-01-13 18:14:39 +11:00
9defc78e21 feat: index caching
- improve index detection for rpms
- improve logging
2026-01-13 18:13:47 +11:00
f40675f3d2 Merge pull request 'feat: add fedora index files' (#3) from benvin/fedora_indexes into master
Reviewed-on: #3
2026-01-10 17:02:58 +11:00
b54e6c3e0c feat: add fedora index files
- ensure files matching xml.zck and xml.zst are marked as index files
2026-01-10 17:01:39 +11:00
7 changed files with 169 additions and 124 deletions

4
.gitignore vendored
View File

@ -45,3 +45,7 @@ uv.lock
# Docker volumes # Docker volumes
minio_data/ minio_data/
# Local configuration overrides
docker-compose.yml
ca-bundle.pem

View File

@ -37,6 +37,7 @@ RUN uv sync --frozen
# Copy application source # Copy application source
COPY --chown=appuser:appuser src/ ./src/ COPY --chown=appuser:appuser src/ ./src/
COPY --chown=appuser:appuser remotes.yaml ./ COPY --chown=appuser:appuser remotes.yaml ./
COPY --chown=appuser:appuser ca-bundle.pem ./
# Expose port # Expose port
EXPOSE 8000 EXPOSE 8000

View File

@ -1,86 +0,0 @@
version: '3.8'
services:
artifactapi:
build:
context: .
dockerfile: Dockerfile
no_cache: true
ports:
- "8000:8000"
environment:
- CONFIG_PATH=/app/remotes.yaml
- DBHOST=postgres
- DBPORT=5432
- DBUSER=artifacts
- DBPASS=artifacts123
- DBNAME=artifacts
- REDIS_URL=redis://redis:6379
- MINIO_ENDPOINT=minio:9000
- MINIO_ACCESS_KEY=minioadmin
- MINIO_SECRET_KEY=minioadmin
- MINIO_BUCKET=artifacts
- MINIO_SECURE=false
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
minio:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
minio:
image: minio/minio:latest
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ":9001"
volumes:
- minio_data:/data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --save 20 1
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 30s
timeout: 10s
retries: 3
postgres:
image: postgres:15-alpine
ports:
- "5432:5432"
environment:
POSTGRES_DB: artifacts
POSTGRES_USER: artifacts
POSTGRES_PASSWORD: artifacts123
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U artifacts -d artifacts"]
interval: 30s
timeout: 10s
retries: 3
volumes:
minio_data:
redis_data:
postgres_data:

View File

@ -1,6 +1,6 @@
[project] [project]
name = "artifactapi" name = "artifactapi"
version = "2.0.0" version = "2.0.5"
description = "Generic artifact caching system with support for various package managers" description = "Generic artifact caching system with support for various package managers"
dependencies = [ dependencies = [
@ -40,4 +40,11 @@ dev = [
"isort>=5.12.0", "isort>=5.12.0",
"mypy>=1.6.0", "mypy>=1.6.0",
"ruff>=0.1.0", "ruff>=0.1.0",
"bump-my-version>=1.2.0",
] ]
[tool.bumpversion]
current_version = "2.0.5"
commit = true
tag = true
message = "Bump version: {current_version} → {new_version}"

View File

@ -23,8 +23,13 @@ class RedisCache:
file_path.endswith("APKINDEX.tar.gz") file_path.endswith("APKINDEX.tar.gz")
or file_path.endswith("Packages.gz") or file_path.endswith("Packages.gz")
or file_path.endswith("repomd.xml") or file_path.endswith("repomd.xml")
or "repodata/" in file_path or ("repodata/" in file_path
and file_path.endswith((".xml", ".xml.gz", ".xml.bz2", ".xml.xz")) and file_path.endswith((
".xml", ".xml.gz", ".xml.bz2", ".xml.xz", ".xml.zck", ".xml.zst",
".sqlite", ".sqlite.gz", ".sqlite.bz2", ".sqlite.xz", ".sqlite.zck", ".sqlite.zst",
".yaml.xz", ".yaml.gz", ".yaml.bz2", ".yaml.zst",
".asc", ".txt"
)))
) )
def get_index_cache_key(self, remote_name: str, path: str) -> str: def get_index_cache_key(self, remote_name: str, path: str) -> str:
@ -61,9 +66,19 @@ class RedisCache:
return return
try: try:
# Get the S3 key and remove it # Construct the URL the same way as in the main flow
s3_key = storage.get_object_key_from_path(remote_name, path) from .config import ConfigManager
if storage.exists(s3_key): import os
storage.client.delete_object(Bucket=storage.bucket, Key=s3_key) config_path = os.environ.get("CONFIG_PATH")
if config_path:
config = ConfigManager(config_path)
remote_config = config.get_remote_config(remote_name)
if remote_config:
base_url = remote_config.get("base_url")
if base_url:
# Use hierarchical path-based key (same as cache_single_artifact)
s3_key = storage.get_object_key(remote_name, path)
if storage.exists(s3_key):
storage.client.delete_object(Bucket=storage.bucket, Key=s3_key)
except Exception: except Exception:
pass pass

View File

@ -1,6 +1,7 @@
import os import os
import re import re
import hashlib import hashlib
import logging
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
import httpx import httpx
from fastapi import FastAPI, HTTPException, Response, Query, File, UploadFile from fastapi import FastAPI, HTTPException, Response, Query, File, UploadFile
@ -8,6 +9,13 @@ from fastapi.responses import PlainTextResponse, JSONResponse
from pydantic import BaseModel from pydantic import BaseModel
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
try:
from importlib.metadata import version
__version__ = version("artifactapi")
except ImportError:
# Fallback for development when package isn't installed
__version__ = "dev"
from .config import ConfigManager from .config import ConfigManager
from .database import DatabaseManager from .database import DatabaseManager
from .storage import S3Storage from .storage import S3Storage
@ -20,7 +28,14 @@ class ArtifactRequest(BaseModel):
include_pattern: str include_pattern: str
app = FastAPI(title="Artifact Storage API", version="2.0.0") # Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = FastAPI(title="Artifact Storage API", version=__version__)
# Initialize components using config # Initialize components using config
config_path = os.environ.get("CONFIG_PATH") config_path = os.environ.get("CONFIG_PATH")
@ -45,7 +60,7 @@ def read_root():
config._check_reload() config._check_reload()
return { return {
"message": "Artifact Storage API", "message": "Artifact Storage API",
"version": "2.0.0", "version": app.version,
"remotes": list(config.config.get("remotes", {}).keys()), "remotes": list(config.config.get("remotes", {}).keys()),
} }
@ -55,6 +70,87 @@ def health_check():
return {"status": "healthy"} return {"status": "healthy"}
@app.put("/cache/flush")
def flush_cache(
remote: str = Query(default=None, description="Specific remote to flush (optional)"),
cache_type: str = Query(default="all", description="Type to flush: 'all', 'index', 'files', 'metrics'")
):
"""Flush cache entries for specified remote or all remotes"""
try:
result = {
"remote": remote,
"cache_type": cache_type,
"flushed": {
"redis_keys": 0,
"s3_objects": 0,
"operations": []
}
}
# Flush Redis entries based on cache_type
if cache_type in ["all", "index", "metrics"] and cache.available and cache.client:
patterns = []
if cache_type in ["all", "index"]:
if remote:
patterns.append(f"index:{remote}:*")
else:
patterns.append("index:*")
if cache_type in ["all", "metrics"]:
if remote:
patterns.append(f"metrics:*:{remote}")
else:
patterns.append("metrics:*")
for pattern in patterns:
keys = cache.client.keys(pattern)
if keys:
cache.client.delete(*keys)
result["flushed"]["redis_keys"] += len(keys)
logger.info(f"Cache flush: Deleted {len(keys)} Redis keys matching '{pattern}'")
if result["flushed"]["redis_keys"] > 0:
result["flushed"]["operations"].append(f"Deleted {result['flushed']['redis_keys']} Redis keys")
# Flush S3 objects if requested
if cache_type in ["all", "files"]:
try:
# Use prefix filtering for remote-specific deletion
list_params = {"Bucket": storage.bucket}
if remote:
list_params["Prefix"] = f"{remote}/"
response = storage.client.list_objects_v2(**list_params)
if 'Contents' in response:
objects_to_delete = [obj['Key'] for obj in response['Contents']]
for key in objects_to_delete:
try:
storage.client.delete_object(Bucket=storage.bucket, Key=key)
result["flushed"]["s3_objects"] += 1
except Exception as e:
logger.warning(f"Failed to delete S3 object {key}: {e}")
if objects_to_delete:
scope = f" for remote '{remote}'" if remote else ""
result["flushed"]["operations"].append(f"Deleted {len(objects_to_delete)} S3 objects{scope}")
logger.info(f"Cache flush: Deleted {len(objects_to_delete)} S3 objects{scope}")
except Exception as e:
result["flushed"]["operations"].append(f"S3 flush failed: {str(e)}")
logger.error(f"Cache flush S3 error: {e}")
if not result["flushed"]["operations"]:
result["flushed"]["operations"].append("No cache entries found to flush")
return result
except Exception as e:
logger.error(f"Cache flush error: {e}")
raise HTTPException(status_code=500, detail=f"Cache flush failed: {str(e)}")
async def construct_remote_url(remote_name: str, path: str) -> str: async def construct_remote_url(remote_name: str, path: str) -> str:
remote_config = config.get_remote_config(remote_name) remote_config = config.get_remote_config(remote_name)
if not remote_config: if not remote_config:
@ -98,13 +194,11 @@ async def check_artifact_patterns(
async def cache_single_artifact(url: str, remote_name: str, path: str) -> dict: async def cache_single_artifact(url: str, remote_name: str, path: str) -> dict:
# Check if using URL-based key or path-based key # Use hierarchical path-based key
if url.startswith("http"): key = storage.get_object_key(remote_name, path)
key = storage.get_object_key(url)
else:
key = storage.get_object_key_from_path(remote_name, path)
if storage.exists(key): if storage.exists(key):
logger.info(f"Cache ALREADY EXISTS: {url} (key: {key})")
return { return {
"url": url, "url": url,
"cached_url": storage.get_url(key), "cached_url": storage.get_url(key),
@ -118,6 +212,8 @@ async def cache_single_artifact(url: str, remote_name: str, path: str) -> dict:
storage_path = storage.upload(key, response.content) storage_path = storage.upload(key, response.content)
logger.info(f"Cache ADD SUCCESS: {url} (size: {len(response.content)} bytes, key: {key})")
return { return {
"url": url, "url": url,
"cached_url": storage.get_url(key), "cached_url": storage.get_url(key),
@ -173,6 +269,7 @@ async def get_artifact(remote_name: str, path: str):
# Check if artifact matches configured patterns # Check if artifact matches configured patterns
if not await check_artifact_patterns(remote_name, repo_path, file_path, path): if not await check_artifact_patterns(remote_name, repo_path, file_path, path):
logger.info(f"PATTERN BLOCKED: {remote_name}/{path} - not matching include patterns")
raise HTTPException( raise HTTPException(
status_code=403, detail="Artifact not allowed by configuration patterns" status_code=403, detail="Artifact not allowed by configuration patterns"
) )
@ -180,24 +277,20 @@ async def get_artifact(remote_name: str, path: str):
# Construct the remote URL # Construct the remote URL
remote_url = await construct_remote_url(remote_name, path) remote_url = await construct_remote_url(remote_name, path)
# Check if artifact is already cached (try both URL and path-based keys) # Check if artifact is already cached
url_key = storage.get_object_key(remote_url) cached_key = storage.get_object_key(remote_name, path)
path_key = storage.get_object_key_from_path(remote_name, path) if not storage.exists(cached_key):
cached_key = None
cached_key = None
if storage.exists(url_key):
cached_key = url_key
elif storage.exists(path_key):
cached_key = path_key
# For index files, check Redis TTL validity # For index files, check Redis TTL validity
filename = os.path.basename(path) filename = os.path.basename(path)
is_index = cache.is_index_file(filename) is_index = cache.is_index_file(path) # Check full path, not just filename
if cached_key and is_index: if cached_key and is_index:
# Index file exists, but check if it's still valid # Index file exists, but check if it's still valid
if not cache.is_index_valid(remote_name, path): if not cache.is_index_valid(remote_name, path):
# Index has expired, remove it from S3 # Index has expired, remove it from S3
logger.info(f"Index EXPIRED: {remote_name}/{path} - removing from cache")
cache.cleanup_expired_index(storage, remote_name, path) cache.cleanup_expired_index(storage, remote_name, path)
cached_key = None # Force re-download cached_key = None # Force re-download
@ -207,6 +300,9 @@ async def get_artifact(remote_name: str, path: str):
artifact_data = storage.download_object(cached_key) artifact_data = storage.download_object(cached_key)
filename = os.path.basename(path) filename = os.path.basename(path)
# Log cache hit
logger.info(f"Cache HIT: {remote_name}/{path} (size: {len(artifact_data)} bytes, key: {cached_key})")
# Determine content type based on file extension # Determine content type based on file extension
content_type = "application/octet-stream" content_type = "application/octet-stream"
if filename.endswith(".tar.gz"): if filename.endswith(".tar.gz"):
@ -245,9 +341,11 @@ async def get_artifact(remote_name: str, path: str):
) )
# Artifact not cached, cache it first # Artifact not cached, cache it first
logger.info(f"Cache MISS: {remote_name}/{path} - fetching from remote: {remote_url}")
result = await cache_single_artifact(remote_url, remote_name, path) result = await cache_single_artifact(remote_url, remote_name, path)
if result["status"] == "error": if result["status"] == "error":
logger.error(f"Cache ADD FAILED: {remote_name}/{path} - {result['error']}")
raise HTTPException( raise HTTPException(
status_code=502, detail=f"Failed to fetch artifact: {result['error']}" status_code=502, detail=f"Failed to fetch artifact: {result['error']}"
) )
@ -258,10 +356,11 @@ async def get_artifact(remote_name: str, path: str):
cache_config = config.get_cache_config(remote_name) cache_config = config.get_cache_config(remote_name)
index_ttl = cache_config.get("index_ttl", 300) # Default 5 minutes index_ttl = cache_config.get("index_ttl", 300) # Default 5 minutes
cache.mark_index_cached(remote_name, path, index_ttl) cache.mark_index_cached(remote_name, path, index_ttl)
logger.info(f"Index file cached with TTL: {remote_name}/{path} (ttl: {index_ttl}s)")
# Now return the cached artifact # Now return the cached artifact
try: try:
cache_key = storage.get_object_key(remote_url) cache_key = storage.get_object_key(remote_name, path)
artifact_data = storage.download_object(cache_key) artifact_data = storage.download_object(cache_key)
filename = os.path.basename(path) filename = os.path.basename(path)
@ -283,7 +382,7 @@ async def get_artifact(remote_name: str, path: str):
metrics.record_cache_miss(remote_name, len(artifact_data)) metrics.record_cache_miss(remote_name, len(artifact_data))
# Record artifact mapping in database # Record artifact mapping in database
cache_key = storage.get_object_key(remote_url) cache_key = storage.get_object_key(remote_name, path)
database.record_artifact_mapping( database.record_artifact_mapping(
cache_key, remote_name, path, len(artifact_data) cache_key, remote_name, path, len(artifact_data)
) )
@ -531,7 +630,11 @@ async def list_cached_artifacts(
cached_artifacts = [] cached_artifacts = []
for url in matching_urls: for url in matching_urls:
key = storage.get_object_key(url) # Extract path from URL for hierarchical key generation
from urllib.parse import urlparse
parsed = urlparse(url)
path = parsed.path
key = storage.get_object_key(remote, path)
if storage.exists(key): if storage.exists(key):
cached_artifacts.append( cached_artifacts.append(
{"url": url, "cached_url": storage.get_url(key), "key": key} {"url": url, "cached_url": storage.get_url(key), "key": key}

View File

@ -1,6 +1,5 @@
import os import os
import hashlib import hashlib
from urllib.parse import urlparse
import boto3 import boto3
from botocore.config import Config from botocore.config import Config
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
@ -55,17 +54,19 @@ class S3Storage:
except ClientError: except ClientError:
self.client.create_bucket(Bucket=self.bucket) self.client.create_bucket(Bucket=self.bucket)
def get_object_key(self, url: str) -> str: def get_object_key(self, remote_name: str, path: str) -> str:
url_hash = hashlib.sha256(url.encode()).hexdigest()[:16] # Extract directory path and filename
parsed = urlparse(url) clean_path = path.lstrip('/')
filename = os.path.basename(parsed.path) filename = os.path.basename(clean_path)
return f"{parsed.netloc}/{url_hash}/{filename}" directory_path = os.path.dirname(clean_path)
def get_object_key_from_path(self, remote_name: str, path: str) -> str: # Hash the directory path to keep keys manageable while preserving remote structure
# Create a key based on the API path for direct access if directory_path:
path_hash = hashlib.sha256(path.encode()).hexdigest()[:16] path_hash = hashlib.sha256(directory_path.encode()).hexdigest()[:16]
filename = os.path.basename(path) return f"{remote_name}/{path_hash}/{filename}"
return f"{remote_name}/{path_hash}/{filename}" else:
# If no directory, just use remote and filename
return f"{remote_name}/{filename}"
def exists(self, key: str) -> bool: def exists(self, key: str) -> bool:
try: try: