Package pulp :: Package server :: Package api :: Module repo_sync
[hide private]
[frames] | no frames]

Source Code for Module pulp.server.api.repo_sync

  1  #!/usr/bin/python 
  2  # 
  3  # Copyright (c) 2010 Red Hat, Inc. 
  4  # 
  5  # This software is licensed to you under the GNU General Public License, 
  6  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  7  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  8  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  9  # along with this software; if not, see 
 10  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 11  # 
 12  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 13  # granted to use or replicate Red Hat trademarks that are incorporated 
 14  # in this software or its documentation. 
 15   
 16  import gzip 
 17  import logging 
 18  import os 
 19  import time 
 20  import traceback 
 21  import shutil 
 22  from urlparse import urlparse 
 23   
 24  import yum 
 25   
 26  import pulp.server.comps_util 
 27  import pulp.server.crontab 
 28  import pulp.server.upload 
 29  import pulp.server.util 
 30  from grinder.RepoFetch import YumRepoGrinder 
 31  from grinder.RHNSync import RHNSync 
 32  from pulp.server import updateinfo 
 33  from pulp.server.api.errata import ErrataApi 
 34  from pulp.server.api.package import PackageApi 
 35  from pulp.server import config 
 36  from pulp.server.pexceptions import PulpException 
 37   
 38   
 39  log = logging.getLogger(__name__) 
 40   
 41  # sync api -------------------------------------------------------------------- 
 42   
