rpmbuilder/tools/build
Ben Vincent 182641132a
All checks were successful
Build / build-8 (pull_request) Successful in 8s
Build / build-9 (pull_request) Successful in 9s
refactor: modernise RPM builder with Python tooling v2
- Migrate from legacy shell-based build system to modern Python tooling
- Update all metadata.yaml files to new schema with per-distro builds
- Standardise build scripts with curl -L, envsubst, and error handling
- Convert nfpm.yaml templates to use environment variable substitution
- Update Dockerfile to accept all package metadata as build arguments
- Modernise Makefile to use new Python build tool commands
- Update CI workflow to use tools/build instead of make
2025-11-30 20:27:05 +11:00

1127 lines
38 KiB
Plaintext
Executable File

#!/usr/bin/env -S uv run --script
# /// script
# dependencies = [
# "typer",
# "requests",
# "pyyaml",
# "hvac",
# "cerberus"
# ]
# ///
# vim: filetype=python
"""
RPM Builder Tool
A modern Python console application for building RPM packages.
Builds RPM packages using Docker and checks for existing packages via Gitea API.
"""
import os
import sys
import logging
import subprocess
import requests
from pathlib import Path
from typing import List, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import hvac
import typer
import yaml
from cerberus import Validator
# ==================== VALIDATION SCHEMA ====================
# Cerberus schema for metadata.yaml validation based on PackageMetadata and Build dataclasses
METADATA_SCHEMA = {
'name': {
'type': 'string',
'required': True,
'empty': False,
'regex': r'^[a-zA-Z0-9][a-zA-Z0-9\-_]*$'
},
'github': {
'type': 'string',
'required': True,
'empty': False,
'regex': r'^[a-zA-Z0-9\-_]+/[a-zA-Z0-9\-_\.]+$'
},
'description': {
'type': 'string',
'required': True,
'empty': False
},
'arch': {
'type': 'string',
'required': False,
'default': 'amd64',
'allowed': ['amd64', 'arm64', 'x86_64']
},
'platform': {
'type': 'string',
'required': False,
'default': 'linux'
},
'maintainer': {
'type': 'string',
'required': False,
'empty': False
},
'homepage': {
'type': 'string',
'required': False,
'empty': False,
'regex': r'^https?://.+'
},
'license': {
'type': 'string',
'required': False,
'empty': False
},
'builds': {
'type': 'list',
'required': True,
'minlength': 1,
'schema': {
'type': 'dict',
'schema': {
'repository': {
'type': 'list',
'required': True,
'minlength': 1,
'schema': {
'type': 'string',
'allowed': ['almalinux/el8', 'almalinux/el9'],
'empty': False
}
},
'image': {
'type': 'string',
'required': True,
'empty': False,
'regex': r'^[a-zA-Z0-9\-_\.:\/]+$'
},
'release': {
'type': 'string',
'required': True,
'empty': False
},
'version': {
'type': 'string',
'required': True,
'empty': False,
'regex': r'^[0-9]+(\.[0-9]+)*(\.[0-9]+)*(-[a-zA-Z0-9]+)*$'
}
}
}
}
}
# ==================== DATACLASSES ===================="
@dataclass
class Build:
"""Describes each build of a package that should be managed."""
repository: List[str]
image: str
release: str
version: str
@dataclass
class PackageMetadata:
"""Defines data that will be used to fill nfpm.yaml variables and contains build objects."""
name: str
github: str
description: str
arch: str = "amd64"
platform: str = "linux"
maintainer: str = ""
homepage: str = ""
license: str = ""
builds: List[Build] = None
def __post_init__(self):
if self.builds is None:
self.builds = []
# ==================== VAULT FUNCTIONS ====================
def get_vault_client() -> hvac.Client:
"""
Initialize and authenticate Vault client using AppRole authentication.
Returns:
Authenticated HVAC client
"""
logger = logging.getLogger(__name__)
# Get required environment variables
vault_addr = os.getenv('VAULT_ADDR', 'https://vault.service.consul:8200')
vault_role_id = os.getenv('VAULT_ROLE_ID')
if not vault_role_id:
logger.error("VAULT_ROLE_ID environment variable is required")
sys.exit(1)
# Initialize Vault client with CA certificate
client = hvac.Client(
url=vault_addr,
verify='/etc/pki/tls/cert.pem'
)
# Authenticate using AppRole
try:
logger.debug(f"Authenticating to Vault at {vault_addr}")
client.auth.approle.login(role_id=vault_role_id)
if not client.is_authenticated():
logger.error("Failed to authenticate with Vault")
sys.exit(1)
logger.debug("Successfully authenticated with Vault")
return client
except Exception as e:
logger.error(f"Vault authentication failed: {e}")
sys.exit(1)
def get_gitea_token() -> str:
"""
Retrieve Gitea API token from Vault.
Returns:
Gitea API token
Raises:
Exception if Vault authentication fails or token cannot be retrieved
"""
logger = logging.getLogger(__name__)
client = get_vault_client()
try:
gitea_secret = client.secrets.kv.v2.read_secret_version(
raise_on_deleted_version=True,
mount_point='kv',
path='service/gitea/unkinben/tokens/read-only-packages'
)
gitea_token = gitea_secret['data']['data']['token']
logger.debug("Successfully retrieved Gitea token from Vault")
except Exception as e:
logger.error(f"Failed to retrieve Gitea token from Vault: {e}")
sys.exit(1)
if not gitea_token:
logger.error("Gitea token is empty")
sys.exit(1)
return gitea_token
# ==================== GITEA API FUNCTIONS ====================
def normalize_version(version: str) -> str:
"""
Normalize version string by removing leading zeros from numeric components.
Gitea automatically does this normalization.
Examples:
"2025.08.03" -> "2025.8.3"
"1.05.0" -> "1.5.0"
"0.6.1" -> "0.6.1" (no change needed)
Args:
version: Original version string
Returns:
Normalized version string
"""
import re
# Split by common separators and normalize each numeric part
parts = re.split(r'([.\-_])', version)
normalized_parts = []
for part in parts:
# If this part is purely numeric and has leading zeros, remove them
if part.isdigit() and len(part) > 1 and part.startswith('0'):
# Remove leading zeros but keep at least one digit
normalized_parts.append(str(int(part)))
else:
normalized_parts.append(part)
return ''.join(normalized_parts)
def check_package_exists(package_name: str, version: str, release: str) -> bool:
"""
Check if a package version exists in the Gitea package registry.
Args:
package_name: Name of the package
version: Version string
release: Release number
Returns:
True if package exists, False otherwise
"""
logger = logging.getLogger(__name__)
# Get configuration from environment
base_url = os.getenv('GITEA_URL', 'https://git.unkin.net')
owner = os.getenv('GITEA_OWNER', 'unkin')
package_type = os.getenv('GITEA_PACKAGE_TYPE', 'rpm')
# Get Gitea token from Vault - fail hard if unavailable
gitea_token = get_gitea_token()
try:
# Normalize version by removing leading zeros (Gitea does this automatically)
# e.g., "2025.08.03" becomes "2025.8.3"
normalized_version = normalize_version(version)
full_version = f"{normalized_version}-{release}"
url = (
f"{base_url}/api/v1/packages/{owner}/"
f"{package_type}/{package_name}/{full_version}"
)
headers = {'Authorization': f'token {gitea_token}'}
logger.debug(f"Checking package existence: {url}")
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
package_info = response.json()
# Package exists if we get package info back
exists = bool(package_info.get('id'))
logger.debug(f"Package {package_name}:{full_version} {'exists' if exists else 'not found'}")
return exists
elif response.status_code == 404:
logger.debug(f"Package {package_name}:{full_version} not found (404)")
return False
elif response.status_code == 401:
logger.error("Authentication failed. Check GITEA_API_TOKEN.")
return False
else:
logger.warning(
f"Unexpected response checking package {package_name}:{full_version}: "
f"{response.status_code} - {response.text}"
)
return False
except requests.RequestException as e:
logger.error(f"Failed to check package {package_name}:{version}-{release}: {e}")
return False
def get_package_full_name(package_name: str, version: str, release: str) -> str:
"""
Generate the full package name as used in the registry.
Args:
package_name: Package name
version: Version string
release: Release number
Returns:
Full package name string
"""
return f"{package_name}-{version}-{release}"
# ==================== DOCKER FUNCTIONS ====================
def check_docker_available() -> bool:
"""
Check if Docker is available and running.
Returns:
True if Docker is available, False otherwise
"""
try:
result = subprocess.run(
['docker', 'version'],
capture_output=True,
text=True,
timeout=10
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def cleanup_container(container_name: str) -> None:
"""
Remove a Docker container.
Args:
container_name: Name of the container to remove
"""
logger = logging.getLogger(__name__)
try:
remove_args = ['docker', 'rm', container_name]
logger.debug(f"Running: {' '.join(remove_args)}")
subprocess.run(remove_args, capture_output=True, text=True)
except Exception as e:
logger.warning(f"Failed to remove container {container_name}: {e}")
def get_base_image_from_metadata(package_dir: Path, distro: str = "el/9") -> str:
"""
Get the base image from package metadata.yaml.
Args:
package_dir: Directory containing the package
distro: Target distro (default: el/9)
Returns:
Base image URL or default if not found
"""
metadata_file = package_dir / "metadata.yaml"
default_image = "git.unkin.net/unkin/almalinux9-rpmbuilder:latest"
if not metadata_file.exists():
return default_image
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
build_configs = metadata.get('builds', [])
for config in build_configs:
if config.get('distro') == distro:
return config.get('image', default_image)
# If no matching distro found, return first image or default
if build_configs:
return build_configs[0].get('image', default_image)
return default_image
except Exception:
return default_image
def build_package_docker(
package_dir: Path,
package_name: str,
package_version: str,
package_release: str,
dist_dir: Path,
base_image: str = "git.unkin.net/unkin/almalinux9-rpmbuilder:latest",
dry_run: bool = False
) -> bool:
"""
Build a package using Docker with central Dockerfile.
Args:
package_dir: Directory containing the package resources
package_name: Name of the package
package_version: Package version
package_release: Package release number
dist_dir: Directory to store built packages
base_image: Base Docker image to use for building
dry_run: If True, only show what would be done
Returns:
True if build succeeded, False otherwise
"""
logger = logging.getLogger(__name__)
try:
# Ensure dist directory exists
package_dist_dir = dist_dir / package_name
if not dry_run:
package_dist_dir.mkdir(parents=True, exist_ok=True)
# Generate Docker image name
image_name = f"{package_name.lower()}-builder"
container_name = f"{package_name}-{package_version}-builder"
# Read metadata.yaml to get all package fields
metadata_file = package_dir / "metadata.yaml"
metadata = {}
if metadata_file.exists():
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f) or {}
except Exception as e:
logger.warning(f"Could not read metadata.yaml: {e}")
logger.info(f"Building RPM for {package_name} version {package_version}")
if dry_run:
logger.info(f"[DRY RUN] Would build Docker image: {image_name}")
logger.info(f"[DRY RUN] Would use base image: {base_image}")
logger.info(f"[DRY RUN] Would pass build arguments:")
logger.info(f"[DRY RUN] PACKAGE_NAME={package_name}")
logger.info(f"[DRY RUN] PACKAGE_VERSION={package_version}")
logger.info(f"[DRY RUN] PACKAGE_RELEASE={package_release}")
logger.info(f"[DRY RUN] PACKAGE_DESCRIPTION={metadata.get('description', '')}")
logger.info(f"[DRY RUN] PACKAGE_MAINTAINER={metadata.get('maintainer', '')}")
logger.info(f"[DRY RUN] PACKAGE_HOMEPAGE={metadata.get('homepage', '')}")
logger.info(f"[DRY RUN] PACKAGE_LICENSE={metadata.get('license', '')}")
logger.info(f"[DRY RUN] PACKAGE_ARCH={metadata.get('arch', 'amd64')}")
logger.info(f"[DRY RUN] PACKAGE_PLATFORM={metadata.get('platform', 'linux')}")
logger.info(f"[DRY RUN] Would create container: {container_name}")
logger.info(f"[DRY RUN] Would copy artifacts to: {package_dist_dir}")
return True
# Build Docker image using central Dockerfile with all metadata
central_dockerfile = package_dir.parent.parent / "Dockerfile"
build_args = [
'docker', 'build',
'-f', str(central_dockerfile),
'--build-arg', f'BASE_IMAGE={base_image}',
'--build-arg', f'PACKAGE_NAME={package_name}',
'--build-arg', f'PACKAGE_VERSION={package_version}',
'--build-arg', f'PACKAGE_RELEASE={package_release}',
'--build-arg', f'PACKAGE_DESCRIPTION={metadata.get("description", "")}',
'--build-arg', f'PACKAGE_MAINTAINER={metadata.get("maintainer", "")}',
'--build-arg', f'PACKAGE_HOMEPAGE={metadata.get("homepage", "")}',
'--build-arg', f'PACKAGE_LICENSE={metadata.get("license", "")}',
'--build-arg', f'PACKAGE_ARCH={metadata.get("arch", "amd64")}',
'--build-arg', f'PACKAGE_PLATFORM={metadata.get("platform", "linux")}',
'-t', image_name,
str(package_dir)
]
logger.debug(f"Running: {' '.join(build_args)}")
result = subprocess.run(
build_args,
capture_output=True,
text=True,
cwd=package_dir
)
if result.returncode != 0:
logger.error(f"Docker build failed for {package_name}")
logger.error(f"stdout: {result.stdout}")
logger.error(f"stderr: {result.stderr}")
return False
# Step 2: Create and start container
create_args = [
'docker', 'create',
'--name', container_name,
image_name
]
logger.debug(f"Running: {' '.join(create_args)}")
result = subprocess.run(create_args, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Container creation failed for {package_name}")
logger.error(f"stderr: {result.stderr}")
return False
try:
# Step 3: Start container
start_args = ['docker', 'start', '-a', container_name]
logger.debug(f"Running: {' '.join(start_args)}")
result = subprocess.run(start_args, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Container execution failed for {package_name}")
logger.error(f"stdout: {result.stdout}")
logger.error(f"stderr: {result.stderr}")
return False
# Step 4: Copy artifacts
copy_args = [
'docker', 'cp',
f"{container_name}:/app/dist/.",
str(package_dist_dir) + "/"
]
logger.debug(f"Running: {' '.join(copy_args)}")
result = subprocess.run(copy_args, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Failed to copy artifacts for {package_name}")
logger.error(f"stderr: {result.stderr}")
return False
logger.info(f"Successfully built {package_name}-{package_version}-{package_release}")
return True
finally:
# Step 5: Clean up container
cleanup_container(container_name)
except Exception as e:
logger.error(f"Unexpected error building {package_name}: {e}")
return False
def cleanup_images(image_pattern: str = "*-builder") -> None:
"""
Clean up Docker images matching a pattern.
Args:
image_pattern: Pattern to match image names
"""
logger = logging.getLogger(__name__)
try:
# List images matching pattern
list_args = ['docker', 'images', '--format', '{{.Repository}}', '--filter', f'reference={image_pattern}']
result = subprocess.run(list_args, capture_output=True, text=True)
if result.returncode == 0 and result.stdout.strip():
images = result.stdout.strip().split('\n')
if images:
remove_args = ['docker', 'rmi'] + images
subprocess.run(remove_args, capture_output=True, text=True)
logger.info(f"Cleaned up {len(images)} Docker images")
except Exception as e:
logger.warning(f"Failed to clean up Docker images: {e}")
# ==================== PACKAGE INFO CLASS ====================
@dataclass
class PackageInfo:
"""Information about a package to build."""
name: str
version: str
release: str
directory: Path
distro: str = 'almalinux/el9'
base_image: str = "git.unkin.net/unkin/almalinux9-rpmbuilder:latest"
def __str__(self):
return f"{self.name}-{self.version}-{self.release} ({self.distro})"
# ==================== BUILDER CLASS ====================
class Builder:
"""Main builder class that orchestrates package building."""
def __init__(self, root_dir: Path):
"""
Initialize the builder.
Args:
root_dir: Root directory of the project
"""
self.root_dir = root_dir
self.rpms_dir = root_dir / "rpms"
self.dist_dir = root_dir / "dist"
self.logger = logging.getLogger(__name__)
# Ensure dist directory exists
self.dist_dir.mkdir(exist_ok=True)
def discover_packages(self, distro: str = 'almalinux/el9') -> List[PackageInfo]:
"""
Discover all packages and their versions from metadata.yaml files.
Args:
distro: Target distro (e.g., 'almalinux/el8', 'almalinux/el9', 'all')
Returns:
List of PackageInfo objects
"""
packages = []
if not self.rpms_dir.exists():
self.logger.error(f"RPMs directory not found: {self.rpms_dir}")
return packages
for package_dir in self.rpms_dir.iterdir():
if not package_dir.is_dir() or package_dir.name.startswith('.'):
continue
metadata_file = package_dir / "metadata.yaml"
if not metadata_file.exists():
self.logger.warning(f"No metadata.yaml found for {package_dir.name}")
continue
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
package_name = metadata.get('name', package_dir.name)
build_configs = metadata.get('builds', [])
if not build_configs:
self.logger.warning(f"No builds in metadata.yaml for {package_name}")
continue
# Handle distro filtering
if distro == 'all':
# Build for all configured distros
for build_config in build_configs:
if isinstance(build_config, dict):
repositories = build_config.get('repository', [])
base_image = build_config.get('image')
version = build_config.get('version')
release = build_config.get('release')
if repositories and base_image and version and release:
# Use the first repository as the distro identifier
build_distro = repositories[0] if repositories else 'unknown'
packages.append(PackageInfo(package_name, version, str(release), package_dir, build_distro, base_image))
else:
# Build for specific distro
for build_config in build_configs:
if isinstance(build_config, dict):
repositories = build_config.get('repository', [])
base_image = build_config.get('image')
version = build_config.get('version')
release = build_config.get('release')
# Check if the target distro matches any repository
if distro in repositories and base_image and version and release:
packages.append(PackageInfo(package_name, version, str(release), package_dir, distro, base_image))
break
else:
# If no matching distro found, log a warning
self.logger.debug(f"No build config for {distro} found for {package_name}")
except Exception as e:
self.logger.error(f"Error reading metadata.yaml for {package_dir.name}: {e}")
continue
return packages
def build_single(
self,
package: str,
version: str,
release: str,
dry_run: bool = False,
force: bool = False,
distro: str = 'almalinux/el9'
) -> bool:
"""
Build a single package.
Args:
package: Package name
version: Package version
release: Package release
dry_run: If True, only show what would be done
force: If True, build even if package exists
distro: Target distro (e.g., 'almalinux/el8', 'almalinux/el9', 'all')
Returns:
True if build succeeded, False otherwise
"""
package_dir = self.rpms_dir / package
if not package_dir.exists():
self.logger.error(f"Package directory not found: {package_dir}")
return False
# Read metadata.yaml to validate version/release
metadata_file = package_dir / "metadata.yaml"
if not metadata_file.exists():
self.logger.error(f"metadata.yaml not found: {metadata_file}")
return False
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
# Find base image and validate version/release for the specified distro
build_configs = metadata.get('builds', [])
base_image = None
found_build = None
if distro == 'all':
# For single package build, 'all' doesn't make sense, default to almalinux/el9
distro = 'almalinux/el9'
for build_config in build_configs:
if isinstance(build_config, dict):
repositories = build_config.get('repository', [])
if distro in repositories:
found_build = build_config
base_image = build_config.get('image')
break
if not found_build or not base_image:
self.logger.error(f"No build configuration found for distro {distro} in {package}")
return False
# Check version and release match
build_version = found_build.get('version')
build_release = found_build.get('release')
if build_version != version:
self.logger.error(
f"Version mismatch for {package} on {distro}: "
f"provided {version} but metadata.yaml has {build_version}"
)
return False
if str(build_release) != str(release):
self.logger.error(
f"Release mismatch for {package} on {distro}: "
f"provided {release} but metadata.yaml has {build_release}"
)
return False
except Exception as e:
self.logger.error(f"Error reading metadata.yaml for {package}: {e}")
return False
package_info = PackageInfo(package, version, release, package_dir, distro, base_image)
return self._build_package(package_info, dry_run, force)
def build_all(self, dry_run: bool = False, force: bool = False, parallel: int = 4, distro: str = 'el/9') -> bool:
"""
Build all packages.
Args:
dry_run: If True, only show what would be done
force: If True, build even if packages exist
parallel: Number of parallel builds
distro: Target distro (e.g., 'almalinux/el8', 'almalinux/el9', 'all')
Returns:
True if all builds succeeded, False otherwise
"""
packages = self.discover_packages(distro)
if not packages:
self.logger.warning("No packages found to build")
return True
self.logger.info(f"Found {len(packages)} packages to process")
if parallel == 1:
return self._build_sequential(packages, dry_run, force)
else:
return self._build_parallel(packages, dry_run, force, parallel)
def _build_sequential(self, packages: List[PackageInfo], dry_run: bool, force: bool) -> bool:
"""Build packages sequentially."""
success_count = 0
for package_info in packages:
if self._build_package(package_info, dry_run, force):
success_count += 1
self.logger.info(f"Built {success_count}/{len(packages)} packages successfully")
return success_count == len(packages)
def _build_parallel(self, packages: List[PackageInfo], dry_run: bool, force: bool, parallel: int) -> bool:
"""Build packages in parallel."""
success_count = 0
with ThreadPoolExecutor(max_workers=parallel) as executor:
# Submit all build tasks
future_to_package = {
executor.submit(self._build_package, pkg, dry_run, force): pkg
for pkg in packages
}
# Process completed builds
for future in as_completed(future_to_package):
package_info = future_to_package[future]
try:
success = future.result()
if success:
success_count += 1
except Exception as e:
self.logger.error(f"Build failed for {package_info}: {e}")
self.logger.info(f"Built {success_count}/{len(packages)} packages successfully")
return success_count == len(packages)
def _build_package(self, package_info: PackageInfo, dry_run: bool, force: bool) -> bool:
"""
Build a single package.
Args:
package_info: Package information
dry_run: If True, only show what would be done
force: If True, build even if package exists
Returns:
True if build succeeded, False otherwise
"""
try:
# Check if package already exists (unless forced)
if not force:
if check_package_exists(
package_info.name,
package_info.version,
package_info.release
):
self.logger.info(
f"Skipping {package_info} (already exists in repository)"
)
return True
# Check Docker is available (unless dry run)
if not dry_run and not check_docker_available():
self.logger.error("Docker is not available or running")
return False
# Build the package
return build_package_docker(
package_dir=package_info.directory,
package_name=package_info.name,
package_version=package_info.version,
package_release=package_info.release,
dist_dir=self.dist_dir,
base_image=package_info.base_image,
dry_run=dry_run
)
except Exception as e:
self.logger.error(f"Failed to build {package_info}: {e}")
return False
def clean_dist(self) -> None:
"""Clean the dist directory."""
if self.dist_dir.exists():
import shutil
shutil.rmtree(self.dist_dir)
self.dist_dir.mkdir()
self.logger.info("Cleaned dist directory")
# ==================== TYPER APPLICATION ====================
app = typer.Typer(help="Build RPM packages using Docker", no_args_is_help=True)
def setup_logging(verbose: bool = False):
"""Set up logging configuration."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
@app.command()
def list_packages(
distro: str = typer.Option("almalinux/el9", help="Target distro to list packages for (default: almalinux/el9)")
):
"""List all available packages."""
setup_logging()
root_dir = Path(__file__).parent.parent
builder = Builder(root_dir)
packages = builder.discover_packages(distro)
if not packages:
typer.echo("No packages found")
return
typer.echo("Available packages:")
for package_info in sorted(packages, key=lambda p: (p.name, p.version)):
typer.echo(f" {package_info}")
@app.command()
def build(
package_name: str = typer.Argument(..., help="Package name to build"),
version: Optional[str] = typer.Option(None, help="Package version (read from metadata.yaml if not provided)"),
release: Optional[str] = typer.Option(None, help="Package release number (read from metadata.yaml if not provided)"),
distro: str = typer.Option("almalinux/el9", help="Target distro (default: almalinux/el9)"),
dry_run: bool = typer.Option(False, "--dry-run", help="Show what would be built without building"),
force: bool = typer.Option(False, "--force", help="Build even if package exists in registry"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging")
):
"""Build a specific package."""
setup_logging(verbose)
try:
root_dir = Path(__file__).parent.parent
builder = Builder(root_dir)
# Read version/release from metadata.yaml if not provided
if not version or not release:
package_dir = builder.rpms_dir / package_name
metadata_file = package_dir / "metadata.yaml"
if not metadata_file.exists():
typer.echo(f"Error: metadata.yaml not found for package {package_name}", err=True)
raise typer.Exit(1)
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
# Version and release are now per-build, so we need to find a build for this distro
if not version or not release:
build_configs = metadata.get('builds', [])
# Find the build for the current distro
found_build = None
for build_config in build_configs:
if isinstance(build_config, dict):
repositories = build_config.get('repository', [])
if distro in repositories:
found_build = build_config
break
if not found_build:
typer.echo(f"Error: No build configuration found for {distro} in {package_name}", err=True)
raise typer.Exit(1)
if not version:
version = found_build.get('version')
if not version:
typer.echo(f"Error: No version in build config for {distro} in {package_name}", err=True)
raise typer.Exit(1)
if not release:
release = found_build.get('release')
if not release:
typer.echo(f"Error: No release in build config for {distro} in {package_name}", err=True)
raise typer.Exit(1)
except Exception as e:
typer.echo(f"Error reading metadata.yaml for {package_name}: {e}", err=True)
raise typer.Exit(1)
success = builder.build_single(
package=package_name,
version=version,
release=str(release),
dry_run=dry_run,
force=force,
distro=distro
)
if not success:
raise typer.Exit(1)
except KeyboardInterrupt:
typer.echo("Build interrupted by user")
raise typer.Exit(130)
@app.command("build-all")
def build_all(
dry_run: bool = typer.Option(False, "--dry-run", help="Show what would be built without building"),
force: bool = typer.Option(False, "--force", help="Build even if packages exist in registry"),
parallel: int = typer.Option(4, help="Number of parallel builds"),
distro: str = typer.Option("almalinux/el9", help="Target distro (almalinux/el8, almalinux/el9, or 'all' for all distros)"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging")
):
"""Build all packages."""
setup_logging(verbose)
try:
root_dir = Path(__file__).parent.parent
builder = Builder(root_dir)
success = builder.build_all(
dry_run=dry_run,
force=force,
parallel=parallel,
distro=distro
)
if not success:
raise typer.Exit(1)
except KeyboardInterrupt:
typer.echo("Build interrupted by user")
raise typer.Exit(130)
@app.command()
def verify(
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging"),
fix_mode: bool = typer.Option(False, "--fix", help="Attempt to fix common issues automatically")
):
"""Verify all metadata.yaml files against the schema."""
setup_logging(verbose)
root_dir = Path(__file__).parent.parent
rpms_dir = root_dir / "rpms"
if not rpms_dir.exists():
typer.echo(f"Error: RPMs directory not found: {rpms_dir}", err=True)
raise typer.Exit(1)
validator = Validator(METADATA_SCHEMA)
total_files = 0
valid_files = 0
errors_found = []
typer.echo("🔍 Validating metadata.yaml files...")
typer.echo()
# Find all metadata.yaml files
for package_dir in rpms_dir.iterdir():
if not package_dir.is_dir() or package_dir.name.startswith('.'):
continue
metadata_file = package_dir / "metadata.yaml"
if not metadata_file.exists():
errors_found.append(f"❌ {package_dir.name}: metadata.yaml not found")
total_files += 1
continue
total_files += 1
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
if metadata is None:
errors_found.append(f"❌ {package_dir.name}: Empty or invalid YAML")
continue
# Validate against schema
if validator.validate(metadata):
if verbose:
typer.echo(f"✅ {package_dir.name}: Valid")
valid_files += 1
else:
errors_found.append(f"❌ {package_dir.name}: Schema validation failed")
if verbose:
for field, error in validator.errors.items():
typer.echo(f" └─ {field}: {error}", err=True)
else:
# Show summary of errors
error_summary = []
for field, error in validator.errors.items():
if isinstance(error, list):
error_summary.append(f"{field}: {error[0] if error else 'validation error'}")
else:
error_summary.append(f"{field}: {error}")
errors_found[-1] += f"\n Issues: {'; '.join(error_summary)}"
except yaml.YAMLError as e:
errors_found.append(f"❌ {package_dir.name}: YAML parsing error - {e}")
except Exception as e:
errors_found.append(f"❌ {package_dir.name}: Unexpected error - {e}")
# Print results
typer.echo()
typer.echo("📊 Validation Results:")
typer.echo(f" Total files: {total_files}")
typer.echo(f" Valid files: {valid_files}")
typer.echo(f" Invalid files: {total_files - valid_files}")
if errors_found:
typer.echo()
typer.echo("🚨 Validation Errors:")
for error in errors_found:
typer.echo(error)
if not verbose:
typer.echo()
typer.echo("💡 Run with --verbose for detailed error information")
raise typer.Exit(1)
else:
typer.echo()
typer.echo("🎉 All metadata.yaml files are valid!")
if __name__ == '__main__':
app()