Package pulp :: Package server :: Package auth :: Module cert_generator
[hide private]
[frames] | no frames]

Source Code for Module pulp.server.auth.cert_generator

  1  #!/usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3  # 
  4  # Copyright © 2010 Red Hat, Inc. 
  5  # 
  6  # This software is licensed to you under the GNU General Public License, 
  7  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  8  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  9  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
 10  # along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 12  # 
 13  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 14  # granted to use or replicate Red Hat trademarks that are incorporated 
 15  # in this software or its documentation. 
 16   
 17  import logging 
 18  from M2Crypto import X509, EVP, RSA, ASN1, util 
 19  import subprocess 
 20   
 21  from pulp.server.pexceptions import PulpException 
 22  from pulp.server import config 
 23   
 24  log = logging.getLogger(__name__) 
 25   
 26  ADMIN_PREFIX = 'admin:' 
 27  ADMIN_SPLITTER = ':' 
 28   
29 -def make_admin_user_cert(user):
30 ''' 31 Generates a x509 certificate for an admin user. 32 33 @param user: identification the certificate will be created for; may not be None 34 @type user: pulp.server.db.model.User 35 36 @return: PEM encoded string 37 @rtype: string 38 ''' 39 40 return make_cert(encode_admin_user(user))
41
42 -def make_cert(uid):
43 """ 44 Generate an x509 certificate with the Subject set to the uid passed into this method: 45 Subject: CN=someconsumer.example.com 46 47 @param uid: ID to be embedded in the certificate 48 @type uid: string 49 50 @return: X509 PEM encoded certificate string 51 @rtype: string 52 """ 53 # Ensure we are dealing with a string and not unicode 54 uid = str(uid) 55 log.debug("make_cert: [%s]" % uid) 56 57 #Make a private key 58 # Don't use M2Crypto directly as it leads to segfaults when trying to convert 59 # the key to a PEM string. Instead create the key with openssl and return the PEM string 60 # Sorta hacky but necessary. 61 # rsa = RSA.gen_key(1024, 65537, callback=passphrase_callback) 62 private_key_pem = _make_priv_key() 63 rsa = RSA.load_key_string(private_key_pem, 64 callback=util.no_passphrase_callback) 65 66 # Make the Cert Request 67 req, pub_key = _make_cert_request(uid, rsa) 68 # Sign it with the Pulp server CA 69 # We can't do this in m2crypto either so we have to shell out 70 # TODO: We technically should be incrementing these serial numbers 71 72 ca_cert = config.config.get('security', 'cacert') 73 ca_key = config.config.get('security', 'cakey') 74 75 serial = '01' 76 cmd = 'openssl x509 -req -sha1 -CA %s -CAkey %s -set_serial %s -days 3650' % \ 77 (ca_cert, ca_key, serial) 78 p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, 79 stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 80 output = p.communicate(input=req.as_pem())[0] 81 p.wait() 82 exit_code = p.returncode 83 if (exit_code != 0): 84 raise Exception("error signing cert request: %s" % output) 85 cert_pem_string = output[output.index("-----BEGIN CERTIFICATE-----"):] 86 return private_key_pem, cert_pem_string
87
88 -def verify_cert(cert_pem):
89 ''' 90 Ensures the given certificate can be verified against the server's CA. 91 92 @param cert_pem: PEM encoded certificate to be verified 93 @type cert_pem: string 94 95 @return: True if the certificate is successfully verified against the CA; False otherwise 96 @rtype: boolean 97 ''' 98 99 # M2Crypto doesn't support verifying a cert against a CA, so call out to openssl 100 ca_cert = config.config.get('security', 'cacert') 101 cmd = 'openssl verify -CAfile %s' % ca_cert 102 p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, 103 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 104 105 # Use communicate to pipe the certificate to the verify call 106 stdout, stderr = p.communicate(input=cert_pem) 107 108 # Successful result example: 109 # stdin: OK\n 110 # Failed result example: 111 # stdin: C = US, ST = NC, L = Raleigh, O = Red Hat, CN = localhost 112 # error 20 at 0 depth lookup:unable to get local issuer certificate\n 113 result = stdout.rstrip() 114 115 if result.endswith('OK'): 116 return True 117 else: 118 return False
119
120 -def encode_admin_user(user):
121 ''' 122 Encodes an admin user's identity into a single line suitable for identification. 123 This is intended to be the identity used in admin certificates. 124 125 @param user: admin user; may not be None 126 @type user: pulp.server.db.model.User 127 128 @return: single line identification of the admin user safe for public visibility; 129 any sensitive information is hashed 130 @rtype: string 131 ''' 132 return '%s%s%s%s' % (ADMIN_PREFIX, user['login'], ADMIN_SPLITTER, user['id'])
133
134 -def is_admin_user(encoded_string):
135 ''' 136 Indicates if the encoded user string represents an admin user. If the string is 137 identified as an admin user, it can be parsed with decode_admin_user. 138 139 @return: True if the user string represents an admin user; False otherwise 140 @rtype: boolean 141 ''' 142 return encoded_string.startswith(ADMIN_PREFIX)
143
144 -def decode_admin_user(encoded_string):
145 ''' 146 Decodes the single line admin user identification produced by encode_admin_user 147 into all of the parts that make up that identification. 148 149 @param encoded_string: string representation of the user provided by encode_admin_user 150 @type encoded_string: string 151 152 @return: tuple of information describing the admin user; (username, id) 153 @rtype: (string, string) 154 ''' 155 156 # Strip off the leading "admin:" prefix 157 encoded_string = encoded_string[len(ADMIN_PREFIX):] 158 159 # Find where to split 160 parsed = encoded_string.split(ADMIN_SPLITTER) 161 162 if len(parsed) != 2: 163 raise PulpException('Invalid encoded admin user information [%s]' % encoded_string) 164 165 username = parsed[0] 166 id = parsed[1] 167 168 return username, id
169
170 -def _make_priv_key():
171 cmd = 'openssl genrsa 1024' 172 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 173 p.wait() 174 exit_code = p.returncode 175 error = p.stderr.read() 176 if (exit_code != 0): 177 raise Exception("error generating private key: %s" % error) 178 output = p.stdout.read() 179 pem_str = output[output.index("-----BEGIN RSA PRIVATE KEY-----"):] 180 return pem_str
181 182
183 -def _make_cert_request(uid, rsa):
184 pub_key = EVP.PKey() 185 x = X509.Request() 186 pub_key.assign_rsa(rsa) 187 rsa = None # should not be freed here 188 x.set_pubkey(pub_key) 189 name = x.get_subject() 190 name.CN = "%s" % uid 191 ext2 = X509.new_extension('nsComment', 192 'Pulp Generated Identity Certificate for Consumer: [%s]' % uid) 193 extstack = X509.X509_Extension_Stack() 194 extstack.push(ext2) 195 x.add_extensions(extstack) 196 x.sign(pub_key,'sha1') 197 pk2 = x.get_pubkey() 198 return x, pub_key
199