1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Defined AMQP broker objects.
18 """
19
20 from pulp.messaging import *
21 from qpid.messaging import Connection
22 from logging import getLogger
23
24 log = getLogger(__name__)
28 """
29 Represents an AMQP broker.
30 @cvar domain: A list dict of brokers.
31 @type domain: dict
32 @ivar url: The broker's url.
33 @type url: L{URL}
34 @ivar cacert: Path to a PEM encoded file containing
35 the CA certificate used to validate the server certificate.
36 @type cacert: str
37 @ivar clientcert: Path to a PEM encoded file containing
38 the private key & certificate used for client authentication.
39 @type clientcert: str
40 """
41
42 domain = {}
43
44 @classmethod
45 - def add(cls, broker):
46 """
47 Add a broker to the domain.
48 @param broker: A broker to add
49 @type broker: L{Broker}
50 """
51 key = broker.url
52 cls.domain[key] = broker
53
54 @classmethod
56 """
57 Get a broker from the domain by I{url}.
58 Created and added if not found.
59 @param url: A broker url
60 @type url: L{URL}
61 @return: The requested broker.
62 @rtype: L{Broker}
63 """
64 if not isinstance(url, URL):
65 url = URL(url)
66 b = cls.domain.get(url)
67 if not b:
68 b = Broker(url)
69 cls.add(b)
70 return b
71
73 """
74 @param url: The broker url <transport>://<host>:<port>.
75 @type url: str
76 """
77 if isinstance(url, URL):
78 self.url = url
79 else:
80 self.url = URL(url)
81 self.cacert = None
82 self.clientcert = None
83 self.connection = None
84
86 """
87 Get broker identifier.
88 @return: The broker I{simple} url.
89 @rtype: str
90 """
91 return self.url.simple()
92
94 """
95 Connect to the broker.
96 @return: The AMQP connection object.
97 @rtype: I{Connection}
98 """
99 if self.connection is None:
100 url = self.url.simple()
101 transport = self.url.transport
102 log.info('connecting:\n%s', self)
103 con = Connection(url=url, reconnect=True, transport=transport)
104 con.attach()
105 log.info('{%s} connected to AMQP', self.id())
106 self.connection = con
107 else:
108 con = self.connection
109 return con
110
119
121 s = []
122 s.append('{%s}:' % self.id())
123 s.append('transport=%s' % self.url.transport.upper())
124 s.append('host=%s' % self.url.host)
125 s.append('port=%d' % self.url.port)
126 s.append('cacert=%s' % self.cacert)
127 s.append('clientcert=%s' % self.clientcert)
128 return '\n'.join(s)
129
132 """
133 Represents a QPID broker URL.
134 @ivar transport: A qpid transport.
135 @type transport: str
136 @ivar host: The host.
137 @type host: str
138 @ivar port: The tcp port.
139 @type port: int
140 """
141
142 @classmethod
144 """
145 Split the url string.
146 @param s: A url string format: <transport>://<host>:<port>.
147 @type s: str
148 @return: The url parts: (transport, host, port)
149 @rtype: tuple
150 """
151 transport, hp = cls.spliturl(s)
152 host, port = cls.splitport(hp)
153 return (transport, host, port)
154
155 @classmethod
157 """
158 Split the transport and url parts.
159 @param s: A url string format: <transport>://<host>:<port>.
160 @type s: str
161 @return: The urlparts: (transport, hostport)
162 @rtype: tuple
163 """
164 part = s.split('://', 1)
165 if len(part) > 1:
166 transport, hp = (part[0], part[1])
167 else:
168 transport, hp = ('tcp', part[0])
169 return (transport, hp)
170
171 @classmethod
173 """
174 Split the host and port.
175 @param s: A url string format: <host>:<port>.
176 @type s: str
177 @return: The urlparts: (host, port)
178 @rtype: tuple
179 """
180 part = s.split(':')
181 host = part[0]
182 if len(part) < 2:
183 port = d
184 else:
185 port = part[1]
186 return (host, int(port))
187
189 """
190 Get the I{simple} string representation: <host>:<port>
191 @return: "<host>:<port>"
192 @rtype: str
193 """
194 return '%s:%d' % (self.host, self.port)
195
197 """
198 @param s: A url string format: <transport>://<host>:<port>.
199 @type s: str
200 """
201 self.transport,\
202 self.host,\
203 self.port = self.split(s)
204
206 return hash(self.simple())
207
210
212 return '%s://%s:%d' % \
213 (self.transport,
214 self.host,
215 self.port)
216