1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 import itertools
18
19
20
22 """
23 In memory queue storage class.
24 """
26 self.__waiting_tasks = []
27 self.__running_tasks = []
28 self.__complete_tasks = []
29
30
31
33 """
34 Return an iterator over all tasks currently in the queue in descending
35 order by length of time in the queue.
36 @return: iterator
37 """
38 return itertools.chain(self.__complete_tasks[:],
39 self.__running_tasks[:],
40 self.__waiting_tasks[:])
41
43 """
44 Return an iterator over all unfinished tasks in the queue in descending
45 order by length of time in the queue.
46 @return: iterator
47 """
48 return itertools.chain(self.__running_tasks[:], self.__waiting_tasks[:])
49
51 """
52 Return an iterator over all waiting tasks in the queue, in descending
53 order by the length of time in the queue.
54 @return: iterator
55 """
56 return self.__waiting_tasks[:]
57
59 """
60 Return an iterator over all running tasks in the queue, in descending
61 order by the length of time in the queue.
62 @return: iterator
63 """
64 return self.__running_tasks[:]
65
67 """
68 Return an iterator over all complete tasks in the queue, in descending
69 order by the length of time in the queue.
70 @return: iterator
71 """
72 return self.__complete_tasks[:]
73
74
75
77 """
78 Add a task to the wait queue.
79 @type task: Task instance
80 @param task: task to add
81 """
82 self.__waiting_tasks.append(task)
83
85 """
86 Remove a task from the wait queue and add it to the running queue.
87 @type task: Task instance
88 @param task: task to add
89 """
90 self.__waiting_tasks.remove(task)
91 self.__running_tasks.append(task)
92
94 """
95 Remove a task from the running queue and add it to the complete queue.
96 @type task: Task instance
97 @param task: task to add
98 """
99 self.__running_tasks.remove(task)
100 self.__complete_tasks.append(task)
101
103 """
104 Remove a task from storage.
105 @type task: Task instance
106 @param task: task to remove
107 """
108 if task in self.__waiting_tasks:
109 self.__waiting_tasks.remove(task)
110 return
111 if task in self.__running_tasks:
112 self.__running_tasks.remove(task)
113 return
114 if task in self.__complete_tasks:
115 self.__complete_tasks.remove(task)
116
117
118
120 """
121 Find a task in the storage based on the given criteria.
122 @type criteria: dict
123 @param criteria: dict of task attr -> value to match against
124 @type include_finished: bool
125 @return: the last (newest) task in the queue that matches on success,
126 None otherwise
127 """
128 num_criteria = len(criteria)
129
130
131
132 for task in reversed(list(self.all_tasks())):
133 matches = 0
134 for attr, value in criteria.items():
135 if not hasattr(task, attr):
136 break;
137 if getattr(task, attr) != value:
138 break;
139 matches += 1
140 if matches == num_criteria:
141 return task
142 return None
143