Package pulp :: Package messaging :: Module async
[hide private]
[frames] | no frames]

Source Code for Module pulp.messaging.async

  1  # 
  2  # Copyright (c) 2010 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16  """ 
 17  Provides async AMQP message consumer classes. 
 18  """ 
 19   
 20  from pulp.messaging import * 
 21  from pulp.messaging.dispatcher import Return 
 22  from pulp.messaging.consumer import Consumer 
 23  from logging import getLogger 
 24   
 25  log = getLogger(__name__) 
 26   
 27   
28 -class ReplyConsumer(Consumer):
29 """ 30 A request, reply consumer. 31 @ivar listener: An reply listener. 32 @type listener: any 33 """ 34
35 - def start(self, listener):
36 """ 37 Start processing messages on the queue and 38 forward to the listener. 39 @param listener: A reply listener. 40 @type listener: L{Listener} 41 """ 42 self.listener = listener 43 Consumer.start(self)
44
45 - def dispatch(self, envelope):
46 """ 47 Dispatch received request. 48 @param envelope: The received envelope. 49 @type envelope: L{Envelope} 50 """ 51 try: 52 reply = self.__getreply(envelope) 53 reply.notify(self.listener) 54 except Exception, e: 55 log.exception(e)
56
57 - def __getreply(self, envelope):
58 if envelope.status: 59 return Status(envelope) 60 result = Return(envelope.result) 61 if result.succeeded(): 62 return Succeeded(envelope) 63 else: 64 return Failed(envelope)
65 66 67
68 -class AsyncReply:
69 """ 70 Asynchronous request reply. 71 @ivar sn: The request serial number. 72 @type sn: str 73 @ivar origin: Which endpoint sent the reply. 74 @type origin: str 75 @ivar any: User defined (round-tripped) data. 76 @type any: object 77 """ 78
79 - def __init__(self, envelope):
80 """ 81 @param envelope: The received envelope. 82 @type envelope: L{Envelope} 83 """ 84 self.sn = envelope.sn 85 self.origin = envelope.origin 86 self.any = envelope.any
87
88 - def notify(self, listener):
89 """ 90 Notify the specified listener. 91 @param listener: The listener to notify. 92 @type listener: L{Listener} or callable. 93 """ 94 pass
95
96 - def __str__(self):
97 s = [] 98 s.append(self.__class__.__name__) 99 s.append(' sn : %s' % self.sn) 100 s.append(' origin : %s' % self.origin) 101 s.append(' user data : %s' % self.any) 102 return '\n'.join(s)
103 104
105 -class FinalReply(AsyncReply):
106 """ 107 A (final) reply. 108 """ 109
110 - def notify(self, listener):
111 if callable(listener): 112 listener(self) 113 return 114 if self.succeeded(): 115 listener.succeeded(self) 116 else: 117 listener.failed(self)
118
119 - def succeeded(self):
120 """ 121 Get whether the reply indicates success. 122 @return: True when succeeded. 123 @rtype: bool 124 """ 125 return False
126
127 - def failed(self):
128 """ 129 Get whether the reply indicates failure. 130 @return: True when failed. 131 @rtype: bool 132 """ 133 return ( not self.succeeded() )
134
135 - def throw(self):
136 """ 137 Throw contained exception. 138 @raise Exception: When contained. 139 """ 140 pass
141 142
143 -class Succeeded(FinalReply):
144 """ 145 Successful reply to asynchronous operation. 146 @ivar retval: The returned value. 147 @type retval: object 148 """ 149
150 - def __init__(self, envelope):
151 """ 152 @param envelope: The received envelope. 153 @type envelope: L{Envelope} 154 """ 155 AsyncReply.__init__(self, envelope) 156 reply = Return(envelope.result) 157 self.retval = reply.retval
158
159 - def succeeded(self):
160 return True
161
162 - def __str__(self):
163 s = [] 164 s.append(AsyncReply.__str__(self)) 165 s.append(' retval:') 166 s.append(str(self.retval)) 167 return '\n'.join(s)
168 169
170 -class Failed(FinalReply):
171 """ 172 Failed reply to asynchronous operation. This reply 173 indicates an exception was raised. 174 @ivar exval: The returned exception. 175 @type exval: object 176 @see: L{Failed.throw} 177 """ 178
179 - def __init__(self, envelope):
180 """ 181 @param envelope: The received envelope. 182 @type envelope: L{Envelope} 183 """ 184 AsyncReply.__init__(self, envelope) 185 reply = Return(envelope.result) 186 self.exval = Exception(reply.exval)
187
188 - def throw(self):
189 raise self.exval
190
191 - def __str__(self):
192 s = [] 193 s.append(AsyncReply.__str__(self)) 194 s.append(' exception:') 195 s.append(str(self.exval)) 196 return '\n'.join(s)
197 198
199 -class Status(AsyncReply):
200 """ 201 Status changed for an asynchronous operation. 202 @ivar status: The status. 203 @type status: str 204 @see: L{Failed.throw} 205 """ 206
207 - def __init__(self, envelope):
208 """ 209 @param envelope: The received envelope. 210 @type envelope: L{Envelope} 211 """ 212 AsyncReply.__init__(self, envelope) 213 self.status = 'started'
214
215 - def notify(self, listener):
216 if callable(listener): 217 listener(self) 218 else: 219 listener.status(self)
220
221 - def __str__(self):
222 s = [] 223 s.append(AsyncReply.__str__(self)) 224 s.append(' status: %s' % str(self.status)) 225 return '\n'.join(s)
226 227
228 -class Listener:
229 """ 230 An asynchronous operation callback listener. 231 """ 232
233 - def succeeded(self, reply):
234 """ 235 Async request succeeded. 236 @param reply: The reply data. 237 @type reply: L{Succeeded}. 238 """ 239 pass
240
241 - def failed(self, reply):
242 """ 243 Async request failed (raised an exception). 244 @param reply: The reply data. 245 @type reply: L{Failed}. 246 """ 247 pass
248
249 - def status(self, reply):
250 """ 251 Async request has started. 252 @param reply: The request. 253 @type reply: L{Status}. 254 """ 255 pass
256