Package pulp :: Package server :: Package tasking :: Package queue :: Module storage
[hide private]
[frames] | no frames]

Source Code for Module pulp.server.tasking.queue.storage

  1  #!/usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3  # 
  4  # Copyright © 2010 Red Hat, Inc. 
  5  # 
  6  # This software is licensed to you under the GNU General Public License, 
  7  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  8  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  9  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
 10  # along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 12  # 
 13  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 14  # granted to use or replicate Red Hat trademarks that are incorporated 
 15  # in this software or its documentation. 
 16   
 17  import itertools 
 18   
 19  # storage class for in-memory task queues ------------------------------------- 
 20      
21 -class VolatileStorage(object):
22 """ 23 In memory queue storage class. 24 """
25 - def __init__(self):
26 self.__waiting_tasks = [] 27 self.__running_tasks = [] 28 self.__complete_tasks = []
29 30 # iterable methods 31
32 - def all_tasks(self):
33 """ 34 Return an iterator over all tasks currently in the queue in descending 35 order by length of time in the queue. 36 @return: iterator 37 """ 38 return itertools.chain(self.__complete_tasks[:], 39 self.__running_tasks[:], 40 self.__waiting_tasks[:])
41
42 - def unfinished_tasks(self):
43 """ 44 Return an iterator over all unfinished tasks in the queue in descending 45 order by length of time in the queue. 46 @return: iterator 47 """ 48 return itertools.chain(self.__running_tasks[:], self.__waiting_tasks[:])
49
50 - def waiting_tasks(self):
51 """ 52 Return an iterator over all waiting tasks in the queue, in descending 53 order by the length of time in the queue. 54 @return: iterator 55 """ 56 return self.__waiting_tasks[:]
57
58 - def running_tasks(self):
59 """ 60 Return an iterator over all running tasks in the queue, in descending 61 order by the length of time in the queue. 62 @return: iterator 63 """ 64 return self.__running_tasks[:]
65
66 - def complete_tasks(self):
67 """ 68 Return an iterator over all complete tasks in the queue, in descending 69 order by the length of time in the queue. 70 @return: iterator 71 """ 72 return self.__complete_tasks[:]
73 74 # add/remove tasks methods 75
76 - def add_waiting_task(self, task):
77 """ 78 Add a task to the wait queue. 79 @type task: Task instance 80 @param task: task to add 81 """ 82 self.__waiting_tasks.append(task)
83
84 - def add_running_task(self, task):
85 """ 86 Remove a task from the wait queue and add it to the running queue. 87 @type task: Task instance 88 @param task: task to add 89 """ 90 self.__waiting_tasks.remove(task) 91 self.__running_tasks.append(task)
92
93 - def add_complete_task(self, task):
94 """ 95 Remove a task from the running queue and add it to the complete queue. 96 @type task: Task instance 97 @param task: task to add 98 """ 99 self.__running_tasks.remove(task) 100 self.__complete_tasks.append(task)
101
102 - def remove_task(self, task):
103 """ 104 Remove a task from storage. 105 @type task: Task instance 106 @param task: task to remove 107 """ 108 if task in self.__waiting_tasks: 109 self.__waiting_tasks.remove(task) 110 return 111 if task in self.__running_tasks: 112 self.__running_tasks.remove(task) 113 return 114 if task in self.__complete_tasks: 115 self.__complete_tasks.remove(task)
116 117 # query methods 118
119 - def find_task(self, criteria):
120 """ 121 Find a task in the storage based on the given criteria. 122 @type criteria: dict 123 @param criteria: dict of task attr -> value to match against 124 @type include_finished: bool 125 @return: the last (newest) task in the queue that matches on success, 126 None otherwise 127 """ 128 num_criteria = len(criteria) 129 # In order to get the newest task and punch out early in the search algorithm, 130 # reverse the tasks before starting the search (reversed returns a list, so 131 # this call isn't destructive). 132 for task in reversed(list(self.all_tasks())): 133 matches = 0 134 for attr, value in criteria.items(): 135 if not hasattr(task, attr): 136 break; 137 if getattr(task, attr) != value: 138 break; 139 matches += 1 140 if matches == num_criteria: 141 return task 142 return None
143