43 -def yum_rhn_progress_callback(info):
44 fields = ('status', 45 'item_name', 46 'items_left', 47 'items_total', 48 'size_left', 49 'size_total') 50 values = tuple(getattr(info, f) for f in fields) 51 log.info("Progress: %s on <%s>, %s/%s items %s/%s bytes" % values) 52 return dict(zip(fields, values))
53 54
55 -def sync(repo, repo_source, progress_callback=None):
56 ''' 57 Synchronizes content for the given RepoSource. 58 59 @param repo: repo to synchronize; may not be None 60 @type repo: L{pulp.model.Repo} 61 62 @param repo_source: indicates the source from which the repo data will be syncced; may not be None 63 @type repo_source: L{pulp.model.RepoSource} 64 ''' 65 source_type = repo_source['type'] 66 if source_type not in type_classes: 67 raise PulpException('Could not find synchronizer for repo type [%s]', source_type) 68 synchronizer = type_classes[source_type]() 69 repo_dir = synchronizer.sync(repo, repo_source, progress_callback) 70 return synchronizer.add_packages_from_dir(repo_dir, repo)
71 72
73 -def update_schedule(repo):
74 ''' 75 Updates the repo sync scheduler entry with the schedule for the given repo. 76 77 @param repo: repo containg the id and sync schedule; may not be None 78 @type repo: L{pulp.model.Repo} 79 ''' 80 tab = pulp.server.crontab.CronTab() 81 82 cmd = _cron_command(repo) 83 entries = tab.find_command(cmd) 84 85 if len(entries) == 0: 86 entry = tab.new(command=cmd) 87 else: 88 entry = entries[0] 89 90 entry.parse(repo['sync_schedule'] + ' ' + cmd) 91 92 log.debug('Writing updated cron entry [%s]' % entry.render()) 93 tab.write()
94 95
96 -def delete_schedule(repo):
97 ''' 98 Deletes the repo sync schedule file for the given repo. 99 100 @param repo: repo containg the id and sync schedule; may not be None 101 @type repo: L{pulp.model.Repo} 102 ''' 103 tab = pulp.server.crontab.CronTab() 104 105 cmd = _cron_command(repo) 106 entries = tab.find_command(cmd) 107 108 if len(entries) > 0: 109 for entry in entries: 110 log.debug('Removing entry [%s]' % entry.render()) 111 tab.remove(entry) 112 tab.write() 113 else: 114 log.debug('No existing cron entry for repo [%s]' % repo['id'])
115
116 -def _cron_command(repo):
117 return 'pulp repo sync %s' % repo['id']
118
119 -def repos_location():
120 return "%s/%s" % (config.config.get('paths', 'local_storage'), "repos")
121
122 -def package_location():
123 return "%s/%s" % (config.config.get('paths', 'local_storage'), "packages")
124 125 # synchronization classes ----------------------------------------------------- 126
127 -class InvalidPathError(Exception):
128 pass
129 130
131 -class BaseSynchronizer(object):
132
133 - def __init__(self):
134 self.package_api = PackageApi() 135 self.errata_api = ErrataApi()
136
137 - def add_packages_from_dir(self, dir, repo):
138 139 startTime = time.time() 140 log.debug("Begin to add packages from %s into %s" % (dir, repo['id'])) 141 package_list = pulp.server.util.get_repo_packages(dir) 142 added_packages = {} 143 added_errataids = [] 144 log.debug("Processing %s potential packages" % (len(package_list))) 145 for package in package_list: 146 package = self.import_package(package, repo) 147 if (package is not None): 148 added_packages[package["id"]] = package 149 endTime = time.time() 150 log.debug("Repo: %s read [%s] packages took %s seconds" % 151 (repo['id'], len(added_packages), endTime - startTime)) 152 # Import groups metadata if present 153 repomd_xml_path = os.path.join(dir.encode("ascii", "ignore"), 'repodata/repomd.xml') 154 if os.path.isfile(repomd_xml_path): 155 repo["repomd_xml_path"] = repomd_xml_path 156 ftypes = pulp.server.util.get_repomd_filetypes(repomd_xml_path) 157 log.debug("repodata has filetypes of %s" % (ftypes)) 158 if "group" in ftypes: 159 group_xml_path = pulp.server.util.get_repomd_filetype_path(repomd_xml_path, "group") 160 group_xml_path = os.path.join(dir.encode("ascii", "ignore"), group_xml_path) 161 if os.path.isfile(group_xml_path): 162 groupfile = open(group_xml_path, "r") 163 repo['group_xml_path'] = group_xml_path 164 self.sync_groups_data(groupfile, repo) 165 log.info("Loaded group info from %s" % (group_xml_path)) 166 else: 167 log.info("Group info not found at file: %s" % (group_xml_path)) 168 if "group_gz" in ftypes: 169 group_gz_xml_path = pulp.server.util.get_repomd_filetype_path( 170 repomd_xml_path, "group_gz") 171 group_gz_xml_path = os.path.join(dir.encode("ascii", "ignore"), 172 group_gz_xml_path) 173 repo['group_gz_xml_path'] = group_gz_xml_path 174 if "updateinfo" in ftypes: 175 updateinfo_xml_path = pulp.server.util.get_repomd_filetype_path( 176 repomd_xml_path, "updateinfo") 177 updateinfo_xml_path = os.path.join(dir.encode("ascii", "ignore"), 178 updateinfo_xml_path) 179 log.info("updateinfo is found in repomd.xml, it's path is %s" % \ 180 (updateinfo_xml_path)) 181 added_errataids = self.sync_updateinfo_data(updateinfo_xml_path, repo) 182 log.debug("Loaded updateinfo from %s for %s" % \ 183 (updateinfo_xml_path, repo["id"])) 184 return added_packages, added_errataids
185
186 - def import_package(self, package, repo):
187 try: 188 retval = None 189 file_name = package.relativepath 190 hashtype = "sha256" 191 checksum = package.checksum 192 found = self.package_api.packages(name=package.name, 193 epoch=package.epoch, version=package.version, 194 release=package.release, arch=package.arch, 195 filename=file_name, 196 checksum_type=hashtype, checksum=checksum) 197 if len(found) == 1: 198 retval = found[0] 199 else: 200 retval = self.package_api.create(package.name, package.epoch, 201 package.version, package.release, package.arch, package.description, 202 hashtype, checksum, file_name) 203 for dep in package.requires: 204 retval.requires.append(dep[0]) 205 for prov in package.provides: 206 retval.provides.append(prov[0]) 207 retval.download_url = config.config.get('server', 'base_url') + "/" + \ 208 config.config.get('server', 'relative_url') + "/" + \ 209 repo["id"] + "/" + file_name 210 self.package_api.update(retval) 211 return retval 212 except Exception, e: 213 log.error("error reading package %s" % (file_name)) 214 log.debug("%s" % (traceback.format_exc()))
215
216 - def sync_groups_data(self, compsfile, repo):
217 """ 218 Synchronizes package group/category info from a repo's group metadata 219 Caller is responsible for saving repo to db. 220 """ 221 try: 222 comps = yum.comps.Comps() 223 comps.add(compsfile) 224 # Remove all "repo_defined" groups/categories 225 for grp_id in repo["packagegroups"]: 226 if repo["packagegroups"][grp_id]["repo_defined"]: 227 del repo["packagegroups"][grp_id] 228 for cat_id in repo["packagegroupcategories"]: 229 if repo["packagegroupcategories"][cat_id]["repo_defined"]: 230 del repo["packagegroupcategories"][cat_id] 231 # Add all groups/categories from repo 232 for c in comps.categories: 233 ctg = pulp.server.comps_util.yum_category_to_model_category(c) 234 ctg["immutable"] = True 235 ctg["repo_defined"] = True 236 repo['packagegroupcategories'][ctg['id']] = ctg 237 for g in comps.groups: 238 grp = pulp.server.comps_util.yum_group_to_model_group(g) 239 grp["immutable"] = True 240 grp["repo_defined"] = True 241 repo['packagegroups'][grp['id']] = grp 242 except yum.Errors.CompsException: 243 log.error("Unable to parse group info for %s" % (compsfile)) 244 return False 245 return True
246
247 - def sync_updateinfo_data(self, updateinfo_xml_path, repo):
248 """ 249 @param updateinfo_xml_path: path to updateinfo metadata xml file 250 @param repo: model.Repo object we want to sync 251 """ 252 eids = [] 253 try: 254 start = time.time() 255 errata = updateinfo.get_errata(updateinfo_xml_path) 256 log.debug("Parsed %s, %s UpdateNotices were returned." % 257 (updateinfo_xml_path, len(errata))) 258 for e in errata: 259 eids.append(e['id']) 260 # Replace existing errata if the update date is newer 261 found = self.errata_api.erratum(e['id']) 262 if found: 263 if e['updated'] <= found['updated']: 264 continue 265 log.debug("Updating errata %s, it's updated date %s is newer than %s." % \ 266 (e['id'], e["updated"], found["updated"])) 267 self.errata_api.delete(e['id']) 268 pkglist = e['pkglist'] 269 self.errata_api.create(id=e['id'], title=e['title'], 270 description=e['description'], version=e['version'], 271 release=e['release'], type=e['type'], 272 status=e['status'], updated=e['updated'], 273 issued=e['issued'], pushcount=e['pushcount'], 274 from_str=e['from_str'], reboot_suggested=e['reboot_suggested'], 275 references=e['references'], pkglist=pkglist, 276 repo_defined=True, immutable=True) 277 end = time.time() 278 log.debug("%s new/updated errata imported in %s seconds" % (len(eids), (end - start))) 279 except yum.Errors.YumBaseError, e: 280 log.error("Unable to parse updateinfo file %s for %s" % (updateinfo_xml_path, repo["id"])) 281 return [] 282 return eids
283
284 -class YumSynchronizer(BaseSynchronizer):
285
286 - def sync(self, repo, repo_source, progress_callback=None):
287 cacert = clicert = clikey = None 288 if repo['ca'] and repo['cert'] and repo['key']: 289 cacert = repo['ca'].encode('utf8') 290 clicert = repo['cert'].encode('utf8') 291 clikey = repo['key'].encode('utf8') 292 293 yfetch = YumRepoGrinder('', repo_source['url'].encode('ascii', 'ignore'), 294 1, cacert=cacert, clicert=clicert, 295 clikey=clikey, packages_location=package_location()) 296 relative_path = repo['relative_path'] 297 if relative_path: 298 store_path = "%s/%s" % (repos_location(), relative_path) 299 else: 300 store_path = "%s/%s" % (repos_location(), repo['id']) 301 yfetch.fetchYumRepo(store_path, callback=progress_callback) 302 303 return store_path
304 305
306 -class LocalSynchronizer(BaseSynchronizer):
307 """ 308 Sync class to synchronize a directory of rpms from a local filer 309 """
310 - def sync(self, repo, repo_source, progress_callback=None):
311 pkg_dir = urlparse(repo_source['url']).path.encode('ascii', 'ignore') 312 log.debug("sync of %s for repo %s" % (pkg_dir, repo['id'])) 313 try: 314 repo_dir = "%s/%s" % (repos_location(), repo['id']) 315 if not os.path.exists(pkg_dir): 316 raise InvalidPathError("Path %s is invalid" % pkg_dir) 317 if repo['use_symlinks']: 318 log.info("create a symlink to src directory %s %s" % (pkg_dir, repo_dir)) 319 os.symlink(pkg_dir, repo_dir) 320 else: 321 if not os.path.exists(repo_dir): 322 os.makedirs(repo_dir) 323 324 pkglist = pulp.server.util.listdir(pkg_dir) 325 log.debug("Found %s packages in %s" % (len(pkglist), pkg_dir)) 326 for count, pkg in enumerate(pkglist): 327 if pkg.endswith(".rpm"): 328 if count % 500 == 0: 329 log.debug("Working on %s/%s" % (count, len(pkglist))) 330 pkg_info = pulp.server.util.get_rpm_information(pkg) 331 pkg_location = "%s/%s/%s/%s/%s/%s" % (package_location(), pkg_info.name, pkg_info.version, 332 pkg_info.release, pkg_info.arch, os.path.basename(pkg)) 333 log.debug('Expected Package Location: %s' % pkg_location) 334 if not pulp.server.util.check_package_exists(pkg_location,\ 335 pulp.server.util.get_file_checksum(filename=pkg)): 336 log.error("package doesn't exist. \ 337 Write the package to packages location: %s" % pkg_location) 338 pkg_dirname = os.path.dirname(pkg_location) 339 if not os.path.exists(pkg_dirname): 340 os.makedirs(pkg_dirname) 341 shutil.copy(pkg, pkg_location) 342 else: 343 log.error("package Already exists. continue") 344 345 repo_pkg_path = os.path.join(repo_dir, os.path.basename(pkg)) 346 if not os.path.islink(repo_pkg_path): 347 os.symlink(pkg_location, repo_pkg_path) 348 349 ##TODO: Need to revist the removal case 350 # # Remove rpms which are no longer in source 351 # existing_pkgs = [] 352 # for pkg in pulp.server.util.listdir(repo_dir): 353 # if pkg.endswith(".rpm"): 354 # existing_pkgs.append(os.path.basename(pkg)) 355 # source_pkgs = [os.path.basename(p) for p in pkglist] 356 # for epkg in existing_pkgs: 357 # if epkg not in source_pkgs: 358 # log.info("Remove %s from repo %s because it is not in repo_source" % (epkg, repo["id"])) 359 # os.remove(os.path.join(repo_dir, epkg)) 360 groups_xml_path = None 361 updateinfo_path = None 362 src_repomd_xml = os.path.join(pkg_dir, "repodata/repomd.xml") 363 if os.path.isfile(src_repomd_xml): 364 ftypes = pulp.server.util.get_repomd_filetypes(src_repomd_xml) 365 log.debug("repodata has filetypes of %s" % (ftypes)) 366 if "group" in ftypes: 367 g = pulp.server.util.get_repomd_filetype_path(src_repomd_xml, "group") 368 src_groups = os.path.join(pkg_dir, g) 369 if os.path.isfile(src_groups): 370 shutil.copy(src_groups, 371 os.path.join(repo_dir, os.path.basename(src_groups))) 372 log.debug("Copied groups over to %s" % (repo_dir)) 373 groups_xml_path = os.path.join(repo_dir, 374 os.path.basename(src_groups)) 375 if "updateinfo" in ftypes: 376 f = pulp.server.util.get_repomd_filetype_path(src_repomd_xml, "updateinfo") 377 src_updateinfo_path = os.path.join(pkg_dir, f) 378 if os.path.isfile(src_updateinfo_path): 379 # Copy the updateinfo metadata to 'updateinfo.xml' 380 # We want to ensure modifyrepo is run with updateinfo 381 # called 'updateinfo.xml', this result in correct 382 # metadata type 383 # 384 # updateinfo reported from repomd.xml may be gzipped, 385 # if it is uncompress and copy to updateinfo.xml 386 # along side of packages in repo 387 # 388 f = src_updateinfo_path.endswith('.gz') and gzip.open(src_updateinfo_path) \ 389 or open(src_updateinfo_path, 'rt') 390 shutil.copyfileobj(f, open( 391 os.path.join(repo_dir, "updateinfo.xml"), "wt")) 392 log.debug("Copied %s to %s" % (src_updateinfo_path, repo_dir)) 393 updateinfo_path = os.path.join(repo_dir, "updateinfo.xml") 394 log.info("Running createrepo, this may take a few minutes to complete.") 395 start = time.time() 396 pulp.server.upload.create_repo(repo_dir, groups=groups_xml_path) 397 end = time.time() 398 log.info("Createrepo finished in %s seconds" % (end - start)) 399 if updateinfo_path: 400 log.debug("Modifying repo for updateinfo") 401 pulp.server.upload.modify_repo(os.path.join(repo_dir, "repodata"), 402 updateinfo_path) 403 except InvalidPathError: 404 log.error("Sync aborted due to invalid source path %s" % (pkg_dir)) 405 raise 406 except IOError: 407 log.error("Unable to create repo directory %s" % repo_dir) 408 raise 409 return repo_dir
410
411 -class RHNSynchronizer(BaseSynchronizer):
412
413 - def sync(self, repo, repo_source, progress_callback=None):
414 # Parse the repo source for necessary pieces 415 # Expected format: <server>/<channel> 416 pieces = repo_source['url'].split('/') 417 if len(pieces) < 2: 418 raise PulpException('Feed format for RHN type must be <server>/<channel>. Feed: %s', 419 repo_source['url']) 420 421 host = 'http://' + pieces[0] 422 channel = pieces[1] 423 424 log.info('Synchronizing from RHN. Host [%s], Channel [%s]' % (host, channel)) 425 426 # Create and configure the grinder hook to RHN 427 s = RHNSync() 428 s.setURL(host) 429 s.setParallel(config.config.get('rhn', 'threads')) 430 431 # Perform the sync 432 dest_dir = '%s/%s/' % (config.config.get('paths', 'local_storage'), repo['id']) 433 s.syncPackages(channel, savePath=dest_dir, callback=progress_callback) 434 s.createRepo(dest_dir) 435 updateinfo_path = os.path.join(dest_dir, "updateinfo.xml") 436 if os.path.isfile(updateinfo_path): 437 log.info("updateinfo_path is found, calling updateRepo") 438 s.updateRepo(updateinfo_path, os.path.join(dest_dir, "repodata")) 439 440 return dest_dir
441 442 # synchronization type map ---------------------------------------------------- 443 444 type_classes = { 445 'yum': YumSynchronizer, 446 'local': LocalSynchronizer, 447 'rhn': RHNSynchronizer, 448 } 449