Package pulp :: Package server :: Package tasking :: Package queue :: Module fifo
[hide private]
[frames] | no frames]

Source Code for Module pulp.server.tasking.queue.fifo

  1  #!/usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3  # 
  4  # Copyright © 2010 Red Hat, Inc. 
  5  # 
  6  # This software is licensed to you under the GNU General Public License, 
  7  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  8  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  9  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
 10  # along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 12  # 
 13  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 14  # granted to use or replicate Red Hat trademarks that are incorporated 
 15  # in this software or its documentation. 
 16   
 17  import threading 
 18  from datetime import datetime, timedelta 
 19   
 20  from pulp.server.tasking.queue.base import TaskQueue 
 21  from pulp.server.tasking.queue.thread import  DRLock, TaskThread 
 22  from pulp.server.tasking.queue.storage import VolatileStorage 
 23  from pulp.server.tasking.task import task_complete_states 
 24   
 25  # fifo task queue ------------------------------------------------------------- 
 26   
27 -class FIFOTaskQueue(TaskQueue):
28 """ 29 Task queue with threaded dispatcher that fires off tasks in the order in 30 which they were enqueued and stores the finished tasks for a specified 31 amount of time. 32 """
33 - def __init__(self, 34 max_running=4, 35 finished_lifetime=timedelta(seconds=3600)):
36 """ 37 @type max_running: int 38 @param max_running: maximum number of tasks to run simultaneously 39 None means indefinitely 40 @type finished_lifetime: datetime.timedelta instance 41 @param finished_lifetime: length of time to keep finished tasks 42 @return: FIFOTaskQueue instance 43 """ 44 self.max_running = max_running 45 self.finished_lifetime = finished_lifetime 46 47 self.__lock = threading.RLock() 48 #self.__lock = DRLock() 49 self.__condition = threading.Condition(self.__lock) 50 51 self.__running_count = 0 52 self.__storage = VolatileStorage() 53 self.__canceled_tasks = [] 54 55 self.__dispatcher_timeout = 0.5 56 self.__dispatcher = threading.Thread(target=self._dispatch) 57 self.__dispatcher.daemon = True 58 self.__dispatcher.start()
59 60 # protected methods: scheduling 61
62 - def _dispatch(self):
63 """ 64 Scheduling method that that executes the scheduling hooks. 65 """ 66 self.__lock.acquire() 67 try: 68 while True: 69 self.__condition.wait(self.__dispatcher_timeout) 70 for task in self._get_tasks(): 71 self.run(task) 72 self._cancel_tasks() 73 self._timeout_tasks() 74 self._cull_tasks() 75 finally: 76 self.__lock.release()
77
78 - def _get_tasks(self):
79 """ 80 Get the next 'n' tasks to run, where is max - currently running tasks 81 """ 82 num_tasks = self.max_running - self.__running_count 83 return self.__storage.waiting_tasks()[:num_tasks]
84
85 - def _cancel_tasks(self):
86 """ 87 Stop any tasks that have been flagged as canceled. 88 """ 89 for task in self.__canceled_tasks[:]: 90 if task.state in task_complete_states: 91 self.__canceled_tasks.remove(task) 92 continue 93 if task.thread is None: 94 continue 95 task.thread.cancel() 96 self.__canceled_tasks.remove(task)
97
98 - def _timeout_tasks(self):
99 """ 100 Stop tasks that have met or exceeded their timeout length. 101 """ 102 running_tasks = self.__storage.running_tasks() 103 if not running_tasks: 104 return 105 now = datetime.now() 106 for task in running_tasks: 107 # the task.start_time can be None if the task has been 'run' by the 108 # queue, but the task thread has not had a chance to execute yet 109 if None in (task.timeout, task.start_time): 110 continue 111 if now - task.start_time < task.timeout: 112 continue 113 task.thread.timeout()
114
115 - def _cull_tasks(self):
116 """ 117 Clean up finished task data 118 """ 119 complete_tasks = self.__storage.complete_tasks() 120 if not complete_tasks: 121 return 122 now = datetime.now() 123 for task in complete_tasks: 124 if now - task.finish_time > self.finished_lifetime: 125 self.__storage.remove_task(task)
126 127 # public methods: queue operations 128
129 - def enqueue(self, task, unique=False):
130 self.__lock.acquire() 131 try: 132 fields = ('method_name', 'args', 'kwargs') 133 if unique and self.exists(task, fields, include_finished=False): 134 return False 135 task.complete_callback = self.complete 136 self.__storage.add_waiting_task(task) 137 self.__condition.notify() 138 return True 139 finally: 140 self.__lock.release()
141
142 - def run(self, task):
143 self.__lock.acquire() 144 try: 145 self.__running_count += 1 146 self.__storage.add_running_task(task) 147 task.thread = TaskThread(target=task.run) 148 task.thread.start() 149 finally: 150 self.__lock.release()
151
152 - def complete(self, task):
153 self.__lock.acquire() 154 try: 155 self.__running_count -= 1 156 self.__storage.add_complete_task(task) 157 task.thread = None 158 task.complete_callback = None 159 finally: 160 self.__lock.release()
161
162 - def cancel(self, task):
163 self.__lock.acquire() 164 try: 165 self.__canceled_tasks.append(task) 166 finally: 167 self.__lock.release()
168
169 - def find(self, **kwargs):
170 self.__lock.acquire() 171 try: 172 return self.__storage.find_task(kwargs) 173 finally: 174 self.__lock.release()
175