Compare commits
9 Commits
fix/boto3-
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 1a71a2c9fa | |||
| d2ecc6b1a0 | |||
| e4013e6a2a | |||
| 9defc78e21 | |||
| f40675f3d2 | |||
| b54e6c3e0c | |||
| 79a8553e9c | |||
| b7205e09a3 | |||
| 1fb6b89a5f |
@ -37,6 +37,7 @@ RUN uv sync --frozen
|
||||
# Copy application source
|
||||
COPY --chown=appuser:appuser src/ ./src/
|
||||
COPY --chown=appuser:appuser remotes.yaml ./
|
||||
COPY --chown=appuser:appuser ca-bundle.pem ./
|
||||
|
||||
# Expose port
|
||||
EXPOSE 8000
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "artifactapi"
|
||||
version = "2.0.0"
|
||||
version = "2.0.2"
|
||||
description = "Generic artifact caching system with support for various package managers"
|
||||
|
||||
dependencies = [
|
||||
|
||||
@ -23,8 +23,13 @@ class RedisCache:
|
||||
file_path.endswith("APKINDEX.tar.gz")
|
||||
or file_path.endswith("Packages.gz")
|
||||
or file_path.endswith("repomd.xml")
|
||||
or "repodata/" in file_path
|
||||
and file_path.endswith((".xml", ".xml.gz", ".xml.bz2", ".xml.xz"))
|
||||
or ("repodata/" in file_path
|
||||
and file_path.endswith((
|
||||
".xml", ".xml.gz", ".xml.bz2", ".xml.xz", ".xml.zck", ".xml.zst",
|
||||
".sqlite", ".sqlite.gz", ".sqlite.bz2", ".sqlite.xz", ".sqlite.zck", ".sqlite.zst",
|
||||
".yaml.xz", ".yaml.gz", ".yaml.bz2", ".yaml.zst",
|
||||
".asc", ".txt"
|
||||
)))
|
||||
)
|
||||
|
||||
def get_index_cache_key(self, remote_name: str, path: str) -> str:
|
||||
@ -61,8 +66,20 @@ class RedisCache:
|
||||
return
|
||||
|
||||
try:
|
||||
# Get the S3 key and remove it
|
||||
s3_key = storage.get_object_key_from_path(remote_name, path)
|
||||
# Construct the URL the same way as in the main flow
|
||||
from .config import ConfigManager
|
||||
import os
|
||||
config_path = os.environ.get("CONFIG_PATH")
|
||||
if config_path:
|
||||
config = ConfigManager(config_path)
|
||||
remote_config = config.get_remote_config(remote_name)
|
||||
if remote_config:
|
||||
base_url = remote_config.get("base_url")
|
||||
if base_url:
|
||||
# Construct URL the same way as construct_remote_url
|
||||
remote_url = f"{base_url.rstrip('/')}/{path}"
|
||||
# Use URL-based key (same as cache_single_artifact)
|
||||
s3_key = storage.get_object_key(remote_url)
|
||||
if storage.exists(s3_key):
|
||||
storage.client.delete_object(Bucket=storage.bucket, Key=s3_key)
|
||||
except Exception:
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import os
|
||||
import re
|
||||
import hashlib
|
||||
import logging
|
||||
from typing import Dict, Any, Optional
|
||||
import httpx
|
||||
from fastapi import FastAPI, HTTPException, Response, Query, File, UploadFile
|
||||
@ -20,7 +21,14 @@ class ArtifactRequest(BaseModel):
|
||||
include_pattern: str
|
||||
|
||||
|
||||
app = FastAPI(title="Artifact Storage API", version="2.0.0")
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
app = FastAPI(title="Artifact Storage API", version="2.0.2")
|
||||
|
||||
# Initialize components using config
|
||||
config_path = os.environ.get("CONFIG_PATH")
|
||||
@ -45,7 +53,7 @@ def read_root():
|
||||
config._check_reload()
|
||||
return {
|
||||
"message": "Artifact Storage API",
|
||||
"version": "2.0.0",
|
||||
"version": app.version,
|
||||
"remotes": list(config.config.get("remotes", {}).keys()),
|
||||
}
|
||||
|
||||
@ -55,6 +63,81 @@ def health_check():
|
||||
return {"status": "healthy"}
|
||||
|
||||
|
||||
@app.put("/cache/flush")
|
||||
def flush_cache(
|
||||
remote: str = Query(default=None, description="Specific remote to flush (optional)"),
|
||||
cache_type: str = Query(default="all", description="Type to flush: 'all', 'index', 'files', 'metrics'")
|
||||
):
|
||||
"""Flush cache entries for specified remote or all remotes"""
|
||||
try:
|
||||
result = {
|
||||
"remote": remote,
|
||||
"cache_type": cache_type,
|
||||
"flushed": {
|
||||
"redis_keys": 0,
|
||||
"s3_objects": 0,
|
||||
"operations": []
|
||||
}
|
||||
}
|
||||
|
||||
# Flush Redis entries based on cache_type
|
||||
if cache_type in ["all", "index", "metrics"] and cache.available and cache.client:
|
||||
patterns = []
|
||||
|
||||
if cache_type in ["all", "index"]:
|
||||
if remote:
|
||||
patterns.append(f"index:{remote}:*")
|
||||
else:
|
||||
patterns.append("index:*")
|
||||
|
||||
if cache_type in ["all", "metrics"]:
|
||||
if remote:
|
||||
patterns.append(f"metrics:*:{remote}")
|
||||
else:
|
||||
patterns.append("metrics:*")
|
||||
|
||||
for pattern in patterns:
|
||||
keys = cache.client.keys(pattern)
|
||||
if keys:
|
||||
cache.client.delete(*keys)
|
||||
result["flushed"]["redis_keys"] += len(keys)
|
||||
logger.info(f"Cache flush: Deleted {len(keys)} Redis keys matching '{pattern}'")
|
||||
|
||||
if result["flushed"]["redis_keys"] > 0:
|
||||
result["flushed"]["operations"].append(f"Deleted {result['flushed']['redis_keys']} Redis keys")
|
||||
|
||||
# Flush S3 objects if requested
|
||||
if cache_type in ["all", "files"]:
|
||||
try:
|
||||
response = storage.client.list_objects_v2(Bucket=storage.bucket)
|
||||
if 'Contents' in response:
|
||||
objects_to_delete = [obj['Key'] for obj in response['Contents']]
|
||||
|
||||
for key in objects_to_delete:
|
||||
try:
|
||||
storage.client.delete_object(Bucket=storage.bucket, Key=key)
|
||||
result["flushed"]["s3_objects"] += 1
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to delete S3 object {key}: {e}")
|
||||
|
||||
if objects_to_delete:
|
||||
result["flushed"]["operations"].append(f"Deleted {len(objects_to_delete)} S3 objects")
|
||||
logger.info(f"Cache flush: Deleted {len(objects_to_delete)} S3 objects")
|
||||
|
||||
except Exception as e:
|
||||
result["flushed"]["operations"].append(f"S3 flush failed: {str(e)}")
|
||||
logger.error(f"Cache flush S3 error: {e}")
|
||||
|
||||
if not result["flushed"]["operations"]:
|
||||
result["flushed"]["operations"].append("No cache entries found to flush")
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Cache flush error: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Cache flush failed: {str(e)}")
|
||||
|
||||
|
||||
async def construct_remote_url(remote_name: str, path: str) -> str:
|
||||
remote_config = config.get_remote_config(remote_name)
|
||||
if not remote_config:
|
||||
@ -105,6 +188,7 @@ async def cache_single_artifact(url: str, remote_name: str, path: str) -> dict:
|
||||
key = storage.get_object_key_from_path(remote_name, path)
|
||||
|
||||
if storage.exists(key):
|
||||
logger.info(f"Cache ALREADY EXISTS: {url} (key: {key})")
|
||||
return {
|
||||
"url": url,
|
||||
"cached_url": storage.get_url(key),
|
||||
@ -118,6 +202,8 @@ async def cache_single_artifact(url: str, remote_name: str, path: str) -> dict:
|
||||
|
||||
storage_path = storage.upload(key, response.content)
|
||||
|
||||
logger.info(f"Cache ADD SUCCESS: {url} (size: {len(response.content)} bytes, key: {key})")
|
||||
|
||||
return {
|
||||
"url": url,
|
||||
"cached_url": storage.get_url(key),
|
||||
@ -173,6 +259,7 @@ async def get_artifact(remote_name: str, path: str):
|
||||
|
||||
# Check if artifact matches configured patterns
|
||||
if not await check_artifact_patterns(remote_name, repo_path, file_path, path):
|
||||
logger.info(f"PATTERN BLOCKED: {remote_name}/{path} - not matching include patterns")
|
||||
raise HTTPException(
|
||||
status_code=403, detail="Artifact not allowed by configuration patterns"
|
||||
)
|
||||
@ -192,12 +279,13 @@ async def get_artifact(remote_name: str, path: str):
|
||||
|
||||
# For index files, check Redis TTL validity
|
||||
filename = os.path.basename(path)
|
||||
is_index = cache.is_index_file(filename)
|
||||
is_index = cache.is_index_file(path) # Check full path, not just filename
|
||||
|
||||
if cached_key and is_index:
|
||||
# Index file exists, but check if it's still valid
|
||||
if not cache.is_index_valid(remote_name, path):
|
||||
# Index has expired, remove it from S3
|
||||
logger.info(f"Index EXPIRED: {remote_name}/{path} - removing from cache")
|
||||
cache.cleanup_expired_index(storage, remote_name, path)
|
||||
cached_key = None # Force re-download
|
||||
|
||||
@ -207,6 +295,9 @@ async def get_artifact(remote_name: str, path: str):
|
||||
artifact_data = storage.download_object(cached_key)
|
||||
filename = os.path.basename(path)
|
||||
|
||||
# Log cache hit
|
||||
logger.info(f"Cache HIT: {remote_name}/{path} (size: {len(artifact_data)} bytes, key: {cached_key})")
|
||||
|
||||
# Determine content type based on file extension
|
||||
content_type = "application/octet-stream"
|
||||
if filename.endswith(".tar.gz"):
|
||||
@ -245,9 +336,11 @@ async def get_artifact(remote_name: str, path: str):
|
||||
)
|
||||
|
||||
# Artifact not cached, cache it first
|
||||
logger.info(f"Cache MISS: {remote_name}/{path} - fetching from remote: {remote_url}")
|
||||
result = await cache_single_artifact(remote_url, remote_name, path)
|
||||
|
||||
if result["status"] == "error":
|
||||
logger.error(f"Cache ADD FAILED: {remote_name}/{path} - {result['error']}")
|
||||
raise HTTPException(
|
||||
status_code=502, detail=f"Failed to fetch artifact: {result['error']}"
|
||||
)
|
||||
@ -258,6 +351,7 @@ async def get_artifact(remote_name: str, path: str):
|
||||
cache_config = config.get_cache_config(remote_name)
|
||||
index_ttl = cache_config.get("index_ttl", 300) # Default 5 minutes
|
||||
cache.mark_index_cached(remote_name, path, index_ttl)
|
||||
logger.info(f"Index file cached with TTL: {remote_name}/{path} (ttl: {index_ttl}s)")
|
||||
|
||||
# Now return the cached artifact
|
||||
try:
|
||||
|
||||
@ -22,16 +22,25 @@ class S3Storage:
|
||||
self.bucket = bucket
|
||||
self.secure = secure
|
||||
|
||||
self.client = boto3.client(
|
||||
"s3",
|
||||
endpoint_url=f"http{'s' if self.secure else ''}://{self.endpoint}",
|
||||
aws_access_key_id=self.access_key,
|
||||
aws_secret_access_key=self.secret_key,
|
||||
config=Config(
|
||||
request_checksum_calculation="when_required",
|
||||
response_checksum_validation="when_required"
|
||||
)
|
||||
)
|
||||
ca_bundle = os.environ.get('REQUESTS_CA_BUNDLE') or os.environ.get('SSL_CERT_FILE')
|
||||
config_kwargs = {
|
||||
"request_checksum_calculation": "when_required",
|
||||
"response_checksum_validation": "when_required"
|
||||
}
|
||||
client_kwargs = {
|
||||
"endpoint_url": f"http{'s' if self.secure else ''}://{self.endpoint}",
|
||||
"aws_access_key_id": self.access_key,
|
||||
"aws_secret_access_key": self.secret_key,
|
||||
"config": Config(**config_kwargs)
|
||||
}
|
||||
|
||||
if ca_bundle and os.path.exists(ca_bundle):
|
||||
client_kwargs["verify"] = ca_bundle
|
||||
print(f"Debug: Using CA bundle: {ca_bundle}")
|
||||
else:
|
||||
print(f"Debug: No CA bundle found. REQUESTS_CA_BUNDLE={os.environ.get('REQUESTS_CA_BUNDLE')}, SSL_CERT_FILE={os.environ.get('SSL_CERT_FILE')}")
|
||||
|
||||
self.client = boto3.client("s3", **client_kwargs)
|
||||
|
||||
# Try to ensure bucket exists, but don't fail if MinIO isn't ready yet
|
||||
try:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user