1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Contains request delivery policies.
18 """
19
20 from pulp.messaging import *
21 from pulp.messaging.dispatcher import Return
22 from pulp.messaging.consumer import Reader
23 from logging import getLogger
24
25 log = getLogger(__name__)
26
27
28
30 """
31 Request timeout.
32 """
33
36
37
39 """
40 Base class for request methods.
41 @ivar producer: A queue producer.
42 @type producer: L{pulp.messaging.producer.Producer}
43 """
44
46 """
47 @param producer: A queue producer.
48 @type producer: L{pulp.messaging.producer.Producer}
49 """
50 self.producer = producer
51
52 - def send(self, address, request, **any):
53 """
54 Send the request..
55 @param address: The destination queue address.
56 @type address: str
57 @param request: A request to send.
58 @type request: object
59 @keyword any: Any (extra) data.
60 """
61 pass
62
63 - def broadcast(self, addresses, request, **any):
64 """
65 Broadcast the request.
66 @param addresses: A list of destination queue addresses.
67 @type addresses: [str,..]
68 @param request: A request to send.
69 @type request: object
70 @keyword any: Any (extra) data.
71 """
72 pass
73
75 """
76 Close and release all resources.
77 """
78 self.producer.close()
79
80
82 """
83 The synchronous request method.
84 This method blocks until a reply is received.
85 @ivar reader: A queue reader used to read the reply.
86 @type reader: L{pulp.messaging.consumer.Reader}
87 """
88
102
103 - def send(self, destination, request, **any):
104 """
105 Send the request then read the reply.
106 @param destination: The destination queue address.
107 @type destination: str
108 @param request: A request to send.
109 @type request: object
110 @keyword any: Any (extra) data.
111 @return: The result of the request.
112 @rtype: object
113 @raise Exception: returned by the peer.
114 """
115 sn = self.producer.send(
116 destination,
117 replyto=str(self.queue),
118 request=request,
119 **any)
120 self.__getstarted(sn)
121 return self.__getreply(sn)
122
129
131 """
132 Get the reply matched by serial number.
133 @param sn: The request serial number.
134 @type sn: str
135 @return: The matched reply envelope.
136 @rtype: L{Envelope}
137 """
138 envelope = self.reader.search(sn, self.timeout)
139 if not envelope:
140 raise RequestTimeout(sn)
141 reply = Return(envelope.result)
142 self.reader.ack()
143 if reply.succeeded():
144 return reply.retval
145 else:
146 raise Exception, reply.exval
147
148
150 """
151 The asynchronous request method.
152 """
153
154 - def __init__(self, producer, tag=None):
155 """
156 @param producer: A queue producer.
157 @type producer: L{pulp.messaging.producer.Producer}
158 @param tag: A reply I{correlation} tag.
159 @type tag: str
160 """
161 RequestMethod.__init__(self, producer)
162 self.tag = tag
163
164 - def send(self, destination, request, **any):
165 """
166 Send the specified request and redirect the reply to the
167 queue for the specified reply I{correlation} tag.
168 @param destination: The AMQP destination.
169 @type destination: L{Destination}
170 @param request: A request to send.
171 @type request: object
172 @keyword any: Any (extra) data.
173 @return: The request serial number.
174 @rtype: str
175 """
176 sn = self.producer.send(
177 destination,
178 replyto=self.__replyto(),
179 request=request,
180 **any)
181 return sn
182
183 - def broadcast(self, destinations, request, **any):
184 """
185 Send the specified request and redirect the reply to the
186 queue for the specified reply I{correlation} tag.
187 @param destinations: A list of destinations.
188 @type destinations: [L{Destination},..]
189 @param request: A request to send.
190 @type request: object
191 @keyword any: Any (extra) data.
192 """
193 sns = self.producer.broadcast(
194 destinations,
195 replyto=self.__replyto(),
196 request=request,
197 **any)
198 return sns
199
201 """
202 Get replyto based on the correlation I{tag}.
203 @return: The replyto AMQP address.
204 @rtype: str
205 """
206 if self.tag:
207 queue = Queue(self.tag)
208 return str(queue)
209 else:
210 return None
211