Skip to content

Commit

Permalink
Add loop functionality to service
Browse files Browse the repository at this point in the history
Made the lzreposync service continuously loop over the existing
channels and synchronize the corresponding repositories.

Added a status column in the rhnchannel table to indicate the
sync status of a given channel.

Also added some helper arguments to the service that allows us to
perform test operations, like creating a test channel and
associating repositories to it, etc
  • Loading branch information
waterflow80 committed Dec 7, 2024
1 parent 574e43e commit dc76a30
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 88 deletions.
264 changes: 181 additions & 83 deletions python/lzreposync/src/lzreposync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,136 @@
get_compatible_arches,
get_channel_info_by_label,
get_all_arches,
create_channel,
create_test_channel,
ChannelAlreadyExistsException,
NoSourceFoundForChannel,
get_all_channels,
create_content_source,
update_channel_status,
)
from lzreposync.import_utils import (
import_package_batch,
batched,
import_repository_packages_in_batch,
)
from lzreposync.rpm_repo import RPMRepo
from lzreposync.rpm_repo import RPMRepo, SignatureVerificationException
from spacewalk.common.repo import GeneralRepoException
from spacewalk.satellite_tools.repo_plugins.deb_src import DebRepo

SLEEP_TIME = 2 # Num of seconds between one sync and another

def _create_channel(channel_label, channel_arch):
print(
f"Creating a new channel with label: {channel_label}, and arch: {channel_arch}"
)
try:
channel = create_test_channel(
channel_label=channel_label, channel_arch=channel_arch
)
print(
f"Info: successfully created channel: {channel_label} -> id={channel.get_id()}, name={channel.get_label()}"
)
except ChannelAlreadyExistsException:
print(f"Warn: failed to create channel {channel_label}. Already exists !!")


def _add_content_source(channel_label, source_url, source_label, source_type="yum"):
create_content_source(
channel_label,
repo_label=source_label,
source_url=source_url,
source_type=source_type,
)


def _sync_channel(channel_label, cache_dir, batch_size=20, no_errata=False):
"""
Synchronize the repositories of the given channel
"""
channel = get_channel_info_by_label(channel_label)
channel_arch = channel["arch"].split("-")[
1
] # Initially the value is like: 'channel-x86_64'
if not channel:
logging.error("Couldn't fetch channel with label %s", channel_label)
return -1
compatible_arches = get_compatible_arches(channel_label)
if channel_arch and channel_arch != ".*" and channel_arch not in compatible_arches:
logging.error(
"Not compatible arch: %s for channel: %s",
channel_arch,
channel_label,
)
return -1
try:
target_repos = db_utils.get_repositories_by_channel_label(channel_label)
except NoSourceFoundForChannel as e:
print("Error:", e.msg)
return -1

failed_repos = [] # contains the list of labels of failed repos
for repo in target_repos:
if repo.repo_type == "yum":
try:
rpm_repo = RPMRepo(
repo.repo_label, cache_dir, repo.source_url, repo.channel_arch
)
logging.debug("Importing package for repo %s", repo.repo_label)
failed = import_repository_packages_in_batch(
rpm_repo,
batch_size,
channel,
compatible_arches=compatible_arches,
no_errata=no_errata,
)
logging.debug(
"Completed import for repo %s with %d failed packages",
repo.repo_label,
failed,
)
except SignatureVerificationException as e:
print(e.message)
failed_repos.append(repo.repo_label)

elif repo.repo_type == "deb":
dep_repo = DebRepo(
repo.source_url,
cache_dir,
pkg_dir="/tmp",
channel_label=repo.channel_label,
)
try:
dep_repo.verify()
except GeneralRepoException as e:
logging.error("__init__.py: Couldn't verify signature ! %s", e)
failed_repos.append(repo.repo_label)

logging.debug("Importing package for repo %s", repo.repo_label)
failed = import_repository_packages_in_batch(
dep_repo,
batch_size,
channel,
compatible_arches=compatible_arches,
)
logging.debug(
"Completed import for repo %s with %d failed packages",
repo.repo_label,
failed,
)
else:
# TODO: handle repositories other than yum and deb
logging.debug("Not supported repo type: %s", repo.repo_type)
continue
return failed_repos


def _display_failed_repos(repos):
"""
A display helper function
"""
for repo_label in repos:
print(f"=> Failed syncing repository: {repo_label}")


def main():
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -59,7 +176,6 @@ def main():
)

parser.add_argument(
"-c",
"--cache",
help="Path to the cache directory",
dest="cache",
Expand Down Expand Up @@ -119,6 +235,15 @@ def main():
nargs=2,
)

