Package pulp :: Package messaging :: Module consumer
[hide private]
[frames] | no frames]

Source Code for Module pulp.messaging.consumer

  1  # 
  2  # Copyright (c) 2010 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16  """ 
 17  Provides AMQP message consumer classes. 
 18  """ 
 19   
 20  from pulp.messaging import * 
 21  from pulp.messaging.endpoint import Endpoint 
 22  from pulp.messaging.producer import Producer 
 23  from pulp.messaging.dispatcher import Return 
 24  from pulp.messaging.window import * 
 25  from pulp.messaging.store import PendingQueue, PendingReceiver 
 26  from qpid.messaging import Empty 
 27  from threading import Thread 
 28  from logging import getLogger 
 29   
 30  log = getLogger(__name__) 
 31   
 32   
33 -class ReceiverThread(Thread):
34 """ 35 Consumer (worker) thread. 36 @ivar __run: The main run/read flag. 37 @type __run: bool 38 @ivar consumer: A consumer that is notified when 39 messages are read. 40 @type consumer: L{Consumer} 41 """ 42
43 - def __init__(self, consumer):
44 """ 45 @param consumer: A consumer that is notified when 46 messages are read. 47 @type consumer: L{Consumer} 48 """ 49 self.__run = True 50 self.consumer = consumer 51 Thread.__init__(self, name=consumer.id())
52
53 - def run(self):
54 """ 55 Messages are read from consumer.receiver and 56 dispatched to the consumer.received(). 57 """ 58 m = None 59 receiver = self.consumer.receiver 60 while self.__run: 61 try: 62 m = receiver.fetch(timeout=1) 63 self.consumer.received(m) 64 except Empty: 65 pass 66 except Exception: 67 log.error('failed:\n%s', m, exc_info=True)
68
69 - def stop(self):
70 """ 71 Stop reading the receiver and terminate 72 the thread. 73 """ 74 self.__run = False
75 76
77 -class Consumer(Endpoint):
78 """ 79 An AMQP (abstract) consumer. 80 """ 81
82 - def __init__(self, destination, **other):
83 """ 84 @param destination: The destination to consumer. 85 @type destination: L{Destination} 86 """ 87 self.destination = destination 88 Endpoint.__init__(self, **other)
89
90 - def id(self):
91 """ 92 Get the endpoint id 93 @return: The destination (simple) address. 94 @rtype: str 95 """ 96 return repr(self.destination)
97
98 - def address(self):
99 """ 100 Get the AMQP address for this endpoint. 101 @return: The AMQP address. 102 @rtype: str 103 """ 104 return str(self.destination)
105
106 - def open(self):
107 """ 108 Open and configure the consumer. 109 """ 110 session = self.session() 111 address = self.address() 112 log.info('{%s} opening %s', self.id(), address) 113 receiver = session.receiver(address) 114 self.receiver = receiver
115
116 - def start(self):
117 """ 118 Start processing messages on the queue. 119 """ 120 self.thread = ReceiverThread(self) 121 self.thread.start()
122
123 - def stop(self):
124 """ 125 Stop processing requests. 126 """ 127 try: 128 self.thread.stop() 129 except: 130 pass
131
132 - def join(self):
133 """ 134 Join the worker thread. 135 """ 136 self.thread.join()
137
138 - def received(self, message):
139 """ 140 Process received request. 141 @param message: The received message. 142 @type message: qpid.messaging.Message 143 """ 144 envelope = Envelope() 145 subject = self.__subject(message) 146 envelope.load(message.content) 147 if subject: 148 envelope.subject = subject 149 log.info('{%s} received:\n%s', self.id(), envelope) 150 if self.valid(envelope): 151 self.dispatch(envelope) 152 self.ack()
153
154 - def valid(self, envelope):
155 """ 156 Check to see if the envelope is valid. 157 @param envelope: The received envelope. 158 @type envelope: qpid.messaging.Message 159 """ 160 valid = True 161 if envelope.version != version: 162 valid = False 163 log.info('{%s} version mismatch (discarded):\n%s', 164 self.id(), envelope) 165 return valid
166
167 - def dispatch(self, envelope):
168 """ 169 Dispatch received request. 170 @param envelope: The received envelope. 171 @type envelope: qpid.messaging.Message 172 """ 173 pass
174
175 - def __subject(self, message):
176 """ 177 Extract the message subject. 178 @param message: The received message. 179 @type message: qpid.messaging.Message 180 @return: The message subject 181 @rtype: str 182 """ 183 return message.properties.get('qpid.subject')
184 185
186 -class Reader(Consumer):
187
188 - def start(self):
189 pass
190
191 - def stop(self):
192 pass
193
194 - def next(self, timeout=90):
195 """ 196 Get the next envelope from the queue. 197 @param timeout: The read timeout. 198 @type timeout: int 199 @return: The next envelope. 200 @rtype: L{Envelope} 201 """ 202 try: 203 message = self.receiver.fetch(timeout=timeout) 204 envelope = Envelope() 205 envelope.load(message.content) 206 log.info('{%s} read next:\n%s', self.id(), envelope) 207 return envelope 208 except Empty: 209 pass
210
211 - def search(self, sn, timeout=90):
212 """ 213 Seach the reply queue for the envelope with 214 the matching serial #. 215 @param sn: The expected serial number. 216 @type sn: str 217 @param timeout: The read timeout. 218 @type timeout: int 219 @return: The next envelope. 220 @rtype: L{Envelope} 221 """ 222 log.info('{%s} searching for: sn=%s', self.id(), sn) 223 while True: 224 envelope = self.next(timeout) 225 if not envelope: 226 return 227 if sn == envelope.sn: 228 log.info('{%s} search found:\n%s', self.id(), envelope) 229 return envelope 230 else: 231 log.info('{%s} search discarding:\n%s', self.id(), envelope) 232 self.ack()
233 234
235 -class RequestConsumer(Consumer):
236 """ 237 An AMQP request consumer. 238 @ivar producer: A reply producer. 239 @type producer: L{pulp.messaging.producer.Producer} 240 @ivar dispatcher: An RMI dispatcher. 241 @type dispatcher: L{pulp.messaging.dispatcher.Dispatcher} 242 """ 243
244 - def start(self, dispatcher):
245 """ 246 Start processing messages on the queue using the 247 specified dispatcher. 248 @param dispatcher: An RMI dispatcher. 249 @type dispatcher: L{pulp.messaging.Dispatcher} 250 """ 251 q = PendingQueue(self.id()) 252 self.pending = PendingReceiver(q, self) 253 self.dispatcher = dispatcher 254 self.producer = Producer(url=self.url) 255 Consumer.start(self) 256 self.pending.start()
257
258 - def dispatch(self, envelope):
259 """ 260 Dispatch received request. 261 @param envelope: The received envelope. 262 @type envelope: L{Envelope} 263 """ 264 try: 265 self.checkwindow(envelope) 266 request = envelope.request 267 self.sendstarted(envelope) 268 result = self.dispatcher.dispatch(request) 269 except WindowMissed: 270 result = Return.exception() 271 except WindowPending: 272 return 273 self.sendreply(envelope, result)
274
275 - def sendreply(self, envelope, result):
276 """ 277 Send the reply if requested. 278 @param envelope: The received envelope. 279 @type envelope: L{Envelope} 280 @param result: The request result. 281 @type result: object 282 """ 283 sn = envelope.sn 284 any = envelope.any 285 replyto = envelope.replyto 286 if not replyto: 287 return 288 try: 289 self.producer.send( 290 replyto, 291 sn=sn, 292 any=any, 293 result=result) 294 except: 295 log.error('send failed:\n%s', result, exc_info=True)
296
297 - def sendstarted(self, envelope):
298 """ 299 Send the a status update if requested. 300 @param envelope: The received envelope. 301 @type envelope: L{Envelope} 302 """ 303 sn = envelope.sn 304 any = envelope.any 305 replyto = envelope.replyto 306 if not replyto: 307 return 308 try: 309 self.producer.send( 310 replyto, 311 sn=sn, 312 any=any, 313 status='started') 314 except: 315 log.error('send (started), failed', exc_info=True)
316
317 - def checkwindow(self, envelope):
318 """ 319 Check the window. 320 @param envelope: The received envelope. 321 @type envelope: L{Envelope} 322 """ 323 window = Window(envelope.window) 324 if window.future(): 325 pending = self.pending.queue 326 pending.add(envelope) 327 raise WindowPending(envelope.sn) 328 if window.past(): 329 raise WindowMissed(envelope.sn)
330
331 - def __del__(self):
332 try: 333 self.pending.stop() 334 self.pending.join(10) 335 except: 336 pass
337 338
339 -class EventConsumer(Consumer):
340 """ 341 An AMQP event consumer. 342 """ 343
344 - def __init__(self, subject=None, name=None, **other):
345 """ 346 @param subject: An (optional) event subject. 347 @type subject: str 348 """ 349 topic = Topic('event', subject, name) 350 Consumer.__init__(self, topic, **other)
351
352 - def dispatch(self, envelope):
353 """ 354 Process received request. 355 @param envelope: The received envelope. 356 @type envelope: L{Envelope} 357 """ 358 try: 359 subject = envelope.subject 360 body = envelope.event 361 self.raised(subject, body) 362 except Exception, e: 363 log.exception(e) 364 self.ack()
365
366 - def raised(self, subject, event):
367 """ 368 Notify the listener that an event has been raised. 369 @param subject: The event subject. 370 @type subject: str 371 @param event: The event body. 372 @type event: any 373 """ 374 pass
375