puppet-prod/site/profiles/templates/helpers/sshsignhost.erb
Ben Vincent b468f67103 feat: sign ssh host keys
- manage python script/venv to sign ssh host certificates
- add approle_id to puppetmaster eyaml files
- add class to sign ssh-rsa host keys
- add facts to check if the current principals match the desired principals
2024-06-01 22:51:42 +10:00

84 lines
2.9 KiB
Plaintext

#!<%= @venv_path %>/bin/python
import argparse
import requests
import json
import yaml
# remove this after certs are generated everywhere
requests.packages.urllib3.disable_warnings()
def load_config(config_path):
with open(config_path, 'r') as file:
config = yaml.safe_load(file)
return config['vault']
def authenticate_approle(vault_config):
url = f"{vault_config['addr']}/v1/auth/{vault_config['approle_path']}/login"
payload = {
"role_id": vault_config['role_id'],
}
response = requests.post(url, json=payload, verify=False)
if response.status_code == 200:
auth_response = response.json()
return auth_response['auth']['client_token']
else:
print(f"Error authenticating with AppRole: {response.text}")
return None
def sign_ssh_certificate(vault_config, public_key, valid_principals, ttl):
# Authenticate using AppRole and get a token
client_token = authenticate_approle(vault_config)
if not client_token:
print("Failed to authenticate with Vault using AppRole.")
return None
# Prepare the SSH certificate signing request
url = f"{vault_config['addr']}/v1/{vault_config['mount_point']}/sign/{vault_config['role_name']}"
headers = {'X-Vault-Token': client_token}
payload = {
"cert_type": "host",
"public_key": public_key,
"valid_principals": valid_principals,
"ttl": ttl
}
# Request the SSH certificate signing
response = requests.post(url, headers=headers, json=payload, verify=False)
if response.status_code == 200:
return response.json()
else:
print(f"Error requesting certificate: {response.text}")
return None
def main(config_file):
config = load_config(config_file)
parser = argparse.ArgumentParser(description='Sign SSH host certificate using Vault.')
parser.add_argument('--public_key', required=True, help='SSH public key as a string')
parser.add_argument('--valid_principals', required=True, help='Comma-separated list of valid principals')
parser.add_argument('--ttl', default='87600h', help='Time-to-live for the certificate (default: 87600h)')
parser.add_argument('--json', action='store_true', help='Output the resulting certificate as JSON')
args = parser.parse_args()
# Load configuration
config = load_config(config_file)
# Sign SSH certificate
response = sign_ssh_certificate(config, args.public_key, args.valid_principals, args.ttl)
if response and 'data' in response and 'signed_key' in response['data']:
if args.json:
output = {
'signed_key': response['data']['signed_key'],
}
print(json.dumps(output))
else:
print(response['data']['signed_key'])
else:
print("Error: The response does not contain the expected data.")
exit(1)
if __name__ == "__main__":
config_file = '<%= @config_path %>'
main(config_file)