refactor: modernise RPM builder with Python tooling v2
Build / build-8 (pull_request) Successful in 8s
Build / build-9 (pull_request) Successful in 9s

- Migrate from legacy shell-based build system to modern Python tooling
- Update all metadata.yaml files to new schema with per-distro builds
- Standardise build scripts with curl -L, envsubst, and error handling
- Convert nfpm.yaml templates to use environment variable substitution
- Update Dockerfile to accept all package metadata as build arguments
- Modernise Makefile to use new Python build tool commands
- Update CI workflow to use tools/build instead of make
This commit is contained in:
2025-11-30 20:27:05 +11:00
parent b3ba980f9f
commit 182641132a
160 changed files with 2013 additions and 1089 deletions
+425 -190
View File
@@ -1,9 +1,11 @@
#!/usr/bin/env -S uv run --script
# /// script
# dependencies = [
# "typer",
# "requests",
# "pyyaml",
# "hvac"
# "hvac",
# "cerberus"
# ]
# ///
@@ -12,22 +14,139 @@
"""
RPM Builder Tool
A Python replacement for the Makefile-based build system.
A modern Python console application for building RPM packages.
Builds RPM packages using Docker and checks for existing packages via Gitea API.
"""
import os
import sys
import argparse
import logging
import subprocess
import requests
from pathlib import Path
from typing import List, Tuple
from typing import List, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import hvac
import typer
import yaml
from cerberus import Validator
# ==================== VALIDATION SCHEMA ====================
# Cerberus schema for metadata.yaml validation based on PackageMetadata and Build dataclasses
METADATA_SCHEMA = {
'name': {
'type': 'string',
'required': True,
'empty': False,
'regex': r'^[a-zA-Z0-9][a-zA-Z0-9\-_]*$'
},
'github': {
'type': 'string',
'required': True,
'empty': False,
'regex': r'^[a-zA-Z0-9\-_]+/[a-zA-Z0-9\-_\.]+$'
},
'description': {
'type': 'string',
'required': True,
'empty': False
},
'arch': {
'type': 'string',
'required': False,
'default': 'amd64',
'allowed': ['amd64', 'arm64', 'x86_64']
},
'platform': {
'type': 'string',
'required': False,
'default': 'linux'
},
'maintainer': {
'type': 'string',
'required': False,
'empty': False
},
'homepage': {
'type': 'string',
'required': False,
'empty': False,
'regex': r'^https?://.+'
},
'license': {
'type': 'string',
'required': False,
'empty': False
},
'builds': {
'type': 'list',
'required': True,
'minlength': 1,
'schema': {
'type': 'dict',
'schema': {
'repository': {
'type': 'list',
'required': True,
'minlength': 1,
'schema': {
'type': 'string',
'allowed': ['almalinux/el8', 'almalinux/el9'],
'empty': False
}
},
'image': {
'type': 'string',
'required': True,
'empty': False,
'regex': r'^[a-zA-Z0-9\-_\.:\/]+$'
},
'release': {
'type': 'string',
'required': True,
'empty': False
},
'version': {
'type': 'string',
'required': True,
'empty': False,
'regex': r'^[0-9]+(\.[0-9]+)*(\.[0-9]+)*(-[a-zA-Z0-9]+)*$'
}
}
}
}
}
# ==================== DATACLASSES ===================="
@dataclass
class Build:
"""Describes each build of a package that should be managed."""
repository: List[str]
image: str
release: str
version: str
@dataclass
class PackageMetadata:
"""Defines data that will be used to fill nfpm.yaml variables and contains build objects."""
name: str
github: str
description: str
arch: str = "amd64"
platform: str = "linux"
maintainer: str = ""
homepage: str = ""
license: str = ""
builds: List[Build] = None
def __post_init__(self):
if self.builds is None:
self.builds = []
# ==================== VAULT FUNCTIONS ====================
def get_vault_client() -> hvac.Client:
@@ -45,7 +164,7 @@ def get_vault_client() -> hvac.Client:
if not vault_role_id:
logger.error("VAULT_ROLE_ID environment variable is required")
raise ValueError("VAULT_ROLE_ID environment variable is required")
sys.exit(1)
# Initialize Vault client with CA certificate
client = hvac.Client(
@@ -56,49 +175,37 @@ def get_vault_client() -> hvac.Client:
# Authenticate using AppRole
try:
logger.debug(f"Authenticating to Vault at {vault_addr}")
auth_response = client.auth.approle.login(role_id=vault_role_id)
client.auth.approle.login(role_id=vault_role_id)
if not client.is_authenticated():
logger.error("Failed to authenticate with Vault")
raise Exception("Failed to authenticate with Vault")
sys.exit(1)
logger.debug("Successfully authenticated with Vault")
return client
except Exception as e:
logger.error(f"Vault authentication failed: {e}")
raise
sys.exit(1)
def get_api_tokens() -> Tuple[str, str]:
def get_gitea_token() -> str:
"""
Retrieve GitHub and Gitea API tokens from Vault.
Retrieve Gitea API token from Vault.
Returns:
Tuple of (github_token, gitea_token)
Gitea API token
Raises:
Exception if Vault authentication fails or tokens cannot be retrieved
Exception if Vault authentication fails or token cannot be retrieved
"""
logger = logging.getLogger(__name__)
client = get_vault_client()
# Read GitHub token
try:
github_secret = client.secrets.kv.v2.read_secret_version(
mount_point='kv',
path='service/github/neoloc/tokens/read-only-token'
)
github_token = github_secret['data']['data']['token']
logger.debug("Successfully retrieved GitHub token from Vault")
except Exception as e:
logger.error(f"Failed to retrieve GitHub token from Vault: {e}")
raise Exception(f"Failed to retrieve GitHub token from Vault: {e}")
# Read Gitea token
try:
gitea_secret = client.secrets.kv.v2.read_secret_version(
raise_on_deleted_version=True,
mount_point='kv',
path='service/gitea/unkinben/tokens/read-only-packages'
)
@@ -106,13 +213,13 @@ def get_api_tokens() -> Tuple[str, str]:
logger.debug("Successfully retrieved Gitea token from Vault")
except Exception as e:
logger.error(f"Failed to retrieve Gitea token from Vault: {e}")
raise Exception(f"Failed to retrieve Gitea token from Vault: {e}")
sys.exit(1)
if not github_token or not gitea_token:
logger.error("One or both API tokens are empty")
raise Exception("One or both API tokens are empty")
if not gitea_token:
logger.error("Gitea token is empty")
sys.exit(1)
return github_token, gitea_token
return gitea_token
# ==================== GITEA API FUNCTIONS ====================
@@ -169,12 +276,8 @@ def check_package_exists(package_name: str, version: str, release: str) -> bool:
owner = os.getenv('GITEA_OWNER', 'unkin')
package_type = os.getenv('GITEA_PACKAGE_TYPE', 'rpm')
# Get API tokens from Vault - fail hard if unavailable
try:
_, gitea_token = get_api_tokens()
except Exception as e:
logger.error(f"Failed to retrieve API tokens from Vault: {e}")
raise Exception(f"Cannot check package existence without Gitea API token: {e}")
# Get Gitea token from Vault - fail hard if unavailable
gitea_token = get_gitea_token()
try:
# Normalize version by removing leading zeros (Gitea does this automatically)
@@ -285,11 +388,10 @@ def get_base_image_from_metadata(package_dir: Path, distro: str = "el/9") -> str
return default_image
try:
import yaml
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
build_configs = metadata.get('build', [])
build_configs = metadata.get('builds', [])
for config in build_configs:
if config.get('distro') == distro:
return config.get('image', default_image)
@@ -340,23 +442,50 @@ def build_package_docker(
image_name = f"{package_name.lower()}-builder"
container_name = f"{package_name}-{package_version}-builder"
# Read metadata.yaml to get all package fields
metadata_file = package_dir / "metadata.yaml"
metadata = {}
if metadata_file.exists():
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f) or {}
except Exception as e:
logger.warning(f"Could not read metadata.yaml: {e}")
logger.info(f"Building RPM for {package_name} version {package_version}")
if dry_run:
logger.info(f"[DRY RUN] Would build Docker image: {image_name}")
logger.info(f"[DRY RUN] Would use base image: {base_image}")
logger.info(f"[DRY RUN] Would pass build arguments:")
logger.info(f"[DRY RUN] PACKAGE_NAME={package_name}")
logger.info(f"[DRY RUN] PACKAGE_VERSION={package_version}")
logger.info(f"[DRY RUN] PACKAGE_RELEASE={package_release}")
logger.info(f"[DRY RUN] PACKAGE_DESCRIPTION={metadata.get('description', '')}")
logger.info(f"[DRY RUN] PACKAGE_MAINTAINER={metadata.get('maintainer', '')}")
logger.info(f"[DRY RUN] PACKAGE_HOMEPAGE={metadata.get('homepage', '')}")
logger.info(f"[DRY RUN] PACKAGE_LICENSE={metadata.get('license', '')}")
logger.info(f"[DRY RUN] PACKAGE_ARCH={metadata.get('arch', 'amd64')}")
logger.info(f"[DRY RUN] PACKAGE_PLATFORM={metadata.get('platform', 'linux')}")
logger.info(f"[DRY RUN] Would create container: {container_name}")
logger.info(f"[DRY RUN] Would copy artifacts to: {package_dist_dir}")
return True
# Step 1: Build Docker image using central Dockerfile
# Build Docker image using central Dockerfile with all metadata
central_dockerfile = package_dir.parent.parent / "Dockerfile"
build_args = [
'docker', 'build',
'-f', str(central_dockerfile),
'--build-arg', f'BASE_IMAGE={base_image}',
'--build-arg', f'PACKAGE_NAME={package_name}',
'--build-arg', f'PACKAGE_VERSION={package_version}',
'--build-arg', f'PACKAGE_RELEASE={package_release}',
'--build-arg', f'PACKAGE_DESCRIPTION={metadata.get("description", "")}',
'--build-arg', f'PACKAGE_MAINTAINER={metadata.get("maintainer", "")}',
'--build-arg', f'PACKAGE_HOMEPAGE={metadata.get("homepage", "")}',
'--build-arg', f'PACKAGE_LICENSE={metadata.get("license", "")}',
'--build-arg', f'PACKAGE_ARCH={metadata.get("arch", "amd64")}',
'--build-arg', f'PACKAGE_PLATFORM={metadata.get("platform", "linux")}',
'-t', image_name,
str(package_dir)
]
@@ -454,16 +583,15 @@ def cleanup_images(image_pattern: str = "*-builder") -> None:
# ==================== PACKAGE INFO CLASS ====================
@dataclass
class PackageInfo:
"""Information about a package to build."""
def __init__(self, name: str, version: str, release: str, directory: Path, distro: str = 'el/9', base_image: str = None):
self.name = name
self.version = version
self.release = release
self.directory = directory
self.distro = distro
self.base_image = base_image or "git.unkin.net/unkin/almalinux9-rpmbuilder:latest"
name: str
version: str
release: str
directory: Path
distro: str = 'almalinux/el9'
base_image: str = "git.unkin.net/unkin/almalinux9-rpmbuilder:latest"
def __str__(self):
return f"{self.name}-{self.version}-{self.release} ({self.distro})"
@@ -490,12 +618,12 @@ class Builder:
# Ensure dist directory exists
self.dist_dir.mkdir(exist_ok=True)
def discover_packages(self, distro: str = 'el/9') -> List[PackageInfo]:
def discover_packages(self, distro: str = 'almalinux/el9') -> List[PackageInfo]:
"""
Discover all packages and their versions from metadata.yaml files.
Args:
distro: Target distro (e.g., 'el/8', 'el/9', 'all')
distro: Target distro (e.g., 'almalinux/el8', 'almalinux/el9', 'all')
Returns:
List of PackageInfo objects
@@ -516,21 +644,14 @@ class Builder:
continue
try:
import yaml
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
package_name = metadata.get('name', package_dir.name)
version = metadata.get('version')
release = metadata.get('release')
build_configs = metadata.get('build', [])
build_configs = metadata.get('builds', [])
if not version:
self.logger.warning(f"No version in metadata.yaml for {package_name}")
continue
if not release:
self.logger.warning(f"No release in metadata.yaml for {package_name}")
if not build_configs:
self.logger.warning(f"No builds in metadata.yaml for {package_name}")
continue
# Handle distro filtering
@@ -538,16 +659,26 @@ class Builder:
# Build for all configured distros
for build_config in build_configs:
if isinstance(build_config, dict):
build_distro = build_config.get('distro')
repositories = build_config.get('repository', [])
base_image = build_config.get('image')
if build_distro and base_image:
version = build_config.get('version')
release = build_config.get('release')
if repositories and base_image and version and release:
# Use the first repository as the distro identifier
build_distro = repositories[0] if repositories else 'unknown'
packages.append(PackageInfo(package_name, version, str(release), package_dir, build_distro, base_image))
else:
# Build for specific distro
for build_config in build_configs:
if isinstance(build_config, dict) and build_config.get('distro') == distro:
if isinstance(build_config, dict):
repositories = build_config.get('repository', [])
base_image = build_config.get('image')
if base_image:
version = build_config.get('version')
release = build_config.get('release')
# Check if the target distro matches any repository
if distro in repositories and base_image and version and release:
packages.append(PackageInfo(package_name, version, str(release), package_dir, distro, base_image))
break
else:
@@ -567,7 +698,7 @@ class Builder:
release: str,
dry_run: bool = False,
force: bool = False,
distro: str = 'el/9'
distro: str = 'almalinux/el9'
) -> bool:
"""
Build a single package.
@@ -578,7 +709,7 @@ class Builder:
release: Package release
dry_run: If True, only show what would be done
force: If True, build even if package exists
distro: Target distro (e.g., 'el/8', 'el/9', 'all')
distro: Target distro (e.g., 'almalinux/el8', 'almalinux/el9', 'all')
Returns:
True if build succeeded, False otherwise
@@ -596,44 +727,48 @@ class Builder:
return False
try:
import yaml
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
metadata_version = metadata.get('version')
metadata_release = metadata.get('release')
if metadata_version != version:
self.logger.error(
f"Version mismatch for {package}: "
f"provided {version} but metadata.yaml has {metadata_version}"
)
return False
if str(metadata_release) != str(release):
self.logger.error(
f"Release mismatch for {package}: "
f"provided {release} but metadata.yaml has {metadata_release}"
)
return False
# Find base image for the specified distro
build_configs = metadata.get('build', [])
# Find base image and validate version/release for the specified distro
build_configs = metadata.get('builds', [])
base_image = None
found_build = None
if distro == 'all':
# For single package build, 'all' doesn't make sense, default to el/9
distro = 'el/9'
# For single package build, 'all' doesn't make sense, default to almalinux/el9
distro = 'almalinux/el9'
for build_config in build_configs:
if isinstance(build_config, dict) and build_config.get('distro') == distro:
base_image = build_config.get('image')
break
if isinstance(build_config, dict):
repositories = build_config.get('repository', [])
if distro in repositories:
found_build = build_config
base_image = build_config.get('image')
break
if not base_image:
if not found_build or not base_image:
self.logger.error(f"No build configuration found for distro {distro} in {package}")
return False
# Check version and release match
build_version = found_build.get('version')
build_release = found_build.get('release')
if build_version != version:
self.logger.error(
f"Version mismatch for {package} on {distro}: "
f"provided {version} but metadata.yaml has {build_version}"
)
return False
if str(build_release) != str(release):
self.logger.error(
f"Release mismatch for {package} on {distro}: "
f"provided {release} but metadata.yaml has {build_release}"
)
return False
except Exception as e:
self.logger.error(f"Error reading metadata.yaml for {package}: {e}")
return False
@@ -649,7 +784,7 @@ class Builder:
dry_run: If True, only show what would be done
force: If True, build even if packages exist
parallel: Number of parallel builds
distro: Target distro (e.g., 'el/8', 'el/9', 'all')
distro: Target distro (e.g., 'almalinux/el8', 'almalinux/el9', 'all')
Returns:
True if all builds succeeded, False otherwise
@@ -747,17 +882,6 @@ class Builder:
self.logger.error(f"Failed to build {package_info}: {e}")
return False
def list_packages(self) -> None:
"""List all available packages."""
packages = self.discover_packages()
if not packages:
print("No packages found")
return
print("Available packages:")
for package_info in sorted(packages, key=lambda p: (p.name, p.version)):
print(f" {package_info}")
def clean_dist(self) -> None:
"""Clean the dist directory."""
@@ -768,9 +892,11 @@ class Builder:
self.logger.info("Cleaned dist directory")
# ==================== MAIN FUNCTIONS ====================
# ==================== TYPER APPLICATION ====================
def setup_logging(verbose=False):
app = typer.Typer(help="Build RPM packages using Docker", no_args_is_help=True)
def setup_logging(verbose: bool = False):
"""Set up logging configuration."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
@@ -779,113 +905,222 @@ def setup_logging(verbose=False):
datefmt='%H:%M:%S'
)
@app.command()
def list_packages(
distro: str = typer.Option("almalinux/el9", help="Target distro to list packages for (default: almalinux/el9)")
):
"""List all available packages."""
setup_logging()
root_dir = Path(__file__).parent.parent
builder = Builder(root_dir)
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description='Build RPM packages using Docker',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --package consul --version 1.21.1 --release 1
%(prog)s --package consul (uses version/release from metadata.yaml)
%(prog)s --package consul --distro el/8 (build for el/8)
%(prog)s --all (builds all packages for el/9 by default)
%(prog)s --all --distro el/8 (builds all packages for el/8)
%(prog)s --all --distro all (builds all packages for all distros)
%(prog)s --all --dry-run
"""
)
packages = builder.discover_packages(distro)
if not packages:
typer.echo("No packages found")
return
# Package selection arguments
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--package', help='Package name to build')
group.add_argument('--all', action='store_true', help='Build all packages')
typer.echo("Available packages:")
for package_info in sorted(packages, key=lambda p: (p.name, p.version)):
typer.echo(f" {package_info}")
# Version and release (optional for single package builds, read from metadata.yaml if not provided)
parser.add_argument('--version', help='Package version (optional, read from metadata.yaml if not provided)')
parser.add_argument('--release', help='Package release number (optional, read from metadata.yaml if not provided)')
# Optional arguments
parser.add_argument('--distro', default='el/9', help='Build for specific distro (default: el/9). Use "all" to build for all distros.')
parser.add_argument('--dry-run', action='store_true', help='Show what would be built without building')
parser.add_argument('--force', action='store_true', help='Build even if package exists in registry')
parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging')
parser.add_argument('--parallel', type=int, default=4, help='Number of parallel builds (default: 4)')
args = parser.parse_args()
# No validation needed - version/release will be read from metadata.yaml if not provided
setup_logging(args.verbose)
@app.command()
def build(
package_name: str = typer.Argument(..., help="Package name to build"),
version: Optional[str] = typer.Option(None, help="Package version (read from metadata.yaml if not provided)"),
release: Optional[str] = typer.Option(None, help="Package release number (read from metadata.yaml if not provided)"),
distro: str = typer.Option("almalinux/el9", help="Target distro (default: almalinux/el9)"),
dry_run: bool = typer.Option(False, "--dry-run", help="Show what would be built without building"),
force: bool = typer.Option(False, "--force", help="Build even if package exists in registry"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging")
):
"""Build a specific package."""
setup_logging(verbose)
try:
# Initialize components
root_dir = Path(__file__).parent.parent
builder = Builder(root_dir)
# Execute build
if args.all:
success = builder.build_all(
dry_run=args.dry_run,
force=args.force,
parallel=args.parallel,
distro=args.distro
)
else:
# Read version/release from metadata.yaml if not provided
version = args.version
release = args.release
# Read version/release from metadata.yaml if not provided
if not version or not release:
package_dir = builder.rpms_dir / package_name
metadata_file = package_dir / "metadata.yaml"
if not version or not release:
package_dir = builder.rpms_dir / args.package
metadata_file = package_dir / "metadata.yaml"
if not metadata_file.exists():
typer.echo(f"Error: metadata.yaml not found for package {package_name}", err=True)
raise typer.Exit(1)
if not metadata_file.exists():
logging.error(f"metadata.yaml not found for package {args.package}")
sys.exit(1)
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
try:
import yaml
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
# Version and release are now per-build, so we need to find a build for this distro
if not version or not release:
build_configs = metadata.get('builds', [])
# Find the build for the current distro
found_build = None
for build_config in build_configs:
if isinstance(build_config, dict):
repositories = build_config.get('repository', [])
if distro in repositories:
found_build = build_config
break
if not found_build:
typer.echo(f"Error: No build configuration found for {distro} in {package_name}", err=True)
raise typer.Exit(1)
if not version:
version = metadata.get('version')
version = found_build.get('version')
if not version:
logging.error(f"No version in metadata.yaml for {args.package}")
sys.exit(1)
typer.echo(f"Error: No version in build config for {distro} in {package_name}", err=True)
raise typer.Exit(1)
if not release:
release = metadata.get('release')
release = found_build.get('release')
if not release:
logging.error(f"No release in metadata.yaml for {args.package}")
sys.exit(1)
typer.echo(f"Error: No release in build config for {distro} in {package_name}", err=True)
raise typer.Exit(1)
except Exception as e:
logging.error(f"Error reading metadata.yaml for {args.package}: {e}")
sys.exit(1)
except Exception as e:
typer.echo(f"Error reading metadata.yaml for {package_name}: {e}", err=True)
raise typer.Exit(1)
success = builder.build_single(
package=args.package,
version=version,
release=str(release),
dry_run=args.dry_run,
force=args.force,
distro=args.distro
)
success = builder.build_single(
package=package_name,
version=version,
release=str(release),
dry_run=dry_run,
force=force,
distro=distro
)
sys.exit(0 if success else 1)
if not success:
raise typer.Exit(1)
except KeyboardInterrupt:
logging.info("Build interrupted by user")
sys.exit(130)
except Exception as e:
logging.error(f"Build failed: {e}")
if args.verbose:
logging.exception("Full traceback:")
sys.exit(1)
typer.echo("Build interrupted by user")
raise typer.Exit(130)
@app.command("build-all")
def build_all(
dry_run: bool = typer.Option(False, "--dry-run", help="Show what would be built without building"),
force: bool = typer.Option(False, "--force", help="Build even if packages exist in registry"),
parallel: int = typer.Option(4, help="Number of parallel builds"),
distro: str = typer.Option("almalinux/el9", help="Target distro (almalinux/el8, almalinux/el9, or 'all' for all distros)"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging")
):
"""Build all packages."""
setup_logging(verbose)
try:
root_dir = Path(__file__).parent.parent
builder = Builder(root_dir)
success = builder.build_all(
dry_run=dry_run,
force=force,
parallel=parallel,
distro=distro
)
if not success:
raise typer.Exit(1)
except KeyboardInterrupt:
typer.echo("Build interrupted by user")
raise typer.Exit(130)
@app.command()
def verify(
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging"),
fix_mode: bool = typer.Option(False, "--fix", help="Attempt to fix common issues automatically")
):
"""Verify all metadata.yaml files against the schema."""
setup_logging(verbose)
root_dir = Path(__file__).parent.parent
rpms_dir = root_dir / "rpms"
if not rpms_dir.exists():
typer.echo(f"Error: RPMs directory not found: {rpms_dir}", err=True)
raise typer.Exit(1)
validator = Validator(METADATA_SCHEMA)
total_files = 0
valid_files = 0
errors_found = []
typer.echo("🔍 Validating metadata.yaml files...")
typer.echo()
# Find all metadata.yaml files
for package_dir in rpms_dir.iterdir():
if not package_dir.is_dir() or package_dir.name.startswith('.'):
continue
metadata_file = package_dir / "metadata.yaml"
if not metadata_file.exists():
errors_found.append(f"{package_dir.name}: metadata.yaml not found")
total_files += 1
continue
total_files += 1
try:
with open(metadata_file, 'r') as f:
metadata = yaml.safe_load(f)
if metadata is None:
errors_found.append(f"{package_dir.name}: Empty or invalid YAML")
continue
# Validate against schema
if validator.validate(metadata):
if verbose:
typer.echo(f"{package_dir.name}: Valid")
valid_files += 1
else:
errors_found.append(f"{package_dir.name}: Schema validation failed")
if verbose:
for field, error in validator.errors.items():
typer.echo(f" └─ {field}: {error}", err=True)
else:
# Show summary of errors
error_summary = []
for field, error in validator.errors.items():
if isinstance(error, list):
error_summary.append(f"{field}: {error[0] if error else 'validation error'}")
else:
error_summary.append(f"{field}: {error}")
errors_found[-1] += f"\n Issues: {'; '.join(error_summary)}"
except yaml.YAMLError as e:
errors_found.append(f"{package_dir.name}: YAML parsing error - {e}")
except Exception as e:
errors_found.append(f"{package_dir.name}: Unexpected error - {e}")
# Print results
typer.echo()
typer.echo("📊 Validation Results:")
typer.echo(f" Total files: {total_files}")
typer.echo(f" Valid files: {valid_files}")
typer.echo(f" Invalid files: {total_files - valid_files}")
if errors_found:
typer.echo()
typer.echo("🚨 Validation Errors:")
for error in errors_found:
typer.echo(error)
if not verbose:
typer.echo()
typer.echo("💡 Run with --verbose for detailed error information")
raise typer.Exit(1)
else:
typer.echo()
typer.echo("🎉 All metadata.yaml files are valid!")
if __name__ == '__main__':
main()
app()