Home | Trees | Indices | Help |
|
---|
|
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 # 4 # Copyright © 2010 Red Hat, Inc. 5 # 6 # This software is licensed to you under the GNU General Public License, 7 # version 2 (GPLv2). There is NO WARRANTY for this software, express or 8 # implied, including the implied warranties of MERCHANTABILITY or FITNESS 9 # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 10 # along with this software; if not, see 11 # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 12 # 13 # Red Hat trademarks are not licensed under GPLv2. No permission is 14 # granted to use or replicate Red Hat trademarks that are incorporated 15 # in this software or its documentation. 16 17 import threading 18 from datetime import datetime, timedelta 19 20 from pulp.server.tasking.queue.base import TaskQueue 21 from pulp.server.tasking.queue.thread import DRLock, TaskThread 22 from pulp.server.tasking.queue.storage import VolatileStorage 23 from pulp.server.tasking.task import task_complete_states 24 25 # fifo task queue ------------------------------------------------------------- 2628 """ 29 Task queue with threaded dispatcher that fires off tasks in the order in 30 which they were enqueued and stores the finished tasks for a specified 31 amount of time. 32 """17536 """ 37 @type max_running: int 38 @param max_running: maximum number of tasks to run simultaneously 39 None means indefinitely 40 @type finished_lifetime: datetime.timedelta instance 41 @param finished_lifetime: length of time to keep finished tasks 42 @return: FIFOTaskQueue instance 43 """ 44 self.max_running = max_running 45 self.finished_lifetime = finished_lifetime 46 47 self.__lock = threading.RLock() 48 #self.__lock = DRLock() 49 self.__condition = threading.Condition(self.__lock) 50 51 self.__running_count = 0 52 self.__storage = VolatileStorage() 53 self.__canceled_tasks = [] 54 55 self.__dispatcher_timeout = 0.5 56 self.__dispatcher = threading.Thread(target=self._dispatch) 57 self.__dispatcher.daemon = True 58 self.__dispatcher.start()59 60 # protected methods: scheduling 6163 """ 64 Scheduling method that that executes the scheduling hooks. 65 """ 66 self.__lock.acquire() 67 try: 68 while True: 69 self.__condition.wait(self.__dispatcher_timeout) 70 for task in self._get_tasks(): 71 self.run(task) 72 self._cancel_tasks() 73 self._timeout_tasks() 74 self._cull_tasks() 75 finally: 76 self.__lock.release()7779 """ 80 Get the next 'n' tasks to run, where is max - currently running tasks 81 """ 82 num_tasks = self.max_running - self.__running_count 83 return self.__storage.waiting_tasks()[:num_tasks]8486 """ 87 Stop any tasks that have been flagged as canceled. 88 """ 89 for task in self.__canceled_tasks[:]: 90 if task.state in task_complete_states: 91 self.__canceled_tasks.remove(task) 92 continue 93 if task.thread is None: 94 continue 95 task.thread.cancel() 96 self.__canceled_tasks.remove(task)9799 """ 100 Stop tasks that have met or exceeded their timeout length. 101 """ 102 running_tasks = self.__storage.running_tasks() 103 if not running_tasks: 104 return 105 now = datetime.now() 106 for task in running_tasks: 107 # the task.start_time can be None if the task has been 'run' by the 108 # queue, but the task thread has not had a chance to execute yet 109 if None in (task.timeout, task.start_time): 110 continue 111 if now - task.start_time < task.timeout: 112 continue 113 task.thread.timeout()114116 """ 117 Clean up finished task data 118 """ 119 complete_tasks = self.__storage.complete_tasks() 120 if not complete_tasks: 121 return 122 now = datetime.now() 123 for task in complete_tasks: 124 if now - task.finish_time > self.finished_lifetime: 125 self.__storage.remove_task(task)126 127 # public methods: queue operations 128130 self.__lock.acquire() 131 try: 132 fields = ('method_name', 'args', 'kwargs') 133 if unique and self.exists(task, fields, include_finished=False): 134 return False 135 task.complete_callback = self.complete 136 self.__storage.add_waiting_task(task) 137 self.__condition.notify() 138 return True 139 finally: 140 self.__lock.release()141143 self.__lock.acquire() 144 try: 145 self.__running_count += 1 146 self.__storage.add_running_task(task) 147 task.thread = TaskThread(target=task.run) 148 task.thread.start() 149 finally: 150 self.__lock.release()151153 self.__lock.acquire() 154 try: 155 self.__running_count -= 1 156 self.__storage.add_complete_task(task) 157 task.thread = None 158 task.complete_callback = None 159 finally: 160 self.__lock.release()161163 self.__lock.acquire() 164 try: 165 self.__canceled_tasks.append(task) 166 finally: 167 self.__lock.release()168
Home | Trees | Indices | Help |
|
---|
Generated by Epydoc 3.0.1 on Tue Sep 7 12:30:36 2010 | http://epydoc.sourceforge.net |