Package pulp :: Package server :: Package event :: Module dispatcher
[hide private]
[frames] | no frames]

Source Code for Module pulp.server.event.dispatcher

  1  #! /usr/bin/env python 
  2  # 
  3  # Copyright (c) 2010 Red Hat, Inc. 
  4  # 
  5  # This software is licensed to you under the GNU General Public License, 
  6  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  7  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  8  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  9  # along with this software; if not, see 
 10  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 11  # 
 12  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 13  # granted to use or replicate Red Hat trademarks that are incorporated 
 14  # in this software or its documentation. 
 15  # 
 16   
 17  """ 
 18  Contains QPID event classes. 
 19  """ 
 20   
 21  import os 
 22  from pulp.server.event import * 
 23  from pulp.messaging.consumer import EventConsumer 
 24  from threading import RLock as Mutex 
 25  import pulp.server.event.handler as hpx 
 26  from logging import getLogger 
 27   
 28  log = getLogger(__name__) 
 29  mutex = Mutex() 
 30  flags = EventFlags() 
31 32 33 # decorator workspace for correlating 34 # methods to classes. 35 _method = ({},{}) 36 37 38 -def handler(entity):
39 """ 40 Event handler decorator. 41 Associate a handler class with an entity. 42 @param entity: The I{entity} part of an AMQP subject. 43 @type entity: str 44 """ 45 def decorator(cls): 46 mutex.acquire() 47 try: 48 global _method 49 EventDispatcher.register( 50 entity, 51 cls, 52 _method[0], 53 _method[1]) 54 _method = ({},{}) 55 finally: 56 mutex.release() 57 return cls
58 return decorator 59
60 -def inbound(action):
61 """ 62 Event (inbound) method decorator. 63 Associate a handler method with an action. 64 @param action: The I{action} part of an AMQP subject. 65 @type action: str 66 """ 67 def decorator(fn): 68 mutex.acquire() 69 try: 70 _method[0][action] = fn 71 finally: 72 mutex.release() 73 return fn
74 return decorator 75
76 -def outbound(action):
77 """ 78 Event (inbound) method decorator. 79 Associate a handler method with an action. 80 @param action: The I{action} part of an AMQP subject. 81 @type action: str 82 """ 83 def decorator(fn): 84 mutex.acquire() 85 try: 86 _method[1][action] = fn 87 finally: 88 mutex.release() 89 return fn
90 return decorator 91
92 -def event(subject):
93 """ 94 The decorator for API methods. 95 Using the dispatcher, an event is resied when the method is invoked. 96 The I{noevent} param specifies whether an event should be raised. 97 Used mostly by the I{inbound} event handlers. 98 @param subject: An AMQP subject form: <entity>.<action>. 99 @type subject: str 100 """ 101 def decorator(fn): 102 def call(*args, **kwargs): 103 retval = fn(*args, **kwargs) 104 if not flags.suspended(subject): 105 entity, action = subject.split('.',1) 106 inst, method = \ 107 EventDispatcher.handler(entity, outbound=action) 108 method(inst, *args, **kwargs) 109 return retval
110 return call 111 return decorator 112
113 114 -class Handler:
115 """ 116 Handler specification. 117 @ivar hclass: The handler class name. 118 @type hclass: str 119 @ivar ibmethod: registered I{inbound} methods. 120 @type ibmethod: method 121 @ivar obmethod: registered I{outbound} methods. 122 @type obmethod: method 123 """ 124
125 - def __init__(self, hclass, inbound, outbound):
126 """ 127 @param hclass: The handler class name. 128 @type hclass: str 129 @param ibmethod: registered I{inbound} methods. 130 @type ibmethod: method 131 @param obmethod: registered I{outbound} methods. 132 @type obmethod: method 133 """ 134 self.hclass = hclass 135 self.ibmethod = inbound 136 self.obmethod = outbound
137
138 - def inst(self):
139 """ 140 Get an instance of the handler. 141 @return: An instance of the handler. 142 @rtype: L{EventHandler} 143 """ 144 return self.hclass()
145
146 - def inbound(self, action):
147 """ 148 Find the I{inbound} method for the specified action. 149 @param action: An event subject action. 150 @type action: str 151 @return: The method registered for the (inbound) action. 152 @rtype: method 153 """ 154 method = self.ibmethod.get(action) 155 if method: 156 return method 157 raise Exception,\ 158 'handler %s has not method registered for (inbound) action: %s' %\ 159 (self.hclass.__name__, 160 action)
161
162 - def outbound(self, action):
163 """ 164 Find the I{outbound} method for the specified action. 165 @param action: An event subject action. 166 @type action: str 167 @return: The method registered for the (outbound) action. 168 @rtype: method 169 """ 170 method = self.obmethod.get(action) 171 if method: 172 return method 173 raise Exception,\ 174 'handler %s has not method registered for (outbound) action: %s' %\ 175 (self.hclass.__name__, 176 action)
177
178 179 -class EventDispatcher(EventConsumer):
180 """ 181 The main event dispatcher. 182 Dispatches events by subject to the registered handler. 183 @cvar handlers: Registered event handler classes. 184 Key: entity. 185 @type handlers: dict 186 """ 187 188 handlers = {} 189
190 - def __init__(self):
191 EventConsumer.__init__(self, None)
192
193 - def start(self):
194 EventConsumer.start(self) 195 log.info('Event dispatcher - started.')
196 197 @classmethod
198 - def register(cls, entity, hclass, inbound, outbound):
199 """ 200 Register a handler. 201 @param entity: The entity name. 202 @type entity: str 203 @param hclass: The handler class. 204 @param hclass: class 205 @param inbound: The I{inbound} method mappings. 206 @type inbound: dict 207 @param outbound: The I{outbound} method mappings. 208 @type outbound: dict 209 """ 210 cls.handlers[entity] = \ 211 Handler(hclass, inbound, outbound)
212 213 @classmethod
214 - def load(cls):
215 """ 216 Load handlers. 217 """ 218 mutex.acquire() 219 try: 220 if cls.handlers: 221 return 222 loader = DynLoader(hpx) 223 loader.load() 224 finally: 225 mutex.release()
226 227 @classmethod
228 - def handler(cls, entity, inbound=None, outbound=None):
229 """ 230 Get a handler class associated with the specified entity. 231 @param entity: The I{entity} part of an AMQP subject. 232 @type entity: str 233 @param inbound: The I{inbound} action. 234 @type inbound: str 235 @param outbound: The I{outbound} action. 236 @type outbound: str 237 @return: The handler instance and method based on whether the 238 I{inbound} or I{outbound} param is specified. 239 @rtype: tuple (inst,method) 240 """ 241 mutex.acquire() 242 try: 243 cls.load() 244 handler = cls.handlers.get(entity) 245 if handler is None: 246 raise Exception,\ 247 'handler for entity "%s", not found' % entity 248 inst = handler.inst() 249 if inbound: 250 method = handler.inbound(inbound) 251 else: 252 method = handler.outbound(outbound) 253 return (inst, method) 254 finally: 255 mutex.release()
256
257 - def raised(self, subject, event):
258 """ 259 Entry point (callback) for received AMQP events. 260 The event is dispatched to the registered handler and the 261 I{inbound} method is called. 262 @param subject: The event (message) subject used for routing. 263 @type subject: str 264 @param event: The event payload. 265 @type event: dict 266 """ 267 try: 268 log.info('received event (%s): %s', 269 subject, 270 event) 271 entity, action = subject.split('.',1) 272 inst, method = self.handler(entity, inbound=action) 273 log.info('dispatching event (%s): %s\n to handler: %s.%s()', 274 subject, 275 event, 276 inst.__class__, 277 method.__name__) 278 flags.suspend(subject) 279 try: 280 method(inst, event) 281 finally: 282 flags.resume(subject) 283 except: 284 log.error('{inbound} event failed (%s):\n%s', 285 subject, 286 event, 287 exc_info=True)
288
289 290 -class EventHandler:
291 """ 292 The event handler base class. 293 """ 294 pass
295
296 297 -class DynLoader:
298 """ 299 A dynamic module loader. 300 @ivar pkg: A package object. 301 @type pkg: python module 302 """ 303
304 - def __init__(self, pkg):
305 """ 306 @param pkg: A package object. 307 @type pkg: python module 308 """ 309 self.pkg = pkg
310
311 - def load(self):
312 """ 313 Load all modules within the package. 314 """ 315 loaded = [] 316 path = os.path.dirname(self.pkg.__file__) 317 for fn in os.listdir(path): 318 if fn.startswith('__'): 319 continue 320 mod, ext = fn.rsplit('.',1) 321 if mod in loaded: 322 continue 323 part = path.split('/') 324 part.append(mod) 325 self.__import(part[1:]) 326 loaded.append(mod)
327
328 - def __import(self, path):
329 """ 330 Import modules the the specified path. 331 @param path: A list of path elements. 332 @type path: list 333 """ 334 for i in range(0, len(path)): 335 mod = '.'.join(path[i:]) 336 try: 337 __import__(mod) 338 log.info('%s - imported.', mod) 339 return # succeeded 340 except: 341 pass 342 raise ImportError, '.'.join(path)
343