- Replace Makefile version/release file system with metadata.yaml only - Add Python build automation (./tools/build) with Gitea API integration - Add GitHub release updater (./tools/update-gh) for version management - Centralize Dockerfiles into single parameterized Dockerfile - Remove 54+ individual package Dockerfiles and version directories - Update Makefile to use new Python tooling - Add GITEA_API_TOKEN validation to prevent duplicate builds - Support both explicit version/release args and metadata.yaml reading
462 lines
14 KiB
Plaintext
Executable File
462 lines
14 KiB
Plaintext
Executable File
#!/usr/bin/env -S uv run --script
|
|
# /// script
|
|
# dependencies = [
|
|
# "requests",
|
|
# "pyyaml",
|
|
# "hvac"
|
|
# ]
|
|
# ///
|
|
|
|
# vim: filetype=python
|
|
|
|
"""
|
|
GitHub Release Update Tool
|
|
|
|
Checks GitHub releases for packages and updates metadata.yaml and release files
|
|
when newer versions are available.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import logging
|
|
import requests
|
|
import yaml
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, List, Tuple
|
|
import re
|
|
import hvac
|
|
|
|
|
|
# ==================== VAULT FUNCTIONS ====================
|
|
|
|
def get_vault_client() -> hvac.Client:
|
|
"""
|
|
Initialize and authenticate Vault client using AppRole authentication.
|
|
|
|
Returns:
|
|
Authenticated HVAC client
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Get required environment variables
|
|
vault_addr = os.getenv('VAULT_ADDR', 'https://vault.service.consul:8200')
|
|
vault_role_id = os.getenv('VAULT_ROLE_ID')
|
|
|
|
if not vault_role_id:
|
|
logger.error("VAULT_ROLE_ID environment variable is required")
|
|
raise ValueError("VAULT_ROLE_ID environment variable is required")
|
|
|
|
# Initialize Vault client with CA certificate
|
|
client = hvac.Client(
|
|
url=vault_addr,
|
|
verify='/etc/pki/tls/cert.pem'
|
|
)
|
|
|
|
# Authenticate using AppRole
|
|
try:
|
|
logger.debug(f"Authenticating to Vault at {vault_addr}")
|
|
auth_response = client.auth.approle.login(role_id=vault_role_id)
|
|
|
|
if not client.is_authenticated():
|
|
logger.error("Failed to authenticate with Vault")
|
|
raise Exception("Failed to authenticate with Vault")
|
|
|
|
logger.debug("Successfully authenticated with Vault")
|
|
return client
|
|
|
|
except Exception as e:
|
|
logger.error(f"Vault authentication failed: {e}")
|
|
raise
|
|
|
|
|
|
def get_api_tokens() -> Tuple[str, str]:
|
|
"""
|
|
Retrieve GitHub and Gitea API tokens from Vault.
|
|
|
|
Returns:
|
|
Tuple of (github_token, gitea_token)
|
|
|
|
Raises:
|
|
Exception if Vault authentication fails or tokens cannot be retrieved
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
|
|
client = get_vault_client()
|
|
|
|
# Read GitHub token
|
|
try:
|
|
github_secret = client.secrets.kv.v2.read_secret_version(
|
|
mount_point='kv',
|
|
path='service/github/neoloc/tokens/read-only-token'
|
|
)
|
|
github_token = github_secret['data']['data']['token']
|
|
logger.debug("Successfully retrieved GitHub token from Vault")
|
|
except Exception as e:
|
|
logger.error(f"Failed to retrieve GitHub token from Vault: {e}")
|
|
raise Exception(f"Failed to retrieve GitHub token from Vault: {e}")
|
|
|
|
# Read Gitea token
|
|
try:
|
|
gitea_secret = client.secrets.kv.v2.read_secret_version(
|
|
mount_point='kv',
|
|
path='service/gitea/unkinben/tokens/read-only-packages'
|
|
)
|
|
gitea_token = gitea_secret['data']['data']['token']
|
|
logger.debug("Successfully retrieved Gitea token from Vault")
|
|
except Exception as e:
|
|
logger.error(f"Failed to retrieve Gitea token from Vault: {e}")
|
|
raise Exception(f"Failed to retrieve Gitea token from Vault: {e}")
|
|
|
|
if not github_token or not gitea_token:
|
|
logger.error("One or both API tokens are empty")
|
|
raise Exception("One or both API tokens are empty")
|
|
|
|
return github_token, gitea_token
|
|
|
|
|
|
def setup_logging(verbose=False):
|
|
"""Set up logging configuration."""
|
|
level = logging.DEBUG if verbose else logging.INFO
|
|
logging.basicConfig(
|
|
level=level,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%H:%M:%S'
|
|
)
|
|
|
|
|
|
def load_env_vars(env_file: Path) -> Dict[str, str]:
|
|
"""
|
|
Load environment variables from env file.
|
|
|
|
Args:
|
|
env_file: Path to the env file
|
|
|
|
Returns:
|
|
Dictionary of environment variables
|
|
"""
|
|
env_vars = {}
|
|
|
|
if not env_file.exists():
|
|
return env_vars
|
|
|
|
with open(env_file, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith('#') and '=' in line:
|
|
# Handle export statements
|
|
if line.startswith('export '):
|
|
line = line[7:] # Remove 'export '
|
|
|
|
key, value = line.split('=', 1)
|
|
env_vars[key.strip()] = value.strip()
|
|
|
|
return env_vars
|
|
|
|
|
|
def get_github_latest_release(repo: str) -> Optional[Dict]:
|
|
"""
|
|
Get the latest release from GitHub API.
|
|
|
|
Args:
|
|
repo: GitHub repository in format "owner/repo"
|
|
|
|
Returns:
|
|
Latest release info or None if not found
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
# Get GitHub token from Vault
|
|
github_token, _ = get_api_tokens()
|
|
|
|
url = f"https://api.github.com/repos/{repo}/releases/latest"
|
|
headers = {
|
|
'Authorization': f'token {github_token}',
|
|
'Accept': 'application/vnd.github.v3+json'
|
|
}
|
|
|
|
logger.debug(f"Checking GitHub releases: {url}")
|
|
response = requests.get(url, headers=headers, timeout=30)
|
|
|
|
if response.status_code == 200:
|
|
release = response.json()
|
|
logger.debug(f"Latest release for {repo}: {release.get('tag_name', 'unknown')}")
|
|
return release
|
|
elif response.status_code == 404:
|
|
logger.warning(f"No releases found for {repo}")
|
|
return None
|
|
elif response.status_code == 401:
|
|
logger.error("GitHub authentication failed. Check GITHUB_API_TOKEN.")
|
|
return None
|
|
else:
|
|
logger.warning(
|
|
f"Unexpected response from GitHub API for {repo}: "
|
|
f"{response.status_code} - {response.text}"
|
|
)
|
|
return None
|
|
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to check GitHub releases for {repo}: {e}")
|
|
return None
|
|
|
|
|
|
def normalize_version(version: str) -> str:
|
|
"""
|
|
Normalize version string by removing 'v' prefix if present.
|
|
|
|
Args:
|
|
version: Version string (e.g., "v1.2.3" or "1.2.3")
|
|
|
|
Returns:
|
|
Normalized version string (e.g., "1.2.3")
|
|
"""
|
|
if version.startswith('v'):
|
|
return version[1:]
|
|
return version
|
|
|
|
|
|
def compare_versions(current: str, latest: str) -> bool:
|
|
"""
|
|
Compare version strings to determine if latest is newer.
|
|
This is a simple string comparison that works for semantic versions.
|
|
|
|
Args:
|
|
current: Current version string
|
|
latest: Latest version string
|
|
|
|
Returns:
|
|
True if latest is newer than current
|
|
"""
|
|
def version_tuple(v):
|
|
# Split by dots and convert to integers where possible
|
|
parts = []
|
|
for part in v.split('.'):
|
|
try:
|
|
parts.append(int(part))
|
|
except ValueError:
|
|
# Handle non-numeric parts
|
|
parts.append(part)
|
|
return tuple(parts)
|
|
|
|
try:
|
|
return version_tuple(latest) > version_tuple(current)
|
|
except:
|
|
# Fallback to string comparison
|
|
return latest != current
|
|
|
|
|
|
def update_package_metadata(package_dir: Path, new_version: str, dry_run: bool = False) -> bool:
|
|
"""
|
|
Update package metadata.yaml with new version.
|
|
|
|
Args:
|
|
package_dir: Path to package directory
|
|
new_version: New version to update to
|
|
dry_run: If True, only show what would be done
|
|
|
|
Returns:
|
|
True if update was successful
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
metadata_file = package_dir / "metadata.yaml"
|
|
|
|
try:
|
|
# Load current metadata
|
|
with open(metadata_file, 'r') as f:
|
|
metadata = yaml.safe_load(f)
|
|
|
|
old_version = metadata.get('version', 'unknown')
|
|
logger.info(f"Updating {metadata.get('name', 'unknown')} from {old_version} to {new_version}")
|
|
|
|
if dry_run:
|
|
logger.info(f"[DRY RUN] Would update metadata.yaml version to {new_version}")
|
|
return True
|
|
|
|
# Update version in metadata and reset release to 1
|
|
metadata['version'] = new_version
|
|
metadata['release'] = 1
|
|
|
|
# Write updated metadata
|
|
with open(metadata_file, 'w') as f:
|
|
yaml.dump(metadata, f, default_flow_style=False, sort_keys=False)
|
|
|
|
logger.info(f"Successfully updated {metadata.get('name')} to version {new_version}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to update package metadata: {e}")
|
|
return False
|
|
|
|
|
|
def check_package_updates(package_dir: Path, dry_run: bool = False) -> bool:
|
|
"""
|
|
Check for updates for a single package.
|
|
|
|
Args:
|
|
package_dir: Path to package directory
|
|
dry_run: If True, only show what would be done
|
|
|
|
Returns:
|
|
True if package was updated or no update needed
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
metadata_file = package_dir / "metadata.yaml"
|
|
|
|
if not metadata_file.exists():
|
|
logger.warning(f"No metadata.yaml found in {package_dir}")
|
|
return False
|
|
|
|
try:
|
|
# Load metadata
|
|
with open(metadata_file, 'r') as f:
|
|
metadata = yaml.safe_load(f)
|
|
|
|
package_name = metadata.get('name', package_dir.name)
|
|
current_version = metadata.get('version')
|
|
github_repo = metadata.get('github')
|
|
|
|
if not github_repo:
|
|
logger.debug(f"Package {package_name} has no GitHub repo configured")
|
|
return True
|
|
|
|
if not current_version:
|
|
logger.warning(f"Package {package_name} has no version in metadata")
|
|
return False
|
|
|
|
logger.info(f"Checking {package_name} (current: {current_version}) from {github_repo}")
|
|
|
|
# Get latest release from GitHub
|
|
latest_release = get_github_latest_release(github_repo)
|
|
if not latest_release:
|
|
return False
|
|
|
|
latest_version = normalize_version(latest_release.get('tag_name', ''))
|
|
if not latest_version:
|
|
logger.warning(f"Could not determine latest version for {package_name}")
|
|
return False
|
|
|
|
# Compare versions
|
|
if compare_versions(current_version, latest_version):
|
|
logger.info(f"New version available: {current_version} -> {latest_version}")
|
|
return update_package_metadata(package_dir, latest_version, dry_run)
|
|
else:
|
|
logger.info(f"Package {package_name} is up to date ({current_version})")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to check package {package_dir.name}: {e}")
|
|
return False
|
|
|
|
|
|
def find_packages_with_github(rpms_dir: Path) -> List[Path]:
|
|
"""
|
|
Find all packages that have GitHub repo configured.
|
|
|
|
Args:
|
|
rpms_dir: Path to rpms directory
|
|
|
|
Returns:
|
|
List of package directories with GitHub repos
|
|
"""
|
|
github_packages = []
|
|
|
|
for package_dir in rpms_dir.iterdir():
|
|
if not package_dir.is_dir() or package_dir.name.startswith('.'):
|
|
continue
|
|
|
|
metadata_file = package_dir / "metadata.yaml"
|
|
if not metadata_file.exists():
|
|
continue
|
|
|
|
try:
|
|
with open(metadata_file, 'r') as f:
|
|
metadata = yaml.safe_load(f)
|
|
|
|
if metadata.get('github'):
|
|
github_packages.append(package_dir)
|
|
except Exception:
|
|
continue
|
|
|
|
return github_packages
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
parser = argparse.ArgumentParser(
|
|
description='Check GitHub releases and update package metadata',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
%(prog)s --package boilerplate
|
|
%(prog)s --all
|
|
%(prog)s --all --dry-run
|
|
"""
|
|
)
|
|
|
|
# Package selection arguments
|
|
group = parser.add_mutually_exclusive_group(required=True)
|
|
group.add_argument('--package', help='Package name to check')
|
|
group.add_argument('--all', action='store_true', help='Check all packages with GitHub repos')
|
|
|
|
# Optional arguments
|
|
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without making changes')
|
|
parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging')
|
|
|
|
args = parser.parse_args()
|
|
|
|
setup_logging(args.verbose)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
# Get root directory and load environment
|
|
root_dir = Path(__file__).parent.parent
|
|
rpms_dir = root_dir / "rpms"
|
|
env_file = root_dir / "env"
|
|
|
|
if not rpms_dir.exists():
|
|
logger.error(f"RPMs directory not found: {rpms_dir}")
|
|
sys.exit(1)
|
|
|
|
success = True
|
|
|
|
if args.package:
|
|
# Check single package
|
|
package_dir = rpms_dir / args.package
|
|
if not package_dir.exists():
|
|
logger.error(f"Package directory not found: {package_dir}")
|
|
sys.exit(1)
|
|
|
|
success = check_package_updates(package_dir, args.dry_run)
|
|
else:
|
|
# Check all packages with GitHub repos
|
|
github_packages = find_packages_with_github(rpms_dir)
|
|
|
|
if not github_packages:
|
|
logger.info("No packages with GitHub repos found")
|
|
sys.exit(0)
|
|
|
|
logger.info(f"Found {len(github_packages)} packages with GitHub repos")
|
|
|
|
updated_count = 0
|
|
for package_dir in github_packages:
|
|
if check_package_updates(package_dir, args.dry_run):
|
|
updated_count += 1
|
|
|
|
logger.info(f"Successfully processed {updated_count}/{len(github_packages)} packages")
|
|
success = updated_count == len(github_packages)
|
|
|
|
sys.exit(0 if success else 1)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Update check interrupted by user")
|
|
sys.exit(130)
|
|
except Exception as e:
|
|
logger.error(f"Update check failed: {e}")
|
|
if args.verbose:
|
|
logger.exception("Full traceback:")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main() |