1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Provides AMQP message consumer classes.
18 """
19
20 from pulp.messaging import *
21 from pulp.messaging.endpoint import Endpoint
22 from pulp.messaging.producer import Producer
23 from pulp.messaging.dispatcher import Return
24 from pulp.messaging.window import *
25 from pulp.messaging.store import PendingQueue, PendingReceiver
26 from qpid.messaging import Empty
27 from threading import Thread
28 from logging import getLogger
29
30 log = getLogger(__name__)
31
32
34 """
35 Consumer (worker) thread.
36 @ivar __run: The main run/read flag.
37 @type __run: bool
38 @ivar consumer: A consumer that is notified when
39 messages are read.
40 @type consumer: L{Consumer}
41 """
42
44 """
45 @param consumer: A consumer that is notified when
46 messages are read.
47 @type consumer: L{Consumer}
48 """
49 self.__run = True
50 self.consumer = consumer
51 Thread.__init__(self, name=consumer.id())
52
54 """
55 Messages are read from consumer.receiver and
56 dispatched to the consumer.received().
57 """
58 m = None
59 receiver = self.consumer.receiver
60 while self.__run:
61 try:
62 m = receiver.fetch(timeout=1)
63 self.consumer.received(m)
64 except Empty:
65 pass
66 except Exception:
67 log.error('failed:\n%s', m, exc_info=True)
68
70 """
71 Stop reading the receiver and terminate
72 the thread.
73 """
74 self.__run = False
75
76
78 """
79 An AMQP (abstract) consumer.
80 """
81
82 - def __init__(self, destination, **other):
83 """
84 @param destination: The destination to consumer.
85 @type destination: L{Destination}
86 """
87 self.destination = destination
88 Endpoint.__init__(self, **other)
89
91 """
92 Get the endpoint id
93 @return: The destination (simple) address.
94 @rtype: str
95 """
96 return repr(self.destination)
97
99 """
100 Get the AMQP address for this endpoint.
101 @return: The AMQP address.
102 @rtype: str
103 """
104 return str(self.destination)
105
115
122
124 """
125 Stop processing requests.
126 """
127 try:
128 self.thread.stop()
129 except:
130 pass
131
133 """
134 Join the worker thread.
135 """
136 self.thread.join()
137
139 """
140 Process received request.
141 @param message: The received message.
142 @type message: qpid.messaging.Message
143 """
144 envelope = Envelope()
145 subject = self.__subject(message)
146 envelope.load(message.content)
147 if subject:
148 envelope.subject = subject
149 log.info('{%s} received:\n%s', self.id(), envelope)
150 if self.valid(envelope):
151 self.dispatch(envelope)
152 self.ack()
153
154 - def valid(self, envelope):
155 """
156 Check to see if the envelope is valid.
157 @param envelope: The received envelope.
158 @type envelope: qpid.messaging.Message
159 """
160 valid = True
161 if envelope.version != version:
162 valid = False
163 log.info('{%s} version mismatch (discarded):\n%s',
164 self.id(), envelope)
165 return valid
166
168 """
169 Dispatch received request.
170 @param envelope: The received envelope.
171 @type envelope: qpid.messaging.Message
172 """
173 pass
174
176 """
177 Extract the message subject.
178 @param message: The received message.
179 @type message: qpid.messaging.Message
180 @return: The message subject
181 @rtype: str
182 """
183 return message.properties.get('qpid.subject')
184
185
187
190
193
194 - def next(self, timeout=90):
195 """
196 Get the next envelope from the queue.
197 @param timeout: The read timeout.
198 @type timeout: int
199 @return: The next envelope.
200 @rtype: L{Envelope}
201 """
202 try:
203 message = self.receiver.fetch(timeout=timeout)
204 envelope = Envelope()
205 envelope.load(message.content)
206 log.info('{%s} read next:\n%s', self.id(), envelope)
207 return envelope
208 except Empty:
209 pass
210
211 - def search(self, sn, timeout=90):
212 """
213 Seach the reply queue for the envelope with
214 the matching serial #.
215 @param sn: The expected serial number.
216 @type sn: str
217 @param timeout: The read timeout.
218 @type timeout: int
219 @return: The next envelope.
220 @rtype: L{Envelope}
221 """
222 log.info('{%s} searching for: sn=%s', self.id(), sn)
223 while True:
224 envelope = self.next(timeout)
225 if not envelope:
226 return
227 if sn == envelope.sn:
228 log.info('{%s} search found:\n%s', self.id(), envelope)
229 return envelope
230 else:
231 log.info('{%s} search discarding:\n%s', self.id(), envelope)
232 self.ack()
233
234
236 """
237 An AMQP request consumer.
238 @ivar producer: A reply producer.
239 @type producer: L{pulp.messaging.producer.Producer}
240 @ivar dispatcher: An RMI dispatcher.
241 @type dispatcher: L{pulp.messaging.dispatcher.Dispatcher}
242 """
243
244 - def start(self, dispatcher):
257
274
276 """
277 Send the reply if requested.
278 @param envelope: The received envelope.
279 @type envelope: L{Envelope}
280 @param result: The request result.
281 @type result: object
282 """
283 sn = envelope.sn
284 any = envelope.any
285 replyto = envelope.replyto
286 if not replyto:
287 return
288 try:
289 self.producer.send(
290 replyto,
291 sn=sn,
292 any=any,
293 result=result)
294 except:
295 log.error('send failed:\n%s', result, exc_info=True)
296
298 """
299 Send the a status update if requested.
300 @param envelope: The received envelope.
301 @type envelope: L{Envelope}
302 """
303 sn = envelope.sn
304 any = envelope.any
305 replyto = envelope.replyto
306 if not replyto:
307 return
308 try:
309 self.producer.send(
310 replyto,
311 sn=sn,
312 any=any,
313 status='started')
314 except:
315 log.error('send (started), failed', exc_info=True)
316
330
332 try:
333 self.pending.stop()
334 self.pending.join(10)
335 except:
336 pass
337
338
340 """
341 An AMQP event consumer.
342 """
343
344 - def __init__(self, subject=None, name=None, **other):
345 """
346 @param subject: An (optional) event subject.
347 @type subject: str
348 """
349 topic = Topic('event', subject, name)
350 Consumer.__init__(self, topic, **other)
351
353 """
354 Process received request.
355 @param envelope: The received envelope.
356 @type envelope: L{Envelope}
357 """
358 try:
359 subject = envelope.subject
360 body = envelope.event
361 self.raised(subject, body)
362 except Exception, e:
363 log.exception(e)
364 self.ack()
365
366 - def raised(self, subject, event):
367 """
368 Notify the listener that an event has been raised.
369 @param subject: The event subject.
370 @type subject: str
371 @param event: The event body.
372 @type event: any
373 """
374 pass
375