#!<%= @venv_path %>/bin/python import argparse import requests import json import yaml # remove this after certs are generated everywhere requests.packages.urllib3.disable_warnings() def load_config(config_path): with open(config_path, 'r') as file: config = yaml.safe_load(file) return config['vault'] def authenticate_approle(vault_config): url = f"{vault_config['addr']}/v1/auth/{vault_config['approle_path']}/login" payload = { "role_id": vault_config['role_id'], } response = requests.post(url, json=payload, verify=False) if response.status_code == 200: auth_response = response.json() return auth_response['auth']['client_token'] else: print(f"Error authenticating with AppRole: {response.text}") return None def sign_ssh_certificate(vault_config, public_key, valid_principals, ttl): # Authenticate using AppRole and get a token client_token = authenticate_approle(vault_config) if not client_token: print("Failed to authenticate with Vault using AppRole.") return None # Prepare the SSH certificate signing request url = f"{vault_config['addr']}/v1/{vault_config['mount_point']}/sign/{vault_config['role_name']}" headers = {'X-Vault-Token': client_token} payload = { "cert_type": "host", "public_key": public_key, "valid_principals": valid_principals, "ttl": ttl } # Request the SSH certificate signing response = requests.post(url, headers=headers, json=payload, verify=False) if response.status_code == 200: return response.json() else: print(f"Error requesting certificate: {response.text}") return None def main(config_file): config = load_config(config_file) parser = argparse.ArgumentParser(description='Sign SSH host certificate using Vault.') parser.add_argument('--public_key', required=True, help='SSH public key as a string') parser.add_argument('--valid_principals', required=True, help='Comma-separated list of valid principals') parser.add_argument('--ttl', default='87600h', help='Time-to-live for the certificate (default: 87600h)') parser.add_argument('--json', action='store_true', help='Output the resulting certificate as JSON') args = parser.parse_args() # Load configuration config = load_config(config_file) # Sign SSH certificate response = sign_ssh_certificate(config, args.public_key, args.valid_principals, args.ttl) if response and 'data' in response and 'signed_key' in response['data']: if args.json: output = { 'signed_key': response['data']['signed_key'], } print(json.dumps(output)) else: print(response['data']['signed_key']) else: print("Error: The response does not contain the expected data.") exit(1) if __name__ == "__main__": config_file = '<%= @config_path %>' main(config_file)