diff --git a/python/lzreposync/src/lzreposync/__init__.py b/python/lzreposync/src/lzreposync/__init__.py index 291d0e513786..b57e4b42f4ed 100644 --- a/python/lzreposync/src/lzreposync/__init__.py +++ b/python/lzreposync/src/lzreposync/__init__.py @@ -8,19 +8,136 @@ get_compatible_arches, get_channel_info_by_label, get_all_arches, - create_channel, + create_test_channel, ChannelAlreadyExistsException, NoSourceFoundForChannel, + get_all_channels, + create_content_source, + update_channel_status, ) from lzreposync.import_utils import ( import_package_batch, batched, import_repository_packages_in_batch, ) -from lzreposync.rpm_repo import RPMRepo +from lzreposync.rpm_repo import RPMRepo, SignatureVerificationException from spacewalk.common.repo import GeneralRepoException from spacewalk.satellite_tools.repo_plugins.deb_src import DebRepo +SLEEP_TIME = 2 # Num of seconds between one sync and another + +def _create_channel(channel_label, channel_arch): + print( + f"Creating a new channel with label: {channel_label}, and arch: {channel_arch}" + ) + try: + channel = create_test_channel( + channel_label=channel_label, channel_arch=channel_arch + ) + print( + f"Info: successfully created channel: {channel_label} -> id={channel.get_id()}, name={channel.get_label()}" + ) + except ChannelAlreadyExistsException: + print(f"Warn: failed to create channel {channel_label}. Already exists !!") + + +def _add_content_source(channel_label, source_url, source_label, source_type="yum"): + create_content_source( + channel_label, + repo_label=source_label, + source_url=source_url, + source_type=source_type, + ) + + +def _sync_channel(channel_label, cache_dir, batch_size=20, no_errata=False): + """ + Synchronize the repositories of the given channel + """ + channel = get_channel_info_by_label(channel_label) + channel_arch = channel["arch"].split("-")[ + 1 + ] # Initially the value is like: 'channel-x86_64' + if not channel: + logging.error("Couldn't fetch channel with label %s", channel_label) + return -1 + compatible_arches = get_compatible_arches(channel_label) + if channel_arch and channel_arch != ".*" and channel_arch not in compatible_arches: + logging.error( + "Not compatible arch: %s for channel: %s", + channel_arch, + channel_label, + ) + return -1 + try: + target_repos = db_utils.get_repositories_by_channel_label(channel_label) + except NoSourceFoundForChannel as e: + print("Error:", e.msg) + return -1 + + failed_repos = [] # contains the list of labels of failed repos + for repo in target_repos: + if repo.repo_type == "yum": + try: + rpm_repo = RPMRepo( + repo.repo_label, cache_dir, repo.source_url, repo.channel_arch + ) + logging.debug("Importing package for repo %s", repo.repo_label) + failed = import_repository_packages_in_batch( + rpm_repo, + batch_size, + channel, + compatible_arches=compatible_arches, + no_errata=no_errata, + ) + logging.debug( + "Completed import for repo %s with %d failed packages", + repo.repo_label, + failed, + ) + except SignatureVerificationException as e: + print(e.message) + failed_repos.append(repo.repo_label) + + elif repo.repo_type == "deb": + dep_repo = DebRepo( + repo.source_url, + cache_dir, + pkg_dir="/tmp", + channel_label=repo.channel_label, + ) + try: + dep_repo.verify() + except GeneralRepoException as e: + logging.error("__init__.py: Couldn't verify signature ! %s", e) + failed_repos.append(repo.repo_label) + + logging.debug("Importing package for repo %s", repo.repo_label) + failed = import_repository_packages_in_batch( + dep_repo, + batch_size, + channel, + compatible_arches=compatible_arches, + ) + logging.debug( + "Completed import for repo %s with %d failed packages", + repo.repo_label, + failed, + ) + else: + # TODO: handle repositories other than yum and deb + logging.debug("Not supported repo type: %s", repo.repo_type) + continue + return failed_repos + + +def _display_failed_repos(repos): + """ + A display helper function + """ + for repo_label in repos: + print(f"=> Failed syncing repository: {repo_label}") + def main(): parser = argparse.ArgumentParser( @@ -59,7 +176,6 @@ def main(): ) parser.add_argument( - "-c", "--cache", help="Path to the cache directory", dest="cache", @@ -119,6 +235,15 @@ def main(): nargs=2, ) + parser.add_argument( + "--create-content-source", + help="Adding a new content source to channel by specifying 'channel_label', 'source_url', 'source_label' and 'type'\n" + "Eg: --create-content-source test_channel https://download.opensuse.org/update/leap/15.5/oss/ leap15.5 yum", + dest="source_info", + type=str, + nargs=4, + ) + args = parser.parse_args() # Remove any existing handlers (loggers) @@ -128,19 +253,17 @@ def main(): # Creating a new channel if args.channel_info: - channel_label, channel_arch = args.channel_info[0], args.channel_info[1] - print( - f"Creating a new channel with label: {channel_label}, and arch: {channel_arch}" + _create_channel(args.channel_info[0], args.channel_info[1]) + return + + # Adding content source to channel + if args.source_info: + _add_content_source( + args.source_info[0], + args.source_info[1], + args.source_info[2], + args.source_info[3], ) - try: - channel = create_channel( - channel_label=channel_label, channel_arch=channel_arch - ) - print( - f"Info: successfully created channel: {channel_label} -> id={channel.get_id()}, name={channel.get_label()}" - ) - except ChannelAlreadyExistsException: - print(f"Warn: failed to create channel {channel_label}. Already exists !!") return arch = args.arch @@ -148,6 +271,7 @@ def main(): arch = f"(noarch|{args.arch})" if args.url: + #### sync using url ### if not args.repo_type: print("ERROR: --type (yum/deb) must be specified when using --url") return @@ -169,74 +293,48 @@ def main(): ) logging.debug("Completed import with %d failed packages", failed) + elif args.channel: + #### sync using channel label ### + _sync_channel(args.channel, args.cache, args.batch_size, args.no_errata) + else: - # No url specified - if args.channel: - channel_label = args.channel - channel = get_channel_info_by_label(channel_label) - if not channel: - logging.error("Couldn't fetch channel with label %s", channel_label) - return - compatible_arches = get_compatible_arches(channel_label) - if args.arch and args.arch != ".*" and args.arch not in compatible_arches: - logging.error( - "Not compatible arch: %s for channel: %s", - args.channel_arch, - args.channel, - ) - return + ### Execute Service Indefinitely ### + ### Continuously Looping over all the channels ### + logging.info("Executing lzreposync service") + while True: try: - target_repos = db_utils.get_repositories_by_channel_label(channel_label) - except NoSourceFoundForChannel as e: - print("Error:", e.msg) - return - for repo in target_repos: - if repo.repo_type == "yum": - rpm_repo = RPMRepo( - repo.repo_label, args.cache, repo.source_url, repo.channel_arch - ) - logging.debug("Importing package for repo %s", repo.repo_label) - failed = import_repository_packages_in_batch( - rpm_repo, - args.batch_size, - channel, - compatible_arches=compatible_arches, - no_errata=args.no_errata, - ) - logging.debug( - "Completed import for repo %s with %d failed packages", - repo.repo_label, - failed, - ) - elif repo.repo_type == "deb": - dep_repo = DebRepo( - repo.source_url, - args.cache, - pkg_dir="/tmp", - channel_label=repo.channel_label, - ) - try: - dep_repo.verify() - except GeneralRepoException as e: - logging.error("__init__.py: Couldn't verify signature ! %s", e) - exit(0) - - logging.debug("Importing package for repo %s", repo.repo_label) - failed = import_repository_packages_in_batch( - dep_repo, - args.batch_size, - channel, - compatible_arches=compatible_arches, - ) - logging.debug( - "Completed import for repo %s with %d failed packages", - repo.repo_label, - failed, - ) - else: - # TODO: handle repositories other than yum and deb - logging.debug("Not supported repo type: %s", repo.repo_type) - continue + all_channels = get_all_channels() + if not all_channels: + print("No channels in the database! Leaving..") + return - else: - logging.error("Either --url or --channel must be specified") + for channel in all_channels: + ch_label = channel["channel_label"] + ch_status = channel["status"] + if ch_status == "pending" or ch_status == "failed": + print(f"INFO: Start synchronizing channel: {ch_label}") + # Update the channel status to 'in_progress' + update_channel_status(ch_label, "in_progress") + ret = _sync_channel( + ch_label, args.cache, args.batch_size, args.no_errata + ) + if isinstance(ret, int): + channel["status"] = "failed" + update_channel_status(ch_label, "failed") + print(f"Failed synchronizing channel {ch_label}") + elif isinstance(ret, list): + if len(ret) == 0: + # No failed repositories + update_channel_status(ch_label, "done") + print(f"Successfully synchronized channel {ch_label}") + else: + update_channel_status(ch_label, "failed") + print( + f"Failed fully synchronizing channel {ch_label}. Some repos might have been synced." + ) + _display_failed_repos(ret) + + except KeyboardInterrupt: + print("Lzreposync is being stopped..") + # TODO: we might want to add some cleanup + exit(0) diff --git a/python/lzreposync/src/lzreposync/db_utils.py b/python/lzreposync/src/lzreposync/db_utils.py index f1ebfdca91f4..eaf7f1cabdd1 100644 --- a/python/lzreposync/src/lzreposync/db_utils.py +++ b/python/lzreposync/src/lzreposync/db_utils.py @@ -37,6 +37,7 @@ def _new_channel_dict(**kwargs): "name": kwargs.get("name") or label, "summary": kwargs.get("summary") or label, "description": kwargs.get("description") or label, + "status": kwargs.get("status") or "pending", "basedir": kwargs.get("basedir") or "/", "channel_arch": kwargs.get("channel_arch") or "i386", "channel_families": [kwargs.get("channel_family") or label], @@ -61,7 +62,7 @@ class ChannelAlreadyExistsException(Exception): """ -def create_channel(channel_label, channel_arch, org_id=1): +def create_test_channel(channel_label, channel_arch, status="pending", org_id=1): """ Create a new test channel with label :channel_label using the channel family private-channel-family-1 :channel_arch: eg: "x86_64" @@ -77,6 +78,7 @@ def create_channel(channel_label, channel_arch, org_id=1): channel_family=channel_family_label, org_id=org_id, channel_arch=channel_arch, + status=status, ) c = rhnChannel.Channel() c.load_from_dict(vdict) @@ -97,7 +99,7 @@ def create_content_source( metadata_signed="N", org_id=1, source_type="yum", - repo_id=1, + repo_id=5, ): """ Create a new content source and associate it with the given channel @@ -128,7 +130,7 @@ def create_content_source( fetch_source_id_query = rhnSQL.prepare( """ - SELECT id from rhnContentSource LIMIT 1""" + SELECT id from rhnContentSource ORDER BY id DESC LIMIT 1""" ) fetch_source_id_query.execute() source_id = fetch_source_id_query.fetchone_dict()["id"] @@ -147,8 +149,13 @@ def create_content_source( ) associate_repo_channel_query.execute(source_id=source_id, channel_id=channel_id) rhnSQL.commit() + print( + f"INFO: Successfully created new source {repo_label} for channel {channel_label}" + ) except errors.lookup(UNIQUE_VIOLATION): - print(f"INFO: Source {repo_label} already exists!") + print( + f"INFO: Source '{repo_label}' for channel '{channel_label}' already exists!" + ) finally: rhnSQL.closeDB() @@ -211,6 +218,36 @@ def get_channel_info_by_label(channel_label): return channel or None +def get_all_channels(): + """ + Return the list of all channels in a dict containing the channel label with the corresponding status + + """ + rhnSQL.initDB() + h = rhnSQL.prepare( + """ + SELECT label as channel_label, status from rhnChannel + """ + ) + h.execute() + return h.fetchall_dict() + + +def update_channel_status(channel_label, new_status): + """ + Update the status ('pending', 'in_progress', 'failed', ...) of the channel that has the given channel_label + """ + rhnSQL.initDB() + print(f"INFO: Updating the status of channel {channel_label} to {new_status}") + h = rhnSQL.prepare( + """ + UPDATE rhnChannel SET status = :new_status WHERE label = :channel_label + """ + ) + h.execute(new_status=new_status, channel_label=channel_label) + rhnSQL.closeDB() + + class NoSourceFoundForChannel(Exception): """Raised when no source(repository) was found""" diff --git a/python/spacewalk/server/importlib/backendOracle.py b/python/spacewalk/server/importlib/backendOracle.py index e78b52399b9f..44704f5e3454 100644 --- a/python/spacewalk/server/importlib/backendOracle.py +++ b/python/spacewalk/server/importlib/backendOracle.py @@ -423,6 +423,7 @@ class OracleBackend(Backend): "checksum_type_id": DBint(), "channel_access": DBstring(10), "update_tag": DBstring(128), + "status": DBstring(11), "installer_updates": DBstring(1), }, pk=["label"], diff --git a/python/spacewalk/server/rhnChannel.py b/python/spacewalk/server/rhnChannel.py index ec6d5da044c0..b3b2373ff82b 100644 --- a/python/spacewalk/server/rhnChannel.py +++ b/python/spacewalk/server/rhnChannel.py @@ -232,6 +232,7 @@ class Channel(BaseChannelObject): "name", "summary", "description", + "status", "basedir", "org_id", "gpg_key_url", @@ -656,6 +657,7 @@ def channel_info(channel): c.name, c.summary, c.description, + c.status, c.update_tag, to_char(c.last_modified, 'YYYYMMDDHH24MISS') last_modified from @@ -687,6 +689,7 @@ def get_base_channel(server_id, none_ok=0): c.name, c.summary, c.description, + c.status, to_char(c.last_modified, 'YYYYMMDDHH24MISS') last_modified from rhnChannel c, rhnChannelArch ca, rhnServerChannel sc where sc.server_id = :server_id @@ -735,6 +738,7 @@ def channels_for_server(server_id): c.name, c.summary, c.description, + c.status, c.gpg_key_url, case when c.gpg_check = 'Y' then 1 ELSE 0 end gpgcheck, :metadata_signed metadata_signed, @@ -836,6 +840,7 @@ def base_channel_for_rel_arch(release, server_arch, org_id=-1, user_id=None): c.name, c.summary, c.description, + c.status, to_char(c.last_modified, 'YYYYMMDDHH24MISS') last_modified from rhnChannel c, rhnChannelArch ca @@ -960,6 +965,7 @@ def get_channel_for_release_arch(release, server_arch, org_id=None): c.name, c.summary, c.description, + c.status, to_char(c.last_modified, 'YYYYMMDDHH24MISS') last_modified from rhnDistChannelMap dcm, rhnChannel c, @@ -988,6 +994,7 @@ def get_channel_for_release_arch(release, server_arch, org_id=None): c.name, c.summary, c.description, + c.status, to_char(c.last_modified, 'YYYYMMDDHH24MISS') last_modified from rhnOrgDistChannelMap odcm, rhnChannel c, @@ -1030,6 +1037,7 @@ def applet_channels_for_uuid(uuid): c.name, c.summary, c.description, + c.status, to_char(c.last_modified, 'YYYYMMDDHH24MISS') last_modified, to_char(s.channels_changed, 'YYYYMMDDHH24MISS') server_channels_changed from rhnChannelArch ca, @@ -1087,6 +1095,7 @@ def channels_for_release_arch(release, server_arch, org_id=-1, user_id=None): c.name, c.summary, c.description, + c.status, to_char(c.last_modified, 'YYYYMMDDHH24MISS') last_modified, -- If user_id is null, then the channel is subscribable rhn_channel.loose_user_role_check(c.id, :user_id, 'subscribe') @@ -1157,6 +1166,7 @@ def list_packages_source(channel_id): new_evr["release"], new_evr["epoch"], ] + # pylint: disable-next=possibly-used-before-assignment # TODO: Enhance ret.append(new_evr_list) return ret diff --git a/schema/spacewalk/common/tables/rhnChannel.sql b/schema/spacewalk/common/tables/rhnChannel.sql index 196817c5b0ed..be04d36a15ce 100644 --- a/schema/spacewalk/common/tables/rhnChannel.sql +++ b/schema/spacewalk/common/tables/rhnChannel.sql @@ -13,7 +13,6 @@ -- in this software or its documentation. -- - CREATE TABLE rhnChannel ( id NUMERIC NOT NULL @@ -32,6 +31,7 @@ CREATE TABLE rhnChannel basedir VARCHAR(256) NOT NULL, name VARCHAR(256) NOT NULL, summary VARCHAR(500) NOT NULL, + status VARCHAR(25) NOT NULL, description VARCHAR(4000), product_name_id NUMERIC CONSTRAINT rhn_channel_product_name_ch_fk diff --git a/schema/spacewalk/upgrade/susemanager-schema-5.0.7-to-susemanager-schema-5.0.8/add-status-column-to-rhnChannel-table.sql b/schema/spacewalk/upgrade/susemanager-schema-5.0.7-to-susemanager-schema-5.0.8/add-status-column-to-rhnChannel-table.sql new file mode 100644 index 000000000000..0e46417a0e73 --- /dev/null +++ b/schema/spacewalk/upgrade/susemanager-schema-5.0.7-to-susemanager-schema-5.0.8/add-status-column-to-rhnChannel-table.sql @@ -0,0 +1,2 @@ + +ALTER TABLE rhnchannel ADD COLUMN IF NOT EXISTS status VARCHAR(25) NOT NULL;