Package pulp :: Package server :: Package tasking :: Module task
[hide private]
[frames] | no frames]

Source Code for Module pulp.server.tasking.task

  1  #!/usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3  # 
  4  # Copyright © 2010 Red Hat, Inc. 
  5  # 
  6  # This software is licensed to you under the GNU General Public License, 
  7  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  8  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  9  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
 10  # along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 12  # 
 13  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 14  # granted to use or replicate Red Hat trademarks that are incorporated 
 15  # in this software or its documentation. 
 16   
 17  import datetime 
 18  import logging 
 19  import sys 
 20  import time 
 21  import traceback 
 22  import uuid 
 23   
 24  from pulp.server.tasking.queue.thread import TimeoutException, CancelException 
 25   
 26   
 27  _log = logging.getLogger(__name__) 
 28   
 29  # task states ----------------------------------------------------------------- 
 30   
 31  task_waiting = 'waiting' 
 32  task_running = 'running' 
 33  task_finished = 'finished' 
 34  task_error = 'error' 
 35  task_timed_out = 'timed out' 
 36  task_canceled = 'canceled' 
 37  task_reset = 'reset' 
 38   
 39  task_states = ( 
 40      task_waiting, 
 41      task_running, 
 42      task_finished, 
 43      task_error, 
 44      task_timed_out, 
 45      task_canceled, 
 46      task_reset, 
 47  ) 
 48   
 49  task_ready_states = ( 
 50      task_waiting, 
 51      task_reset, 
 52  ) 
 53   
 54  task_complete_states = ( 
 55      task_finished, 
 56      task_error, 
 57      task_timed_out, 
 58      task_canceled, 
 59  ) 
 60   
 61  # task ------------------------------------------------------------------------ 
 62   
63 -class Task(object):
64 """ 65 Task class 66 Meta data for executing a long-running task. 67 """
68 - def __init__(self, callable, args=[], kwargs={}, timeout=None):
69 """ 70 Create a Task for the passed in callable and arguments. 71 @param callable: function, method, lambda, or object with __call__ 72 @param args: positional arguments to be passed into the callable 73 @param kwargs: keyword arguments to be passed into the callable 74 @type timeout: datetime.timedelta instance or None 75 @param timeout: maximum length of time to allow task to run, 76 None means indefinitely 77 """ 78 # task resources 79 self.id = str(uuid.uuid1(clock_seq=int(time.time() * 1000))) 80 self.callable = callable 81 self.args = args 82 self.kwargs = kwargs 83 self._progress_callback = None 84 self.timeout = timeout 85 86 # resources managed by the task queue to deliver events 87 self.complete_callback = None 88 self.thread = None 89 90 # resources for a task run 91 self.method_name = callable.__name__ 92 self.state = task_waiting 93 self.progress = None 94 self.start_time = None 95 self.finish_time = None 96 self.result = None 97 self.exception = None 98 self.traceback = None
99
100 - def _exception_event(self):
101 """ 102 Let the contextual thread know that an exception has been received. 103 """ 104 if not hasattr(self.thread, 'exception_event'): 105 return 106 self.thread.exception_event()
107
108 - def set_progress(self, arg, callback):
109 """ 110 Setup a progress callback for the task, if it accepts one 111 @type arg: str 112 @param arg: name of the callable's progress callback argument 113 @type callback: callable, returning a dict 114 @param callback: value of the callable's progress callback argument 115 """ 116 self.kwargs[arg] = self.progress_callback 117 self._progress_callback = callback
118
119 - def run(self):
120 """ 121 Run this task and record the result or exception. 122 """ 123 assert self.state in task_ready_states 124 self.state = task_running 125 self.start_time = datetime.datetime.now() 126 try: 127 result = self.callable(*self.args, **self.kwargs) 128 except TimeoutException: 129 self.state = task_timed_out 130 self._exception_event() 131 _log.error('Task id:%s, method_name:%s: TIMED OUT' % 132 (self.id, self.method_name)) 133 except CancelException: 134 self.state = task_canceled 135 self._exception_event() 136 _log.info('Task id:%s, method_name:%s: CANCELLED' % 137 (self.id, self.method_name)) 138 except Exception, e: 139 self.state = task_error 140 self.exception = repr(e) 141 self.traceback = traceback.format_exception(*sys.exc_info()) 142 self._exception_event() 143 _log.error('Task id:%s, method_name:%s:\n%s' % 144 (self.id, self.method_name, ''.join(self.traceback))) 145 else: 146 self.state = task_finished 147 self.result = result 148 self.finish_time = datetime.datetime.now() 149 if self.complete_callback is not None: 150 self.complete_callback(self)
151
152 - def progress_callback(self, *args, **kwargs):
153 """ 154 Provide a callback for runtime progress reporting. 155 """ 156 try: 157 # NOTE, the self._progress_callback method should return a dict 158 self.progress = self._progress_callback(*args, **kwargs) 159 except Exception, e: 160 _log.error('Exception, %s, in task %s progress callback: %s' % 161 (repr(e), self.id, self._progress_callback.__name__)) 162 raise
163
164 - def reset(self):
165 """ 166 Reset this task's recorded data. 167 """ 168 if self.state not in task_complete_states: 169 return 170 self.state = task_reset 171 self.progress = None 172 self._progress_callback = None 173 self.start_time = None 174 self.finish_time = None 175 self.result = None 176 self.exception = None 177 self.traceback = None
178