rpmbuilder/tools/build
Ben Vincent 2954835dd0
All checks were successful
Build / build-8 (pull_request) Successful in 13s
Build / build-9 (pull_request) Successful in 13s
feat: add GitHub integration for automated package updates
- add GitHub API functions to fetch latest releases and compare versions
- implement package update checking against GitHub repositories
- add upgrade and upgrade-all CLI commands with dry-run support
- include .ruff_cache in .gitignore for cleaner repository
2025-12-29 22:08:15 +11:00

1469 lines
50 KiB
Plaintext
Executable File

#!/usr/bin/env -S uv run --script
# /// script
# dependencies = [
# "typer",
# "requests",
# "pyyaml",
# "hvac",
# "cerberus"
# ]
# ///
# vim: filetype=python
"""
RPM Builder Tool
A modern Python console application for building RPM packages.
Builds RPM packages using Docker and checks for existing packages via Gitea API.
"""
import os
import sys
import logging
import subprocess
import requests
from pathlib import Path
from typing import List, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import hvac
import typer
import yaml
from cerberus import Validator
# ==================== VALIDATION SCHEMA ====================
# Cerberus schema for metadata.yaml validation based on PackageMetadata and Build dataclasses
METADATA_SCHEMA = {
'name': {
'type': 'string',
'required': True,
'empty': False,
'regex': r'^[a-zA-Z0-9][a-zA-Z0-9\-_]*$'
},
'github': {
'type': 'string',
'required': True,
'empty': False,
'regex': r'^[a-zA-Z0-9\-_]+/[a-zA-Z0-9\-_\.]+$'
},
'description': {
'type': 'string',
'required': True,
'empty': False
},
'arch': {
'type': 'string',
'required': False,
'default': 'amd64',
'allowed': ['amd64', 'arm64', 'x86_64']
},
'platform': {
'type': 'string',
'required': False,
'default': 'linux'
},
'maintainer': {
'type': 'string',
'required': False,
'empty': False
},
'homepage': {
'type': 'string',
'required': False,
'empty': False,
'regex': r'^https?://.+'
},
'license': {
'type': 'string',
'required': False,
'empty': False
},
'builds': {
'type': 'list',
'required': True,
'minlength': 1,
'schema': {
'type': 'dict',
'schema': {
'repository': {
'type': 'list',
'required': True,
'minlength': 1,
'schema': {
'type': 'string',
'allowed': ['almalinux/el8', 'almalinux/el9'],
'empty': False
}
},
'image': {
'type': 'string',
'required': True,
'empty': False,
'regex': r'^[a-zA-Z0-9\-_\.:\/]+$'
},
'release': {
'type': 'string',
'required': True,
'empty': False
},
'version': {
'type': 'string',
'required': True,
'empty': False,
'regex': r'^[0-9]+(\.[0-9]+)*(\.[0-9]+)*(-[a-zA-Z0-9]+)*$'
}
}
}
}
}
# ==================== DATACLASSES ===================="
@dataclass
class Build:
"""Describes each build of a package that should be managed."""
repository: List[str]
image: str
release: str
version: str
@dataclass
class PackageMetadata:
"""Defines data that will be used to fill nfpm.yaml variables and contains build objects."""
name: str
github: str
description: str
arch: str = "amd64"
platform: str = "linux"
maintainer: str = ""
homepage: str = ""
license: str = ""
builds: List[Build] = None
def __post_init__(self):
if self.builds is None:
self.builds = []
# ==================== VAULT FUNCTIONS ====================
def get_vault_client() -> hvac.Client:
"""
Initialize and authenticate Vault client using AppRole authentication.
Returns:
Authenticated HVAC client
"""
logger = logging.getLogger(__name__)
# Get required environment variables
vault_addr = os.getenv('VAULT_ADDR', 'https://vault.service.consul:8200')
vault_role_id = os.getenv('VAULT_ROLE_ID')
if not vault_role_id:
logger.error("VAULT_ROLE_ID environment variable is required")
sys.exit(1)
# Initialize Vault client with CA certificate
client = hvac.Client(
url=vault_addr,
verify='/etc/pki/tls/cert.pem'
)
# Authenticate using AppRole
try:
logger.debug(f"Authenticating to Vault at {vault_addr}")
client.auth.approle.login(role_id=vault_role_id)
if not client.is_authenticated():
logger.error("Failed to authenticate with Vault")
sys.exit(1)
logger.debug("Successfully authenticated with Vault")
return client
except Exception as e:
logger.error(f"Vault authentication failed: {e}")
sys.exit(1)
def get_gitea_token() -> str:
"""
Retrieve Gitea API token from Vault.
Returns:
Gitea API token
Raises:
Exception if Vault authentication fails or token cannot be retrieved
"""
logger = logging.getLogger(__name__)
client = get_vault_client()
try:
gitea_secret = client.secrets.kv.v2.read_secret_version(
raise_on_deleted_version=True,
mount_point='kv',
path='service/gitea/unkinben/tokens/read-only-packages'
)
gitea_token = gitea_secret['data']['data']['token']
logger.debug("Successfully retrieved Gitea token from Vault")
except Exception as e:
logger.error(f"Failed to retrieve Gitea token from Vault: {e}")
sys.exit(1)
if not gitea_token:
logger.error("Gitea token is empty")
sys.exit(1)
return gitea_token
# ==================== GITHUB API FUNCTIONS ====================
def get_github_latest_release(repo: str) -> Optional[dict]:
"""
Get the latest release from GitHub API.
Args:
repo: GitHub repository in format "owner/repo"
Returns:
Latest release info or None if not found
"""
logger = logging.getLogger(__name__)
try:
github_token = get_github_token()
url = f"https://api.github.com/repos/{repo}/releases/latest"
headers = {
'Authorization': f'token {github_token}',
'Accept': 'application/vnd.github.v3+json'
}
logger.debug(f"Checking GitHub releases: {url}")
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
release = response.json()
logger.debug(f"Latest release for {repo}: {release.get('tag_name', 'unknown')}")
return release
elif response.status_code == 404:
logger.warning(f"No releases found for {repo}")
return None
elif response.status_code == 401:
logger.error("GitHub authentication failed. Check GitHub token.")
return None
else:
logger.warning(
f"Unexpected response from GitHub API for {repo}: "
f"{response.status_code} - {response.text}"
)
return None
except requests.RequestException as e:
logger.error(f"Failed to check GitHub releases for {repo}: {e}")
return None
def normalize_github_version(version: str) -> str:
"""
Normalize version string by removing 'v' prefix if present.
Args:
version: Version string (e.g., "v1.2.3" or "1.2.3")
Returns:
Normalized version string (e.g., "1.2.3")
"""
if version.startswith('v'):
return version[1:]
return version
def compare_versions(current: str, latest: str) -> bool:
"""
Compare version strings to determine if latest is newer.
Args:
current: Current version string
latest: Latest version string
Returns:
True if latest is newer than current
"""
def version_tuple(v):
parts = []
for part in v.split('.'):
try:
parts.append(int(part))
except ValueError:
parts.append(part)
return tuple(parts)
try:
return version_tuple(latest) > version_tuple(current)
except (ValueError, TypeError):
return latest != current
def get_github_token() -> str:
"""
Retrieve GitHub API token from Vault.
Returns:
GitHub API token
Raises:
Exception if Vault authentication fails or token cannot be retrieved
"""
logger = logging.getLogger(__name__)
client = get_vault_client()
try:
github_secret = client.secrets.kv.v2.read_secret_version(
raise_on_deleted_version=True,
mount_point='kv',
path='service/github/neoloc/tokens/read-only-token'
)
github_token = github_secret['data']['data']['token']
logger.debug("Successfully retrieved GitHub token from Vault")
except Exception as e:
logger.error(f"Failed to retrieve GitHub token from Vault: {e}")
sys.exit(1)
if not github_token:
logger.error("GitHub token is empty")
sys.exit(1)
return github_token
# ==================== GITEA API FUNCTIONS ====================
def normalize_version(version: str) -> str:
"""
Normalize version string by removing leading zeros from numeric components.
Gitea automatically does this normalization.
Examples:
"2025.08.03" -> "2025.8.3"
"1.05.0" -> "1.5.0"
"0.6.1" -> "0.6.1" (no change needed)
Args:
version: Original version string
Returns:
Normalized version string
"""
import re
# Split by common separators and normalize each numeric part
parts = re.split(r'([.\-_])', version)
normalized_parts = []
for part in parts:
# If this part is purely numeric and has leading zeros, remove them
if part.isdigit() and len(part) > 1 and part.startswith('0'):
# Remove leading zeros but keep at least one digit
normalized_parts.append(str(int(part)))
else:
normalized_parts.append(part)
return ''.join(normalized_parts)
def check_package_exists(package_name: str, version: str, release: str) -> bool:
"""
Check if a package version exists in the Gitea package registry.
Args:
package_name: Name of the package
version: Version string
release: Release number
Returns:
True if package exists, False otherwise
"""
logger = logging.getLogger(__name__)
# Get configuration from environment
base_url = os.getenv('GITEA_URL', 'https://git.unkin.net')
owner = os.getenv('GITEA_OWNER', 'unkin')
package_type = os.getenv('GITEA_PACKAGE_TYPE', 'rpm')
# Get Gitea token from Vault - fail hard if unavailable
gitea_token = get_gitea_token()
try:
# Normalize version by removing leading zeros (Gitea does this automatically)
# e.g., "2025.08.03" becomes "2025.8.3"
normalized_version = normalize_version(version)
full_version = f"{normalized_version}-{release}"
url = (
f"{base_url}/api/v1/packages/{owner}/"
f"{package_type}/{package_name}/{full_version}"
)
headers = {'Authorization': f'token {gitea_token}'}
logger.debug(f"Checking package existence: {url}")
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
package_info = response.json()
# Package exists if we get package info back
exists = bool(package_info.get('id'))
logger.debug(f"Package {package_name}:{full_version} {'exists' if exists else 'not found'}")
return exists
elif response.status_code == 404:
logger.debug(f"Package {package_name}:{full_version} not found (404)")
return False
elif response.status_code == 401:
logger.error("Authentication failed. Check GITEA_API_TOKEN.")
return False
else:
logger.warning(
f"Unexpected response checking package {package_name}:{full_version}: "
f"{response.status_code} - {response.text}"
)
return False
except requests.RequestException as e:
logger.error(f"Failed to check package {package_name}:{version}-{release}: {e}")
return False
def get_package_full_name(package_name: str, version: str, release: str) -> str:
"""
Generate the full package name as used in the registry.
Args:
package_name: Package name
version: Version string
release: Release number
Returns:
Full package name string
"""
return f"{package_name}-{version}-{release}"
# ==================== DOCKER FUNCTIONS ====================
def check_docker_available() -> bool:
"""
Check if Docker is available and running.
Returns:
True if Docker is available, False otherwise
"""
try:
result = subprocess.run(
['docker', 'version'],
capture_output=True,
text=True,
timeout=10
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def cleanup_container(container_name: str) -> None:
"""
Remove a Docker container.
Args:
container_name: Name of the container to remove
"""
logger = logging.getLogger(__name__)
try:
remove_args = ['docker', 'rm', container_name]
logger.debug(f"Running: {' '.join(remove_args)}")
subprocess.run(remove_args, capture_output=True, text=True)
except Exception as e:
logger.warning(f"Failed to remove container {container_name}: {e}")
def get_base_image_from_metadata(package_dir: Path, distro: str = "el/9") -> str:
"""
Get the base image from package metadata.yaml.
Args:
package_dir: Directory containing the package
distro: Target distro (default: el/9)
Returns:
Base image URL or default if not found
"""
metadata_file = package_dir / "metadata.yaml"
default_image = "git.unkin.net/unkin/almalinux9-rpmbuilder:latest"
if not metadata_file.exists():
return default_image
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
build_configs = metadata.get('builds', [])
for config in build_configs:
if config.get('distro') == distro:
return config.get('image', default_image)
# If no matching distro found, return first image or default
if build_configs:
return build_configs[0].get('image', default_image)
return default_image
except Exception:
return default_image
def build_package_docker(
package_dir: Path,
package_name: str,
package_version: str,
package_release: str,
dist_dir: Path,
base_image: str = "git.unkin.net/unkin/almalinux9-rpmbuilder:latest",
dry_run: bool = False
) -> bool:
"""
Build a package using Docker with central Dockerfile.
Args:
package_dir: Directory containing the package resources
package_name: Name of the package
package_version: Package version
package_release: Package release number
dist_dir: Directory to store built packages
base_image: Base Docker image to use for building
dry_run: If True, only show what would be done
Returns:
True if build succeeded, False otherwise
"""
logger = logging.getLogger(__name__)
try:
# Ensure dist directory exists
package_dist_dir = dist_dir / package_name
if not dry_run:
package_dist_dir.mkdir(parents=True, exist_ok=True)
# Generate Docker image name
image_name = f"{package_name.lower()}-builder"
container_name = f"{package_name}-{package_version}-builder"
# Read metadata.yaml to get all package fields
metadata_file = package_dir / "metadata.yaml"
metadata = {}
if metadata_file.exists():
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f) or {}
except Exception as e:
logger.warning(f"Could not read metadata.yaml: {e}")
logger.info(f"Building RPM for {package_name} version {package_version}")
if dry_run:
logger.info(f"[DRY RUN] Would build Docker image: {image_name}")
logger.info(f"[DRY RUN] Would use base image: {base_image}")
logger.info("[DRY RUN] Would pass build arguments:")
logger.info(f"[DRY RUN] PACKAGE_NAME={package_name}")
logger.info(f"[DRY RUN] PACKAGE_VERSION={package_version}")
logger.info(f"[DRY RUN] PACKAGE_RELEASE={package_release}")
logger.info(f"[DRY RUN] PACKAGE_DESCRIPTION={metadata.get('description', '')}")
logger.info(f"[DRY RUN] PACKAGE_MAINTAINER={metadata.get('maintainer', '')}")
logger.info(f"[DRY RUN] PACKAGE_HOMEPAGE={metadata.get('homepage', '')}")
logger.info(f"[DRY RUN] PACKAGE_LICENSE={metadata.get('license', '')}")
logger.info(f"[DRY RUN] PACKAGE_ARCH={metadata.get('arch', 'amd64')}")
logger.info(f"[DRY RUN] PACKAGE_PLATFORM={metadata.get('platform', 'linux')}")
logger.info(f"[DRY RUN] Would create container: {container_name}")
logger.info(f"[DRY RUN] Would copy artifacts to: {package_dist_dir}")
return True
# Build Docker image using central Dockerfile with all metadata
central_dockerfile = package_dir.parent.parent / "Dockerfile"
build_args = [
'docker', 'build',
'-f', str(central_dockerfile),
'--build-arg', f'BASE_IMAGE={base_image}',
'--build-arg', f'PACKAGE_NAME={package_name}',
'--build-arg', f'PACKAGE_VERSION={package_version}',
'--build-arg', f'PACKAGE_RELEASE={package_release}',
'--build-arg', f'PACKAGE_DESCRIPTION={metadata.get("description", "")}',
'--build-arg', f'PACKAGE_MAINTAINER={metadata.get("maintainer", "")}',
'--build-arg', f'PACKAGE_HOMEPAGE={metadata.get("homepage", "")}',
'--build-arg', f'PACKAGE_LICENSE={metadata.get("license", "")}',
'--build-arg', f'PACKAGE_ARCH={metadata.get("arch", "amd64")}',
'--build-arg', f'PACKAGE_PLATFORM={metadata.get("platform", "linux")}',
'-t', image_name,
str(package_dir)
]
logger.debug(f"Running: {' '.join(build_args)}")
result = subprocess.run(
build_args,
capture_output=True,
text=True,
cwd=package_dir
)
if result.returncode != 0:
logger.error(f"Docker build failed for {package_name}")
logger.error(f"stdout: {result.stdout}")
logger.error(f"stderr: {result.stderr}")
return False
# Step 2: Create and start container
create_args = [
'docker', 'create',
'--name', container_name,
image_name
]
logger.debug(f"Running: {' '.join(create_args)}")
result = subprocess.run(create_args, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Container creation failed for {package_name}")
logger.error(f"stderr: {result.stderr}")
return False
try:
# Step 3: Start container
start_args = ['docker', 'start', '-a', container_name]
logger.debug(f"Running: {' '.join(start_args)}")
result = subprocess.run(start_args, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Container execution failed for {package_name}")
logger.error(f"stdout: {result.stdout}")
logger.error(f"stderr: {result.stderr}")
return False
# Step 4: Copy artifacts
copy_args = [
'docker', 'cp',
f"{container_name}:/app/dist/.",
str(package_dist_dir) + "/"
]
logger.debug(f"Running: {' '.join(copy_args)}")
result = subprocess.run(copy_args, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Failed to copy artifacts for {package_name}")
logger.error(f"stderr: {result.stderr}")
return False
logger.info(f"Successfully built {package_name}-{package_version}-{package_release}")
return True
finally:
# Step 5: Clean up container
cleanup_container(container_name)
except Exception as e:
logger.error(f"Unexpected error building {package_name}: {e}")
return False
def cleanup_images(image_pattern: str = "*-builder") -> None:
"""
Clean up Docker images matching a pattern.
Args:
image_pattern: Pattern to match image names
"""
logger = logging.getLogger(__name__)
try:
# List images matching pattern
list_args = ['docker', 'images', '--format', '{{.Repository}}', '--filter', f'reference={image_pattern}']
result = subprocess.run(list_args, capture_output=True, text=True)
if result.returncode == 0 and result.stdout.strip():
images = result.stdout.strip().split('\n')
if images:
remove_args = ['docker', 'rmi'] + images
subprocess.run(remove_args, capture_output=True, text=True)
logger.info(f"Cleaned up {len(images)} Docker images")
except Exception as e:
logger.warning(f"Failed to clean up Docker images: {e}")
# ==================== PACKAGE INFO CLASS ====================
@dataclass
class PackageInfo:
"""Information about a package to build."""
name: str
version: str
release: str
directory: Path
distro: str = 'almalinux/el9'
base_image: str = "git.unkin.net/unkin/almalinux9-rpmbuilder:latest"
def __str__(self):
return f"{self.name}-{self.version}-{self.release} ({self.distro})"
# ==================== BUILDER CLASS ====================
class Builder:
"""Main builder class that orchestrates package building."""
def __init__(self, root_dir: Path):
"""
Initialize the builder.
Args:
root_dir: Root directory of the project
"""
self.root_dir = root_dir
self.rpms_dir = root_dir / "rpms"
self.dist_dir = root_dir / "dist"
self.logger = logging.getLogger(__name__)
# Ensure dist directory exists
self.dist_dir.mkdir(exist_ok=True)
def discover_packages(self, distro: str = 'almalinux/el9') -> List[PackageInfo]:
"""
Discover all packages and their versions from metadata.yaml files.
Args:
distro: Target distro (e.g., 'almalinux/el8', 'almalinux/el9', 'all')
Returns:
List of PackageInfo objects
"""
packages = []
if not self.rpms_dir.exists():
self.logger.error(f"RPMs directory not found: {self.rpms_dir}")
return packages
for package_dir in self.rpms_dir.iterdir():
if not package_dir.is_dir() or package_dir.name.startswith('.'):
continue
metadata_file = package_dir / "metadata.yaml"
if not metadata_file.exists():
self.logger.warning(f"No metadata.yaml found for {package_dir.name}")
continue
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
package_name = metadata.get('name', package_dir.name)
build_configs = metadata.get('builds', [])
if not build_configs:
self.logger.warning(f"No builds in metadata.yaml for {package_name}")
continue
# Handle distro filtering
if distro == 'all':
# Build for all configured distros
for build_config in build_configs:
if isinstance(build_config, dict):
repositories = build_config.get('repository', [])
base_image = build_config.get('image')
version = build_config.get('version')
release = build_config.get('release')
if repositories and base_image and version and release:
# Use the first repository as the distro identifier
build_distro = repositories[0] if repositories else 'unknown'
packages.append(PackageInfo(package_name, version, str(release), package_dir, build_distro, base_image))
else:
# Build for specific distro
for build_config in build_configs:
if isinstance(build_config, dict):
repositories = build_config.get('repository', [])
base_image = build_config.get('image')
version = build_config.get('version')
release = build_config.get('release')
# Check if the target distro matches any repository
if distro in repositories and base_image and version and release:
packages.append(PackageInfo(package_name, version, str(release), package_dir, distro, base_image))
break
else:
# If no matching distro found, log a warning
self.logger.debug(f"No build config for {distro} found for {package_name}")
except Exception as e:
self.logger.error(f"Error reading metadata.yaml for {package_dir.name}: {e}")
continue
return packages
def build_single(
self,
package: str,
version: str,
release: str,
dry_run: bool = False,
force: bool = False,
distro: str = 'almalinux/el9'
) -> bool:
"""
Build a single package.
Args:
package: Package name
version: Package version
release: Package release
dry_run: If True, only show what would be done
force: If True, build even if package exists
distro: Target distro (e.g., 'almalinux/el8', 'almalinux/el9', 'all')
Returns:
True if build succeeded, False otherwise
"""
package_dir = self.rpms_dir / package
if not package_dir.exists():
self.logger.error(f"Package directory not found: {package_dir}")
return False
# Read metadata.yaml to validate version/release
metadata_file = package_dir / "metadata.yaml"
if not metadata_file.exists():
self.logger.error(f"metadata.yaml not found: {metadata_file}")
return False
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
# Find base image and validate version/release for the specified distro
build_configs = metadata.get('builds', [])
base_image = None
found_build = None
if distro == 'all':
# For single package build, 'all' doesn't make sense, default to almalinux/el9
distro = 'almalinux/el9'
for build_config in build_configs:
if isinstance(build_config, dict):
repositories = build_config.get('repository', [])
if distro in repositories:
found_build = build_config
base_image = build_config.get('image')
break
if not found_build or not base_image:
self.logger.error(f"No build configuration found for distro {distro} in {package}")
return False
# Check version and release match
build_version = found_build.get('version')
build_release = found_build.get('release')
if build_version != version:
self.logger.error(
f"Version mismatch for {package} on {distro}: "
f"provided {version} but metadata.yaml has {build_version}"
)
return False
if str(build_release) != str(release):
self.logger.error(
f"Release mismatch for {package} on {distro}: "
f"provided {release} but metadata.yaml has {build_release}"
)
return False
except Exception as e:
self.logger.error(f"Error reading metadata.yaml for {package}: {e}")
return False
package_info = PackageInfo(package, version, release, package_dir, distro, base_image)
return self._build_package(package_info, dry_run, force)
def build_all(self, dry_run: bool = False, force: bool = False, parallel: int = 4, distro: str = 'el/9') -> bool:
"""
Build all packages.
Args:
dry_run: If True, only show what would be done
force: If True, build even if packages exist
parallel: Number of parallel builds
distro: Target distro (e.g., 'almalinux/el8', 'almalinux/el9', 'all')
Returns:
True if all builds succeeded, False otherwise
"""
packages = self.discover_packages(distro)
if not packages:
self.logger.warning("No packages found to build")
return True
self.logger.info(f"Found {len(packages)} packages to process")
if parallel == 1:
return self._build_sequential(packages, dry_run, force)
else:
return self._build_parallel(packages, dry_run, force, parallel)
def _build_sequential(self, packages: List[PackageInfo], dry_run: bool, force: bool) -> bool:
"""Build packages sequentially."""
success_count = 0
for package_info in packages:
if self._build_package(package_info, dry_run, force):
success_count += 1
self.logger.info(f"Built {success_count}/{len(packages)} packages successfully")
return success_count == len(packages)
def _build_parallel(self, packages: List[PackageInfo], dry_run: bool, force: bool, parallel: int) -> bool:
"""Build packages in parallel."""
success_count = 0
with ThreadPoolExecutor(max_workers=parallel) as executor:
# Submit all build tasks
future_to_package = {
executor.submit(self._build_package, pkg, dry_run, force): pkg
for pkg in packages
}
# Process completed builds
for future in as_completed(future_to_package):
package_info = future_to_package[future]
try:
success = future.result()
if success:
success_count += 1
except Exception as e:
self.logger.error(f"Build failed for {package_info}: {e}")
self.logger.info(f"Built {success_count}/{len(packages)} packages successfully")
return success_count == len(packages)
def _build_package(self, package_info: PackageInfo, dry_run: bool, force: bool) -> bool:
"""
Build a single package.
Args:
package_info: Package information
dry_run: If True, only show what would be done
force: If True, build even if package exists
Returns:
True if build succeeded, False otherwise
"""
try:
# Check if package already exists (unless forced)
if not force:
if check_package_exists(
package_info.name,
package_info.version,
package_info.release
):
self.logger.info(
f"Skipping {package_info} (already exists in repository)"
)
return True
# Check Docker is available (unless dry run)
if not dry_run and not check_docker_available():
self.logger.error("Docker is not available or running")
return False
# Build the package
return build_package_docker(
package_dir=package_info.directory,
package_name=package_info.name,
package_version=package_info.version,
package_release=package_info.release,
dist_dir=self.dist_dir,
base_image=package_info.base_image,
dry_run=dry_run
)
except Exception as e:
self.logger.error(f"Failed to build {package_info}: {e}")
return False
def clean_dist(self) -> None:
"""Clean the dist directory."""
if self.dist_dir.exists():
import shutil
shutil.rmtree(self.dist_dir)
self.dist_dir.mkdir()
self.logger.info("Cleaned dist directory")
def check_package_updates(self, package_name: str, dry_run: bool = False) -> bool:
"""
Check for updates for a single package from GitHub releases.
Args:
package_name: Name of the package to check
dry_run: If True, only show what would be done
Returns:
True if package was updated or no update needed, False if error
"""
package_dir = self.rpms_dir / package_name
metadata_file = package_dir / "metadata.yaml"
if not metadata_file.exists():
self.logger.warning(f"No metadata.yaml found for {package_name}")
return False
try:
with open(metadata_file, 'r') as f:
metadata_data = yaml.safe_load(f)
# Convert to dataclass for easier handling
builds = [Build(**build) for build in metadata_data.get('builds', [])]
package_metadata = PackageMetadata(
name=metadata_data.get('name', package_name),
github=metadata_data.get('github', ''),
description=metadata_data.get('description', ''),
arch=metadata_data.get('arch', 'amd64'),
platform=metadata_data.get('platform', 'linux'),
maintainer=metadata_data.get('maintainer', ''),
homepage=metadata_data.get('homepage', ''),
license=metadata_data.get('license', ''),
builds=builds
)
if not package_metadata.github:
self.logger.debug(f"Package {package_name} has no GitHub repo configured")
return True
self.logger.info(f"Checking {package_name} from {package_metadata.github}")
# Get latest release from GitHub
latest_release = get_github_latest_release(package_metadata.github)
if not latest_release:
return False
latest_version = normalize_github_version(latest_release.get('tag_name', ''))
if not latest_version:
self.logger.warning(f"Could not determine latest version for {package_name}")
return False
# Check if any build needs updating
updated = False
for i, build in enumerate(package_metadata.builds):
if compare_versions(build.version, latest_version):
# Determine distro suffix based on repository configuration
distro_suffix = self._get_distro_suffix(build.repository)
new_release = f"1-{distro_suffix}" if distro_suffix else "1"
self.logger.info(f"New version available for {package_name}: {build.version} -> {latest_version}")
if not dry_run:
package_metadata.builds[i].version = latest_version
package_metadata.builds[i].release = new_release
updated = True
else:
self.logger.info(f"[DRY RUN] Would update {package_name} to {latest_version} with release {new_release}")
updated = True
if updated and not dry_run:
# Convert back to dict and save
updated_data = {
'name': package_metadata.name,
'github': package_metadata.github,
'description': package_metadata.description,
'arch': package_metadata.arch,
'platform': package_metadata.platform,
'maintainer': package_metadata.maintainer,
'homepage': package_metadata.homepage,
'license': package_metadata.license,
'builds': [
{
'repository': build.repository,
'image': build.image,
'release': build.release,
'version': build.version
}
for build in package_metadata.builds
]
}
with open(metadata_file, 'w') as f:
yaml.dump(updated_data, f, default_flow_style=False, sort_keys=False)
self.logger.info(f"Successfully updated {package_name} to version {latest_version}")
elif not updated:
self.logger.info(f"Package {package_name} is up to date")
return True
except Exception as e:
self.logger.error(f"Failed to check package {package_name}: {e}")
return False
def upgrade_all(self, dry_run: bool = False) -> bool:
"""
Check for updates for all packages with GitHub repos configured.
Args:
dry_run: If True, only show what would be done
Returns:
True if all packages were processed successfully
"""
if not self.rpms_dir.exists():
self.logger.error(f"RPMs directory not found: {self.rpms_dir}")
return False
github_packages = []
# Find packages with GitHub repos
for package_dir in self.rpms_dir.iterdir():
if not package_dir.is_dir() or package_dir.name.startswith('.'):
continue
metadata_file = package_dir / "metadata.yaml"
if not metadata_file.exists():
continue
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
if metadata.get('github'):
github_packages.append(package_dir.name)
except Exception:
continue
if not github_packages:
self.logger.info("No packages with GitHub repos found")
return True
self.logger.info(f"Found {len(github_packages)} packages with GitHub repos")
success_count = 0
for package_name in github_packages:
if self.check_package_updates(package_name, dry_run):
success_count += 1
self.logger.info(f"Successfully processed {success_count}/{len(github_packages)} packages")
return success_count == len(github_packages)
def _get_distro_suffix(self, repositories: List[str]) -> str:
"""
Determine the distro suffix based on repository configuration.
Args:
repositories: List of repositories (e.g., ['almalinux/el9', 'fedora/f33'])
Returns:
Distro suffix string (e.g., 'el9', 'f33') or empty string if not determinable
"""
if not repositories:
return ""
# Use the first repository and extract the part after the '/'
repo = repositories[0]
if '/' in repo:
return repo.split('/', 1)[1]
return ""
# ==================== TYPER APPLICATION ====================
app = typer.Typer(help="Build RPM packages using Docker", no_args_is_help=True)
def setup_logging(verbose: bool = False):
"""Set up logging configuration."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
@app.command()
def list_packages(
distro: str = typer.Option("almalinux/el9", help="Target distro to list packages for (default: almalinux/el9)")
):
"""List all available packages."""
setup_logging()
root_dir = Path(__file__).parent.parent
builder = Builder(root_dir)
packages = builder.discover_packages(distro)
if not packages:
typer.echo("No packages found")
return
typer.echo("Available packages:")
for package_info in sorted(packages, key=lambda p: (p.name, p.version)):
typer.echo(f" {package_info}")
@app.command()
def build(
package_name: str = typer.Argument(..., help="Package name to build"),
version: Optional[str] = typer.Option(None, help="Package version (read from metadata.yaml if not provided)"),
release: Optional[str] = typer.Option(None, help="Package release number (read from metadata.yaml if not provided)"),
distro: str = typer.Option("almalinux/el9", help="Target distro (default: almalinux/el9)"),
dry_run: bool = typer.Option(False, "--dry-run", help="Show what would be built without building"),
force: bool = typer.Option(False, "--force", help="Build even if package exists in registry"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging")
):
"""Build a specific package."""
setup_logging(verbose)
try:
root_dir = Path(__file__).parent.parent
builder = Builder(root_dir)
# Read version/release from metadata.yaml if not provided
if not version or not release:
package_dir = builder.rpms_dir / package_name
metadata_file = package_dir / "metadata.yaml"
if not metadata_file.exists():
typer.echo(f"Error: metadata.yaml not found for package {package_name}", err=True)
raise typer.Exit(1)
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
# Version and release are now per-build, so we need to find a build for this distro
if not version or not release:
build_configs = metadata.get('builds', [])
# Find the build for the current distro
found_build = None
for build_config in build_configs:
if isinstance(build_config, dict):
repositories = build_config.get('repository', [])
if distro in repositories:
found_build = build_config
break
if not found_build:
typer.echo(f"Error: No build configuration found for {distro} in {package_name}", err=True)
raise typer.Exit(1)
if not version:
version = found_build.get('version')
if not version:
typer.echo(f"Error: No version in build config for {distro} in {package_name}", err=True)
raise typer.Exit(1)
if not release:
release = found_build.get('release')
if not release:
typer.echo(f"Error: No release in build config for {distro} in {package_name}", err=True)
raise typer.Exit(1)
except Exception as e:
typer.echo(f"Error reading metadata.yaml for {package_name}: {e}", err=True)
raise typer.Exit(1)
success = builder.build_single(
package=package_name,
version=version,
release=str(release),
dry_run=dry_run,
force=force,
distro=distro
)
if not success:
raise typer.Exit(1)
except KeyboardInterrupt:
typer.echo("Build interrupted by user")
raise typer.Exit(130)
@app.command("build-all")
def build_all(
dry_run: bool = typer.Option(False, "--dry-run", help="Show what would be built without building"),
force: bool = typer.Option(False, "--force", help="Build even if packages exist in registry"),
parallel: int = typer.Option(4, help="Number of parallel builds"),
distro: str = typer.Option("almalinux/el9", help="Target distro (almalinux/el8, almalinux/el9, or 'all' for all distros)"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging")
):
"""Build all packages."""
setup_logging(verbose)
try:
root_dir = Path(__file__).parent.parent
builder = Builder(root_dir)
success = builder.build_all(
dry_run=dry_run,
force=force,
parallel=parallel,
distro=distro
)
if not success:
raise typer.Exit(1)
except KeyboardInterrupt:
typer.echo("Build interrupted by user")
raise typer.Exit(130)
@app.command()
def verify(
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging"),
fix_mode: bool = typer.Option(False, "--fix", help="Attempt to fix common issues automatically")
):
"""Verify all metadata.yaml files against the schema."""
setup_logging(verbose)
root_dir = Path(__file__).parent.parent
rpms_dir = root_dir / "rpms"
if not rpms_dir.exists():
typer.echo(f"Error: RPMs directory not found: {rpms_dir}", err=True)
raise typer.Exit(1)
validator = Validator(METADATA_SCHEMA)
total_files = 0
valid_files = 0
errors_found = []
typer.echo("🔍 Validating metadata.yaml files...")
typer.echo()
# Find all metadata.yaml files
for package_dir in rpms_dir.iterdir():
if not package_dir.is_dir() or package_dir.name.startswith('.'):
continue
metadata_file = package_dir / "metadata.yaml"
if not metadata_file.exists():
errors_found.append(f"❌ {package_dir.name}: metadata.yaml not found")
total_files += 1
continue
total_files += 1
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
if metadata is None:
errors_found.append(f"❌ {package_dir.name}: Empty or invalid YAML")
continue
# Validate against schema
if validator.validate(metadata):
if verbose:
typer.echo(f"✅ {package_dir.name}: Valid")
valid_files += 1
else:
errors_found.append(f"❌ {package_dir.name}: Schema validation failed")
if verbose:
for field, error in validator.errors.items():
typer.echo(f" └─ {field}: {error}", err=True)
else:
# Show summary of errors
error_summary = []
for field, error in validator.errors.items():
if isinstance(error, list):
error_summary.append(f"{field}: {error[0] if error else 'validation error'}")
else:
error_summary.append(f"{field}: {error}")
errors_found[-1] += f"\n Issues: {'; '.join(error_summary)}"
except yaml.YAMLError as e:
errors_found.append(f"❌ {package_dir.name}: YAML parsing error - {e}")
except Exception as e:
errors_found.append(f"❌ {package_dir.name}: Unexpected error - {e}")
# Print results
typer.echo()
typer.echo("📊 Validation Results:")
typer.echo(f" Total files: {total_files}")
typer.echo(f" Valid files: {valid_files}")
typer.echo(f" Invalid files: {total_files - valid_files}")
if errors_found:
typer.echo()
typer.echo("🚨 Validation Errors:")
for error in errors_found:
typer.echo(error)
if not verbose:
typer.echo()
typer.echo("💡 Run with --verbose for detailed error information")
raise typer.Exit(1)
else:
typer.echo()
typer.echo("🎉 All metadata.yaml files are valid!")
@app.command()
def upgrade(
package_name: Optional[str] = typer.Argument(None, help="Package name to upgrade (optional, upgrades all if not provided)"),
dry_run: bool = typer.Option(False, "--dry-run", help="Show what would be done without making changes"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging")
):
"""Check for package upgrades from GitHub releases and update metadata.yaml files."""
setup_logging(verbose)
try:
root_dir = Path(__file__).parent.parent
builder = Builder(root_dir)
if package_name:
# Upgrade single package
success = builder.check_package_updates(package_name, dry_run)
if not success:
raise typer.Exit(1)
else:
# Upgrade all packages
success = builder.upgrade_all(dry_run)
if not success:
raise typer.Exit(1)
except KeyboardInterrupt:
typer.echo("Upgrade check interrupted by user")
raise typer.Exit(130)
@app.command("upgrade-all")
def upgrade_all(
dry_run: bool = typer.Option(False, "--dry-run", help="Show what would be done without making changes"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging")
):
"""Check for upgrades for all packages with GitHub repos configured."""
setup_logging(verbose)
try:
root_dir = Path(__file__).parent.parent
builder = Builder(root_dir)
success = builder.upgrade_all(dry_run)
if not success:
raise typer.Exit(1)
except KeyboardInterrupt:
typer.echo("Upgrade check interrupted by user")
raise typer.Exit(130)
if __name__ == '__main__':
app()