1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Provides (local) message storage classes.
18 """
19
20 import os
21 from pulp.messaging import *
22 from pulp.messaging.window import Window
23 from time import sleep
24 from stat import *
25 from threading import Thread, RLock
26 from logging import getLogger
27
28 log = getLogger(__name__)
29
30
32 """
33 Persistent (local) storage of I{pending} envelopes that have
34 been processed of an AMQP queue. Most likely use is for messages
35 with a future I{window} which cannot be processed immediately.
36 @cvar ROOT: The root directory used for storage.
37 @type ROOT: str
38 @ivar id: The queue id.
39 @type id: str
40 @ivar lastmod: Last (directory) modification.
41 @type lastmod: int
42 @ivar pending: The queue of pending envelopes.
43 @type pending: [Envelope,..]
44 @ivar uncommitted: A list (removed) of files pending commit.
45 @type uncommitted: [path,..]
46 """
47
48 ROOT = '/var/lib/pulp/messaging'
49
51 """
52 @param id: The queue id.
53 @type id: str
54 """
55 self.id = id
56 self.pending = []
57 self.uncommitted = []
58 self.__lock = RLock()
59 self.mkdir()
60 self.load()
61
62 - def add(self, envelope):
63 """
64 Enqueue the specified envelope.
65 @param envelope: An L{Envelope}
66 @type envelope: L{Envelope}
67 """
68
69 fn = self.fn(envelope)
70 f = open(fn, 'w')
71 f.write(envelope.dump())
72 f.close()
73 log.info('{%s} add pending:\n%s', self.id, envelope)
74 self.lock()
75 try:
76 self.pending.insert(0, envelope)
77 finally:
78 self.unlock()
79
80 - def next(self, wait=1):
81 """
82 Get the next pending envelope.
83 @param wait: The number of seconds to wait for a pending item.
84 @type wait: int
85 @return: An L{Envelope}
86 @rtype: L{Envelope}
87 """
88 self.lock()
89 try:
90 queue = self.pending[:]
91 finally:
92 self.unlock()
93 while wait:
94 if queue:
95 envelope = queue.pop()
96 window = Window(envelope.window)
97 if window.future():
98 log.info('{%s} deferring:\n%s', self.id, envelope)
99 continue
100 self.remove(envelope)
101 log.info('{%s} next:\n%s', self.id, envelope)
102 return envelope
103 else:
104 sleep(1)
105 wait -= 1
106
108 """
109 Remove the specified envelope and place on the uncommitted list.
110 @param envelope: An L{Envelope}
111 @type envelope: L{Envelope}
112 """
113 self.lock()
114 try:
115 self.pending.remove(envelope)
116 self.uncommitted.append(envelope)
117 finally:
118 self.unlock()
119
121 """
122 Commit envelopes removed from the queue.
123 @return: self
124 @rtype: L{PendingQueue}
125 """
126 self.lock()
127 try:
128 uncommitted = self.uncommitted[:]
129 finally:
130 self.unlock()
131 for envelope in uncommitted:
132 fn = self.fn(envelope)
133 log.info('{%s} commit:%s', self.id, envelope.sn)
134 try:
135 os.remove(fn)
136 except Exception, e:
137 log.exception(e)
138 self.lock()
139 try:
140 self.uncommitted = []
141 finally:
142 self.unlock()
143 return self
144
146 """
147 Load the in-memory queue from filesystem.
148 """
149
150 path = os.path.join(self.ROOT, self.id)
151 pending = []
152 for fn in os.listdir(path):
153 path = os.path.join(self.ROOT, self.id, fn)
154 envelope = Envelope()
155 f = open(path)
156 s = f.read()
157 f.close()
158 envelope.load(s)
159 ctime = self.created(path)
160 pending.append((ctime, envelope))
161 pending.sort()
162 self.lock()
163 try:
164 self.pending = [p[1] for p in pending]
165 finally:
166 self.unlock()
167
169 """
170 Get create timestamp.
171 @return: The file create timestamp.
172 @rtype: int
173 """
174 stat = os.stat(path)
175 return stat[ST_CTIME]
176
178 """
179 Get modification timestamp.
180 @return: The file modification timestamp.
181 @rtype: int
182 """
183 stat = os.stat(path)
184 return stat[ST_MTIME]
185
187 """
188 Ensure the directory exists.
189 """
190 path = os.path.join(self.ROOT, self.id)
191 if not os.path.exists(path):
192 os.makedirs(path)
193
194 - def fn(self, envelope):
195 """
196 Get the qualified file name for the envelope.
197 @param envelope: An L{Envelope}
198 @type envelope: L{Envelope}
199 @return: The absolute file path.
200 @rtype: str
201 """
202 return os.path.join(self.ROOT, self.id, envelope.sn)
203
206
209
210
212 """
213 A pending queue receiver.
214 @ivar __run: The main run loop flag.
215 @type __run: bool
216 @ivar queue: The L{PendingQueue} being read.
217 @type queue: L{PendingQueue}
218 @ivar consumer: The queue listener.
219 @type consumer: L{pulp.messaging.consumer.Consumer}
220 """
221
227
229 """
230 Main receiver (thread).
231 Read and dispatch envelopes.
232 """
233 log.info('{%s} started', self.name)
234 while self.__run:
235 envelope = self.queue.next(3)
236 if envelope:
237 self.dispatch(envelope)
238 self.queue.commit()
239
241 """
242 Dispatch the envelope to the listener.
243 @param envelope: An L{Envelope} to be dispatched.
244 @type envelope: L{Envelope}
245 """
246 try:
247 self.listener.dispatch(envelope)
248 except Exception, e:
249 log.exception(e)
250
252 """
253 Stop the receiver.
254 """
255 self.__run = False
256 log.info('{%s} stopping', self.name)
257