diff --git a/sigeca_data_import_microservice/app/application/email_notification.py b/sigeca_data_import_microservice/app/application/email_notification.py new file mode 100644 index 0000000..af2341f --- /dev/null +++ b/sigeca_data_import_microservice/app/application/email_notification.py @@ -0,0 +1,63 @@ +from typing import List + +from app.config import Config +from app.infrastructure.smtp_client import SMTPClient + + +template = """Dear Administrator, + +The recent synchronization process has finished` . + +Please review the resources below and make the necessary manual adjustments. + +Resources: +{} +Please manually update the resources listed above and verify the changes. +""" + +resource_template = """ Resource: {facility_name} {facility_code} ({operation}) + {geo_zone_mismatch} + {facility_type_mismatch} +""" + + +def _emails_formatter(facilities_response): + + emails = [] + current_email = "" + max_facilities_per_email = 500 + subject_prefix = "Facilities Sync Completion Notification" + + for i, resource in enumerate(facilities_response): + resource_info = resource_template.format( + facility_name=resource["name"], + facility_code=resource["code"], + operation=resource['operation'], + geo_zone_mismatch=f"Geographic Zone: {resource['municipality']}", + facility_type_mismatch=f"Facility Type: {resource['type']}", + ) + + if (i % max_facilities_per_email) == 0 and i != 0: + emails.append((f"{subject_prefix} - Part {len(emails) + 1}", current_email)) + current_email = "" + + current_email += resource_info + "\n\n" + + # Add the last email + if current_email: + emails.append((f"{subject_prefix} - Part {len(emails) + 1}", current_email)) + + formatted_emails = [] + for subject, email_body in emails: + formatted_emails.append((subject, template.format(email_body.strip()))) + + return formatted_emails + + +def notify_administrator(sync_results: List[dict], mailing_list: List[str]): + emails = _emails_formatter(sync_results) + smtp_client = SMTPClient(Config().smtp) + + for receiver in mailing_list: + for subject, content in emails: + smtp_client.send_email(receiver, subject, content) \ No newline at end of file diff --git a/sigeca_data_import_microservice/app/application/scheduler.py b/sigeca_data_import_microservice/app/application/scheduler.py index 3291848..48f4eac 100644 --- a/sigeca_data_import_microservice/app/application/scheduler.py +++ b/sigeca_data_import_microservice/app/application/scheduler.py @@ -1,4 +1,4 @@ -from .synchronization.facilities import FacilitySynchronizationService +from .synchronization.facilities.synchronization import FacilitySynchronizationService import logging from apscheduler.schedulers.background import BlockingScheduler diff --git a/sigeca_data_import_microservice/app/application/synchronization/facilities.py b/sigeca_data_import_microservice/app/application/synchronization/facilities.py deleted file mode 100644 index bd3e4f4..0000000 --- a/sigeca_data_import_microservice/app/application/synchronization/facilities.py +++ /dev/null @@ -1,404 +0,0 @@ -import logging -from pyspark.sql import DataFrame -from pyspark.sql import SparkSession -import uuid -from pyspark.sql.functions import ( - col, - udf, - from_json, - when, - array_except, - concat, - size, - lit, -) -from app.domain.resources import ( - FacilityResourceRepository, - GeographicZoneResourceRepository, - FacilityOperatorResourceRepository, - ProgramResourceRepository, - FacilityTypeResourceRepository, -) -from app.infrastructure.sigeca_api_client import SigecaApiClient -from app.infrastructure.open_lmis_api_client import OpenLmisApiClient -from pyspark.sql.types import ( - StructType, - StructField, - StringType, - BooleanType, - TimestampType, - ArrayType, - DoubleType, - DataType, -) -from app.infrastructure.jdbc_reader import JDBCReader -from .validators import validate_facilities_dataframe -from .geo_zones import ProvinceSynchronization, MunicipalitySynchronization -from .facility_types import FacilityTypeSynchronization -from .product import ProgramSynchronization -import json - -facility_schema = StructType( - [ - StructField("id", StringType(), True), - StructField("reference_id", StringType(), True), - StructField("is_deleted", BooleanType(), True), - StructField("last_updated", StringType(), True), # TODO: Change to timestamp - StructField("name", StringType(), True), - StructField("code", StringType(), True), - StructField("acronym", StringType(), True), - StructField("category", StringType(), True), - StructField("ownership", StringType(), True), - StructField("management", StringType(), True), - StructField("municipality", StringType(), True), - StructField("province", StringType(), True), - StructField("is_operational", BooleanType(), True), - StructField( - "latitude", StringType(), True - ), # Data of latitude and longitude inconsitent - StructField("longitude", StringType(), True), - StructField( - "services", - ArrayType( - StructType( - [ - StructField("name", StringType(), True), - StructField("code", StringType(), True), - ] - ) - ), - True, - ), - ] -) - - -class FacilitySynchronizationService: - def __init__( - self, - jdbc_reader: JDBCReader, - facility_client: SigecaApiClient, - lmis_client: OpenLmisApiClient, - facility_repository: FacilityResourceRepository, - geo_zone_repo: GeographicZoneResourceRepository, - facility_type_repo: FacilityTypeResourceRepository, - operator_repo: FacilityOperatorResourceRepository, - program_repo: ProgramResourceRepository, - ): - self.facility_client = facility_client - self.facility_repository = facility_repository - self.lmis_client = lmis_client - self.geo_zone_repo = geo_zone_repo - self.facility_type_repo = facility_type_repo - self.operator_repo = operator_repo - self.program_repo = program_repo - self.jdbc_reader = jdbc_reader - - def synchronize_facilities(self): - try: - # Step 1: Fetch data from the external API - external_facilities = self.facility_client.fetch_facilities() - - # Step 2: Validate and transform the data - valid_external_df = self.validate_and_transform(external_facilities) - - # Step 3: Fetch existing data from the database - existing_facilities = self.facility_repository.get_all().alias( - "existing_facilities" - ) - - joined = valid_external_df.join( - existing_facilities, - valid_external_df["facilities.code"] == existing_facilities["code"], - "left", - ) - - create, update, delete = self._split_df(joined) - - logging.info("Synchronizing Facilities") - self._create_new_facilities(create) - logging.info("Updating Facilities") - self._update_existing_facilities(update) - - logging.info("Deactivating Deleted Facilities") - self._delete_removed_facilities(delete) - - # Log the results - logging.info("Facility synchronization completed successfully") - - except Exception as e: - logging.error(f"An error occurred during facility synchronization: {e}") - raise - - def validate_and_transform(self, facilities): - # Extract services names from the nested structure - df = self.jdbc_reader.spark.createDataFrame( - facilities, schema=facility_schema - ).alias("facilities") - - # Check for mandatory fields and valid relations - df = validate_facilities_dataframe(df).filter( - col("facilities.is_deleted") == False - ) - - self.synchronize_mising_geographic_zones(df) - self.synchronize_mising_types(df) - self.synchronize_products(df) - - # Currently skip, on UAT no operators exist - # self.synchronize_operators() - - df = self.jdbc_reader.spark.createDataFrame( - facilities, schema=facility_schema - ).alias("facilities") - df = validate_facilities_dataframe(df) - - df = self._add_zones_to_df(df) - df = self._add_types_to_df(df) - df = self._add_supported_program_info_to_df(df) - - return df - - def _add_zones_to_df(self, df): - geo_zone_df = self.geo_zone_repo.get_all().alias("geo_zone") - municipalities = geo_zone_df.filter(col("levelnumber") == 3).alias( - "municipality" - ) - province = geo_zone_df.filter(col("levelnumber") == 2).alias("province") - - # Validate foreign keys - df = df.join( - municipalities, df["municipality"] == municipalities["name"], "left" - ) - df = df.join(province, df["province"] == province["name"], "left") - return df - - def _add_types_to_df(self, df): - types = self.facility_type_repo.get_all().alias("facility_type") - df = df.join(types, df["category"] == types["name"], "left") - return df - - def _add_supported_program_info_to_df(self, df): - programs = self.program_repo.get_all().alias("program") - code_id_dict = {row["code"]: row["id"] for row in programs.collect()} - - add_info = udf( - lambda supported_programs: json.dumps( - { - entry["code"]: {"id": code_id_dict.get(entry["code"], None)} - for entry in supported_programs - if entry["code"] in code_id_dict - } - ) - ) - df = df.withColumn("code_id_dict", add_info(col("services"))) - return df - - def synchronize_mising_geographic_zones(self, df: DataFrame): - ProvinceSynchronization( - self.jdbc_reader, df, self.geo_zone_repo, self.lmis_client - ).synchronize() - - MunicipalitySynchronization( - self.jdbc_reader, df, self.geo_zone_repo, self.lmis_client - ).synchronize() - - def synchronize_mising_types(self, df): - FacilityTypeSynchronization( - self.jdbc_reader, df, self.facility_type_repo, self.lmis_client - ).synchronize() - - def synchronize_products(self, df): - ProgramSynchronization( - self.jdbc_reader, df, self.program_repo, self.lmis_client - ).synchronize() - - def _split_df(self, df): - deleted = df.filter(col("facilities.is_deleted") == True).filter( - col("existing_facilities.id").isNotNull() - ) - - existing = df.filter(col("facilities.is_deleted") == False) - - new_facilities = existing.filter(col("existing_facilities.id").isNull()) - updated_facilities = existing.filter(col("existing_facilities.id").isNotNull()) - return new_facilities, updated_facilities, deleted - - def _create_new_facilities(self, facilities): - format_payload_f = self.format_payload_f() - df = facilities.withColumn( - "payload", - format_payload_f( - col(f"existing_facilities.id"), - col(f"facilities.name"), - col(f"facilities.code"), - col("municipality.id"), - col("facility_type.id"), - lit('{}'), - col("is_operational"), - lit(True) - ), - ) - - - logging.info(F"New Facilities to be created: { df.count()}") - for row in df.collect(): - self._create_request(row) - - def format_payload_f(self): - format_payload_f = udf( - lambda id, name, code, geographic_zone, facility_type, supported_programs, operational, enabled: json.dumps( - { - "id": id, - "code": code, - "name": name, - "geographicZone": {"id": geographic_zone}, - "type": {"id": facility_type}, - "active": operational, - "enabled": enabled, - "openLmisAccessible": enabled, - "supportedPrograms": [ - { - "id": data['id'], - "supportActive": data.get('supportActive', True), - "supportLocallyFulfilled": data.get('supportLocallyFulfilled', False), - "supportStartDate": data.get("supportStartDate") - - } - for data in json.loads(supported_programs).values() if 'id' in data.keys() - ], - } - ) - ) - - return format_payload_f - - def _create_request(self, data): - try: - self.lmis_client.send_post_request("facilities", data["payload"]) - except Exception as e: - logging.error( - f"An error occurred during facility creation request ({data}): {e}" - ) - - def _update_request(self, data): - try: - self.lmis_client.send_put_request("facilities", data["id"], data["payload"]) - except Exception as e: - logging.error( - f"An error occurred during facility update request ({data}): {e}" - ) - - def _delete_request(self, data): - try: - self.lmis_client.send_delete_request("facilities", data["id"]) - except Exception as e: - logging.error( - f"An error occurred during facility delete request ({data}): {e}" - ) - - def merge_json_f(self): - def _inner_merge(json1, json2): - dict1 = json.loads(json1) - dict2 = json.loads(json2) - merged_dict = {**dict2, **dict1} - return json.dumps(merged_dict) - - return udf(_inner_merge, StringType()) - - def _update_existing_facilities(self, facilities: DataFrame, is_deleted=False): - merge_json_udf = self.merge_json_f() - facilities = facilities.withColumn( - "mergedServices", - merge_json_udf( - col("existing_facilities.supported_programs"), col("code_id_dict") - ), - ) - - format_payload_f = self.format_payload_f() - - facilities = facilities.withColumn( - "payload", - format_payload_f( - col(f"existing_facilities.id"), - col(f"facilities.name"), - col(f"facilities.code"), - col("municipality.id"), - col("facility_type.id"), - col("existing_facilities.supported_programs"), # Use Existing Services - col("is_operational"), - lit(not is_deleted), - ), - ) - - facilities = facilities.withColumn( - "oldPayload", - format_payload_f( - col(f"existing_facilities.id"), - col(f"existing_facilities.name"), - col(f"facilities.code"), - col("existing_facilities.geographiczoneid"), - col("existing_facilities.typeid"), - col("existing_facilities.supported_programs"), - col("existing_facilities.active"), - col("existing_facilities.enabled"), - ), - ) - - schema = self.jdbc_reader.spark.read.json( - facilities.rdd.map(lambda row: row.payload) - ).schema # Infer schema from the first JSON column - facilities = facilities.withColumn( - "json1_struct", from_json(col("payload"), schema) - ) - facilities = facilities.withColumn( - "json2_struct", from_json(col("oldPayload"), schema) - ) - - def compare_for_any_change(df, col1, col2): - changes = [] - for field in schema.fields: - field_name = field.name - # Skip supported programs as they remain the same. - if field_name == 'supportedPrograms': - continue - else: - changes.append( - col(f"{col1}.{field_name}") != col(f"{col2}.{field_name}") - ) - - # Aggregate all change flags into a single boolean indicating any change - if changes: - # Aggregate all change flags into a single boolean indicating any change - change_column = ( - when(sum([change.cast("int") for change in changes]) > 0, True) - .otherwise(False) - .alias("any_change") - ) - else: - change_column = lit(False).alias( - "any_change" - ) # Handle the case when changes list is empty - - # Select the original JSON payloads and the any_change flag - return df.select( - "payload", "existing_facilities.id", "oldPayload", change_column - ) - - - logging.info(F"Comparing Changes For Facilities: {facilities.count()}" ) - # Apply the comparison function - result_df = compare_for_any_change(facilities, "json1_struct", "json2_struct") - - result_df = result_df.filter(col("any_change") == True)[ - ["payload", "existing_facilities.id"] - ] - - logging.info(F"Facilities That Changed since last update: {result_df.count()}") - - for row in result_df.collect(): - self._update_request(row) - - def _delete_removed_facilities(self, facilities): - # Delete doesn't remove the facility, it set's it to disaled and unactive - self._update_existing_facilities(facilities, True) diff --git a/sigeca_data_import_microservice/app/application/synchronization/facilities/__init__.py b/sigeca_data_import_microservice/app/application/synchronization/facilities/__init__.py new file mode 100644 index 0000000..3b72975 --- /dev/null +++ b/sigeca_data_import_microservice/app/application/synchronization/facilities/__init__.py @@ -0,0 +1,3 @@ +from .data_transformator import FacilityDataTransformer +from .facility_suplement_synchronization import FacilitySupplementSync +from .synchronization import FacilitySynchronizationService diff --git a/sigeca_data_import_microservice/app/application/synchronization/facilities/data_changes_detection.py b/sigeca_data_import_microservice/app/application/synchronization/facilities/data_changes_detection.py new file mode 100644 index 0000000..1883706 --- /dev/null +++ b/sigeca_data_import_microservice/app/application/synchronization/facilities/data_changes_detection.py @@ -0,0 +1,80 @@ +import json +import logging + +from pyspark.sql import DataFrame +from pyspark.sql.functions import col, lit, udf, from_json, when + +from app.config import Config + + +def get_fallbacks(): + return Config().fallbacks + + +def basic_comparison(column): + return lambda col1, col2: col(f"{col1}.{column}") != col(f"{col2}.{column}") + + +def basic_with_fallback(column): + # For the resources that doesn't have internal object and fallback to default one (like UNKNOWN) + # but was reassigned in the system to other, actual object. + fallback_value = get_fallbacks() or {} + + return lambda col1, col2: ( + (col(f"{col1}.{column}") != col(f"{col2}.{column}")) + & (col(f"{col1}.{column}.id") != lit(fallback_value[column])) + ) + + +def get_default_field_comparison_strategy(): + return { + "supportedPrograms": lambda col1, col2: lit( + False + ), # Skip comparison of the supported programs, always true + "name": basic_comparison("name"), + "code": basic_comparison("code"), + "active": basic_comparison("active"), + "enabled": basic_comparison("enabled"), + "openLmisAccessible": basic_comparison("openLmisAccessible"), + "geographicZone": basic_with_fallback("geographicZone"), + "type": basic_with_fallback("type"), + } + + +def compare_json_content( + facilities: DataFrame, + json_1_column_name: str, + json_2_column_name: str, + comparison_strategy: dict = None, +): + if not comparison_strategy: + comparison_strategy = get_default_field_comparison_strategy() + + def compare_for_any_change(df, col1, col2): + changes = [] + for field, strategy in comparison_strategy.items(): + # Skip supported programs as they remain the same. + changes.append(strategy(col1, col2)) + + # Aggregate all change flags into a single boolean indicating any change + if changes: + # Aggregate all change flags into a single boolean indicating any change + change_column = ( + when(sum([change.cast("int") for change in changes]) > 0, True) + .otherwise(False) + .alias("any_change") + ) + else: + change_column = lit(False).alias( + "any_change" + ) # Handle the case when changes list is empty + + # Select the original JSON payloads and the any_change flag + return df.withColumn("any_change", change_column) + + logging.info(f"Comparing Changes For Facilities: {facilities.count()}") + # Apply the comparison function + result_df = compare_for_any_change( + facilities, json_1_column_name, json_2_column_name + ) + return result_df diff --git a/sigeca_data_import_microservice/app/application/synchronization/facilities/data_transformator.py b/sigeca_data_import_microservice/app/application/synchronization/facilities/data_transformator.py new file mode 100644 index 0000000..d5ea679 --- /dev/null +++ b/sigeca_data_import_microservice/app/application/synchronization/facilities/data_transformator.py @@ -0,0 +1,120 @@ +import json + +from pyspark.sql.functions import col, udf + +from app.application.synchronization.facilities.facility_schema import facility_schema +from app.config import Config +from app.domain.resources import ( + GeographicZoneResourceRepository, + FacilityTypeResourceRepository, + ProgramResourceRepository, +) +from app.infrastructure.jdbc_reader import JDBCReader +from app.application.synchronization.validators import validate_facilities_dataframe + + +class FacilityDataTransformer: + def __init__( + self, + jdbc_reader: JDBCReader, + geo_zone_repo: GeographicZoneResourceRepository, + facility_type_repo: FacilityTypeResourceRepository, + program_repo: ProgramResourceRepository, + ): + self.jdbc_reader = jdbc_reader + self.geo_zone_repo = geo_zone_repo + self.facility_type_repo = facility_type_repo + self.program_repo = program_repo + + def get_data_frame_with_full_information(self, facilities): + df = self.get_validated_dataframe(facilities) + df = self._add_zones_to_df(df) + df = self._add_types_to_df(df) + df = self._add_supported_program_info_to_df(df) + return df + + def get_validated_dataframe(self, facilities): + df = self.jdbc_reader.spark.createDataFrame( + facilities, schema=facility_schema + ).alias("facilities") + df = validate_facilities_dataframe(df) + return df + + def _add_zones_to_df(self, df): + geo_zone_df = self.geo_zone_repo.get_all().alias("geo_zone") + municipalities = geo_zone_df.filter(col("levelnumber") == 3).alias( + "municipality" + ) + province = geo_zone_df.filter(col("levelnumber") == 2).alias("province") + + # Validate foreign keys + df = df.join( + municipalities, df["municipality"] == municipalities["name"], "left" + ) + df = df.join(province, df["province"] == province["name"], "left") + return df + + def _add_types_to_df(self, df): + types = self.facility_type_repo.get_all().alias("facility_type") + df = df.join(types, df["category"] == types["name"], "left") + return df + + def _add_supported_program_info_to_df(self, df): + programs = self.program_repo.get_all().alias("program") + code_id_dict = {row["code"]: row["id"] for row in programs.collect()} + + add_info = udf( + lambda supported_programs: json.dumps( + { + entry["code"]: {"id": code_id_dict.get(entry["code"], None)} + for entry in supported_programs + if entry["code"] in code_id_dict + } + ) + ) + df = df.withColumn("code_id_dict", add_info(col("services"))) + return df + + +def get_format_payload_f(): + fallbacks = Config().fallbacks + format_payload_f = udf( + lambda id, name, code, geographic_zone, facility_type, supported_programs, operational, enabled: json.dumps( + { + "id": id, + "code": code, + "name": name, + "geographicZone": {"id": geographic_zone or fallbacks.geographicZone}, + "type": {"id": facility_type or fallbacks.type}, + "active": operational, + "enabled": enabled, + "openLmisAccessible": enabled, + "supportedPrograms": [ + { + "id": data["id"], + "supportActive": data.get("supportActive", True), + "supportLocallyFulfilled": data.get( + "supportLocallyFulfilled", False + ), + "supportStartDate": data.get("supportStartDate"), + } + for data in json.loads(supported_programs).values() + if "id" in data.keys() + ], + } + ) + ) + + return format_payload_f + + +def get_email_response_f(): + return udf( + lambda name, code, municipality_name, facility_type, operation: json.dumps({ + "name": name, + "code": code, + "municipality": municipality_name or "UNDEFINED", + "type": facility_type or "UNDEFINED", + "operation": operation + }) + ) \ No newline at end of file diff --git a/sigeca_data_import_microservice/app/application/synchronization/facilities/facility_schema.py b/sigeca_data_import_microservice/app/application/synchronization/facilities/facility_schema.py new file mode 100644 index 0000000..1348fa1 --- /dev/null +++ b/sigeca_data_import_microservice/app/application/synchronization/facilities/facility_schema.py @@ -0,0 +1,41 @@ +from pyspark.sql.types import ( + StructType, + StructField, + StringType, + BooleanType, + ArrayType, +) + +facility_schema = StructType( + [ + StructField("id", StringType(), True), + StructField("reference_id", StringType(), True), + StructField("is_deleted", BooleanType(), True), + StructField("last_updated", StringType(), True), # TODO: Change to timestamp + StructField("name", StringType(), True), + StructField("code", StringType(), True), + StructField("acronym", StringType(), True), + StructField("category", StringType(), True), + StructField("ownership", StringType(), True), + StructField("management", StringType(), True), + StructField("municipality", StringType(), True), + StructField("province", StringType(), True), + StructField("is_operational", BooleanType(), True), + StructField( + "latitude", StringType(), True + ), # Data of latitude and longitude inconsitent + StructField("longitude", StringType(), True), + StructField( + "services", + ArrayType( + StructType( + [ + StructField("name", StringType(), True), + StructField("code", StringType(), True), + ] + ) + ), + True, + ), + ] +) diff --git a/sigeca_data_import_microservice/app/application/synchronization/facilities/facility_suplement_synchronization.py b/sigeca_data_import_microservice/app/application/synchronization/facilities/facility_suplement_synchronization.py new file mode 100644 index 0000000..4fa9557 --- /dev/null +++ b/sigeca_data_import_microservice/app/application/synchronization/facilities/facility_suplement_synchronization.py @@ -0,0 +1,61 @@ +from app.application.synchronization.facility_types import FacilityTypeSynchronization +from app.application.synchronization.geo_zones import ( + ProvinceSynchronization, + MunicipalitySynchronization, +) +from pyspark.sql import DataFrame + +from app.application.synchronization.product import ProgramSynchronization +from app.domain.resources import ( + FacilityResourceRepository, + GeographicZoneResourceRepository, + FacilityTypeResourceRepository, + FacilityOperatorResourceRepository, + ProgramResourceRepository, +) +from app.infrastructure.jdbc_reader import JDBCReader +from app.infrastructure.open_lmis_api_client import OpenLmisApiClient +from app.infrastructure.sigeca_api_client import SigecaApiClient + + +class FacilitySupplementSync: + + def __init__( + self, + jdbc_reader: JDBCReader, + lmis_client: OpenLmisApiClient, + geo_zone_repo: GeographicZoneResourceRepository, + facility_type_repo: FacilityTypeResourceRepository, + program_repo: ProgramResourceRepository, + ): + self.lmis_client = lmis_client + self.geo_zone_repo = geo_zone_repo + self.facility_type_repo = facility_type_repo + self.program_repo = program_repo + self.jdbc_reader = jdbc_reader + + def synchronize_supplement_data(self, df): + self.synchronize_mising_geographic_zones(df) + self.synchronize_mising_types(df) + self.synchronize_products(df) + # Currently skip, on UAT no operators exist + # self.synchronize_operators() + + def synchronize_mising_geographic_zones(self, df: DataFrame): + ProvinceSynchronization( + self.jdbc_reader, df, self.geo_zone_repo, self.lmis_client + ).synchronize() + + MunicipalitySynchronization( + self.jdbc_reader, df, self.geo_zone_repo, self.lmis_client + ).synchronize() + + def synchronize_mising_types(self, df): + FacilityTypeSynchronization( + self.jdbc_reader, df, self.facility_type_repo, self.lmis_client + ).synchronize() + + def synchronize_products(self, df): + ProgramSynchronization( + self.jdbc_reader, df, self.program_repo, self.lmis_client + ).synchronize() diff --git a/sigeca_data_import_microservice/app/application/synchronization/facilities/synchronization.py b/sigeca_data_import_microservice/app/application/synchronization/facilities/synchronization.py new file mode 100644 index 0000000..af203ef --- /dev/null +++ b/sigeca_data_import_microservice/app/application/synchronization/facilities/synchronization.py @@ -0,0 +1,316 @@ +import json +import logging + +from pyspark.sql import DataFrame +from pyspark.sql.functions import col, lit, udf, from_json, when +from pyspark.sql.types import StringType + +from app.application.synchronization.facilities import ( + FacilitySupplementSync, + FacilityDataTransformer, +) +from app.config import Config +from app.domain.resources import ( + FacilityResourceRepository, + GeographicZoneResourceRepository, + FacilityTypeResourceRepository, + FacilityOperatorResourceRepository, + ProgramResourceRepository, +) +from app.infrastructure.jdbc_reader import JDBCReader +from app.infrastructure.open_lmis_api_client import OpenLmisApiClient +from app.infrastructure.sigeca_api_client import SigecaApiClient +from .data_changes_detection import compare_json_content +from .data_transformator import FacilityDataTransformer, get_format_payload_f, get_email_response_f +from ...email_notification import notify_administrator + + +class FacilitySynchronizationService: + def __init__( + self, + jdbc_reader: JDBCReader, + facility_client: SigecaApiClient, + lmis_client: OpenLmisApiClient, + facility_repository: FacilityResourceRepository, + geo_zone_repo: GeographicZoneResourceRepository, + facility_type_repo: FacilityTypeResourceRepository, + operator_repo: FacilityOperatorResourceRepository, + program_repo: ProgramResourceRepository, + ): + self.facility_client = facility_client + self.facility_repository = facility_repository + self.lmis_client = lmis_client + self.geo_zone_repo = geo_zone_repo + self.facility_type_repo = facility_type_repo + self.operator_repo = operator_repo + self.program_repo = program_repo + self.jdbc_reader = jdbc_reader + self.supplement_sync = FacilitySupplementSync( + self.jdbc_reader, + self.lmis_client, + self.geo_zone_repo, + self.facility_type_repo, + self.program_repo, + ) + + self.facility_data_transformer = FacilityDataTransformer( + self.jdbc_reader, + self.geo_zone_repo, + self.facility_type_repo, + self.program_repo, + ) + + self.config = Config() + + def synchronize_facilities(self): + try: + # Step 1: Fetch data from the external API + external_facilities = self.facility_client.fetch_facilities() + + # Step 2: Validate and transform the data + valid_external_df = self.validate_and_transform(external_facilities) + + # Step 3: Fetch existing data from the database + existing_facilities = self.facility_repository.get_all().alias( + "existing_facilities" + ) + + joined = valid_external_df.join( + existing_facilities, + valid_external_df["facilities.code"] == existing_facilities["code"], + "left", + ) + + create, update, delete = self._split_df(joined) + + responses = [] + responses.extend(self._create_new_facilities(create)) + responses.extend(self._update_existing_facilities(update)) + + # No output + self._delete_removed_facilities(delete) + + if self.config.sync.email_report_list and responses: + logging.info("Sending completion report to mailing list.") + notify_administrator(responses, self.config.sync.email_report_list) + elif not self.config.sync.email_report_list: + logging.info("Email List For Sync Report not set up.") + # Log the results + logging.info("Facility synchronization completed successfully") + + except Exception as e: + logging.error(f"An error occurred during facility synchronization: {e}") + raise + + def validate_and_transform(self, facilities): + # Extract services names from the nested structure + config = False + if config is True: + self.synchronize_supplement_data(facilities) + + df = self.get_full_facilities_data_frame(facilities) + + return df + + def get_full_facilities_data_frame(self, facilities): + return self.facility_data_transformer.get_data_frame_with_full_information( + facilities + ) + + def synchronize_supplement_data(self, facilities): + df = self.facility_data_transformer.get_validated_dataframe(facilities) + # Check for mandatory fields and valid relations + if self.config.sync.synchronize_relevant: + logging.info("Synchronizing relevant resources.") + self.supplement_sync.synchronize_supplement_data(df) + else: + logging.info( + "Synchronization of relevant resources (Types, Geo Zones, Products) is disabled" + ) + return df + + def _split_df(self, df): + deleted = df.filter(col("facilities.is_deleted") == True).filter( + col("existing_facilities.id").isNotNull() + ) + + existing = df.filter(col("facilities.is_deleted") == False) + new_facilities = existing.filter(col("existing_facilities.id").isNull()) + updated_facilities = existing.filter(col("existing_facilities.id").isNotNull()) + return new_facilities, updated_facilities, deleted + + def _create_new_facilities(self, facilities): + logging.info("Synchronizing Facilities") + format_payload_f = get_format_payload_f() + df = facilities.withColumn( + "payload", + format_payload_f( + col(f"existing_facilities.id"), + col(f"facilities.name"), + col(f"facilities.code"), + col("municipality.id"), + col("facility_type.id"), + lit("{}"), + col("is_operational"), + lit(True), + ), + ) + + email_response_f = get_email_response_f() + df = df.withColumn( + "response", + email_response_f( + col(f"facilities.name"), + col(f"facilities.code"), + col("municipality.name"), + col("facility_type.name"), + lit("CREATE") + ), + ) + + responses = [] + if df.count() > 0: + logging.info(f"New Facilities to be created: { df.count()}") + for row in df.collect(): + responses.append(json.loads(row.response)) + self._create_request(row) + else: + logging.info("No new facilities created") + return responses + + def _create_request(self, data): + try: + return self.lmis_client.send_post_request("facilities", data["payload"]) + except Exception as e: + logging.error( + f"An error occurred during facility creation request ({data}): {e}" + ) + + def _update_request(self, data): + try: + return self.lmis_client.send_put_request( + "facilities", data["id"], data["payload"] + ) + except Exception as e: + logging.error( + f"An error occurred during facility update request ({data}): {e}" + ) + + def _delete_request(self, data): + try: + return self.lmis_client.send_delete_request("facilities", data["id"]) + except Exception as e: + logging.error( + f"An error occurred during facility delete request ({data}): {e}" + ) + + def merge_json_f(self): + def _inner_merge(json1, json2): + dict1 = json.loads(json1) + dict2 = json.loads(json2) + merged_dict = {**dict2, **dict1} + return json.dumps(merged_dict) + + return udf(_inner_merge, StringType()) + + def _update_existing_facilities(self, facilities: DataFrame, is_deleted=False): + logging.info("Updating Facilities") + merge_json_udf = self.merge_json_f() + facilities = facilities.withColumn( + "mergedServices", + merge_json_udf( + col("existing_facilities.supported_programs"), col("code_id_dict") + ), + ) + format_payload_f = get_format_payload_f() + + facilities = facilities.withColumn( + "payload", + format_payload_f( + col(f"existing_facilities.id"), + col(f"facilities.name"), + col(f"facilities.code"), + when( + col("municipality.id").isNotNull(), col("municipality.id") + ).otherwise(lit(self.config.fallbacks.geographicZone)), + when( + col("facility_type.id").isNotNull(), col("facility_type.id") + ).otherwise(lit(self.config.fallbacks.type)), + col("existing_facilities.supported_programs"), # Use Existing Services + col("is_operational"), + lit(not is_deleted), + ), + ) + + facilities = facilities.withColumn( + "oldPayload", + format_payload_f( + col(f"existing_facilities.id"), + col(f"existing_facilities.name"), + col(f"facilities.code"), + col("existing_facilities.geographiczoneid"), + col("existing_facilities.typeid"), + col("existing_facilities.supported_programs"), + col("existing_facilities.active"), + col("existing_facilities.enabled"), + ), + ) + + schema = self.jdbc_reader.spark.read.json( + facilities.rdd.map(lambda row: row.payload) + ).schema # Infer schema from the first JSON column + + facilities = facilities.withColumn( + "json1_struct", from_json(col("payload"), schema) + ) + facilities = facilities.withColumn( + "json2_struct", from_json(col("oldPayload"), schema) + ) + # Apply the comparison function + + result_df = compare_json_content(facilities, "json1_struct", "json2_struct") + + result_df = result_df.filter(col("any_change") == True)[ + [ + "facilities.name", + "facilities.code", + "municipality.name", + "facility_type.name", + "payload", + "existing_facilities.id", + ] + ] + + email_response_f = get_email_response_f() + result_df = result_df.withColumn( + "response", + email_response_f( + col(f"facilities.name"), + col(f"facilities.code"), + col("municipality.name"), + col("facility_type.name"), + lit("UPDATE" if not is_deleted else "DELETE") + ), + ) + + responses = [] + if result_df.count() > 0: + logging.info( + f"Facilities That Changed since last update: {result_df.count()}" + ) + for row in result_df.collect(): + self._update_request(row) + responses.append(json.loads(row.response)) + else: + logging.info("No facilities were updated") + return responses + + def _delete_removed_facilities(self, facilities): + logging.info("Deactivating Deleted Facilities") + # Delete doesn't remove the facility, it set's it to disaled and unactive + if facilities.count() > 0: + self._update_existing_facilities(facilities, True) + else: + logging.info("No facilities were deleted.") + + return [] diff --git a/sigeca_data_import_microservice/app/application/synchronization/facility_supported_products.py b/sigeca_data_import_microservice/app/application/synchronization/facility_supported_products.py deleted file mode 100644 index 639e16b..0000000 --- a/sigeca_data_import_microservice/app/application/synchronization/facility_supported_products.py +++ /dev/null @@ -1,112 +0,0 @@ -import logging -from pyspark.sql import DataFrame -from pyspark.sql import SparkSession -from pyspark.sql.functions import col, udf, upper -from app.domain.resources import ( - FacilityResourceRepository, - GeographicZoneResourceRepository, - FacilityOperatorResourceRepository, - ProgramResourceRepository, - FacilityTypeResourceRepository, -) -from app.infrastructure.sigeca_api_client import SigecaApiClient - -from app.infrastructure.jdbc_reader import JDBCReader -from .validators import validate_facilities_dataframe -import unidecode -import json - -logger = logging.getLogger(__name__) - - -class FacilitySupportedProductsSynchronization: - def __init__( - self, - jdbc_reader: JDBCReader, - facilities: DataFrame, - repo: ProgramResourceRepository, - ): - self.facilities = facilities - self.repo = repo - self.jdbc_reader = jdbc_reader - - def synchronize(self): - try: - self._add_missing_facility_types() - logging.info("Facility products synchronization completed successfully") - except Exception as e: - logging.error( - f"An error occurred during facility products synchronization: {e}" - ) - raise - - def _create_joined_df(self): - df = self.facilities.alias("facilities") - types = self.repo.get_all().alias("facility_type") - # Validate foreign keys - df = df.join(types, df["category"] == types["name"], "left") - - return df - - def validate(self, facilities_df): - # Extract services names from the nested structure - missing = self._validate_missing(facilities_df) - return missing - - def _validate_missing(self, facilities_df): - missing = facilities_df.filter((col(f"facility_type.id").isNull())) - - num_invalid = missing.count() - if num_invalid > 0: - logger.warning( - f"Found {num_invalid} facilities with non existing type present:" - ) - # Log details of invalid entries - missing.distinct()[ - ["facilities.code", "facilities.name", "category"] - ].show() - else: - logger.info(f"All facility products matching.") - return missing - - def _add_missing_facility_types(self): - df = self._create_joined_df() - missing: DataFrame = self.validate(df) - - reduced_df = missing[["category"]].distinct() - min_display_order = ( - self.repo.get_all() - .alias("facility_type") - .sort(col("displayorder").desc()) - .first()["displayorder"] - ) - display_orders = [ - min_display_order + n for n in range(1, reduced_df.count() + 1) - ] - add_display_order_f = udf(lambda: display_orders.pop()) - add_category_code_f = udf(lambda name: f"{unidecode.unidecode(name.lower())}") - format_payload_f = udf( - lambda name, code, display: json.dumps( - {"code": code, "name": name, "displayOrder": display} - ) - ) - reduced_df = ( - reduced_df.withColumn("display", add_display_order_f()) - .withColumn("code", add_category_code_f(col("category"))) - .withColumn( - "payload", - format_payload_f( - col(f"category"), - col("code"), - col("display"), - ), - ) - ) - - reduced_df.show() - for row in reduced_df.collect(): - self._sent_to_client(row) - - def _sent_to_client(self, data): - print("THIS IS LEGIT CLIENT") - print(data["payload"]) diff --git a/sigeca_data_import_microservice/app/config.py b/sigeca_data_import_microservice/app/config.py new file mode 100644 index 0000000..324f0e1 --- /dev/null +++ b/sigeca_data_import_microservice/app/config.py @@ -0,0 +1,145 @@ +from dataclasses import dataclass, field +from typing import Any, Dict, Optional, List + + +@dataclass +class OpenLmisApiConfig: + api_url: Optional[str] = None + username: Optional[str] = None + password: Optional[str] = None + login_token: Optional[str] = None + + def __getitem__(self, key: str) -> Any: + return getattr(self, key) + + +@dataclass +class FallbackConfig: + geographicZone: Optional[str] = None + type: Optional[str] = None + + def __getitem__(self, key: str) -> Any: + return getattr(self, key) + + +@dataclass +class SigecaApiConfig: + api_url: Optional[str] = None + headers: Optional[Dict[str, str]] = field(default_factory=dict) + credentials: Optional[Dict[str, str]] = field(default_factory=dict) + skip_verification: Optional[bool] = None + + def __getitem__(self, key: str) -> Any: + return getattr(self, key) + + +@dataclass +class DatabaseConfig: + username: Optional[str] = None + password: Optional[str] = None + host: Optional[str] = None + port: Optional[int] = None + database: Optional[str] = None + + def __getitem__(self, key: str) -> Any: + return getattr(self, key) + + +@dataclass +class JdbcReaderConfig: + jdbc_url: Optional[str] = None + jdbc_user: Optional[str] = None + jdbc_password: Optional[str] = None + jdbc_driver: Optional[str] = None + log_level: Optional[str] = None + ssh_host: Optional[str] = None + ssh_port: Optional[int] = None + ssh_user: Optional[str] = None + ssh_private_key_path: Optional[str] = None + remote_bind_address: Optional[str] = None + remote_bind_port: Optional[int] = None + local_bind_port: Optional[int] = None + + def __getitem__(self, key: str) -> Any: + return getattr(self, key) + + +@dataclass +class SyncConfig: + interval_minutes: Optional[int] = None + synchronize_relevant: Optional[bool] = False + email_report_list: Optional[List[str]] = field(default_factory=list) + + def __getitem__(self, key: str) -> Any: + return getattr(self, key) + + +@dataclass +class SMTPClientConfig: + server_url: Optional[str] = None + username: Optional[str] = None + password: Optional[str] = None + sender: Optional[str] = None + server_port: Optional[int] = 465 + + def __getitem__(self, key: str) -> Any: + return getattr(self, key) + + +class SingletonMeta(type): + """ + A Singleton metaclass that creates a single instance of a class. + """ + + _instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + instance = super().__call__(*args, **kwargs) + cls._instances[cls] = instance + return cls._instances[cls] + + def __getitem__(self, key: str) -> Any: + return getattr(self, key) + + +@dataclass +class Config(metaclass=SingletonMeta): + open_lmis_api: Optional[OpenLmisApiConfig] = field( + default_factory=OpenLmisApiConfig + ) + sigeca_api: Optional[SigecaApiConfig] = field(default_factory=SigecaApiConfig) + database: Optional[DatabaseConfig] = field(default_factory=DatabaseConfig) + jdbc_reader: Optional[JdbcReaderConfig] = field(default_factory=JdbcReaderConfig) + sync: Optional[SyncConfig] = field(default_factory=SyncConfig) + additional_settings: Dict[str, Any] = field(default_factory=dict) + fallbacks: Optional[FallbackConfig] = field(default_factory=FallbackConfig) + smtp: Optional[SMTPClientConfig] = field(default_factory=SMTPClientConfig) + + @classmethod + def from_dict(cls, config_dict: Dict[str, Any]) -> "Config": + open_lmis_api = OpenLmisApiConfig(**config_dict.get("open_lmis_api", {})) + sigeca_api = SigecaApiConfig(**config_dict.get("sigeca_api", {})) + database = DatabaseConfig(**config_dict.get("database", {})) + jdbc_reader = JdbcReaderConfig(**config_dict.get("jdbc_reader", {})) + sync = SyncConfig(**config_dict.get("sync", {})) + fallback = FallbackConfig(**config_dict.get("fallbacks", {})) + smtp = SMTPClientConfig(**config_dict.get("smtp", {})) + additional_settings = { + k: v for k, v in config_dict.items() if k not in cls.__dataclass_fields__ + } + + instance = cls() + instance.open_lmis_api = open_lmis_api + instance.sigeca_api = sigeca_api + instance.database = database + instance.jdbc_reader = jdbc_reader + instance.sync = sync + instance.additional_settings = additional_settings + instance.fallbacks = fallback + instance.smtp = smtp + + return instance + + def __getitem__(self, key: str) -> Any: + return getattr(self, key) diff --git a/sigeca_data_import_microservice/app/infrastructure/database.py b/sigeca_data_import_microservice/app/infrastructure/database.py index 67c51bb..60b3bb2 100644 --- a/sigeca_data_import_microservice/app/infrastructure/database.py +++ b/sigeca_data_import_microservice/app/infrastructure/database.py @@ -2,16 +2,18 @@ from sqlalchemy.orm import sessionmaker, declarative_base import urllib.parse +from app.config import DatabaseConfig + Base = declarative_base() -def get_engine(config: dict): +def get_engine(config: DatabaseConfig): url = ( f"postgresql+psycopg2://" - + urllib.parse.quote_plus(config["username"]) + + urllib.parse.quote_plus(config.username) + ":" - + urllib.parse.quote_plus(config["password"]) - + f"@{config['host']}:{config.get('port', 5432)}/{config['database']}" + + urllib.parse.quote_plus(config.password) + + f"@{config.host}:{config.port}/{config.database}" ) return create_engine(url) diff --git a/sigeca_data_import_microservice/app/infrastructure/jdbc_reader.py b/sigeca_data_import_microservice/app/infrastructure/jdbc_reader.py index 32f1226..b2427d3 100644 --- a/sigeca_data_import_microservice/app/infrastructure/jdbc_reader.py +++ b/sigeca_data_import_microservice/app/infrastructure/jdbc_reader.py @@ -31,7 +31,7 @@ def __init__(self, config: Any): .config("spark.jars.packages", "org.postgresql:postgresql:42.2.23") .getOrCreate() ) - self.spark.sparkContext.setLogLevel(config.get("log_level", "WARN")) + self.spark.sparkContext.setLogLevel(config["log_level"] or "WARN") self.tunnel = None def setup_ssh_tunnel(self): diff --git a/sigeca_data_import_microservice/app/infrastructure/open_lmis_api_client.py b/sigeca_data_import_microservice/app/infrastructure/open_lmis_api_client.py index 1eff4ef..f82b416 100644 --- a/sigeca_data_import_microservice/app/infrastructure/open_lmis_api_client.py +++ b/sigeca_data_import_microservice/app/infrastructure/open_lmis_api_client.py @@ -3,13 +3,15 @@ import logging from requests.auth import HTTPBasicAuth +from app.config import OpenLmisApiConfig + class OpenLmisApiClient: LOGIN_URI = "oauth/token?grant_type=password" FACILITIES_URI = "facilities" GEOGRAPHICAL_ZONES_URI = "geographicZones" - def __init__(self, api_config: dict): + def __init__(self, api_config: OpenLmisApiConfig): api_url: str = api_config["api_url"] username: str = api_config["username"] password: str = api_config["password"] diff --git a/sigeca_data_import_microservice/app/infrastructure/sigeca_api_client.py b/sigeca_data_import_microservice/app/infrastructure/sigeca_api_client.py index ce9c8f6..b114001 100644 --- a/sigeca_data_import_microservice/app/infrastructure/sigeca_api_client.py +++ b/sigeca_data_import_microservice/app/infrastructure/sigeca_api_client.py @@ -3,17 +3,19 @@ import logging from requests.auth import HTTPBasicAuth +from app.config import SigecaApiConfig + class SigecaApiClient: LOGIN_URI = "token/" FACILITIES_URI = "facilities/" GEOGRAPHICAL_ZONES_URI = "geographicZones" - def __init__(self, api_config: dict): + def __init__(self, api_config: SigecaApiConfig): api_url: str = api_config["api_url"] headers: str = api_config["headers"] - self.credentials = api_config['credentials'] - self.skip_verification: bool = api_config.get('skip_verification') or False + self.credentials = api_config["credentials"] + self.skip_verification: bool = api_config["skip_verification"] or False if api_url.endswith("/"): api_url = api_url[:-1] @@ -30,29 +32,32 @@ def fetch_facilities(self, **kwargs): self._get_token() url = f"{self.api_url}/{self.FACILITIES_URI}" - response = requests.get(url, headers=self.headers, verify=not self.skip_verification) + response = requests.get( + url, headers=self.headers, verify=not self.skip_verification + ) if response.status_code == 200: - return response.json() + return response.json() else: logging.error( - f"Failed to log into OpenLMIS API: {response.status_code} {response}" + f"Failed to Fetch Facilities from SIGECA API: {response.status_code} {response}" ) - raise Exception("Failed to log into sigeca central API") - + raise Exception("Failed to Fetch Facilities from SIGECA API") def _get_token(self): """Login to get access token""" url = f"{self.api_url}/{self.LOGIN_URI}" data = self.credentials - response = requests.post(url, headers=self.headers, data=data, verify=not self.skip_verification) + response = requests.post( + url, headers=self.headers, data=data, verify=not self.skip_verification + ) if response.status_code == 200: self.token = response.json().get("access_token") - self.headers['Authorization'] = F"Bearer {self.token}" + self.headers["Authorization"] = f"Bearer {self.token}" else: logging.error( - f"Failed to log into OpenLMIS API: {response.status_code} {response}" + f"Failed to log into SIGECA API: {response.status_code} {response.text}" ) - raise Exception("Failed to log into sigeca central API") \ No newline at end of file + raise Exception("Failed to log into SIGECA API") diff --git a/sigeca_data_import_microservice/app/infrastructure/smtp_client.py b/sigeca_data_import_microservice/app/infrastructure/smtp_client.py new file mode 100644 index 0000000..850fcd1 --- /dev/null +++ b/sigeca_data_import_microservice/app/infrastructure/smtp_client.py @@ -0,0 +1,33 @@ +import logging +import smtplib +import ssl +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +from app.config import SMTPClientConfig + + +class SMTPClient: + def __init__(self, config: SMTPClientConfig): + self._url = config.server_url + self._port = config.server_port + self._username = config.username + self._password = config.password + self._sender = config.sender + + def send_email(self, address: str, subject: str, message: str): + try: + context = ssl.create_default_context() + with smtplib.SMTP_SSL(self._url, self._port, context=context) as smtp: + smtp.login(self._username, self._password) + msg = MIMEMultipart() + msg['From'] = self._sender + msg['To'] = address + msg['Subject'] = subject + # Attach the email body to the MIME message + msg.attach(MIMEText(message, 'plain')) + # Send the email + smtp.send_message(msg) + except Exception as e: + logging.error("Failed to send email via SMTP: %s", str(e), exc_info=e) + raise e diff --git a/sigeca_data_import_microservice/config_example.json b/sigeca_data_import_microservice/config_example.json index 4e7d195..f5e85d5 100644 --- a/sigeca_data_import_microservice/config_example.json +++ b/sigeca_data_import_microservice/config_example.json @@ -36,7 +36,18 @@ "remote_bind_port": 5432, "local_bind_port": 5559 }, + "smtp": { + "server_url": "example.com", + "server_port": 465, + "username": "username", + "password": "password", + "sender": "sender" + }, "sync": { "interval_minutes": 5 + }, + "fallbacks": { + "geographicZone" : "[UUID]", + "type": "[UUID]" } } \ No newline at end of file diff --git a/sigeca_data_import_microservice/docs/IntegrationOfFacilities.pdf b/sigeca_data_import_microservice/docs/IntegrationOfFacilities.pdf index fca14dc..327ceb9 100644 Binary files a/sigeca_data_import_microservice/docs/IntegrationOfFacilities.pdf and b/sigeca_data_import_microservice/docs/IntegrationOfFacilities.pdf differ diff --git a/sigeca_data_import_microservice/main.py b/sigeca_data_import_microservice/main.py index d81d8fc..b1b945c 100644 --- a/sigeca_data_import_microservice/main.py +++ b/sigeca_data_import_microservice/main.py @@ -4,7 +4,9 @@ import os from app.application.scheduler import FacilitySyncScheduler -from app.application.synchronization.facilities import FacilitySynchronizationService +from app.application.synchronization.facilities.synchronization import ( + FacilitySynchronizationService, +) from app.domain.resources import ( FacilityOperatorResourceRepository, FacilityResourceRepository, @@ -14,9 +16,11 @@ ) from app.infrastructure.database import get_engine from app.infrastructure.jdbc_reader import JDBCReader +from app.infrastructure.smtp_client import SMTPClient from app.infrastructure.open_lmis_api_client import OpenLmisApiClient from app.infrastructure.sigeca_api_client import SigecaApiClient -from dotenv import dotenv_values, load_dotenv +from dotenv import load_dotenv +from app.config import Config def load_config(from_env=False): @@ -60,11 +64,14 @@ def _run_scheduler(sync_service, sync_interval_minutes): logging.basicConfig(level=logging.INFO) config = load_config(args.env_config) - engine = get_engine(config["database"]) + config = Config.from_dict(config) + + engine = get_engine(Config().database) - lmis_client = OpenLmisApiClient(config["open_lmis_api"]) - sigeca_client = SigecaApiClient(config["sigeca_api"]) - jdbc_reader = JDBCReader(config["jdbc_reader"]) + lmis_client = OpenLmisApiClient(config.open_lmis_api) + sigeca_client = SigecaApiClient(config.sigeca_api) + jdbc_reader = JDBCReader(config.jdbc_reader) + smtp_client = SMTPClient(config.smtp) sync_service = FacilitySynchronizationService( jdbc_reader, @@ -76,8 +83,9 @@ def _run_scheduler(sync_service, sync_interval_minutes): FacilityOperatorResourceRepository(jdbc_reader), ProgramResourceRepository(jdbc_reader), ) + try: - if config["jdbc_reader"].get("ssh_user"): + if config.jdbc_reader.ssh_user: jdbc_reader.setup_ssh_tunnel() if args.run_mode == "continuous": @@ -87,9 +95,8 @@ def _run_scheduler(sync_service, sync_interval_minutes): elif args.run_mode == "one-time": lmis_client.login() sync_service.synchronize_facilities() - except Exception as e: logging.exception(e) finally: - if config["jdbc_reader"].get("ssh_user"): + if config.jdbc_reader.ssh_user: jdbc_reader.close_ssh_tunnel()