Package pulp :: Package messaging :: Module store
[hide private]
[frames] | no frames]

Source Code for Module pulp.messaging.store

  1  # 
  2  # Copyright (c) 2010 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16  """ 
 17  Provides (local) message storage classes. 
 18  """ 
 19   
 20  import os 
 21  from pulp.messaging import * 
 22  from pulp.messaging.window import Window 
 23  from time import sleep 
 24  from stat import * 
 25  from threading import Thread, RLock 
 26  from logging import getLogger 
 27   
 28  log = getLogger(__name__) 
 29   
 30   
31 -class PendingQueue:
32 """ 33 Persistent (local) storage of I{pending} envelopes that have 34 been processed of an AMQP queue. Most likely use is for messages 35 with a future I{window} which cannot be processed immediately. 36 @cvar ROOT: The root directory used for storage. 37 @type ROOT: str 38 @ivar id: The queue id. 39 @type id: str 40 @ivar lastmod: Last (directory) modification. 41 @type lastmod: int 42 @ivar pending: The queue of pending envelopes. 43 @type pending: [Envelope,..] 44 @ivar uncommitted: A list (removed) of files pending commit. 45 @type uncommitted: [path,..] 46 """ 47 48 ROOT = '/var/lib/pulp/messaging' 49
50 - def __init__(self, id):
51 """ 52 @param id: The queue id. 53 @type id: str 54 """ 55 self.id = id 56 self.pending = [] 57 self.uncommitted = [] 58 self.__lock = RLock() 59 self.mkdir() 60 self.load()
61
62 - def add(self, envelope):
63 """ 64 Enqueue the specified envelope. 65 @param envelope: An L{Envelope} 66 @type envelope: L{Envelope} 67 """ 68 69 fn = self.fn(envelope) 70 f = open(fn, 'w') 71 f.write(envelope.dump()) 72 f.close() 73 log.info('{%s} add pending:\n%s', self.id, envelope) 74 self.lock() 75 try: 76 self.pending.insert(0, envelope) 77 finally: 78 self.unlock()
79
80 - def next(self, wait=1):
81 """ 82 Get the next pending envelope. 83 @param wait: The number of seconds to wait for a pending item. 84 @type wait: int 85 @return: An L{Envelope} 86 @rtype: L{Envelope} 87 """ 88 self.lock() 89 try: 90 queue = self.pending[:] 91 finally: 92 self.unlock() 93 while wait: 94 if queue: 95 envelope = queue.pop() 96 window = Window(envelope.window) 97 if window.future(): 98 log.info('{%s} deferring:\n%s', self.id, envelope) 99 continue 100 self.remove(envelope) 101 log.info('{%s} next:\n%s', self.id, envelope) 102 return envelope 103 else: 104 sleep(1) 105 wait -= 1
106
107 - def remove(self, envelope):
108 """ 109 Remove the specified envelope and place on the uncommitted list. 110 @param envelope: An L{Envelope} 111 @type envelope: L{Envelope} 112 """ 113 self.lock() 114 try: 115 self.pending.remove(envelope) 116 self.uncommitted.append(envelope) 117 finally: 118 self.unlock()
119
120 - def commit(self):
121 """ 122 Commit envelopes removed from the queue. 123 @return: self 124 @rtype: L{PendingQueue} 125 """ 126 self.lock() 127 try: 128 uncommitted = self.uncommitted[:] 129 finally: 130 self.unlock() 131 for envelope in uncommitted: 132 fn = self.fn(envelope) 133 log.info('{%s} commit:%s', self.id, envelope.sn) 134 try: 135 os.remove(fn) 136 except Exception, e: 137 log.exception(e) 138 self.lock() 139 try: 140 self.uncommitted = [] 141 finally: 142 self.unlock() 143 return self
144
145 - def load(self):
146 """ 147 Load the in-memory queue from filesystem. 148 """ 149 150 path = os.path.join(self.ROOT, self.id) 151 pending = [] 152 for fn in os.listdir(path): 153 path = os.path.join(self.ROOT, self.id, fn) 154 envelope = Envelope() 155 f = open(path) 156 s = f.read() 157 f.close() 158 envelope.load(s) 159 ctime = self.created(path) 160 pending.append((ctime, envelope)) 161 pending.sort() 162 self.lock() 163 try: 164 self.pending = [p[1] for p in pending] 165 finally: 166 self.unlock()
167
168 - def created(self, path):
169 """ 170 Get create timestamp. 171 @return: The file create timestamp. 172 @rtype: int 173 """ 174 stat = os.stat(path) 175 return stat[ST_CTIME]
176
177 - def modified(self, path):
178 """ 179 Get modification timestamp. 180 @return: The file modification timestamp. 181 @rtype: int 182 """ 183 stat = os.stat(path) 184 return stat[ST_MTIME]
185
186 - def mkdir(self):
187 """ 188 Ensure the directory exists. 189 """ 190 path = os.path.join(self.ROOT, self.id) 191 if not os.path.exists(path): 192 os.makedirs(path)
193
194 - def fn(self, envelope):
195 """ 196 Get the qualified file name for the envelope. 197 @param envelope: An L{Envelope} 198 @type envelope: L{Envelope} 199 @return: The absolute file path. 200 @rtype: str 201 """ 202 return os.path.join(self.ROOT, self.id, envelope.sn)
203
204 - def lock(self):
205 self.__lock.acquire()
206
207 - def unlock(self):
208 self.__lock.release()
209 210
211 -class PendingReceiver(Thread):
212 """ 213 A pending queue receiver. 214 @ivar __run: The main run loop flag. 215 @type __run: bool 216 @ivar queue: The L{PendingQueue} being read. 217 @type queue: L{PendingQueue} 218 @ivar consumer: The queue listener. 219 @type consumer: L{pulp.messaging.consumer.Consumer} 220 """ 221
222 - def __init__(self, queue, listener):
223 self.__run = True 224 self.queue = queue 225 self.listener = listener 226 Thread.__init__(self, name='pending:%s' % queue.id)
227
228 - def run(self):
229 """ 230 Main receiver (thread). 231 Read and dispatch envelopes. 232 """ 233 log.info('{%s} started', self.name) 234 while self.__run: 235 envelope = self.queue.next(3) 236 if envelope: 237 self.dispatch(envelope) 238 self.queue.commit()
239
240 - def dispatch(self, envelope):
241 """ 242 Dispatch the envelope to the listener. 243 @param envelope: An L{Envelope} to be dispatched. 244 @type envelope: L{Envelope} 245 """ 246 try: 247 self.listener.dispatch(envelope) 248 except Exception, e: 249 log.exception(e)
250
251 - def stop(self):
252 """ 253 Stop the receiver. 254 """ 255 self.__run = False 256 log.info('{%s} stopping', self.name)
257