rpmbuilder/tools/build
Ben Vincent a6cbcb42af
Some checks failed
Build / build-8 (pull_request) Failing after 8s
Build / build-9 (pull_request) Failing after 8s
refactor: modernise RPM builder with Python tooling
- Replace Makefile version/release file system with metadata.yaml only
- Add Python build automation (./tools/build) with Gitea API integration
- Add GitHub release updater (./tools/update-gh) for version management
- Centralize Dockerfiles into single parameterized Dockerfile
- Remove 54+ individual package Dockerfiles and version directories
- Update Makefile to use new Python tooling
- Add GITEA_API_TOKEN validation to prevent duplicate builds
- Support both explicit version/release args and metadata.yaml reading
2025-11-30 03:29:41 +11:00

890 lines
30 KiB
Plaintext
Executable File

#!/usr/bin/env -S uv run --script
# /// script
# dependencies = [
# "requests",
# "pyyaml",
# "hvac"
# ]
# ///
# vim: filetype=python
"""
RPM Builder Tool
A Python replacement for the Makefile-based build system.
Builds RPM packages using Docker and checks for existing packages via Gitea API.
"""
import os
import sys
import argparse
import logging
import subprocess
import requests
from pathlib import Path
from typing import List, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
import hvac
# ==================== VAULT FUNCTIONS ====================
def get_vault_client() -> hvac.Client:
"""
Initialize and authenticate Vault client using AppRole authentication.
Returns:
Authenticated HVAC client
"""
logger = logging.getLogger(__name__)
# Get required environment variables
vault_addr = os.getenv('VAULT_ADDR', 'https://vault.service.consul:8200')
vault_role_id = os.getenv('VAULT_ROLE_ID')
if not vault_role_id:
logger.error("VAULT_ROLE_ID environment variable is required")
raise ValueError("VAULT_ROLE_ID environment variable is required")
# Initialize Vault client with CA certificate
client = hvac.Client(
url=vault_addr,
verify='/etc/pki/tls/cert.pem'
)
# Authenticate using AppRole
try:
logger.debug(f"Authenticating to Vault at {vault_addr}")
auth_response = client.auth.approle.login(role_id=vault_role_id)
if not client.is_authenticated():
logger.error("Failed to authenticate with Vault")
raise Exception("Failed to authenticate with Vault")
logger.debug("Successfully authenticated with Vault")
return client
except Exception as e:
logger.error(f"Vault authentication failed: {e}")
raise
def get_api_tokens() -> Tuple[str, str]:
"""
Retrieve GitHub and Gitea API tokens from Vault.
Returns:
Tuple of (github_token, gitea_token)
Raises:
Exception if Vault authentication fails or tokens cannot be retrieved
"""
logger = logging.getLogger(__name__)
client = get_vault_client()
# Read GitHub token
try:
github_secret = client.secrets.kv.v2.read_secret_version(
path='service/github/neoloc/tokens/read-only-token'
)
github_token = github_secret['data']['data']['token']
logger.debug("Successfully retrieved GitHub token from Vault")
except Exception as e:
logger.error(f"Failed to retrieve GitHub token from Vault: {e}")
raise Exception(f"Failed to retrieve GitHub token from Vault: {e}")
# Read Gitea token
try:
gitea_secret = client.secrets.kv.v2.read_secret_version(
path='service/gitea/unkinben/tokens/read-only-packages'
)
gitea_token = gitea_secret['data']['data']['token']
logger.debug("Successfully retrieved Gitea token from Vault")
except Exception as e:
logger.error(f"Failed to retrieve Gitea token from Vault: {e}")
raise Exception(f"Failed to retrieve Gitea token from Vault: {e}")
if not github_token or not gitea_token:
logger.error("One or both API tokens are empty")
raise Exception("One or both API tokens are empty")
return github_token, gitea_token
# ==================== GITEA API FUNCTIONS ====================
def normalize_version(version: str) -> str:
"""
Normalize version string by removing leading zeros from numeric components.
Gitea automatically does this normalization.
Examples:
"2025.08.03" -> "2025.8.3"
"1.05.0" -> "1.5.0"
"0.6.1" -> "0.6.1" (no change needed)
Args:
version: Original version string
Returns:
Normalized version string
"""
import re
# Split by common separators and normalize each numeric part
parts = re.split(r'([.\-_])', version)
normalized_parts = []
for part in parts:
# If this part is purely numeric and has leading zeros, remove them
if part.isdigit() and len(part) > 1 and part.startswith('0'):
# Remove leading zeros but keep at least one digit
normalized_parts.append(str(int(part)))
else:
normalized_parts.append(part)
return ''.join(normalized_parts)
def check_package_exists(package_name: str, version: str, release: str) -> bool:
"""
Check if a package version exists in the Gitea package registry.
Args:
package_name: Name of the package
version: Version string
release: Release number
Returns:
True if package exists, False otherwise
"""
logger = logging.getLogger(__name__)
# Get configuration from environment
base_url = os.getenv('GITEA_URL', 'https://git.unkin.net')
owner = os.getenv('GITEA_OWNER', 'unkin')
package_type = os.getenv('GITEA_PACKAGE_TYPE', 'rpm')
# Get API tokens from Vault - fail hard if unavailable
try:
_, gitea_token = get_api_tokens()
except Exception as e:
logger.error(f"Failed to retrieve API tokens from Vault: {e}")
raise Exception(f"Cannot check package existence without Gitea API token: {e}")
try:
# Normalize version by removing leading zeros (Gitea does this automatically)
# e.g., "2025.08.03" becomes "2025.8.3"
normalized_version = normalize_version(version)
full_version = f"{normalized_version}-{release}"
url = (
f"{base_url}/api/v1/packages/{owner}/"
f"{package_type}/{package_name}/{full_version}"
)
headers = {'Authorization': f'token {gitea_token}'}
logger.debug(f"Checking package existence: {url}")
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
package_info = response.json()
# Package exists if we get package info back
exists = bool(package_info.get('id'))
logger.debug(f"Package {package_name}:{full_version} {'exists' if exists else 'not found'}")
return exists
elif response.status_code == 404:
logger.debug(f"Package {package_name}:{full_version} not found (404)")
return False
elif response.status_code == 401:
logger.error("Authentication failed. Check GITEA_API_TOKEN.")
return False
else:
logger.warning(
f"Unexpected response checking package {package_name}:{full_version}: "
f"{response.status_code} - {response.text}"
)
return False
except requests.RequestException as e:
logger.error(f"Failed to check package {package_name}:{version}-{release}: {e}")
return False
def get_package_full_name(package_name: str, version: str, release: str) -> str:
"""
Generate the full package name as used in the registry.
Args:
package_name: Package name
version: Version string
release: Release number
Returns:
Full package name string
"""
return f"{package_name}-{version}-{release}"
# ==================== DOCKER FUNCTIONS ====================
def check_docker_available() -> bool:
"""
Check if Docker is available and running.
Returns:
True if Docker is available, False otherwise
"""
try:
result = subprocess.run(
['docker', 'version'],
capture_output=True,
text=True,
timeout=10
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def cleanup_container(container_name: str) -> None:
"""
Remove a Docker container.
Args:
container_name: Name of the container to remove
"""
logger = logging.getLogger(__name__)
try:
remove_args = ['docker', 'rm', container_name]
logger.debug(f"Running: {' '.join(remove_args)}")
subprocess.run(remove_args, capture_output=True, text=True)
except Exception as e:
logger.warning(f"Failed to remove container {container_name}: {e}")
def get_base_image_from_metadata(package_dir: Path, distro: str = "el/9") -> str:
"""
Get the base image from package metadata.yaml.
Args:
package_dir: Directory containing the package
distro: Target distro (default: el/9)
Returns:
Base image URL or default if not found
"""
metadata_file = package_dir / "metadata.yaml"
default_image = "git.unkin.net/unkin/almalinux9-rpmbuilder:latest"
if not metadata_file.exists():
return default_image
try:
import yaml
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
build_configs = metadata.get('build', [])
for config in build_configs:
if config.get('distro') == distro:
return config.get('image', default_image)
# If no matching distro found, return first image or default
if build_configs:
return build_configs[0].get('image', default_image)
return default_image
except Exception:
return default_image
def build_package_docker(
package_dir: Path,
package_name: str,
package_version: str,
package_release: str,
dist_dir: Path,
base_image: str = "git.unkin.net/unkin/almalinux9-rpmbuilder:latest",
dry_run: bool = False
) -> bool:
"""
Build a package using Docker with central Dockerfile.
Args:
package_dir: Directory containing the package resources
package_name: Name of the package
package_version: Package version
package_release: Package release number
dist_dir: Directory to store built packages
base_image: Base Docker image to use for building
dry_run: If True, only show what would be done
Returns:
True if build succeeded, False otherwise
"""
logger = logging.getLogger(__name__)
try:
# Ensure dist directory exists
package_dist_dir = dist_dir / package_name
if not dry_run:
package_dist_dir.mkdir(parents=True, exist_ok=True)
# Generate Docker image name
image_name = f"{package_name.lower()}-builder"
container_name = f"{package_name}-{package_version}-builder"
logger.info(f"Building RPM for {package_name} version {package_version}")
if dry_run:
logger.info(f"[DRY RUN] Would build Docker image: {image_name}")
logger.info(f"[DRY RUN] Would use base image: {base_image}")
logger.info(f"[DRY RUN] Would create container: {container_name}")
logger.info(f"[DRY RUN] Would copy artifacts to: {package_dist_dir}")
return True
# Step 1: Build Docker image using central Dockerfile
central_dockerfile = package_dir.parent.parent / "Dockerfile"
build_args = [
'docker', 'build',
'-f', str(central_dockerfile),
'--build-arg', f'BASE_IMAGE={base_image}',
'--build-arg', f'PACKAGE_VERSION={package_version}',
'--build-arg', f'PACKAGE_RELEASE={package_release}',
'-t', image_name,
str(package_dir)
]
logger.debug(f"Running: {' '.join(build_args)}")
result = subprocess.run(
build_args,
capture_output=True,
text=True,
cwd=package_dir
)
if result.returncode != 0:
logger.error(f"Docker build failed for {package_name}")
logger.error(f"stdout: {result.stdout}")
logger.error(f"stderr: {result.stderr}")
return False
# Step 2: Create and start container
create_args = [
'docker', 'create',
'--name', container_name,
image_name
]
logger.debug(f"Running: {' '.join(create_args)}")
result = subprocess.run(create_args, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Container creation failed for {package_name}")
logger.error(f"stderr: {result.stderr}")
return False
try:
# Step 3: Start container
start_args = ['docker', 'start', '-a', container_name]
logger.debug(f"Running: {' '.join(start_args)}")
result = subprocess.run(start_args, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Container execution failed for {package_name}")
logger.error(f"stdout: {result.stdout}")
logger.error(f"stderr: {result.stderr}")
return False
# Step 4: Copy artifacts
copy_args = [
'docker', 'cp',
f"{container_name}:/app/dist/.",
str(package_dist_dir) + "/"
]
logger.debug(f"Running: {' '.join(copy_args)}")
result = subprocess.run(copy_args, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Failed to copy artifacts for {package_name}")
logger.error(f"stderr: {result.stderr}")
return False
logger.info(f"Successfully built {package_name}-{package_version}-{package_release}")
return True
finally:
# Step 5: Clean up container
cleanup_container(container_name)
except Exception as e:
logger.error(f"Unexpected error building {package_name}: {e}")
return False
def cleanup_images(image_pattern: str = "*-builder") -> None:
"""
Clean up Docker images matching a pattern.
Args:
image_pattern: Pattern to match image names
"""
logger = logging.getLogger(__name__)
try:
# List images matching pattern
list_args = ['docker', 'images', '--format', '{{.Repository}}', '--filter', f'reference={image_pattern}']
result = subprocess.run(list_args, capture_output=True, text=True)
if result.returncode == 0 and result.stdout.strip():
images = result.stdout.strip().split('\n')
if images:
remove_args = ['docker', 'rmi'] + images
subprocess.run(remove_args, capture_output=True, text=True)
logger.info(f"Cleaned up {len(images)} Docker images")
except Exception as e:
logger.warning(f"Failed to clean up Docker images: {e}")
# ==================== PACKAGE INFO CLASS ====================
class PackageInfo:
"""Information about a package to build."""
def __init__(self, name: str, version: str, release: str, directory: Path, distro: str = 'el/9', base_image: str = None):
self.name = name
self.version = version
self.release = release
self.directory = directory
self.distro = distro
self.base_image = base_image or "git.unkin.net/unkin/almalinux9-rpmbuilder:latest"
def __str__(self):
return f"{self.name}-{self.version}-{self.release} ({self.distro})"
# ==================== BUILDER CLASS ====================
class Builder:
"""Main builder class that orchestrates package building."""
def __init__(self, root_dir: Path):
"""
Initialize the builder.
Args:
root_dir: Root directory of the project
"""
self.root_dir = root_dir
self.rpms_dir = root_dir / "rpms"
self.dist_dir = root_dir / "dist"
self.logger = logging.getLogger(__name__)
# Ensure dist directory exists
self.dist_dir.mkdir(exist_ok=True)
def discover_packages(self, distro: str = 'el/9') -> List[PackageInfo]:
"""
Discover all packages and their versions from metadata.yaml files.
Args:
distro: Target distro (e.g., 'el/8', 'el/9', 'all')
Returns:
List of PackageInfo objects
"""
packages = []
if not self.rpms_dir.exists():
self.logger.error(f"RPMs directory not found: {self.rpms_dir}")
return packages
for package_dir in self.rpms_dir.iterdir():
if not package_dir.is_dir() or package_dir.name.startswith('.'):
continue
metadata_file = package_dir / "metadata.yaml"
if not metadata_file.exists():
self.logger.warning(f"No metadata.yaml found for {package_dir.name}")
continue
try:
import yaml
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
package_name = metadata.get('name', package_dir.name)
version = metadata.get('version')
release = metadata.get('release')
build_configs = metadata.get('build', [])
if not version:
self.logger.warning(f"No version in metadata.yaml for {package_name}")
continue
if not release:
self.logger.warning(f"No release in metadata.yaml for {package_name}")
continue
# Handle distro filtering
if distro == 'all':
# Build for all configured distros
for build_config in build_configs:
if isinstance(build_config, dict):
build_distro = build_config.get('distro')
base_image = build_config.get('image')
if build_distro and base_image:
packages.append(PackageInfo(package_name, version, str(release), package_dir, build_distro, base_image))
else:
# Build for specific distro
for build_config in build_configs:
if isinstance(build_config, dict) and build_config.get('distro') == distro:
base_image = build_config.get('image')
if base_image:
packages.append(PackageInfo(package_name, version, str(release), package_dir, distro, base_image))
break
else:
# If no matching distro found, log a warning
self.logger.debug(f"No build config for {distro} found for {package_name}")
except Exception as e:
self.logger.error(f"Error reading metadata.yaml for {package_dir.name}: {e}")
continue
return packages
def build_single(
self,
package: str,
version: str,
release: str,
dry_run: bool = False,
force: bool = False,
distro: str = 'el/9'
) -> bool:
"""
Build a single package.
Args:
package: Package name
version: Package version
release: Package release
dry_run: If True, only show what would be done
force: If True, build even if package exists
distro: Target distro (e.g., 'el/8', 'el/9', 'all')
Returns:
True if build succeeded, False otherwise
"""
package_dir = self.rpms_dir / package
if not package_dir.exists():
self.logger.error(f"Package directory not found: {package_dir}")
return False
# Read metadata.yaml to validate version/release
metadata_file = package_dir / "metadata.yaml"
if not metadata_file.exists():
self.logger.error(f"metadata.yaml not found: {metadata_file}")
return False
try:
import yaml
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
metadata_version = metadata.get('version')
metadata_release = metadata.get('release')
if metadata_version != version:
self.logger.error(
f"Version mismatch for {package}: "
f"provided {version} but metadata.yaml has {metadata_version}"
)
return False
if str(metadata_release) != str(release):
self.logger.error(
f"Release mismatch for {package}: "
f"provided {release} but metadata.yaml has {metadata_release}"
)
return False
# Find base image for the specified distro
build_configs = metadata.get('build', [])
base_image = None
if distro == 'all':
# For single package build, 'all' doesn't make sense, default to el/9
distro = 'el/9'
for build_config in build_configs:
if isinstance(build_config, dict) and build_config.get('distro') == distro:
base_image = build_config.get('image')
break
if not base_image:
self.logger.error(f"No build configuration found for distro {distro} in {package}")
return False
except Exception as e:
self.logger.error(f"Error reading metadata.yaml for {package}: {e}")
return False
package_info = PackageInfo(package, version, release, package_dir, distro, base_image)
return self._build_package(package_info, dry_run, force)
def build_all(self, dry_run: bool = False, force: bool = False, parallel: int = 4, distro: str = 'el/9') -> bool:
"""
Build all packages.
Args:
dry_run: If True, only show what would be done
force: If True, build even if packages exist
parallel: Number of parallel builds
distro: Target distro (e.g., 'el/8', 'el/9', 'all')
Returns:
True if all builds succeeded, False otherwise
"""
packages = self.discover_packages(distro)
if not packages:
self.logger.warning("No packages found to build")
return True
self.logger.info(f"Found {len(packages)} packages to process")
if parallel == 1:
return self._build_sequential(packages, dry_run, force)
else:
return self._build_parallel(packages, dry_run, force, parallel)
def _build_sequential(self, packages: List[PackageInfo], dry_run: bool, force: bool) -> bool:
"""Build packages sequentially."""
success_count = 0
for package_info in packages:
if self._build_package(package_info, dry_run, force):
success_count += 1
self.logger.info(f"Built {success_count}/{len(packages)} packages successfully")
return success_count == len(packages)
def _build_parallel(self, packages: List[PackageInfo], dry_run: bool, force: bool, parallel: int) -> bool:
"""Build packages in parallel."""
success_count = 0
with ThreadPoolExecutor(max_workers=parallel) as executor:
# Submit all build tasks
future_to_package = {
executor.submit(self._build_package, pkg, dry_run, force): pkg
for pkg in packages
}
# Process completed builds
for future in as_completed(future_to_package):
package_info = future_to_package[future]
try:
success = future.result()
if success:
success_count += 1
except Exception as e:
self.logger.error(f"Build failed for {package_info}: {e}")
self.logger.info(f"Built {success_count}/{len(packages)} packages successfully")
return success_count == len(packages)
def _build_package(self, package_info: PackageInfo, dry_run: bool, force: bool) -> bool:
"""
Build a single package.
Args:
package_info: Package information
dry_run: If True, only show what would be done
force: If True, build even if package exists
Returns:
True if build succeeded, False otherwise
"""
try:
# Check if package already exists (unless forced)
if not force:
if check_package_exists(
package_info.name,
package_info.version,
package_info.release
):
self.logger.info(
f"Skipping {package_info} (already exists in repository)"
)
return True
# Check Docker is available (unless dry run)
if not dry_run and not check_docker_available():
self.logger.error("Docker is not available or running")
return False
# Build the package
return build_package_docker(
package_dir=package_info.directory,
package_name=package_info.name,
package_version=package_info.version,
package_release=package_info.release,
dist_dir=self.dist_dir,
base_image=package_info.base_image,
dry_run=dry_run
)
except Exception as e:
self.logger.error(f"Failed to build {package_info}: {e}")
return False
def list_packages(self) -> None:
"""List all available packages."""
packages = self.discover_packages()
if not packages:
print("No packages found")
return
print("Available packages:")
for package_info in sorted(packages, key=lambda p: (p.name, p.version)):
print(f" {package_info}")
def clean_dist(self) -> None:
"""Clean the dist directory."""
if self.dist_dir.exists():
import shutil
shutil.rmtree(self.dist_dir)
self.dist_dir.mkdir()
self.logger.info("Cleaned dist directory")
# ==================== MAIN FUNCTIONS ====================
def setup_logging(verbose=False):
"""Set up logging configuration."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description='Build RPM packages using Docker',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --package consul --version 1.21.1 --release 1
%(prog)s --package consul (uses version/release from metadata.yaml)
%(prog)s --package consul --distro el/8 (build for el/8)
%(prog)s --all (builds all packages for el/9 by default)
%(prog)s --all --distro el/8 (builds all packages for el/8)
%(prog)s --all --distro all (builds all packages for all distros)
%(prog)s --all --dry-run
"""
)
# Package selection arguments
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--package', help='Package name to build')
group.add_argument('--all', action='store_true', help='Build all packages')
# Version and release (optional for single package builds, read from metadata.yaml if not provided)
parser.add_argument('--version', help='Package version (optional, read from metadata.yaml if not provided)')
parser.add_argument('--release', help='Package release number (optional, read from metadata.yaml if not provided)')
# Optional arguments
parser.add_argument('--distro', default='el/9', help='Build for specific distro (default: el/9). Use "all" to build for all distros.')
parser.add_argument('--dry-run', action='store_true', help='Show what would be built without building')
parser.add_argument('--force', action='store_true', help='Build even if package exists in registry')
parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging')
parser.add_argument('--parallel', type=int, default=4, help='Number of parallel builds (default: 4)')
args = parser.parse_args()
# No validation needed - version/release will be read from metadata.yaml if not provided
setup_logging(args.verbose)
try:
# Initialize components
root_dir = Path(__file__).parent.parent
builder = Builder(root_dir)
# Execute build
if args.all:
success = builder.build_all(
dry_run=args.dry_run,
force=args.force,
parallel=args.parallel,
distro=args.distro
)
else:
# Read version/release from metadata.yaml if not provided
version = args.version
release = args.release
if not version or not release:
package_dir = builder.rpms_dir / args.package
metadata_file = package_dir / "metadata.yaml"
if not metadata_file.exists():
logging.error(f"metadata.yaml not found for package {args.package}")
sys.exit(1)
try:
import yaml
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
if not version:
version = metadata.get('version')
if not version:
logging.error(f"No version in metadata.yaml for {args.package}")
sys.exit(1)
if not release:
release = metadata.get('release')
if not release:
logging.error(f"No release in metadata.yaml for {args.package}")
sys.exit(1)
except Exception as e:
logging.error(f"Error reading metadata.yaml for {args.package}: {e}")
sys.exit(1)
success = builder.build_single(
package=args.package,
version=version,
release=str(release),
dry_run=args.dry_run,
force=args.force,
distro=args.distro
)
sys.exit(0 if success else 1)
except KeyboardInterrupt:
logging.info("Build interrupted by user")
sys.exit(130)
except Exception as e:
logging.error(f"Build failed: {e}")
if args.verbose:
logging.exception("Full traceback:")
sys.exit(1)
if __name__ == '__main__':
main()