1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 import datetime
18 import logging
19 import sys
20 import time
21 import traceback
22 import uuid
23
24 from pulp.server.tasking.queue.thread import TimeoutException, CancelException
25
26
27 _log = logging.getLogger(__name__)
28
29
30
31 task_waiting = 'waiting'
32 task_running = 'running'
33 task_finished = 'finished'
34 task_error = 'error'
35 task_timed_out = 'timed out'
36 task_canceled = 'canceled'
37 task_reset = 'reset'
38
39 task_states = (
40 task_waiting,
41 task_running,
42 task_finished,
43 task_error,
44 task_timed_out,
45 task_canceled,
46 task_reset,
47 )
48
49 task_ready_states = (
50 task_waiting,
51 task_reset,
52 )
53
54 task_complete_states = (
55 task_finished,
56 task_error,
57 task_timed_out,
58 task_canceled,
59 )
60
61
62
64 """
65 Task class
66 Meta data for executing a long-running task.
67 """
68 - def __init__(self, callable, args=[], kwargs={}, timeout=None):
69 """
70 Create a Task for the passed in callable and arguments.
71 @param callable: function, method, lambda, or object with __call__
72 @param args: positional arguments to be passed into the callable
73 @param kwargs: keyword arguments to be passed into the callable
74 @type timeout: datetime.timedelta instance or None
75 @param timeout: maximum length of time to allow task to run,
76 None means indefinitely
77 """
78
79 self.id = str(uuid.uuid1(clock_seq=int(time.time() * 1000)))
80 self.callable = callable
81 self.args = args
82 self.kwargs = kwargs
83 self._progress_callback = None
84 self.timeout = timeout
85
86
87 self.complete_callback = None
88 self.thread = None
89
90
91 self.method_name = callable.__name__
92 self.state = task_waiting
93 self.progress = None
94 self.start_time = None
95 self.finish_time = None
96 self.result = None
97 self.exception = None
98 self.traceback = None
99
101 """
102 Let the contextual thread know that an exception has been received.
103 """
104 if not hasattr(self.thread, 'exception_event'):
105 return
106 self.thread.exception_event()
107
109 """
110 Setup a progress callback for the task, if it accepts one
111 @type arg: str
112 @param arg: name of the callable's progress callback argument
113 @type callback: callable, returning a dict
114 @param callback: value of the callable's progress callback argument
115 """
116 self.kwargs[arg] = self.progress_callback
117 self._progress_callback = callback
118
120 """
121 Run this task and record the result or exception.
122 """
123 assert self.state in task_ready_states
124 self.state = task_running
125 self.start_time = datetime.datetime.now()
126 try:
127 result = self.callable(*self.args, **self.kwargs)
128 except TimeoutException:
129 self.state = task_timed_out
130 self._exception_event()
131 _log.error('Task id:%s, method_name:%s: TIMED OUT' %
132 (self.id, self.method_name))
133 except CancelException:
134 self.state = task_canceled
135 self._exception_event()
136 _log.info('Task id:%s, method_name:%s: CANCELLED' %
137 (self.id, self.method_name))
138 except Exception, e:
139 self.state = task_error
140 self.exception = repr(e)
141 self.traceback = traceback.format_exception(*sys.exc_info())
142 self._exception_event()
143 _log.error('Task id:%s, method_name:%s:\n%s' %
144 (self.id, self.method_name, ''.join(self.traceback)))
145 else:
146 self.state = task_finished
147 self.result = result
148 self.finish_time = datetime.datetime.now()
149 if self.complete_callback is not None:
150 self.complete_callback(self)
151
153 """
154 Provide a callback for runtime progress reporting.
155 """
156 try:
157
158 self.progress = self._progress_callback(*args, **kwargs)
159 except Exception, e:
160 _log.error('Exception, %s, in task %s progress callback: %s' %
161 (repr(e), self.id, self._progress_callback.__name__))
162 raise
163
165 """
166 Reset this task's recorded data.
167 """
168 if self.state not in task_complete_states:
169 return
170 self.state = task_reset
171 self.progress = None
172 self._progress_callback = None
173 self.start_time = None
174 self.finish_time = None
175 self.result = None
176 self.exception = None
177 self.traceback = None
178