from fastapi import FastAPI, HTTPException, Depends, Header from pydantic import BaseModel import subprocess import hvac import yaml import logging # Configure logging logger = logging.getLogger('uvicorn.error') # Load configuration from a YAML file with open('config.yaml', 'r') as config_file: config = yaml.safe_load(config_file) vault_addr = config.get("vault_addr") role_id = config.get("role_id") secret_path = config.get("vault_secret_path", "secret/data/puppetserver") mount_path = config.get("vault_secret_mount", "kv") # Initialize the FastAPI app app = FastAPI() # Function to fetch the valid token from the Vault def get_valid_token(): vault_client = hvac.Client(url=vault_addr) try: vault_client.auth.approle.login(role_id=role_id) logger.info("Successfully authenticated with Vault using AppRole.") try: secret = vault_client.secrets.kv.v2.read_secret_version( path=secret_path, mount_point=mount_path, raise_on_deleted_version=True, ) logger.info("Successfully retrieved token from Vault.") return secret['data']['data'].get('admin') except Exception as e: logger.error(f"Failed to retrieve token from Vault: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to retrieve token from Vault: {str(e)}") except Exception as e: logger.error(f"Failed to authenticate with Vault using AppRole: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to authenticate with Vault using AppRole: {str(e)}") # Dependency to verify token def verify_token(token: str = Header(None)) -> str: if not token: logger.warning("Token not provided in request headers.") raise HTTPException(status_code=401, detail="Token not provided") valid_token = get_valid_token() if token != valid_token: logger.warning("Invalid token provided.") raise HTTPException(status_code=401, detail="Invalid token") logger.info("Token verification successful.") return token # Return the token explicitly # Request model class CertRequest(BaseModel): certname: str @app.post("/puppetserver/ca/clean") async def clean_cert(cert_request: CertRequest, token: str = Depends(verify_token)): certname = cert_request.certname try: logger.info(f"Cleaning certificate for certname: {certname}") command = ["sudo", "puppetserver", "ca", "clean", "--certname", certname] result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode == 0: logger.info(f"Successfully cleaned certificate for certname: {certname}") return {"status": "success", "message": result.stdout.strip()} else: logger.error(f"Error cleaning certificate for certname: {certname}: {result.stderr.strip()}") return {"status": "error", "message": result.stderr.strip()} except Exception as e: logger.error(f"Internal server error while cleaning certificate for certname: {certname}: {str(e)}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @app.post("/puppetserver/g10k") async def run_g10k(token: str = Depends(verify_token)): try: logger.info("Running g10k command") command = ["sudo", "/opt/puppetlabs/bin/puppet-g10k"] result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode == 0: logger.info("Successfully ran g10k command.") return {"status": "success", "message": result.stdout.strip()} else: logger.error(f"Error running g10k command: {result.stderr.strip()}") return {"status": "error", "message": result.stderr.strip()} except Exception as e: logger.error(f"Internal server error while running g10k command: {str(e)}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)