1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Provides async AMQP message consumer classes.
18 """
19
20 from pulp.messaging import *
21 from pulp.messaging.dispatcher import Return
22 from pulp.messaging.consumer import Consumer
23 from logging import getLogger
24
25 log = getLogger(__name__)
26
27
29 """
30 A request, reply consumer.
31 @ivar listener: An reply listener.
32 @type listener: any
33 """
34
35 - def start(self, listener):
36 """
37 Start processing messages on the queue and
38 forward to the listener.
39 @param listener: A reply listener.
40 @type listener: L{Listener}
41 """
42 self.listener = listener
43 Consumer.start(self)
44
46 """
47 Dispatch received request.
48 @param envelope: The received envelope.
49 @type envelope: L{Envelope}
50 """
51 try:
52 reply = self.__getreply(envelope)
53 reply.notify(self.listener)
54 except Exception, e:
55 log.exception(e)
56
65
66
67
69 """
70 Asynchronous request reply.
71 @ivar sn: The request serial number.
72 @type sn: str
73 @ivar origin: Which endpoint sent the reply.
74 @type origin: str
75 @ivar any: User defined (round-tripped) data.
76 @type any: object
77 """
78
80 """
81 @param envelope: The received envelope.
82 @type envelope: L{Envelope}
83 """
84 self.sn = envelope.sn
85 self.origin = envelope.origin
86 self.any = envelope.any
87
89 """
90 Notify the specified listener.
91 @param listener: The listener to notify.
92 @type listener: L{Listener} or callable.
93 """
94 pass
95
97 s = []
98 s.append(self.__class__.__name__)
99 s.append(' sn : %s' % self.sn)
100 s.append(' origin : %s' % self.origin)
101 s.append(' user data : %s' % self.any)
102 return '\n'.join(s)
103
104
106 """
107 A (final) reply.
108 """
109
111 if callable(listener):
112 listener(self)
113 return
114 if self.succeeded():
115 listener.succeeded(self)
116 else:
117 listener.failed(self)
118
120 """
121 Get whether the reply indicates success.
122 @return: True when succeeded.
123 @rtype: bool
124 """
125 return False
126
128 """
129 Get whether the reply indicates failure.
130 @return: True when failed.
131 @rtype: bool
132 """
133 return ( not self.succeeded() )
134
136 """
137 Throw contained exception.
138 @raise Exception: When contained.
139 """
140 pass
141
142
144 """
145 Successful reply to asynchronous operation.
146 @ivar retval: The returned value.
147 @type retval: object
148 """
149
151 """
152 @param envelope: The received envelope.
153 @type envelope: L{Envelope}
154 """
155 AsyncReply.__init__(self, envelope)
156 reply = Return(envelope.result)
157 self.retval = reply.retval
158
161
168
169
171 """
172 Failed reply to asynchronous operation. This reply
173 indicates an exception was raised.
174 @ivar exval: The returned exception.
175 @type exval: object
176 @see: L{Failed.throw}
177 """
178
180 """
181 @param envelope: The received envelope.
182 @type envelope: L{Envelope}
183 """
184 AsyncReply.__init__(self, envelope)
185 reply = Return(envelope.result)
186 self.exval = Exception(reply.exval)
187
190
197
198
200 """
201 Status changed for an asynchronous operation.
202 @ivar status: The status.
203 @type status: str
204 @see: L{Failed.throw}
205 """
206
208 """
209 @param envelope: The received envelope.
210 @type envelope: L{Envelope}
211 """
212 AsyncReply.__init__(self, envelope)
213 self.status = 'started'
214
216 if callable(listener):
217 listener(self)
218 else:
219 listener.status(self)
220
226
227
229 """
230 An asynchronous operation callback listener.
231 """
232
234 """
235 Async request succeeded.
236 @param reply: The reply data.
237 @type reply: L{Succeeded}.
238 """
239 pass
240
242 """
243 Async request failed (raised an exception).
244 @param reply: The reply data.
245 @type reply: L{Failed}.
246 """
247 pass
248
250 """
251 Async request has started.
252 @param reply: The request.
253 @type reply: L{Status}.
254 """
255 pass
256