1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 import logging
18 from M2Crypto import X509, EVP, RSA, ASN1, util
19 import subprocess
20
21 from pulp.server.pexceptions import PulpException
22 from pulp.server import config
23
24 log = logging.getLogger(__name__)
25
26 ADMIN_PREFIX = 'admin:'
27 ADMIN_SPLITTER = ':'
28
30 '''
31 Generates a x509 certificate for an admin user.
32
33 @param user: identification the certificate will be created for; may not be None
34 @type user: pulp.server.db.model.User
35
36 @return: PEM encoded string
37 @rtype: string
38 '''
39
40 return make_cert(encode_admin_user(user))
41
43 """
44 Generate an x509 certificate with the Subject set to the uid passed into this method:
45 Subject: CN=someconsumer.example.com
46
47 @param uid: ID to be embedded in the certificate
48 @type uid: string
49
50 @return: X509 PEM encoded certificate string
51 @rtype: string
52 """
53
54 uid = str(uid)
55 log.debug("make_cert: [%s]" % uid)
56
57
58
59
60
61
62 private_key_pem = _make_priv_key()
63 rsa = RSA.load_key_string(private_key_pem,
64 callback=util.no_passphrase_callback)
65
66
67 req, pub_key = _make_cert_request(uid, rsa)
68
69
70
71
72 ca_cert = config.config.get('security', 'cacert')
73 ca_key = config.config.get('security', 'cakey')
74
75 serial = '01'
76 cmd = 'openssl x509 -req -sha1 -CA %s -CAkey %s -set_serial %s -days 3650' % \
77 (ca_cert, ca_key, serial)
78 p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
79 stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
80 output = p.communicate(input=req.as_pem())[0]
81 p.wait()
82 exit_code = p.returncode
83 if (exit_code != 0):
84 raise Exception("error signing cert request: %s" % output)
85 cert_pem_string = output[output.index("-----BEGIN CERTIFICATE-----"):]
86 return private_key_pem, cert_pem_string
87
89 '''
90 Ensures the given certificate can be verified against the server's CA.
91
92 @param cert_pem: PEM encoded certificate to be verified
93 @type cert_pem: string
94
95 @return: True if the certificate is successfully verified against the CA; False otherwise
96 @rtype: boolean
97 '''
98
99
100 ca_cert = config.config.get('security', 'cacert')
101 cmd = 'openssl verify -CAfile %s' % ca_cert
102 p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
103 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
104
105
106 stdout, stderr = p.communicate(input=cert_pem)
107
108
109
110
111
112
113 result = stdout.rstrip()
114
115 if result.endswith('OK'):
116 return True
117 else:
118 return False
119
121 '''
122 Encodes an admin user's identity into a single line suitable for identification.
123 This is intended to be the identity used in admin certificates.
124
125 @param user: admin user; may not be None
126 @type user: pulp.server.db.model.User
127
128 @return: single line identification of the admin user safe for public visibility;
129 any sensitive information is hashed
130 @rtype: string
131 '''
132 return '%s%s%s%s' % (ADMIN_PREFIX, user['login'], ADMIN_SPLITTER, user['id'])
133
135 '''
136 Indicates if the encoded user string represents an admin user. If the string is
137 identified as an admin user, it can be parsed with decode_admin_user.
138
139 @return: True if the user string represents an admin user; False otherwise
140 @rtype: boolean
141 '''
142 return encoded_string.startswith(ADMIN_PREFIX)
143
145 '''
146 Decodes the single line admin user identification produced by encode_admin_user
147 into all of the parts that make up that identification.
148
149 @param encoded_string: string representation of the user provided by encode_admin_user
150 @type encoded_string: string
151
152 @return: tuple of information describing the admin user; (username, id)
153 @rtype: (string, string)
154 '''
155
156
157 encoded_string = encoded_string[len(ADMIN_PREFIX):]
158
159
160 parsed = encoded_string.split(ADMIN_SPLITTER)
161
162 if len(parsed) != 2:
163 raise PulpException('Invalid encoded admin user information [%s]' % encoded_string)
164
165 username = parsed[0]
166 id = parsed[1]
167
168 return username, id
169
171 cmd = 'openssl genrsa 1024'
172 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
173 p.wait()
174 exit_code = p.returncode
175 error = p.stderr.read()
176 if (exit_code != 0):
177 raise Exception("error generating private key: %s" % error)
178 output = p.stdout.read()
179 pem_str = output[output.index("-----BEGIN RSA PRIVATE KEY-----"):]
180 return pem_str
181
182
184 pub_key = EVP.PKey()
185 x = X509.Request()
186 pub_key.assign_rsa(rsa)
187 rsa = None
188 x.set_pubkey(pub_key)
189 name = x.get_subject()
190 name.CN = "%s" % uid
191 ext2 = X509.new_extension('nsComment',
192 'Pulp Generated Identity Certificate for Consumer: [%s]' % uid)
193 extstack = X509.X509_Extension_Stack()
194 extstack.push(ext2)
195 x.add_extensions(extstack)
196 x.sign(pub_key,'sha1')
197 pk2 = x.get_pubkey()
198 return x, pub_key
199