Package pulp :: Package messaging :: Module policy
[hide private]
[frames] | no frames]

Source Code for Module pulp.messaging.policy

  1  # 
  2  # Copyright (c) 2010 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16  """ 
 17  Contains request delivery policies. 
 18  """ 
 19   
 20  from pulp.messaging import * 
 21  from pulp.messaging.dispatcher import Return 
 22  from pulp.messaging.consumer import Reader 
 23  from logging import getLogger 
 24   
 25  log = getLogger(__name__) 
 26   
 27   
 28   
29 -class RequestTimeout(Exception):
30 """ 31 Request timeout. 32 """ 33
34 - def __init__(self, sn):
35 Exception.__init__(self, sn)
36 37
38 -class RequestMethod:
39 """ 40 Base class for request methods. 41 @ivar producer: A queue producer. 42 @type producer: L{pulp.messaging.producer.Producer} 43 """ 44
45 - def __init__(self, producer):
46 """ 47 @param producer: A queue producer. 48 @type producer: L{pulp.messaging.producer.Producer} 49 """ 50 self.producer = producer
51
52 - def send(self, address, request, **any):
53 """ 54 Send the request.. 55 @param address: The destination queue address. 56 @type address: str 57 @param request: A request to send. 58 @type request: object 59 @keyword any: Any (extra) data. 60 """ 61 pass
62
63 - def broadcast(self, addresses, request, **any):
64 """ 65 Broadcast the request. 66 @param addresses: A list of destination queue addresses. 67 @type addresses: [str,..] 68 @param request: A request to send. 69 @type request: object 70 @keyword any: Any (extra) data. 71 """ 72 pass
73
74 - def close(self):
75 """ 76 Close and release all resources. 77 """ 78 self.producer.close()
79 80
81 -class Synchronous(RequestMethod):
82 """ 83 The synchronous request method. 84 This method blocks until a reply is received. 85 @ivar reader: A queue reader used to read the reply. 86 @type reader: L{pulp.messaging.consumer.Reader} 87 """ 88
89 - def __init__(self, producer, timeout):
90 """ 91 @param producer: A queue producer. 92 @type producer: L{pulp.messaging.producer.Producer} 93 @param timeout: The request timeout (seconds). 94 @type timeout: int 95 """ 96 self.timeout = timeout 97 self.queue = Queue(getuuid(), durable=False) 98 RequestMethod.__init__(self, producer) 99 reader = Reader(self.queue, url=self.producer.url) 100 reader.start() 101 self.reader = reader
102
103 - def send(self, destination, request, **any):
104 """ 105 Send the request then read the reply. 106 @param destination: The destination queue address. 107 @type destination: str 108 @param request: A request to send. 109 @type request: object 110 @keyword any: Any (extra) data. 111 @return: The result of the request. 112 @rtype: object 113 @raise Exception: returned by the peer. 114 """ 115 sn = self.producer.send( 116 destination, 117 replyto=str(self.queue), 118 request=request, 119 **any) 120 self.__getstarted(sn) 121 return self.__getreply(sn)
122
123 - def __getstarted(self, sn):
124 envelope = self.reader.search(sn, self.timeout) 125 if envelope: 126 log.info('request (%s), started', sn) 127 else: 128 raise RequestTimeout(sn)
129
130 - def __getreply(self, sn):
131 """ 132 Get the reply matched by serial number. 133 @param sn: The request serial number. 134 @type sn: str 135 @return: The matched reply envelope. 136 @rtype: L{Envelope} 137 """ 138 envelope = self.reader.search(sn, self.timeout) 139 if not envelope: 140 raise RequestTimeout(sn) 141 reply = Return(envelope.result) 142 self.reader.ack() 143 if reply.succeeded(): 144 return reply.retval 145 else: 146 raise Exception, reply.exval
147 148
149 -class Asynchronous(RequestMethod):
150 """ 151 The asynchronous request method. 152 """ 153
154 - def __init__(self, producer, tag=None):
155 """ 156 @param producer: A queue producer. 157 @type producer: L{pulp.messaging.producer.Producer} 158 @param tag: A reply I{correlation} tag. 159 @type tag: str 160 """ 161 RequestMethod.__init__(self, producer) 162 self.tag = tag
163
164 - def send(self, destination, request, **any):
165 """ 166 Send the specified request and redirect the reply to the 167 queue for the specified reply I{correlation} tag. 168 @param destination: The AMQP destination. 169 @type destination: L{Destination} 170 @param request: A request to send. 171 @type request: object 172 @keyword any: Any (extra) data. 173 @return: The request serial number. 174 @rtype: str 175 """ 176 sn = self.producer.send( 177 destination, 178 replyto=self.__replyto(), 179 request=request, 180 **any) 181 return sn
182
183 - def broadcast(self, destinations, request, **any):
184 """ 185 Send the specified request and redirect the reply to the 186 queue for the specified reply I{correlation} tag. 187 @param destinations: A list of destinations. 188 @type destinations: [L{Destination},..] 189 @param request: A request to send. 190 @type request: object 191 @keyword any: Any (extra) data. 192 """ 193 sns = self.producer.broadcast( 194 destinations, 195 replyto=self.__replyto(), 196 request=request, 197 **any) 198 return sns
199
200 - def __replyto(self):
201 """ 202 Get replyto based on the correlation I{tag}. 203 @return: The replyto AMQP address. 204 @rtype: str 205 """ 206 if self.tag: 207 queue = Queue(self.tag) 208 return str(queue) 209 else: 210 return None
211