1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import gzip
17 import logging
18 import os
19 import time
20 import traceback
21 import shutil
22 from urlparse import urlparse
23
24 import yum
25
26 import pulp.server.comps_util
27 import pulp.server.crontab
28 import pulp.server.upload
29 import pulp.server.util
30 from grinder.RepoFetch import YumRepoGrinder
31 from grinder.RHNSync import RHNSync
32 from pulp.server import updateinfo
33 from pulp.server.api.errata import ErrataApi
34 from pulp.server.api.package import PackageApi
35 from pulp.server import config
36 from pulp.server.pexceptions import PulpException
37
38
39 log = logging.getLogger(__name__)
40
41
42
44 fields = ('status',
45 'item_name',
46 'items_left',
47 'items_total',
48 'size_left',
49 'size_total')
50 values = tuple(getattr(info, f) for f in fields)
51 log.info("Progress: %s on <%s>, %s/%s items %s/%s bytes" % values)
52 return dict(zip(fields, values))
53
54
55 -def sync(repo, repo_source, progress_callback=None):
56 '''
57 Synchronizes content for the given RepoSource.
58
59 @param repo: repo to synchronize; may not be None
60 @type repo: L{pulp.model.Repo}
61
62 @param repo_source: indicates the source from which the repo data will be syncced; may not be None
63 @type repo_source: L{pulp.model.RepoSource}
64 '''
65 source_type = repo_source['type']
66 if source_type not in type_classes:
67 raise PulpException('Could not find synchronizer for repo type [%s]', source_type)
68 synchronizer = type_classes[source_type]()
69 repo_dir = synchronizer.sync(repo, repo_source, progress_callback)
70 return synchronizer.add_packages_from_dir(repo_dir, repo)
71
72
74 '''
75 Updates the repo sync scheduler entry with the schedule for the given repo.
76
77 @param repo: repo containg the id and sync schedule; may not be None
78 @type repo: L{pulp.model.Repo}
79 '''
80 tab = pulp.server.crontab.CronTab()
81
82 cmd = _cron_command(repo)
83 entries = tab.find_command(cmd)
84
85 if len(entries) == 0:
86 entry = tab.new(command=cmd)
87 else:
88 entry = entries[0]
89
90 entry.parse(repo['sync_schedule'] + ' ' + cmd)
91
92 log.debug('Writing updated cron entry [%s]' % entry.render())
93 tab.write()
94
95
97 '''
98 Deletes the repo sync schedule file for the given repo.
99
100 @param repo: repo containg the id and sync schedule; may not be None
101 @type repo: L{pulp.model.Repo}
102 '''
103 tab = pulp.server.crontab.CronTab()
104
105 cmd = _cron_command(repo)
106 entries = tab.find_command(cmd)
107
108 if len(entries) > 0:
109 for entry in entries:
110 log.debug('Removing entry [%s]' % entry.render())
111 tab.remove(entry)
112 tab.write()
113 else:
114 log.debug('No existing cron entry for repo [%s]' % repo['id'])
115
117 return 'pulp repo sync %s' % repo['id']
118
120 return "%s/%s" % (config.config.get('paths', 'local_storage'), "repos")
121
123 return "%s/%s" % (config.config.get('paths', 'local_storage'), "packages")
124
125
126
129
130
132
136
138
139 startTime = time.time()
140 log.debug("Begin to add packages from %s into %s" % (dir, repo['id']))
141 package_list = pulp.server.util.get_repo_packages(dir)
142 added_packages = {}
143 added_errataids = []
144 log.debug("Processing %s potential packages" % (len(package_list)))
145 for package in package_list:
146 package = self.import_package(package, repo)
147 if (package is not None):
148 added_packages[package["id"]] = package
149 endTime = time.time()
150 log.debug("Repo: %s read [%s] packages took %s seconds" %
151 (repo['id'], len(added_packages), endTime - startTime))
152
153 repomd_xml_path = os.path.join(dir.encode("ascii", "ignore"), 'repodata/repomd.xml')
154 if os.path.isfile(repomd_xml_path):
155 repo["repomd_xml_path"] = repomd_xml_path
156 ftypes = pulp.server.util.get_repomd_filetypes(repomd_xml_path)
157 log.debug("repodata has filetypes of %s" % (ftypes))
158 if "group" in ftypes:
159 group_xml_path = pulp.server.util.get_repomd_filetype_path(repomd_xml_path, "group")
160 group_xml_path = os.path.join(dir.encode("ascii", "ignore"), group_xml_path)
161 if os.path.isfile(group_xml_path):
162 groupfile = open(group_xml_path, "r")
163 repo['group_xml_path'] = group_xml_path
164 self.sync_groups_data(groupfile, repo)
165 log.info("Loaded group info from %s" % (group_xml_path))
166 else:
167 log.info("Group info not found at file: %s" % (group_xml_path))
168 if "group_gz" in ftypes:
169 group_gz_xml_path = pulp.server.util.get_repomd_filetype_path(
170 repomd_xml_path, "group_gz")
171 group_gz_xml_path = os.path.join(dir.encode("ascii", "ignore"),
172 group_gz_xml_path)
173 repo['group_gz_xml_path'] = group_gz_xml_path
174 if "updateinfo" in ftypes:
175 updateinfo_xml_path = pulp.server.util.get_repomd_filetype_path(
176 repomd_xml_path, "updateinfo")
177 updateinfo_xml_path = os.path.join(dir.encode("ascii", "ignore"),
178 updateinfo_xml_path)
179 log.info("updateinfo is found in repomd.xml, it's path is %s" % \
180 (updateinfo_xml_path))
181 added_errataids = self.sync_updateinfo_data(updateinfo_xml_path, repo)
182 log.debug("Loaded updateinfo from %s for %s" % \
183 (updateinfo_xml_path, repo["id"]))
184 return added_packages, added_errataids
185
187 try:
188 retval = None
189 file_name = package.relativepath
190 hashtype = "sha256"
191 checksum = package.checksum
192 found = self.package_api.packages(name=package.name,
193 epoch=package.epoch, version=package.version,
194 release=package.release, arch=package.arch,
195 filename=file_name,
196 checksum_type=hashtype, checksum=checksum)
197 if len(found) == 1:
198 retval = found[0]
199 else:
200 retval = self.package_api.create(package.name, package.epoch,
201 package.version, package.release, package.arch, package.description,
202 hashtype, checksum, file_name)
203 for dep in package.requires:
204 retval.requires.append(dep[0])
205 for prov in package.provides:
206 retval.provides.append(prov[0])
207 retval.download_url = config.config.get('server', 'base_url') + "/" + \
208 config.config.get('server', 'relative_url') + "/" + \
209 repo["id"] + "/" + file_name
210 self.package_api.update(retval)
211 return retval
212 except Exception, e:
213 log.error("error reading package %s" % (file_name))
214 log.debug("%s" % (traceback.format_exc()))
215
217 """
218 Synchronizes package group/category info from a repo's group metadata
219 Caller is responsible for saving repo to db.
220 """
221 try:
222 comps = yum.comps.Comps()
223 comps.add(compsfile)
224
225 for grp_id in repo["packagegroups"]:
226 if repo["packagegroups"][grp_id]["repo_defined"]:
227 del repo["packagegroups"][grp_id]
228 for cat_id in repo["packagegroupcategories"]:
229 if repo["packagegroupcategories"][cat_id]["repo_defined"]:
230 del repo["packagegroupcategories"][cat_id]
231
232 for c in comps.categories:
233 ctg = pulp.server.comps_util.yum_category_to_model_category(c)
234 ctg["immutable"] = True
235 ctg["repo_defined"] = True
236 repo['packagegroupcategories'][ctg['id']] = ctg
237 for g in comps.groups:
238 grp = pulp.server.comps_util.yum_group_to_model_group(g)
239 grp["immutable"] = True
240 grp["repo_defined"] = True
241 repo['packagegroups'][grp['id']] = grp
242 except yum.Errors.CompsException:
243 log.error("Unable to parse group info for %s" % (compsfile))
244 return False
245 return True
246
248 """
249 @param updateinfo_xml_path: path to updateinfo metadata xml file
250 @param repo: model.Repo object we want to sync
251 """
252 eids = []
253 try:
254 start = time.time()
255 errata = updateinfo.get_errata(updateinfo_xml_path)
256 log.debug("Parsed %s, %s UpdateNotices were returned." %
257 (updateinfo_xml_path, len(errata)))
258 for e in errata:
259 eids.append(e['id'])
260
261 found = self.errata_api.erratum(e['id'])
262 if found:
263 if e['updated'] <= found['updated']:
264 continue
265 log.debug("Updating errata %s, it's updated date %s is newer than %s." % \
266 (e['id'], e["updated"], found["updated"]))
267 self.errata_api.delete(e['id'])
268 pkglist = e['pkglist']
269 self.errata_api.create(id=e['id'], title=e['title'],
270 description=e['description'], version=e['version'],
271 release=e['release'], type=e['type'],
272 status=e['status'], updated=e['updated'],
273 issued=e['issued'], pushcount=e['pushcount'],
274 from_str=e['from_str'], reboot_suggested=e['reboot_suggested'],
275 references=e['references'], pkglist=pkglist,
276 repo_defined=True, immutable=True)
277 end = time.time()
278 log.debug("%s new/updated errata imported in %s seconds" % (len(eids), (end - start)))
279 except yum.Errors.YumBaseError, e:
280 log.error("Unable to parse updateinfo file %s for %s" % (updateinfo_xml_path, repo["id"]))
281 return []
282 return eids
283
285
286 - def sync(self, repo, repo_source, progress_callback=None):
287 cacert = clicert = clikey = None
288 if repo['ca'] and repo['cert'] and repo['key']:
289 cacert = repo['ca'].encode('utf8')
290 clicert = repo['cert'].encode('utf8')
291 clikey = repo['key'].encode('utf8')
292
293 yfetch = YumRepoGrinder('', repo_source['url'].encode('ascii', 'ignore'),
294 1, cacert=cacert, clicert=clicert,
295 clikey=clikey, packages_location=package_location())
296 relative_path = repo['relative_path']
297 if relative_path:
298 store_path = "%s/%s" % (repos_location(), relative_path)
299 else:
300 store_path = "%s/%s" % (repos_location(), repo['id'])
301 yfetch.fetchYumRepo(store_path, callback=progress_callback)
302
303 return store_path
304
305
307 """
308 Sync class to synchronize a directory of rpms from a local filer
309 """
310 - def sync(self, repo, repo_source, progress_callback=None):
311 pkg_dir = urlparse(repo_source['url']).path.encode('ascii', 'ignore')
312 log.debug("sync of %s for repo %s" % (pkg_dir, repo['id']))
313 try:
314 repo_dir = "%s/%s" % (repos_location(), repo['id'])
315 if not os.path.exists(pkg_dir):
316 raise InvalidPathError("Path %s is invalid" % pkg_dir)
317 if repo['use_symlinks']:
318 log.info("create a symlink to src directory %s %s" % (pkg_dir, repo_dir))
319 os.symlink(pkg_dir, repo_dir)
320 else:
321 if not os.path.exists(repo_dir):
322 os.makedirs(repo_dir)
323
324 pkglist = pulp.server.util.listdir(pkg_dir)
325 log.debug("Found %s packages in %s" % (len(pkglist), pkg_dir))
326 for count, pkg in enumerate(pkglist):
327 if pkg.endswith(".rpm"):
328 if count % 500 == 0:
329 log.debug("Working on %s/%s" % (count, len(pkglist)))
330 pkg_info = pulp.server.util.get_rpm_information(pkg)
331 pkg_location = "%s/%s/%s/%s/%s/%s" % (package_location(), pkg_info.name, pkg_info.version,
332 pkg_info.release, pkg_info.arch, os.path.basename(pkg))
333 log.debug('Expected Package Location: %s' % pkg_location)
334 if not pulp.server.util.check_package_exists(pkg_location,\
335 pulp.server.util.get_file_checksum(filename=pkg)):
336 log.error("package doesn't exist. \
337 Write the package to packages location: %s" % pkg_location)
338 pkg_dirname = os.path.dirname(pkg_location)
339 if not os.path.exists(pkg_dirname):
340 os.makedirs(pkg_dirname)
341 shutil.copy(pkg, pkg_location)
342 else:
343 log.error("package Already exists. continue")
344
345 repo_pkg_path = os.path.join(repo_dir, os.path.basename(pkg))
346 if not os.path.islink(repo_pkg_path):
347 os.symlink(pkg_location, repo_pkg_path)
348
349
350
351
352
353
354
355
356
357
358
359
360 groups_xml_path = None
361 updateinfo_path = None
362 src_repomd_xml = os.path.join(pkg_dir, "repodata/repomd.xml")
363 if os.path.isfile(src_repomd_xml):
364 ftypes = pulp.server.util.get_repomd_filetypes(src_repomd_xml)
365 log.debug("repodata has filetypes of %s" % (ftypes))
366 if "group" in ftypes:
367 g = pulp.server.util.get_repomd_filetype_path(src_repomd_xml, "group")
368 src_groups = os.path.join(pkg_dir, g)
369 if os.path.isfile(src_groups):
370 shutil.copy(src_groups,
371 os.path.join(repo_dir, os.path.basename(src_groups)))
372 log.debug("Copied groups over to %s" % (repo_dir))
373 groups_xml_path = os.path.join(repo_dir,
374 os.path.basename(src_groups))
375 if "updateinfo" in ftypes:
376 f = pulp.server.util.get_repomd_filetype_path(src_repomd_xml, "updateinfo")
377 src_updateinfo_path = os.path.join(pkg_dir, f)
378 if os.path.isfile(src_updateinfo_path):
379
380
381
382
383
384
385
386
387
388 f = src_updateinfo_path.endswith('.gz') and gzip.open(src_updateinfo_path) \
389 or open(src_updateinfo_path, 'rt')
390 shutil.copyfileobj(f, open(
391 os.path.join(repo_dir, "updateinfo.xml"), "wt"))
392 log.debug("Copied %s to %s" % (src_updateinfo_path, repo_dir))
393 updateinfo_path = os.path.join(repo_dir, "updateinfo.xml")
394 log.info("Running createrepo, this may take a few minutes to complete.")
395 start = time.time()
396 pulp.server.upload.create_repo(repo_dir, groups=groups_xml_path)
397 end = time.time()
398 log.info("Createrepo finished in %s seconds" % (end - start))
399 if updateinfo_path:
400 log.debug("Modifying repo for updateinfo")
401 pulp.server.upload.modify_repo(os.path.join(repo_dir, "repodata"),
402 updateinfo_path)
403 except InvalidPathError:
404 log.error("Sync aborted due to invalid source path %s" % (pkg_dir))
405 raise
406 except IOError:
407 log.error("Unable to create repo directory %s" % repo_dir)
408 raise
409 return repo_dir
410
412
413 - def sync(self, repo, repo_source, progress_callback=None):
414
415
416 pieces = repo_source['url'].split('/')
417 if len(pieces) < 2:
418 raise PulpException('Feed format for RHN type must be <server>/<channel>. Feed: %s',
419 repo_source['url'])
420
421 host = 'http://' + pieces[0]
422 channel = pieces[1]
423
424 log.info('Synchronizing from RHN. Host [%s], Channel [%s]' % (host, channel))
425
426
427 s = RHNSync()
428 s.setURL(host)
429 s.setParallel(config.config.get('rhn', 'threads'))
430
431
432 dest_dir = '%s/%s/' % (config.config.get('paths', 'local_storage'), repo['id'])
433 s.syncPackages(channel, savePath=dest_dir, callback=progress_callback)
434 s.createRepo(dest_dir)
435 updateinfo_path = os.path.join(dest_dir, "updateinfo.xml")
436 if os.path.isfile(updateinfo_path):
437 log.info("updateinfo_path is found, calling updateRepo")
438 s.updateRepo(updateinfo_path, os.path.join(dest_dir, "repodata"))
439
440 return dest_dir
441
442
443
444 type_classes = {
445 'yum': YumSynchronizer,
446 'local': LocalSynchronizer,
447 'rhn': RHNSynchronizer,
448 }
449