Package pulp :: Package client :: Package agent :: Module main
[hide private]
[frames] | no frames]

Source Code for Module pulp.client.agent.main

  1  # 
  2  # Copyright (c) 2010 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16  import sys 
 17  import os 
 18  from getopt import getopt 
 19  from pulp.client import * 
 20  from pulp.client.config import Config 
 21  from pulp.client.lock import Lock, LockFailed 
 22  from pulp.client.agent import * 
 23  from pulp.client.agent.action import Action 
 24  from pulp.client.agent.plugin import PluginLoader 
 25  from pulp.client.agent.plugins import * 
 26  from pulp.client.logutil import getLogger 
 27  from pulp.messaging import Queue 
 28  from pulp.messaging.broker import Broker 
 29  from pulp.messaging.base import Agent as Base 
 30  from pulp.messaging.consumer import RequestConsumer 
 31  from time import sleep 
 32  from threading import Thread 
 33   
 34  log = getLogger(__name__) 
35 36 37 -class PluginLoader:
38 """ 39 Agent plugins loader. 40 """ 41 42 ROOT = '/var/lib/pulp/agent' 43 PLUGINS = 'plugins' 44 45 @classmethod
46 - def abspath(cls):
47 return os.path.join(cls.ROOT, cls.PLUGINS)
48
49 - def __init__(self):
50 path = self.abspath() 51 if os.path.exists(path): 52 return 53 os.makedirs(path) 54 pkg = os.path.join(path, '__init__.py') 55 f = open(pkg, 'w') 56 f.close()
57
58 - def load(self):
59 """ 60 Load the plugins. 61 """ 62 sys.path.append(self.ROOT) 63 path = self.abspath() 64 for fn in os.listdir(path): 65 if fn.startswith('__'): 66 continue 67 if not fn.endswith('.py'): 68 continue 69 self.__import(fn)
70
71 - def __import(self, fn):
72 """ 73 Import a module by file name. 74 @param fn: The module file name. 75 @type fn: str 76 """ 77 mod = fn.rsplit('.', 1)[0] 78 imp = '%s.%s' % (self.PLUGINS, mod) 79 try: 80 __import__(imp) 81 log.info('plugin "%s", imported', imp) 82 except: 83 log.error('plugin "%s", import failed', imp, exc_info=True)
84
85 86 -class ActionThread(Thread):
87 """ 88 Run actions independantly of main thread. 89 @ivar actions: A list of actions to run. 90 @type actions: [L{Action},..] 91 """ 92
93 - def __init__(self, actions):
94 """ 95 @param actions: A list of actions to run. 96 @type actions: [L{Action},..] 97 """ 98 self.actions = actions 99 Thread.__init__(self, name='Actions')
100
101 - def run(self):
102 """ 103 Run actions. 104 """ 105 while True: 106 for action in self.actions: 107 action() 108 sleep(10)
109
110 111 -class Agent(Base):
112 """ 113 Pulp agent. 114 """ 115
116 - def __init__(self, actions=[]):
117 id = self.id() 118 actionThread = ActionThread(actions) 119 actionThread.start() 120 cfg = Config() 121 queue = Queue(id) 122 url = cfg.messaging.url 123 if url and isinstance(url, str): 124 broker = Broker.get(url) 125 broker.cacert = cfg.messaging.cacert 126 broker.clientcert = cfg.messaging.clientcert 127 consumer = RequestConsumer(queue, url=url) 128 Base.__init__(self, consumer) 129 else: 130 log.warn('agent {%s} has messaging disabled.', id) 131 log.info('agent {%s} - started.', id) 132 actionThread.join()
133
134 - def id(self):
135 """ 136 Get agent id. 137 @return: The agent UUID. 138 """ 139 cid = ConsumerId() 140 while ( not cid.uuid ): 141 log.info('Not registered.') 142 sleep(60) 143 cid.read() 144 return cid.uuid
145
146 147 -class AgentLock(Lock):
148 """ 149 Agent lock ensure that agent only has single instance running. 150 @cvar PATH: The lock file absolute path. 151 @type PATH: str 152 """ 153 154 PATH = '/var/run/pulpd.pid' 155
156 - def __init__(self):
157 Lock.__init__(self, self.PATH)
158
159 160 -def start(daemon=True):
161 """ 162 Agent main. 163 Add recurring, time-based actions here. 164 All actions must be subclass of L{action.Action}. 165 """ 166 lock = AgentLock() 167 try: 168 lock.acquire(wait=False) 169 except LockFailed, e: 170 raise Exception('Agent already running') 171 if daemon: 172 daemonize(lock) 173 try: 174 pl = PluginLoader() 175 pl.load() 176 actions = [] 177 for cls, interval in Action.actions: 178 action = cls(**interval) 179 actions.append(action) 180 agent = Agent(actions) 181 agent.close() 182 finally: 183 lock.release()
184
185 -def usage():
186 """ 187 Show usage. 188 """ 189 s = [] 190 s.append('\npulpd <optoins>') 191 s.append(' -h, --help') 192 s.append(' Show help') 193 s.append(' -c, --console') 194 s.append(' Run in the foreground and not as a daemon.') 195 s.append(' default: 0') 196 s.append('\n') 197 print '\n'.join(s)
198
199 -def daemonize(lock):
200 """ 201 Daemon configuration. 202 """ 203 pid = os.fork() 204 if pid == 0: # child 205 os.setsid() 206 os.chdir('/') 207 os.close(0) 208 os.close(1) 209 os.close(2) 210 dn = os.open('/dev/null', os.O_RDWR) 211 os.dup(dn) 212 os.dup(dn) 213 os.dup(dn) 214 else: # parent 215 lock.update(pid) 216 os.waitpid(pid, os.WNOHANG) 217 os._exit(0)
218
219 -def main():
220 daemon = True 221 opts, args = getopt(sys.argv[1:], 'hc', ['help','console']) 222 for opt,arg in opts: 223 if opt in ('-h', '--help'): 224 usage() 225 sys.exit(0) 226 if opt in ('-c', '--console'): 227 daemon = False 228 continue 229 start(daemon)
230 231 if __name__ == '__main__': 232 main() 233