Package pulp :: Package server :: Package tasking :: Package queue :: Module thread
[hide private]
[frames] | no frames]

Source Code for Module pulp.server.tasking.queue.thread

  1  #!/usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3  # 
  4  # Copyright © 2010 Red Hat, Inc. 
  5  # 
  6  # This software is licensed to you under the GNU General Public License, 
  7  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  8  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  9  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
 10  # along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 12  # 
 13  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 14  # granted to use or replicate Red Hat trademarks that are incorporated 
 15  # in this software or its documentation. 
 16   
 17  import ctypes 
 18  import inspect 
 19  import logging 
 20  import threading 
 21   
 22   
 23  _log = logging.getLogger(__name__) 
24 25 # debugging re-entrant lock --------------------------------------------------- 26 27 -class DRLock(object):
28 """ 29 Re-entrant lock that logs when it is acquired and when it is released at the 30 debug log level. 31 """
32 - def __init__(self):
33 self.__lock = threading.RLock() 34 self._is_owned = self.__lock._is_owned
35 #self._acquire_restore = self.__lock._acquire_restore 36 #self._release_save = self.__lock._release_save 37
38 - def __repr__(self):
39 return repr(self.__lock)
40
41 - def acquire(self, blocking=1):
42 _log.debug('Thread %s called acquire' % threading.current_thread()) 43 if not self.__lock.acquire(blocking): 44 return False 45 _log.debug('Lock %s ACQUIRED' % repr(self)) 46 return True
47
48 - def release(self):
49 _log.debug('Thread %s called release' % threading.current_thread()) 50 self.__lock.release() 51 _log.debug('Lock %s RELEASED' % repr(self))
52 53 __enter__ = acquire 54
55 - def __exit__(self, *args, **kwargs):
56 self.release()
57
58 # interruptable thread base class --------------------------------------------- 59 60 # based on an answer from stack overflow: 61 # http://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread-in-python 62 63 -def _raise_exception_in_thread(tid, exc_type):
64 """ 65 Raises an exception in the threads with id tid. 66 """ 67 assert inspect.isclass(exc_type) 68 # NOTE this returns the number of threads that it modified, which should 69 # only be 1 or 0 (if the thread id wasn't found) 70 long_tid = ctypes.c_long(tid) 71 exc_ptr = ctypes.py_object(exc_type) 72 num = ctypes.pythonapi.PyThreadState_SetAsyncExc(long_tid, exc_ptr) 73 if num == 1: 74 return 75 if num == 0: 76 raise ValueError('Invalid thread id') 77 # NOTE if it returns a number greater than one, you're in trouble, 78 # and you should call it again with exc=NULL to revert the effect 79 null_ptr = ctypes.py_object() 80 ctypes.pythonapi.PyThreadState_SetAsyncExc(long_tid, null_ptr) 81 raise SystemError('PyThreadState_SetAsyncExc failed')
82
83 84 -class InterruptableThread(threading.Thread):
85 """ 86 A thread class that supports raising exception in the thread from another 87 thread. 88 """ 89
90 - def __init__(self, *args, **kwargs):
91 super(InterruptableThread, self).__init__(*args, **kwargs) 92 self.__default_timeout = 0.05 93 self.__exception_event = threading.Event()
94 95 @property
96 - def _tid(self):
97 """ 98 Determine this thread's id. 99 """ 100 if not self.is_alive(): 101 raise threading.ThreadError('Thread is not active') 102 # do we have it cached? 103 if hasattr(self, '_thread_id'): 104 return self._thread_id 105 # no, look for it in the _active dict 106 for tid, tobj in threading._active.items(): 107 if tobj is self: 108 self._thread_id = tid 109 return tid 110 raise AssertionError('Could not determine thread id')
111
112 - def exception_event(self):
113 """ 114 Flag that an exception has been delivered to the thread and handled. 115 This will unblock the thread trying to deliver the exception. 116 """ 117 self.__exception_event.set()
118
119 - def raise_exception(self, exc_type):
120 """ 121 Raise and exception in this thread. 122 123 NOTE this is executed in the context of the calling thread and blocks 124 until the exception has been delivered to this thread and this thread 125 exists. 126 """ 127 while not self.__exception_event.is_set(): 128 try: 129 _raise_exception_in_thread(self._tid, exc_type) 130 self.__exception_event.wait(self.__default_timeout) 131 except (threading.ThreadError, AssertionError, 132 ValueError, SystemError), e: 133 _log.error('Failed to deliver exception %s to thread[%s]: %s' % 134 (exc_type.__name__, str(self.ident), e.message)) 135 break
136
137 # task thread ----------------------------------------------------------------- 138 139 -class TaskThreadException(Exception):
140 """ 141 Base class for task-specific exceptions to be raised in a task thread. 142 """ 143 pass
144
145 146 -class TimeoutException(TaskThreadException):
147 """ 148 Exception to interrupt a task with a time out. 149 """ 150 pass
151
152 153 -class CancelException(TaskThreadException):
154 """ 155 Exception to interrupt a task with a cancellation. 156 """ 157 pass
158
159 160 -class TaskThread(InterruptableThread):
161 """ 162 Derived task thread class that allows for task-specific interruptions. 163 """
164 - def timeout(self):
165 """ 166 Raise a TimeoutException in the thread. 167 """ 168 self.raise_exception(TimeoutException)
169
170 - def cancel(self):
171 """ 172 Raise a CancelException in the thread. 173 """ 174 self.raise_exception(CancelException)
175