1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Agent base classes.
18 """
19
20 from pulp.messaging import *
21 from pulp.messaging.decorators import Remote
22 from pulp.messaging.dispatcher import Dispatcher
23 from pulp.messaging.window import Window
24 from pulp.messaging.policy import *
25 from logging import getLogger
26
27 log = getLogger(__name__)
28
29
30
32 """
33 The agent base provides a dispatcher and automatic
34 registration of methods based on decorators.
35 @ivar consumer: A qpid consumer.
36 @type consumer: L{pulp.messaging.Consumer}
37 """
38
50
52 """
53 Close and release all resources.
54 """
55 self.consumer.close()
56
57
59 """
60 The stub container base
61 @ivar __id: The peer ID.
62 @type __id: str
63 @ivar __producer: An AMQP producer.
64 @type __producer: L{pulp.messaging.producer.Producer}
65 @ivar __stubs: A list of L{Stub} objects.
66 @type __stubs: [L{Stub},..]
67 @ivar __options: Container options.
68 @type __options: L{Options}
69 """
70
71 - def __init__(self, id, producer, **options):
72 """
73 @param id: The peer ID.
74 @type id: str
75 @param producer: An AMQP producer.
76 @type producer: L{pulp.messaging.producer.Producer}
77 @param options: keyword options.
78 @type options: dict
79 """
80 self.__id = id
81 self.__options = Options(window=Window(), timeout=90)
82 self.__stubs = []
83 self.__options.update(options)
84 self.__setmethod(producer)
85 self.__addstubs()
86
88 """
89 Set the request method based on options.
90 The selected method is added to I{options}.
91 @param producer: An AMQP producer.
92 @type producer: L{pulp.messaging.producer.Producer}
93 """
94 if self.__async():
95 ctag = self.__options.ctag
96 self.__options.method = Asynchronous(producer, ctag)
97 else:
98 timeout = int(self.__options.timeout)
99 self.__options.method = Synchronous(producer, timeout)
100
102 """
103 Add stubs found in the I{stubs} dictionary.
104 Each is added as an attribute matching the dictionary key.
105 """
106 destination = self.__destination()
107 for ns, sclass in Remote.stubs.items():
108 stub = sclass(destination, self.__options)
109 setattr(self, ns, stub)
110 self.__stubs.append(stub)
111
113 """
114 Get the stub destination(s).
115 @return: Either a queue destination or a list of queues.
116 @rtype: list
117 """
118 if isinstance(self.__id, (list,tuple)):
119 queues = []
120 for d in self.__id:
121 queues.append(Queue(d))
122 return queues
123 else:
124 return Queue(self.__id)
125
127 """
128 Get whether an I{asynchronous} request method
129 should be used based on selected options.
130 @return: True if async.
131 @rtype: bool
132 """
133 if ( self.__options.ctag
134 or self.__options.async ):
135 return True
136 return isinstance(self.__id, (list,tuple))
137
139 return '{%s} opt:%s' % (self.__id, str(self.__options))
140