Package pulp :: Package server :: Package webservices :: Module role_check
[hide private]
[frames] | no frames]

Source Code for Module pulp.server.webservices.role_check

  1  #!/usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3  # 
  4  # Copyright © 2010 Red Hat, Inc. 
  5  # 
  6  # This software is licensed to you under the GNU General Public License, 
  7  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  8  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  9  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
 10  # along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 12  # 
 13  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 14  # granted to use or replicate Red Hat trademarks that are incorporated 
 15  # in this software or its documentation. 
 16   
 17  import base64 
 18  import logging 
 19  try: 
 20      import json 
 21  except ImportError: 
 22      import simplejson as json 
 23   
 24  import pymongo.json_util  
 25  import web 
 26   
 27  import pulp.server.auth.auth as principal 
 28  from pulp.server.api.user import UserApi 
 29  from pulp.server.api.consumer import ConsumerApi 
 30  from pulp.server.auth.certificate import Certificate 
 31  import pulp.server.auth.password_util as password_util 
 32  import pulp.server.auth.cert_generator as cert_generator 
 33  from pulp.server.pexceptions import PulpException 
 34  from pulp.server.webservices import http 
 35   
 36  # globals --------------------------------------------------------------------- 
 37   
 38  LOG = logging.getLogger(__name__) 
 39  USER_API = UserApi() 
 40  CONSUMER_API = ConsumerApi() 
 41   
 42  # decorator-------------------------------------------------------------------- 
 43   
