Package pulp :: Package messaging :: Module endpoint
[hide private]
[frames] | no frames]

Source Code for Module pulp.messaging.endpoint

  1  # 
  2  # Copyright (c) 2010 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16  """ 
 17  AMQP endpoint base classes. 
 18  """ 
 19   
 20  from pulp.messaging import * 
 21  from pulp.messaging.broker import Broker 
 22  from pulp.messaging.transport import SSLTransport 
 23  from qpid.messaging import Connection 
 24  from time import sleep 
 25  from logging import getLogger 
 26   
 27  log = getLogger(__name__) 
28 29 30 -class Endpoint:
31 """ 32 Base class for QPID endpoint. 33 @cvar connectons: An AMQP connection. 34 @type connectons: L{Connection} 35 @ivar uuid: The unique endpoint id. 36 @type uuid: str 37 @ivar __session: An AMQP session. 38 @type __session: qpid.messaging.Session 39 """ 40 41 LOCALHOST = 'tcp://localhost:5672' 42 43 @classmethod
44 - def shutdown(cls):
45 """ 46 Shutdown all connections. 47 """ 48 for broker in Broker.domain.values(): 49 try: 50 broker.close() 51 except: 52 pass
53
54 - def __init__(self, uuid=getuuid(), url=LOCALHOST):
55 """ 56 @param uuid: The endpoint uuid. 57 @type uuid: str 58 @param url: The broker url <transport>://<user>/<pass>@<host>:<port>. 59 @type url: str 60 """ 61 self.uuid = uuid 62 self.url = url 63 self.__session = None 64 self.open()
65
66 - def id(self):
67 """ 68 Get the endpoint id 69 @return: The id. 70 @rtype: str 71 """ 72 return self.uuid
73
74 - def connection(self):
75 """ 76 Get cached connection based on I{url}. 77 @return: The global connection. 78 @rtype: L{Connection} 79 """ 80 broker = Broker.get(self.url) 81 con = broker.connect() 82 log.info('{%s} connected to AMQP' % self.id()) 83 return con
84
85 - def session(self):
86 """ 87 Get a session for the open connection. 88 @return: An open session. 89 @rtype: qpid.messaging.Session 90 """ 91 if self.__session is None: 92 con = self.connection() 93 self.__session = con.session() 94 return self.__session
95
96 - def ack(self):
97 """ 98 Acknowledge all messages received on the session. 99 """ 100 try: 101 self.__session.acknowledge() 102 except: 103 pass
104
105 - def open(self):
106 """ 107 Open and configure the endpoint. 108 """ 109 pass
110
111 - def close(self):
112 """ 113 Close (shutdown) the endpoint. 114 """ 115 session = self.__session 116 self.__session = None 117 try: 118 session.stop() 119 session.close() 120 except: 121 pass
122
123 - def __parsedurl(self):
124 urlpart = self.url.split('://', 1) 125 if len(urlpart) == 1: 126 return (urlpart[0], 'tcp') 127 else: 128 return (urlpart[0], urlpart[1])
129
130 - def __del__(self):
131 self.close()
132
133 - def __str__(self):
134 return 'Endpoint id:%s broker @ %s' % (self.id(), self.url)
135