1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 import base64
18 import logging
19 try:
20 import json
21 except ImportError:
22 import simplejson as json
23
24 import pymongo.json_util
25 import web
26
27 import pulp.server.auth.auth as principal
28 from pulp.server.api.user import UserApi
29 from pulp.server.api.consumer import ConsumerApi
30 from pulp.server.auth.certificate import Certificate
31 import pulp.server.auth.password_util as password_util
32 import pulp.server.auth.cert_generator as cert_generator
33 from pulp.server.pexceptions import PulpException
34 from pulp.server.webservices import http
35
36
37
38 LOG = logging.getLogger(__name__)
39 USER_API = UserApi()
40 CONSUMER_API = ConsumerApi()
41
42
43
45 '''
46 Decorator class to check roles of web service caller.
47
48 Copied and modified from:
49 http://wiki.python.org/moin/PythonDecoratorLibrary#DifferentDecoratorForms
50 '''
51
52 - def __init__(self, *dec_args, **dec_kw):
53 '''The decorator arguments are passed here. Save them for runtime.'''
54 self.dec_args = dec_args
55 self.dec_kw = dec_kw
56
58 def check_roles(*fargs, **kw):
59 '''
60 Strip off the decorator arguments so we can use those to check the
61 roles of the current caller.
62
63 Note: the first argument cannot be "self" because we get a parse error
64 "takes at least 1 argument" unless the instance is actually included in
65 the argument list, which is redundant. If this wraps a class instance,
66 the "self" will be the first argument.
67 '''
68
69
70 roles = {'consumer': None, 'admin': None, 'consumer_id': None}
71 for key in self.dec_kw.keys():
72 roles[key] = self.dec_kw[key]
73
74 user = None
75
76
77 if roles['admin']:
78
79 try:
80 user = self.check_admin(*fargs)
81 except PulpException, pe:
82
83 http.status_unauthorized()
84 http.header('Content-Type', 'application/json')
85 return json.dumps(pe.value, default=pymongo.json_util.default)
86
87
88 if not user and (roles['consumer'] or roles['consumer_id']):
89 user = self.check_consumer(roles['consumer_id'], *fargs)
90
91
92 if not user:
93 http.status_unauthorized()
94 http.header('Content-Type', 'application/json')
95 return json.dumps("Authorization failed. Check your username and password or your certificate",
96 default=pymongo.json_util.default)
97
98
99
100 principal.set_principal(user)
101
102
103
104 if fargs and getattr(fargs[0], '__class__', None):
105 instance, fargs = fargs[0], fargs[1:]
106 result = f(instance, *fargs, **kw)
107 else:
108 result = f(*(fargs), **kw)
109
110 return result
111
112
113 self.f = f
114 check_roles.__name__ = f.__name__
115 check_roles.__dict__.update(f.__dict__)
116 check_roles.__doc__ = f.__doc__
117 return check_roles
118
120 '''
121 Checks the request to see if it contains a valid admin authentication.
122
123 @return: user instance of the authenticated user if valid
124 credentials were specified; None otherwise
125 @rtype: L{pulp.server.db.model.User}
126 '''
127
128 return self.check_admin_cert(*fargs) or self.check_username_pass(*fargs)
129
131 '''
132 Determines if the certificate in the request represents a valid admin certificate.
133
134 @return: user instance of the authenticated user if valid
135 credentials were specified; None otherwise
136 @rtype: L{pulp.server.db.model.User}
137 '''
138
139
140 environment = web.ctx.environ
141 cert_pem = environment.get('SSL_CLIENT_CERT', None)
142
143
144 if cert_pem is None:
145 return None
146
147
148 idcert = Certificate(content=cert_pem)
149 subject = idcert.subject()
150 encoded_user = subject.get('CN', None)
151
152
153 valid = cert_generator.verify_cert(cert_pem)
154 if not valid:
155 LOG.error('Admin certificate with CN [%s] is signed by a foreign CA' % encoded_user)
156 return None
157
158
159 if not encoded_user:
160 return None
161
162
163 if not cert_generator.is_admin_user(encoded_user):
164 return None
165
166
167 username, id = cert_generator.decode_admin_user(encoded_user)
168
169
170 user = USER_API.user(username)
171 if user is None:
172 LOG.error('User [%s] specified in certificate was not found in the system' % username)
173 return None
174
175
176 if id != user['id']:
177 LOG.error('ID in admin certificate for user [%s] was incorrect' % username)
178 return None
179
180 return user
181
183 '''
184 If the request uses HTTP authorization, verify the credentials identify a
185 valid user in the system.
186
187 @return: user instance of the authenticated user if valid
188 credentials were specified; None otherwise
189 @rtype: L{pulp.server.db.model.User}
190 '''
191
192
193 environment = web.ctx.environ
194 auth_string = environment.get('HTTP_AUTHORIZATION', None)
195
196
197 if auth_string is not None and auth_string.startswith("Basic"):
198
199
200 encoded_auth = auth_string.split(" ")[1]
201 auth_string = base64.decodestring(encoded_auth)
202 uname_pass = auth_string.split(":")
203 username = uname_pass[0]
204 password = uname_pass[1]
205
206
207 user = USER_API.user(username)
208 if user is None:
209 LOG.error('User [%s] specified in certificate was not found in the system' %
210 username)
211 return None
212
213
214 good_password = password_util.check_password(user['password'], password)
215 if not good_password:
216 LOG.error('Password for user [%s] was incorrect' % username)
217 return None
218
219 return user
220
221 return None
222
224 '''
225 Determines if the certificate in the request represents a valid consumer certificate.
226
227 @param check_id: if True, the consumer UID will be checked to make sure it
228 is present in the fargs argument; if False the only validation
229 will be that the UID exists in the DB (default = False)
230 @type check_id: boolean
231
232 @return: user instance of the authenticated user if valid
233 credentials were specified; None otherwise
234 @rtype: L{pulp.server.db.model.User}
235 '''
236
237
238 environment = web.ctx.environ
239 cert_pem = environment.get('SSL_CLIENT_CERT', None)
240
241
242 if cert_pem is None:
243 return None
244
245
246 idcert = Certificate(content=cert_pem)
247 subject = idcert.subject()
248 consumer_cert_uid = subject.get('CN', None)
249
250
251 valid = cert_generator.verify_cert(cert_pem)
252 if not valid:
253 LOG.error('Consumer certificate with CN [%s] is signed by a foreign CA' % consumer_cert_uid)
254 return None
255
256
257 if consumer_cert_uid is None:
258 LOG.error("Consumer UID not found in certificate")
259 return None
260
261
262 consumer = CONSUMER_API.consumer(consumer_cert_uid)
263 if not consumer:
264 LOG.error("Consumer with id [%s] does not exist" % consumer_cert_uid)
265 return None
266
267
268
269 good_certificate = False
270 if check_id:
271 for arg in fargs:
272 LOG.error("Checking ID in cert [%s] against expected ID [%s]" %
273 (consumer_cert_uid, arg))
274 if arg == consumer_cert_uid:
275 good_certificate = True
276 break
277 else:
278 good_certificate = True
279
280 if good_certificate:
281 return consumer
282 else:
283 return None
284