- created approle 'certmanager' using 'certmanager' policy - update certmanager script to generate token based on roleid
103 lines
4.7 KiB
Plaintext
103 lines
4.7 KiB
Plaintext
#!<%= @venv_path %>/bin/python
|
|
|
|
import argparse
|
|
import requests
|
|
import json
|
|
import os
|
|
import yaml
|
|
from zipfile import ZipFile
|
|
|
|
# remove this after certs are generated everywhere
|
|
requests.packages.urllib3.disable_warnings()
|
|
|
|
def load_config(config_path):
|
|
with open(config_path, 'r') as file:
|
|
config = yaml.safe_load(file)
|
|
return config['vault']
|
|
|
|
def authenticate_approle(vault_config):
|
|
url = f"{vault_config['addr']}/v1/auth/{vault_config['approle_path']}/login"
|
|
payload = {
|
|
"role_id": vault_config['role_id'],
|
|
}
|
|
response = requests.post(url, json=payload, verify=False)
|
|
if response.status_code == 200:
|
|
auth_response = response.json()
|
|
return auth_response['auth']['client_token']
|
|
else:
|
|
print(f"Error authenticating with AppRole: {response.text}")
|
|
return None
|
|
|
|
def request_certificate(common_name, alt_names, ip_sans, expiry_days, vault_config):
|
|
# Authenticate using AppRole and get a token
|
|
client_token = authenticate_approle(vault_config)
|
|
if not client_token:
|
|
print("Failed to authenticate with Vault using AppRole.")
|
|
return None
|
|
|
|
url = f"{vault_config['addr']}/v1/{vault_config['mount_point']}/issue/{vault_config['role_name']}"
|
|
headers = {'X-Vault-Token': client_token}
|
|
payload = {
|
|
"common_name": common_name,
|
|
"alt_names": ",".join(alt_names),
|
|
"ip_sans": ",".join(ip_sans),
|
|
"ttl": f"{expiry_days}d"
|
|
}
|
|
response = requests.post(url, headers=headers, json=payload, verify=False)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
print(f"Error requesting certificate: {response.text}")
|
|
return None
|
|
|
|
def save_cert_files(certificate_response, common_name, compress, config, json_output):
|
|
base_path = config.get('output_path', '.')
|
|
cert_dir = os.path.join(base_path, common_name)
|
|
if json_output:
|
|
import json
|
|
output = {
|
|
'certificate': certificate_response['data']['certificate'],
|
|
'private_key': certificate_response['data']['private_key'],
|
|
'full_chain': certificate_response['data']['issuing_ca'] + "\n" + certificate_response['data']['certificate'],
|
|
}
|
|
print(json.dumps(output))
|
|
elif not compress:
|
|
os.makedirs(cert_dir, exist_ok=True)
|
|
with open(os.path.join(cert_dir, "certificate.crt"), "w") as cert_file:
|
|
cert_file.write(certificate_response['data']['certificate'])
|
|
with open(os.path.join(cert_dir, "private.key"), "w") as key_file:
|
|
key_file.write(certificate_response['data']['private_key'])
|
|
with open(os.path.join(cert_dir, "full_chain.crt"), "w") as full_chain_file:
|
|
full_chain_file.write(certificate_response['data']['issuing_ca'] + "\n" + certificate_response['data']['certificate'])
|
|
else:
|
|
zip_name = f"{os.path.join(base_path, common_name)}.zip"
|
|
with ZipFile(zip_name, 'w') as zipf:
|
|
zipf.writestr("certificate.crt", certificate_response['data']['certificate'])
|
|
zipf.writestr("private.key", certificate_response['data']['private_key'])
|
|
zipf.writestr("full_chain.crt", certificate_response['data']['issuing_ca'] + "\n" + certificate_response['data']['certificate'])
|
|
|
|
def main(config_file):
|
|
config = load_config(config_file)
|
|
parser = argparse.ArgumentParser(description='Request and retrieve a certificate from Vault.')
|
|
parser.add_argument('common_name', type=str, help='Common Name for the certificate')
|
|
parser.add_argument('-a', '--alt-names', type=str, default='', help='Comma-separated alternative names for the certificate')
|
|
parser.add_argument('-i', '--ip-sans', type=str, default='', help='Comma-separated IP Subject Alternative Names for the certificate')
|
|
parser.add_argument('-e', '--expiry-days', type=int, default=365, help='Validity of the certificate in days (default: 365)')
|
|
parser.add_argument('-c', '--compress', action='store_true', help='Compress the certificate, key, and full chain into a zip file')
|
|
parser.add_argument('--json', action='store_true', help='Output results in JSON format')
|
|
args = parser.parse_args()
|
|
alt_names = [name.strip() for name in args.alt_names.split(',') if name]
|
|
ip_sans = [ip.strip() for ip in args.ip_sans.split(',') if ip]
|
|
certificate_response = request_certificate(args.common_name, alt_names, ip_sans, args.expiry_days, config)
|
|
if certificate_response:
|
|
if args.json:
|
|
save_cert_files(certificate_response, args.common_name, args.compress, config, True)
|
|
else:
|
|
save_cert_files(certificate_response, args.common_name, args.compress, config, False)
|
|
else:
|
|
print("Failed to obtain certificate.")
|
|
|
|
if __name__ == "__main__":
|
|
config_file = '<%= @config_path %>'
|
|
main(config_file)
|