#!/usr/bin/env <%= @venv_path %>/bin/python import argparse import requests import json import os import yaml from zipfile import ZipFile # remove this after certs are generated everywhere requests.packages.urllib3.disable_warnings() def load_config(config_path): with open(config_path, 'r') as file: config = yaml.safe_load(file) return config['vault'] def request_certificate(common_name, alt_names, ip_sans, expiry_days, vault_config): url = f"{vault_config['addr']}/v1/{vault_config['mount_point']}/issue/{vault_config['role_name']}" headers = {'X-Vault-Token': vault_config['token']} payload = { "common_name": common_name, "alt_names": ",".join(alt_names), "ip_sans": ",".join(ip_sans), "ttl": f"{expiry_days}d" } response = requests.post(url, headers=headers, json=payload, verify=False) if response.status_code == 200: return response.json() else: print(f"Error requesting certificate: {response.text}") return None def save_cert_files(certificate_response, common_name, compress, config, json_output): base_path = config.get('output_path', '.') cert_dir = os.path.join(base_path, common_name) if json_output: import json output = { 'certificate': certificate_response['data']['certificate'], 'private_key': certificate_response['data']['private_key'], 'full_chain': certificate_response['data']['issuing_ca'] + "\n" + certificate_response['data']['certificate'], } print(json.dumps(output)) elif not compress: os.makedirs(cert_dir, exist_ok=True) with open(os.path.join(cert_dir, "certificate.crt"), "w") as cert_file: cert_file.write(certificate_response['data']['certificate']) with open(os.path.join(cert_dir, "private.key"), "w") as key_file: key_file.write(certificate_response['data']['private_key']) with open(os.path.join(cert_dir, "full_chain.crt"), "w") as full_chain_file: full_chain_file.write(certificate_response['data']['issuing_ca'] + "\n" + certificate_response['data']['certificate']) else: zip_name = f"{os.path.join(base_path, common_name)}.zip" with ZipFile(zip_name, 'w') as zipf: zipf.writestr("certificate.crt", certificate_response['data']['certificate']) zipf.writestr("private.key", certificate_response['data']['private_key']) zipf.writestr("full_chain.crt", certificate_response['data']['issuing_ca'] + "\n" + certificate_response['data']['certificate']) def main(config_file): config = load_config(config_file) parser = argparse.ArgumentParser(description='Request and retrieve a certificate from Vault.') parser.add_argument('common_name', type=str, help='Common Name for the certificate') parser.add_argument('-a', '--alt-names', type=str, default='', help='Comma-separated alternative names for the certificate') parser.add_argument('-i', '--ip-sans', type=str, default='', help='Comma-separated IP Subject Alternative Names for the certificate') parser.add_argument('-e', '--expiry-days', type=int, default=365, help='Validity of the certificate in days (default: 365)') parser.add_argument('-c', '--compress', action='store_true', help='Compress the certificate, key, and full chain into a zip file') parser.add_argument('--json', action='store_true', help='Output results in JSON format') args = parser.parse_args() alt_names = [name.strip() for name in args.alt_names.split(',') if name] ip_sans = [ip.strip() for ip in args.ip_sans.split(',') if ip] certificate_response = request_certificate(args.common_name, alt_names, ip_sans, args.expiry_days, config) if certificate_response: if args.json: save_cert_files(certificate_response, args.common_name, args.compress, config, True) else: save_cert_files(certificate_response, args.common_name, args.compress, config, False) else: print("Failed to obtain certificate.") if __name__ == "__main__": config_file = '<%= @config_path %>' main(config_file)