parser.add_argument(
"--create-content-source",
help="Adding a new content source to channel by specifying 'channel_label', 'source_url', 'source_label' and 'type'\n"
"Eg: --create-content-source test_channel https://download.opensuse.org/update/leap/15.5/oss/ leap15.5 yum",
dest="source_info",
type=str,
nargs=4,
)

args = parser.parse_args()

# Remove any existing handlers (loggers)
Expand All @@ -128,26 +253,25 @@ def main():

# Creating a new channel
if args.channel_info:
channel_label, channel_arch = args.channel_info[0], args.channel_info[1]
print(
f"Creating a new channel with label: {channel_label}, and arch: {channel_arch}"
_create_channel(args.channel_info[0], args.channel_info[1])
return

# Adding content source to channel
if args.source_info:
_add_content_source(
args.source_info[0],
args.source_info[1],
args.source_info[2],
args.source_info[3],
)
try:
channel = create_channel(
channel_label=channel_label, channel_arch=channel_arch
)
print(
f"Info: successfully created channel: {channel_label} -> id={channel.get_id()}, name={channel.get_label()}"
)
except ChannelAlreadyExistsException:
print(f"Warn: failed to create channel {channel_label}. Already exists !!")
return

arch = args.arch
if arch != ".*":
arch = f"(noarch|{args.arch})"

if args.url:
#### sync using url ###
if not args.repo_type:
print("ERROR: --type (yum/deb) must be specified when using --url")
return
Expand All @@ -169,74 +293,48 @@ def main():
)
logging.debug("Completed import with %d failed packages", failed)

elif args.channel:
#### sync using channel label ###
_sync_channel(args.channel, args.cache, args.batch_size, args.no_errata)

else:
# No url specified
if args.channel:
channel_label = args.channel
channel = get_channel_info_by_label(channel_label)
if not channel:
logging.error("Couldn't fetch channel with label %s", channel_label)
return
compatible_arches = get_compatible_arches(channel_label)
if args.arch and args.arch != ".*" and args.arch not in compatible_arches:
logging.error(
"Not compatible arch: %s for channel: %s",
args.channel_arch,
args.channel,
)
return
### Execute Service Indefinitely ###
### Continuously Looping over all the channels ###
logging.info("Executing lzreposync service")
while True:
try:
target_repos = db_utils.get_repositories_by_channel_label(channel_label)
except NoSourceFoundForChannel as e:
print("Error:", e.msg)
return
for repo in target_repos:
if repo.repo_type == "yum":
rpm_repo = RPMRepo(
repo.repo_label, args.cache, repo.source_url, repo.channel_arch
)
logging.debug("Importing package for repo %s", repo.repo_label)
failed = import_repository_packages_in_batch(
rpm_repo,
args.batch_size,
channel,
compatible_arches=compatible_arches,
no_errata=args.no_errata,
)
logging.debug(
"Completed import for repo %s with %d failed packages",
repo.repo_label,
failed,
)
elif repo.repo_type == "deb":
dep_repo = DebRepo(
repo.source_url,
args.cache,
pkg_dir="/tmp",
channel_label=repo.channel_label,
)
try:
dep_repo.verify()
except GeneralRepoException as e:
logging.error("__init__.py: Couldn't verify signature ! %s", e)
exit(0)

logging.debug("Importing package for repo %s", repo.repo_label)
failed = import_repository_packages_in_batch(
dep_repo,
args.batch_size,
channel,
compatible_arches=compatible_arches,
)
logging.debug(
"Completed import for repo %s with %d failed packages",
repo.repo_label,
failed,
)
else:
# TODO: handle repositories other than yum and deb
logging.debug("Not supported repo type: %s", repo.repo_type)
continue
all_channels = get_all_channels()
if not all_channels:
print("No channels in the database! Leaving..")
return

else:
logging.error("Either --url or --channel must be specified")
for channel in all_channels:
ch_label = channel["channel_label"]
ch_status = channel["status"]
if ch_status == "pending" or ch_status == "failed":
print(f"INFO: Start synchronizing channel: {ch_label}")
# Update the channel status to 'in_progress'
update_channel_status(ch_label, "in_progress")
ret = _sync_channel(
ch_label, args.cache, args.batch_size, args.no_errata
)
if isinstance(ret, int):
channel["status"] = "failed"
update_channel_status(ch_label, "failed")
print(f"Failed synchronizing channel {ch_label}")
elif isinstance(ret, list):
if len(ret) == 0:
# No failed repositories
update_channel_status(ch_label, "done")
print(f"Successfully synchronized channel {ch_label}")
else:
update_channel_status(ch_label, "failed")
print(
f"Failed fully synchronizing channel {ch_label}. Some repos might have been synced."
)
_display_failed_repos(ret)

except KeyboardInterrupt:
print("Lzreposync is being stopped..")
# TODO: we might want to add some cleanup
exit(0)
Loading

0 comments on commit dc76a30

Please sign in to comment.