#!/usr/bin/env -S uv run --script # /// script # dependencies = [ # "requests", # "pyyaml", # "hvac" # ] # /// # vim: filetype=python """ GitHub Release Update Tool Checks GitHub releases for packages and updates metadata.yaml and release files when newer versions are available. """ import os import sys import argparse import logging import requests import yaml from pathlib import Path from typing import Dict, Optional, List, Tuple import re import hvac # ==================== VAULT FUNCTIONS ==================== def get_vault_client() -> hvac.Client: """ Initialize and authenticate Vault client using AppRole authentication. Returns: Authenticated HVAC client """ logger = logging.getLogger(__name__) # Get required environment variables vault_addr = os.getenv('VAULT_ADDR', 'https://vault.service.consul:8200') vault_role_id = os.getenv('VAULT_ROLE_ID') if not vault_role_id: logger.error("VAULT_ROLE_ID environment variable is required") raise ValueError("VAULT_ROLE_ID environment variable is required") # Initialize Vault client with CA certificate client = hvac.Client( url=vault_addr, verify='/etc/pki/tls/cert.pem' ) # Authenticate using AppRole try: logger.debug(f"Authenticating to Vault at {vault_addr}") auth_response = client.auth.approle.login(role_id=vault_role_id) if not client.is_authenticated(): logger.error("Failed to authenticate with Vault") raise Exception("Failed to authenticate with Vault") logger.debug("Successfully authenticated with Vault") return client except Exception as e: logger.error(f"Vault authentication failed: {e}") raise def get_api_tokens() -> Tuple[str, str]: """ Retrieve GitHub and Gitea API tokens from Vault. Returns: Tuple of (github_token, gitea_token) Raises: Exception if Vault authentication fails or tokens cannot be retrieved """ logger = logging.getLogger(__name__) client = get_vault_client() # Read GitHub token try: github_secret = client.secrets.kv.v2.read_secret_version( mount_point='kv', path='service/github/neoloc/tokens/read-only-token' ) github_token = github_secret['data']['data']['token'] logger.debug("Successfully retrieved GitHub token from Vault") except Exception as e: logger.error(f"Failed to retrieve GitHub token from Vault: {e}") raise Exception(f"Failed to retrieve GitHub token from Vault: {e}") # Read Gitea token try: gitea_secret = client.secrets.kv.v2.read_secret_version( mount_point='kv', path='service/gitea/unkinben/tokens/read-only-packages' ) gitea_token = gitea_secret['data']['data']['token'] logger.debug("Successfully retrieved Gitea token from Vault") except Exception as e: logger.error(f"Failed to retrieve Gitea token from Vault: {e}") raise Exception(f"Failed to retrieve Gitea token from Vault: {e}") if not github_token or not gitea_token: logger.error("One or both API tokens are empty") raise Exception("One or both API tokens are empty") return github_token, gitea_token def setup_logging(verbose=False): """Set up logging configuration.""" level = logging.DEBUG if verbose else logging.INFO logging.basicConfig( level=level, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) def load_env_vars(env_file: Path) -> Dict[str, str]: """ Load environment variables from env file. Args: env_file: Path to the env file Returns: Dictionary of environment variables """ env_vars = {} if not env_file.exists(): return env_vars with open(env_file, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: # Handle export statements if line.startswith('export '): line = line[7:] # Remove 'export ' key, value = line.split('=', 1) env_vars[key.strip()] = value.strip() return env_vars def get_github_latest_release(repo: str) -> Optional[Dict]: """ Get the latest release from GitHub API. Args: repo: GitHub repository in format "owner/repo" Returns: Latest release info or None if not found """ logger = logging.getLogger(__name__) try: # Get GitHub token from Vault github_token, _ = get_api_tokens() url = f"https://api.github.com/repos/{repo}/releases/latest" headers = { 'Authorization': f'token {github_token}', 'Accept': 'application/vnd.github.v3+json' } logger.debug(f"Checking GitHub releases: {url}") response = requests.get(url, headers=headers, timeout=30) if response.status_code == 200: release = response.json() logger.debug(f"Latest release for {repo}: {release.get('tag_name', 'unknown')}") return release elif response.status_code == 404: logger.warning(f"No releases found for {repo}") return None elif response.status_code == 401: logger.error("GitHub authentication failed. Check GITHUB_API_TOKEN.") return None else: logger.warning( f"Unexpected response from GitHub API for {repo}: " f"{response.status_code} - {response.text}" ) return None except requests.RequestException as e: logger.error(f"Failed to check GitHub releases for {repo}: {e}") return None def normalize_version(version: str) -> str: """ Normalize version string by removing 'v' prefix if present. Args: version: Version string (e.g., "v1.2.3" or "1.2.3") Returns: Normalized version string (e.g., "1.2.3") """ if version.startswith('v'): return version[1:] return version def compare_versions(current: str, latest: str) -> bool: """ Compare version strings to determine if latest is newer. This is a simple string comparison that works for semantic versions. Args: current: Current version string latest: Latest version string Returns: True if latest is newer than current """ def version_tuple(v): # Split by dots and convert to integers where possible parts = [] for part in v.split('.'): try: parts.append(int(part)) except ValueError: # Handle non-numeric parts parts.append(part) return tuple(parts) try: return version_tuple(latest) > version_tuple(current) except: # Fallback to string comparison return latest != current def update_package_metadata(package_dir: Path, new_version: str, dry_run: bool = False) -> bool: """ Update package metadata.yaml with new version. Args: package_dir: Path to package directory new_version: New version to update to dry_run: If True, only show what would be done Returns: True if update was successful """ logger = logging.getLogger(__name__) metadata_file = package_dir / "metadata.yaml" try: # Load current metadata with open(metadata_file, 'r') as f: metadata = yaml.safe_load(f) old_version = metadata.get('version', 'unknown') logger.info(f"Updating {metadata.get('name', 'unknown')} from {old_version} to {new_version}") if dry_run: logger.info(f"[DRY RUN] Would update metadata.yaml version to {new_version}") return True # Update version in metadata and reset release to 1 metadata['version'] = new_version metadata['release'] = 1 # Write updated metadata with open(metadata_file, 'w') as f: yaml.dump(metadata, f, default_flow_style=False, sort_keys=False) logger.info(f"Successfully updated {metadata.get('name')} to version {new_version}") return True except Exception as e: logger.error(f"Failed to update package metadata: {e}") return False def check_package_updates(package_dir: Path, dry_run: bool = False) -> bool: """ Check for updates for a single package. Args: package_dir: Path to package directory dry_run: If True, only show what would be done Returns: True if package was updated or no update needed """ logger = logging.getLogger(__name__) metadata_file = package_dir / "metadata.yaml" if not metadata_file.exists(): logger.warning(f"No metadata.yaml found in {package_dir}") return False try: # Load metadata with open(metadata_file, 'r') as f: metadata = yaml.safe_load(f) package_name = metadata.get('name', package_dir.name) current_version = metadata.get('version') github_repo = metadata.get('github') if not github_repo: logger.debug(f"Package {package_name} has no GitHub repo configured") return True if not current_version: logger.warning(f"Package {package_name} has no version in metadata") return False logger.info(f"Checking {package_name} (current: {current_version}) from {github_repo}") # Get latest release from GitHub latest_release = get_github_latest_release(github_repo) if not latest_release: return False latest_version = normalize_version(latest_release.get('tag_name', '')) if not latest_version: logger.warning(f"Could not determine latest version for {package_name}") return False # Compare versions if compare_versions(current_version, latest_version): logger.info(f"New version available: {current_version} -> {latest_version}") return update_package_metadata(package_dir, latest_version, dry_run) else: logger.info(f"Package {package_name} is up to date ({current_version})") return True except Exception as e: logger.error(f"Failed to check package {package_dir.name}: {e}") return False def find_packages_with_github(rpms_dir: Path) -> List[Path]: """ Find all packages that have GitHub repo configured. Args: rpms_dir: Path to rpms directory Returns: List of package directories with GitHub repos """ github_packages = [] for package_dir in rpms_dir.iterdir(): if not package_dir.is_dir() or package_dir.name.startswith('.'): continue metadata_file = package_dir / "metadata.yaml" if not metadata_file.exists(): continue try: with open(metadata_file, 'r') as f: metadata = yaml.safe_load(f) if metadata.get('github'): github_packages.append(package_dir) except Exception: continue return github_packages def main(): """Main entry point.""" parser = argparse.ArgumentParser( description='Check GitHub releases and update package metadata', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s --package boilerplate %(prog)s --all %(prog)s --all --dry-run """ ) # Package selection arguments group = parser.add_mutually_exclusive_group(required=True) group.add_argument('--package', help='Package name to check') group.add_argument('--all', action='store_true', help='Check all packages with GitHub repos') # Optional arguments parser.add_argument('--dry-run', action='store_true', help='Show what would be done without making changes') parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging') args = parser.parse_args() setup_logging(args.verbose) logger = logging.getLogger(__name__) try: # Get root directory and load environment root_dir = Path(__file__).parent.parent rpms_dir = root_dir / "rpms" env_file = root_dir / "env" if not rpms_dir.exists(): logger.error(f"RPMs directory not found: {rpms_dir}") sys.exit(1) success = True if args.package: # Check single package package_dir = rpms_dir / args.package if not package_dir.exists(): logger.error(f"Package directory not found: {package_dir}") sys.exit(1) success = check_package_updates(package_dir, args.dry_run) else: # Check all packages with GitHub repos github_packages = find_packages_with_github(rpms_dir) if not github_packages: logger.info("No packages with GitHub repos found") sys.exit(0) logger.info(f"Found {len(github_packages)} packages with GitHub repos") updated_count = 0 for package_dir in github_packages: if check_package_updates(package_dir, args.dry_run): updated_count += 1 logger.info(f"Successfully processed {updated_count}/{len(github_packages)} packages") success = updated_count == len(github_packages) sys.exit(0 if success else 1) except KeyboardInterrupt: logger.info("Update check interrupted by user") sys.exit(130) except Exception as e: logger.error(f"Update check failed: {e}") if args.verbose: logger.exception("Full traceback:") sys.exit(1) if __name__ == '__main__': main()