1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 AMQP endpoint base classes.
18 """
19
20 from pulp.messaging import *
21 from pulp.messaging.broker import Broker
22 from pulp.messaging.transport import SSLTransport
23 from qpid.messaging import Connection
24 from time import sleep
25 from logging import getLogger
26
27 log = getLogger(__name__)
31 """
32 Base class for QPID endpoint.
33 @cvar connectons: An AMQP connection.
34 @type connectons: L{Connection}
35 @ivar uuid: The unique endpoint id.
36 @type uuid: str
37 @ivar __session: An AMQP session.
38 @type __session: qpid.messaging.Session
39 """
40
41 LOCALHOST = 'tcp://localhost:5672'
42
43 @classmethod
53
55 """
56 @param uuid: The endpoint uuid.
57 @type uuid: str
58 @param url: The broker url <transport>://<user>/<pass>@<host>:<port>.
59 @type url: str
60 """
61 self.uuid = uuid
62 self.url = url
63 self.__session = None
64 self.open()
65
67 """
68 Get the endpoint id
69 @return: The id.
70 @rtype: str
71 """
72 return self.uuid
73
75 """
76 Get cached connection based on I{url}.
77 @return: The global connection.
78 @rtype: L{Connection}
79 """
80 broker = Broker.get(self.url)
81 con = broker.connect()
82 log.info('{%s} connected to AMQP' % self.id())
83 return con
84
86 """
87 Get a session for the open connection.
88 @return: An open session.
89 @rtype: qpid.messaging.Session
90 """
91 if self.__session is None:
92 con = self.connection()
93 self.__session = con.session()
94 return self.__session
95
97 """
98 Acknowledge all messages received on the session.
99 """
100 try:
101 self.__session.acknowledge()
102 except:
103 pass
104
106 """
107 Open and configure the endpoint.
108 """
109 pass
110
112 """
113 Close (shutdown) the endpoint.
114 """
115 session = self.__session
116 self.__session = None
117 try:
118 session.stop()
119 session.close()
120 except:
121 pass
122
124 urlpart = self.url.split('://', 1)
125 if len(urlpart) == 1:
126 return (urlpart[0], 'tcp')
127 else:
128 return (urlpart[0], urlpart[1])
129
132
134 return 'Endpoint id:%s broker @ %s' % (self.id(), self.url)
135