1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 from uuid import uuid4
17 import simplejson as json
18
19 version = '0.1'
20
21
24
25
27 """
28 Container options.
29 Options:
30 - async : Indicates that requests asynchronous.
31 Default = False
32 - ctag : The asynchronous correlation tag.
33 When specified, it implies all requests are asynchronous.
34 - window : The request window. See I{Window}.
35 Default = any time.
36 - timeout : The synchronous timeout (seconds).
37 Default = 90 seconds.
38 """
39 __getattr__ = dict.get
40 __setattr__ = dict.__setitem__
41 __delattr__= dict.__delitem__
42
43
45 """
46 Basic envelope is a json encoded/decoded dictionary
47 that provides dot (.) style access.
48 """
49
50 __getattr__ = dict.get
51 __setattr__= dict.__setitem__
52 __delattr__= dict.__delitem__
53
55 """
56 Load using a json string.
57 @param s: A json encoded string.
58 @type s: str
59 """
60 d = json.loads(s)
61 self.update(d)
62 return self
63
65 """
66 Dump to a json string.
67 @return: A json encoded string.
68 @rtype: str
69 """
70 d = self
71 return json.dumps(d, indent=2)
72
75
76
78 """
79 AMQP destinations (topics & queues)
80 """
81
83 """
84 Get the destination I{formal} AMQP address which contains
85 properties used to create the destination.
86 @return: The destination address.
87 @rtype: str
88 """
89 pass
90
92 """
93 Delete the destination.
94 Implemented using a hack becauase python API does not
95 directly support removing destinations.
96 @param session: An AMQP session.
97 @type session: I{qpid.messaging.Session}
98 """
99 address = '%s;{delete:always}' % repr(self)
100 sender = session.sender(address)
101 sender.close()
102
104 return str(self).split(';', 1)[0]
105
106
107 -class Topic(Destination):
108 """
109 Represents and AMQP topic.
110 @ivar topic: The name of the topic.
111 @type topic: str
112 @ivar subject: The subject.
113 @type subject: str
114 @ivar name: The (optional) subscription name.
115 Used for durable subscriptions.
116 @type name: str
117 """
118
119 - def __init__(self, topic, subject=None, name=None):
120 """
121 @param topic: The name of the topic.
122 @type topic: str
123 @param subject: The subject.
124 @type subject: str
125 @param name: The (optional) subscription name.
126 Used for durable subscriptions.
127 @type name: str
128 """
129 self.topic = topic
130 self.subject = subject
131 self.name = name
132
134 """
135 Get the topic I{formal} AMQP address which contains
136 properties used to create the topic.
137 @return: The topic address.
138 @rtype: str
139 """
140 s = []
141 s.append(self.topic)
142 if self.subject:
143 s.append('/%s' % self.subject)
144 s.append(';{')
145 s.append('create:always')
146 s.append(',node:{type:topic,durable:True}')
147 s.append(',link:{durable:True}')
148 s.append('}')
149 return ''.join(s)
150
152 """
153 Get the topic I{durable} AMQP address which contains
154 properties used to create the topic.
155 @return: The topic address.
156 @rtype: str
157 """
158 s = []
159 s.append(self.name)
160 s.append(';{')
161 s.append('create:always')
162 s.append(',node:{type:topic,durable:True}')
163 s.append(',link:{durable:True')
164 s.append(',x-bindings:[')
165 s.append('{exchange:%s' % self.topic)
166 if self.subject:
167 s.append(',key:%s' % self.subject)
168 s.append('}]')
169 s.append('}}')
170 return ''.join(s)
171
177
178
179 -class Queue(Destination):
180 """
181 Represents and AMQP queue.
182 @ivar name: The name of the queue.
183 @type name: str
184 @ivar durable: The durable flag.
185 @type durable: str
186 """
187
188 - def __init__(self, name, durable=True):
189 """
190 @param name: The name of the queue.
191 @type name: str
192 @param durable: The durable flag.
193 @type durable: str
194 """
195 self.name = name
196 self.durable = durable
197
199 """
200 Get the queue I{formal} AMQP address which contains
201 properties used to create the queue.
202 @return: The queue address.
203 @rtype: str
204 """
205 s = []
206 s.append(self.name)
207 s.append(';{')
208 s.append('create:always')
209 s.append(',node:{type:queue,durable:True}')
210 s.append(',link:{durable:True}')
211 s.append('}')
212 return ''.join(s)
213
215 """
216 Get the queue AMQP address which contains
217 properties used to create a temporary queue.
218 @return: The queue address.
219 @rtype: str
220 """
221 s = []
222 s.append(self.name)
223 s.append(';{')
224 s.append('create:always,delete:receiver')
225 s.append(',node:{type:queue}')
226 s.append(',link:{durable:True}')
227 s.append('}')
228 return ''.join(s)
229
231 if self.durable:
232 return self.address()
233 else:
234 return self.tmpAddress()
235