Package pulp :: Package messaging
[hide private]
[frames] | no frames]

Source Code for Package pulp.messaging

  1  # 
  2  # Copyright (c) 2010 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16  from uuid import uuid4 
 17  import simplejson as json 
 18   
 19  version = '0.1' 
 20   
 21   
22 -def getuuid():
23 return str(uuid4())
24 25
26 -class Options(dict):
27 """ 28 Container options. 29 Options: 30 - async : Indicates that requests asynchronous. 31 Default = False 32 - ctag : The asynchronous correlation tag. 33 When specified, it implies all requests are asynchronous. 34 - window : The request window. See I{Window}. 35 Default = any time. 36 - timeout : The synchronous timeout (seconds). 37 Default = 90 seconds. 38 """ 39 __getattr__ = dict.get 40 __setattr__ = dict.__setitem__ 41 __delattr__= dict.__delitem__
42 43
44 -class Envelope(dict):
45 """ 46 Basic envelope is a json encoded/decoded dictionary 47 that provides dot (.) style access. 48 """ 49 50 __getattr__ = dict.get 51 __setattr__= dict.__setitem__ 52 __delattr__= dict.__delitem__ 53
54 - def load(self, s):
55 """ 56 Load using a json string. 57 @param s: A json encoded string. 58 @type s: str 59 """ 60 d = json.loads(s) 61 self.update(d) 62 return self
63
64 - def dump(self):
65 """ 66 Dump to a json string. 67 @return: A json encoded string. 68 @rtype: str 69 """ 70 d = self 71 return json.dumps(d, indent=2)
72
73 - def __str__(self):
74 return self.dump()
75 76
77 -class Destination:
78 """ 79 AMQP destinations (topics & queues) 80 """ 81
82 - def address(self):
83 """ 84 Get the destination I{formal} AMQP address which contains 85 properties used to create the destination. 86 @return: The destination address. 87 @rtype: str 88 """ 89 pass
90
91 - def delete(self, session):
92 """ 93 Delete the destination. 94 Implemented using a hack becauase python API does not 95 directly support removing destinations. 96 @param session: An AMQP session. 97 @type session: I{qpid.messaging.Session} 98 """ 99 address = '%s;{delete:always}' % repr(self) 100 sender = session.sender(address) 101 sender.close()
102
103 - def __repr__(self):
104 return str(self).split(';', 1)[0]
105 106
107 -class Topic(Destination):
108 """ 109 Represents and AMQP topic. 110 @ivar topic: The name of the topic. 111 @type topic: str 112 @ivar subject: The subject. 113 @type subject: str 114 @ivar name: The (optional) subscription name. 115 Used for durable subscriptions. 116 @type name: str 117 """ 118
119 - def __init__(self, topic, subject=None, name=None):
120 """ 121 @param topic: The name of the topic. 122 @type topic: str 123 @param subject: The subject. 124 @type subject: str 125 @param name: The (optional) subscription name. 126 Used for durable subscriptions. 127 @type name: str 128 """ 129 self.topic = topic 130 self.subject = subject 131 self.name = name
132
133 - def address(self):
134 """ 135 Get the topic I{formal} AMQP address which contains 136 properties used to create the topic. 137 @return: The topic address. 138 @rtype: str 139 """ 140 s = [] 141 s.append(self.topic) 142 if self.subject: 143 s.append('/%s' % self.subject) 144 s.append(';{') 145 s.append('create:always') 146 s.append(',node:{type:topic,durable:True}') 147 s.append(',link:{durable:True}') 148 s.append('}') 149 return ''.join(s)
150
151 - def queuedAddress(self):
152 """ 153 Get the topic I{durable} AMQP address which contains 154 properties used to create the topic. 155 @return: The topic address. 156 @rtype: str 157 """ 158 s = [] 159 s.append(self.name) 160 s.append(';{') 161 s.append('create:always') 162 s.append(',node:{type:topic,durable:True}') 163 s.append(',link:{durable:True') 164 s.append(',x-bindings:[') 165 s.append('{exchange:%s' % self.topic) 166 if self.subject: 167 s.append(',key:%s' % self.subject) 168 s.append('}]') 169 s.append('}}') 170 return ''.join(s)
171
172 - def __str__(self):
173 if self.name: 174 return self.queuedAddress() 175 else: 176 return self.address()
177 178
179 -class Queue(Destination):
180 """ 181 Represents and AMQP queue. 182 @ivar name: The name of the queue. 183 @type name: str 184 @ivar durable: The durable flag. 185 @type durable: str 186 """ 187
188 - def __init__(self, name, durable=True):
189 """ 190 @param name: The name of the queue. 191 @type name: str 192 @param durable: The durable flag. 193 @type durable: str 194 """ 195 self.name = name 196 self.durable = durable
197
198 - def address(self):
199 """ 200 Get the queue I{formal} AMQP address which contains 201 properties used to create the queue. 202 @return: The queue address. 203 @rtype: str 204 """ 205 s = [] 206 s.append(self.name) 207 s.append(';{') 208 s.append('create:always') 209 s.append(',node:{type:queue,durable:True}') 210 s.append(',link:{durable:True}') 211 s.append('}') 212 return ''.join(s)
213
214 - def tmpAddress(self):
215 """ 216 Get the queue AMQP address which contains 217 properties used to create a temporary queue. 218 @return: The queue address. 219 @rtype: str 220 """ 221 s = [] 222 s.append(self.name) 223 s.append(';{') 224 s.append('create:always,delete:receiver') 225 s.append(',node:{type:queue}') 226 s.append(',link:{durable:True}') 227 s.append('}') 228 return ''.join(s)
229
230 - def __str__(self):
231 if self.durable: 232 return self.address() 233 else: 234 return self.tmpAddress()
235