44 -class RoleCheck(object):
45 ''' 46 Decorator class to check roles of web service caller. 47 48 Copied and modified from: 49 http://wiki.python.org/moin/PythonDecoratorLibrary#DifferentDecoratorForms 50 ''' 51
52 - def __init__(self, *dec_args, **dec_kw):
53 '''The decorator arguments are passed here. Save them for runtime.''' 54 self.dec_args = dec_args 55 self.dec_kw = dec_kw
56
57 - def __call__(self, f):
58 def check_roles(*fargs, **kw): 59 ''' 60 Strip off the decorator arguments so we can use those to check the 61 roles of the current caller. 62 63 Note: the first argument cannot be "self" because we get a parse error 64 "takes at least 1 argument" unless the instance is actually included in 65 the argument list, which is redundant. If this wraps a class instance, 66 the "self" will be the first argument. 67 ''' 68 69 # Determine which roles will be checked by this instance of the decorator 70 roles = {'consumer': None, 'admin': None, 'consumer_id': None} 71 for key in self.dec_kw.keys(): 72 roles[key] = self.dec_kw[key] 73 74 user = None 75 76 # Admin role trumps any other checking 77 if roles['admin']: 78 # If not using cert check uname and password 79 try: 80 user = self.check_admin(*fargs) 81 except PulpException, pe: 82 # TODO: Figure out how to re-use the same return function in base.py 83 http.status_unauthorized() 84 http.header('Content-Type', 'application/json') 85 return json.dumps(pe.value, default=pymongo.json_util.default) 86 87 # Consumer role checking 88 if not user and (roles['consumer'] or roles['consumer_id']): 89 user = self.check_consumer(roles['consumer_id'], *fargs) 90 91 # Process the results of the auth checks 92 if not user: 93 http.status_unauthorized() 94 http.header('Content-Type', 'application/json') 95 return json.dumps("Authorization failed. Check your username and password or your certificate", 96 default=pymongo.json_util.default) 97 98 # If we get this far, access has been granted (in access failed the method 99 # would have returned by now) 100 principal.set_principal(user) 101 102 # If it wraps a class instance, call the function on the instance; 103 # otherwise just call it directly 104 if fargs and getattr(fargs[0], '__class__', None): 105 instance, fargs = fargs[0], fargs[1:] 106 result = f(instance, *fargs, **kw) 107 else: 108 result = f(*(fargs), **kw) 109 110 return result
111 112 # Save wrapped function reference 113 self.f = f 114 check_roles.__name__ = f.__name__ 115 check_roles.__dict__.update(f.__dict__) 116 check_roles.__doc__ = f.__doc__ 117 return check_roles
118
119 - def check_admin(self, *fargs):
120 ''' 121 Checks the request to see if it contains a valid admin authentication. 122 123 @return: user instance of the authenticated user if valid 124 credentials were specified; None otherwise 125 @rtype: L{pulp.server.db.model.User} 126 ''' 127 128 return self.check_admin_cert(*fargs) or self.check_username_pass(*fargs)
129
130 - def check_admin_cert(self, *fargs):
131 ''' 132 Determines if the certificate in the request represents a valid admin certificate. 133 134 @return: user instance of the authenticated user if valid 135 credentials were specified; None otherwise 136 @rtype: L{pulp.server.db.model.User} 137 ''' 138 139 # Extract the certificate from the request 140 environment = web.ctx.environ 141 cert_pem = environment.get('SSL_CLIENT_CERT', None) 142 143 # If there's no certificate, punch out early 144 if cert_pem is None: 145 return None 146 147 # Parse the certificate and extract the consumer UUID 148 idcert = Certificate(content=cert_pem) 149 subject = idcert.subject() 150 encoded_user = subject.get('CN', None) 151 152 # Verify the certificate has been signed by the pulp CA 153 valid = cert_generator.verify_cert(cert_pem) 154 if not valid: 155 LOG.error('Admin certificate with CN [%s] is signed by a foreign CA' % encoded_user) 156 return None 157 158 # If there is no user/pass combo, this is not a valid admin certificate 159 if not encoded_user: 160 return None 161 162 # Make sure the certificate is an admin certificate 163 if not cert_generator.is_admin_user(encoded_user): 164 return None 165 166 # Parse out the username and id from the certificate information 167 username, id = cert_generator.decode_admin_user(encoded_user) 168 169 # Verify a user exists with the given name 170 user = USER_API.user(username) 171 if user is None: 172 LOG.error('User [%s] specified in certificate was not found in the system' % username) 173 return None 174 175 # Verify the correct user ID 176 if id != user['id']: 177 LOG.error('ID in admin certificate for user [%s] was incorrect' % username) 178 return None 179 180 return user
181
182 - def check_username_pass(self, *fargs):
183 ''' 184 If the request uses HTTP authorization, verify the credentials identify a 185 valid user in the system. 186 187 @return: user instance of the authenticated user if valid 188 credentials were specified; None otherwise 189 @rtype: L{pulp.server.db.model.User} 190 ''' 191 192 # Get the credentials from the request 193 environment = web.ctx.environ 194 auth_string = environment.get('HTTP_AUTHORIZATION', None) 195 196 # If credentials were passed in, verify them 197 if auth_string is not None and auth_string.startswith("Basic"): 198 199 # Parse out the credentials 200 encoded_auth = auth_string.split(" ")[1] 201 auth_string = base64.decodestring(encoded_auth) 202 uname_pass = auth_string.split(":") 203 username = uname_pass[0] 204 password = uname_pass[1] 205 206 # Verify a user exists with the given name 207 user = USER_API.user(username) 208 if user is None: 209 LOG.error('User [%s] specified in certificate was not found in the system' % 210 username) 211 return None 212 213 # Verify the correct password was specified 214 good_password = password_util.check_password(user['password'], password) 215 if not good_password: 216 LOG.error('Password for user [%s] was incorrect' % username) 217 return None 218 219 return user 220 221 return None
222
223 - def check_consumer(self, check_id=False, *fargs):
224 ''' 225 Determines if the certificate in the request represents a valid consumer certificate. 226 227 @param check_id: if True, the consumer UID will be checked to make sure it 228 is present in the fargs argument; if False the only validation 229 will be that the UID exists in the DB (default = False) 230 @type check_id: boolean 231 232 @return: user instance of the authenticated user if valid 233 credentials were specified; None otherwise 234 @rtype: L{pulp.server.db.model.User} 235 ''' 236 237 # Extract the certificate from the request 238 environment = web.ctx.environ 239 cert_pem = environment.get('SSL_CLIENT_CERT', None) 240 241 # If there's no certificate, punch out early 242 if cert_pem is None: 243 return None 244 245 # Parse the certificate and extract the consumer UUID 246 idcert = Certificate(content=cert_pem) 247 subject = idcert.subject() 248 consumer_cert_uid = subject.get('CN', None) 249 250 # Verify the certificate has been signed by the pulp CA 251 valid = cert_generator.verify_cert(cert_pem) 252 if not valid: 253 LOG.error('Consumer certificate with CN [%s] is signed by a foreign CA' % consumer_cert_uid) 254 return None 255 256 # If there is no consumer ID, this is not a valid consumer certificate 257 if consumer_cert_uid is None: 258 LOG.error("Consumer UID not found in certificate") 259 return None 260 261 # Check that it is a valid consumer in our DB 262 consumer = CONSUMER_API.consumer(consumer_cert_uid) 263 if not consumer: 264 LOG.error("Consumer with id [%s] does not exist" % consumer_cert_uid) 265 return None 266 267 # If the ID should be explicitly verified against a provided list, the ID in the 268 # certificate will be verified against the ID found in the certificate 269 good_certificate = False 270 if check_id: 271 for arg in fargs: 272 LOG.error("Checking ID in cert [%s] against expected ID [%s]" % 273 (consumer_cert_uid, arg)) 274 if arg == consumer_cert_uid: 275 good_certificate = True 276 break 277 else: 278 good_certificate = True 279 280 if good_certificate: 281 return consumer 282 else: 283 return None
284