Package pulp :: Package messaging :: Module broker
[hide private]
[frames] | no frames]

Source Code for Module pulp.messaging.broker

  1  # 
  2  # Copyright (c) 2010 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16  """ 
 17  Defined AMQP broker objects. 
 18  """ 
 19   
 20  from pulp.messaging import * 
 21  from qpid.messaging import Connection 
 22  from logging import getLogger 
 23   
 24  log = getLogger(__name__) 
25 26 27 -class Broker:
28 """ 29 Represents an AMQP broker. 30 @cvar domain: A list dict of brokers. 31 @type domain: dict 32 @ivar url: The broker's url. 33 @type url: L{URL} 34 @ivar cacert: Path to a PEM encoded file containing 35 the CA certificate used to validate the server certificate. 36 @type cacert: str 37 @ivar clientcert: Path to a PEM encoded file containing 38 the private key & certificate used for client authentication. 39 @type clientcert: str 40 """ 41 42 domain = {} 43 44 @classmethod
45 - def add(cls, broker):
46 """ 47 Add a broker to the domain. 48 @param broker: A broker to add 49 @type broker: L{Broker} 50 """ 51 key = broker.url 52 cls.domain[key] = broker
53 54 @classmethod
55 - def get(cls, url):
56 """ 57 Get a broker from the domain by I{url}. 58 Created and added if not found. 59 @param url: A broker url 60 @type url: L{URL} 61 @return: The requested broker. 62 @rtype: L{Broker} 63 """ 64 if not isinstance(url, URL): 65 url = URL(url) 66 b = cls.domain.get(url) 67 if not b: 68 b = Broker(url) 69 cls.add(b) 70 return b
71
72 - def __init__(self, url):
73 """ 74 @param url: The broker url <transport>://<host>:<port>. 75 @type url: str 76 """ 77 if isinstance(url, URL): 78 self.url = url 79 else: 80 self.url = URL(url) 81 self.cacert = None 82 self.clientcert = None 83 self.connection = None
84
85 - def id(self):
86 """ 87 Get broker identifier. 88 @return: The broker I{simple} url. 89 @rtype: str 90 """ 91 return self.url.simple()
92
93 - def connect(self):
94 """ 95 Connect to the broker. 96 @return: The AMQP connection object. 97 @rtype: I{Connection} 98 """ 99 if self.connection is None: 100 url = self.url.simple() 101 transport = self.url.transport 102 log.info('connecting:\n%s', self) 103 con = Connection(url=url, reconnect=True, transport=transport) 104 con.attach() 105 log.info('{%s} connected to AMQP', self.id()) 106 self.connection = con 107 else: 108 con = self.connection 109 return con
110
111 - def close(self):
112 """ 113 Close the connection to the broker. 114 """ 115 try: 116 self.connection.close() 117 except: 118 log.exception(str(self))
119
120 - def __str__(self):
121 s = [] 122 s.append('{%s}:' % self.id()) 123 s.append('transport=%s' % self.url.transport.upper()) 124 s.append('host=%s' % self.url.host) 125 s.append('port=%d' % self.url.port) 126 s.append('cacert=%s' % self.cacert) 127 s.append('clientcert=%s' % self.clientcert) 128 return '\n'.join(s)
129
130 131 -class URL:
132 """ 133 Represents a QPID broker URL. 134 @ivar transport: A qpid transport. 135 @type transport: str 136 @ivar host: The host. 137 @type host: str 138 @ivar port: The tcp port. 139 @type port: int 140 """ 141 142 @classmethod
143 - def split(cls, s):
144 """ 145 Split the url string. 146 @param s: A url string format: <transport>://<host>:<port>. 147 @type s: str 148 @return: The url parts: (transport, host, port) 149 @rtype: tuple 150 """ 151 transport, hp = cls.spliturl(s) 152 host, port = cls.splitport(hp) 153 return (transport, host, port)
154 155 @classmethod
156 - def spliturl(cls, s):
157 """ 158 Split the transport and url parts. 159 @param s: A url string format: <transport>://<host>:<port>. 160 @type s: str 161 @return: The urlparts: (transport, hostport) 162 @rtype: tuple 163 """ 164 part = s.split('://', 1) 165 if len(part) > 1: 166 transport, hp = (part[0], part[1]) 167 else: 168 transport, hp = ('tcp', part[0]) 169 return (transport, hp)
170 171 @classmethod
172 - def splitport(cls, s, d=5672):
173 """ 174 Split the host and port. 175 @param s: A url string format: <host>:<port>. 176 @type s: str 177 @return: The urlparts: (host, port) 178 @rtype: tuple 179 """ 180 part = s.split(':') 181 host = part[0] 182 if len(part) < 2: 183 port = d 184 else: 185 port = part[1] 186 return (host, int(port))
187
188 - def simple(self):
189 """ 190 Get the I{simple} string representation: <host>:<port> 191 @return: "<host>:<port>" 192 @rtype: str 193 """ 194 return '%s:%d' % (self.host, self.port)
195
196 - def __init__(self, s):
197 """ 198 @param s: A url string format: <transport>://<host>:<port>. 199 @type s: str 200 """ 201 self.transport,\ 202 self.host,\ 203 self.port = self.split(s)
204
205 - def __hash__(self):
206 return hash(self.simple())
207
208 - def __eq__(self, other):
209 return ( self.simple() == other.simple() )
210
211 - def __str__(self):
212 return '%s://%s:%d' % \ 213 (self.transport, 214 self.host, 215 self.port)
216