diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index d73028ff90..25cc986958 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -88,7 +88,10 @@ "console": "integratedTerminal", "preLaunchTask": "Copy_env_file_for_api_debug", "cwd": "${workspaceFolder}/api_app", - "envFile": "${workspaceFolder}/api_app/.env" + "envFile": "${workspaceFolder}/api_app/.env", + "env": { + "OTEL_RESOURCE_ATTRIBUTES": "service.name=api,service.instance.id=local_debug,service.version=dev" + } }, { "name": "E2E Extended", @@ -190,7 +193,8 @@ "cwd": "${workspaceFolder}/resource_processor", "envFile": "${workspaceFolder}/core/private.env", "env": { - "PYTHONPATH": "." + "PYTHONPATH": ".", + "OTEL_RESOURCE_ATTRIBUTES": "service.name=resource_processor,service.instance.id=local_debug,service.version=dev" } }, { diff --git a/api_app/_version.py b/api_app/_version.py index a99557a02f..137e5fb32a 100644 --- a/api_app/_version.py +++ b/api_app/_version.py @@ -1 +1 @@ -__version__ = "0.15.17" +__version__ = "0.15.19" diff --git a/api_app/db/repositories/base.py b/api_app/db/repositories/base.py index 631ea58474..35395fa064 100644 --- a/api_app/db/repositories/base.py +++ b/api_app/db/repositories/base.py @@ -30,7 +30,7 @@ async def _get_container(cls, container_name, partition_key_obj) -> ContainerPro raise UnableToAccessDatabase async def query(self, query: str, parameters: Optional[dict] = None): - items = self.container.query_items(query=query, parameters=parameters, enable_cross_partition_query=True) + items = self.container.query_items(query=query, parameters=parameters) return [i async for i in items] async def read_item_by_id(self, item_id: str) -> dict: diff --git a/api_app/main.py b/api_app/main.py index 0af6dc16d8..eeb5055be3 100644 --- a/api_app/main.py +++ b/api_app/main.py @@ -1,6 +1,5 @@ import asyncio import logging -from opencensus.ext.azure.trace_exporter import AzureExporter import uvicorn from fastapi import FastAPI @@ -9,9 +8,6 @@ from fastapi_utils.tasks import repeat_every from service_bus.airlock_request_status_update import receive_step_result_message_and_update_status -from services.tracing import RequestTracerMiddleware -from opencensus.trace.samplers import ProbabilitySampler - from starlette.exceptions import HTTPException from starlette.middleware.errors import ServerErrorMiddleware @@ -21,10 +17,13 @@ from api.errors.generic_error import generic_error_handler from core import config from core.events import create_start_app_handler, create_stop_app_handler -from services.logging import initialize_logging, telemetry_processor_callback_function +from services.logging import initialize_logging from service_bus.deployment_status_updater import DeploymentStatusUpdater +logger = logging.getLogger() + + def get_application() -> FastAPI: application = FastAPI( title=config.PROJECT_NAME, @@ -36,17 +35,16 @@ def get_application() -> FastAPI: openapi_url=None ) + if config.DEBUG: + initialize_logging(logging.DEBUG, True, application) + else: + initialize_logging(logging.INFO, False, application) + application.add_event_handler("startup", create_start_app_handler(application)) application.add_event_handler("shutdown", create_stop_app_handler(application)) - try: - exporter = AzureExporter(sampler=ProbabilitySampler(1.0)) - exporter.add_telemetry_processor(telemetry_processor_callback_function) - application.add_middleware(RequestTracerMiddleware, exporter=exporter) - except Exception: - logging.exception("Failed to add RequestTracerMiddleware") - application.add_middleware(ServerErrorMiddleware, handler=generic_error_handler) + # Allow local UI debugging with local API if config.ENABLE_LOCAL_DEBUGGING: application.add_middleware( @@ -63,17 +61,12 @@ def get_application() -> FastAPI: return application -if config.DEBUG: - initialize_logging(logging.DEBUG) -else: - initialize_logging(logging.INFO) - app = get_application() @app.on_event("startup") async def watch_deployment_status() -> None: - logging.info("Starting deployment status watcher thread") + logger.info("Starting deployment status watcher thread") statusWatcher = DeploymentStatusUpdater(app) await statusWatcher.init_repos() current_event_loop = asyncio.get_event_loop() diff --git a/api_app/requirements.txt b/api_app/requirements.txt index ba5aa64214..336f340919 100644 --- a/api_app/requirements.txt +++ b/api_app/requirements.txt @@ -1,25 +1,25 @@ # API -azure-core==1.26.1 +azure-core==1.29.4 aiohttp==3.8.5 -azure-cosmos==4.3.0 -azure-identity==1.12.0 -azure-mgmt-cosmosdb==9.0.0 +azure-cosmos==4.5.1 +azure-identity==1.14.1 +azure-monitor-opentelemetry==1.0.0 +azure-mgmt-cosmosdb==9.3.0 azure-mgmt-compute==29.1.0 azure-mgmt-costmanagement==3.0.0 -azure-storage-blob==12.15.0 -azure-servicebus==7.8.1 +azure-storage-blob==12.18.3 +azure-servicebus==7.11.3 azure-eventgrid==4.9.1 fastapi[all]==0.95.0 fastapi-utils==0.2.1 gunicorn==20.1.0 jsonschema[format_nongpl]==4.17.1 msal~=1.20.0 -opencensus-ext-azure==1.1.7 -opencensus-ext-logging==0.1.1 -PyJWT==2.6.0 +PyJWT==2.8.0 uvicorn[standard]==0.20.0 semantic-version==2.10.0 pytz~=2022.7 python-dateutil~=2.8.2 azure-mgmt-resource==22.0.0 pandas==1.5.2 +opentelemetry.instrumentation.logging==0.41b0 diff --git a/api_app/services/aad_authentication.py b/api_app/services/aad_authentication.py index 29dc1716a7..d0ccf3175c 100644 --- a/api_app/services/aad_authentication.py +++ b/api_app/services/aad_authentication.py @@ -5,7 +5,6 @@ from typing import List, Optional import jwt import requests -import rsa from fastapi import Request, HTTPException, status from msal import ConfidentialClientApplication @@ -19,6 +18,11 @@ from api.dependencies.database import get_db_client_from_request from db.repositories.workspaces import WorkspaceRepository +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization + + MICROSOFT_GRAPH_URL = config.MICROSOFT_GRAPH_URL.strip("/") @@ -179,9 +183,14 @@ def _get_token_key(self, key_id: str) -> str: for key in keys['keys']: n = int.from_bytes(base64.urlsafe_b64decode(self._ensure_b64padding(key['n'])), "big") e = int.from_bytes(base64.urlsafe_b64decode(self._ensure_b64padding(key['e'])), "big") - pub_key = rsa.PublicKey(n, e) + pub_key = rsa.RSAPublicNumbers(e, n).public_key(default_backend()) + pub_key_pkcs1 = pub_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.PKCS1 + ) + # Cache the PEM formatted public key. - AzureADAuthorization._jwt_keys[key['kid']] = pub_key.save_pkcs1() + AzureADAuthorization._jwt_keys[key['kid']] = pub_key_pkcs1 return AzureADAuthorization._jwt_keys[key_id] diff --git a/api_app/services/logging.py b/api_app/services/logging.py index ccccf39ac7..c17728ef19 100644 --- a/api_app/services/logging.py +++ b/api_app/services/logging.py @@ -1,12 +1,12 @@ import logging -from typing import Optional +import os +import re +from opentelemetry.instrumentation.logging import LoggingInstrumentor +from azure.monitor.opentelemetry import configure_azure_monitor +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.requests import RequestsInstrumentor -from opencensus.ext.azure.log_exporter import AzureLogHandler -from opencensus.trace import config_integration -from opencensus.trace.samplers import AlwaysOnSampler -from opencensus.trace.tracer import Tracer - -from core.config import VERSION +from fastapi import FastAPI UNWANTED_LOGGERS = [ "azure.core.pipeline.policies.http_logging_policy", @@ -33,14 +33,18 @@ "uamqp.async_ops.session_async", "uamqp.sender", "uamqp.client", - "azure.servicebus.aio._base_handler_async" + "azure.servicebus.aio._base_handler_async", + "azure.servicebus._pyamqp.aio._connection_async", + "azure.servicebus._pyamqp.aio._link_async", + "opentelemetry.attributes" ] -def disable_unwanted_loggers(): - """ - Disables the unwanted loggers. - """ +debug = os.environ.get("DEBUG", "False").lower() in ("true", "1") + + +def configure_loggers(): + for logger_name in UNWANTED_LOGGERS: logging.getLogger(logger_name).disabled = True @@ -48,59 +52,91 @@ def disable_unwanted_loggers(): logging.getLogger(logger_name).setLevel(logging.ERROR) -def telemetry_processor_callback_function(envelope): - envelope.tags['ai.cloud.role'] = 'api' - envelope.tags['ai.application.ver'] = VERSION - - -class ExceptionTracebackFilter(logging.Filter): - """ - If a record contains 'exc_info', it will only show in the 'exceptions' section of Application Insights without showing - in the 'traces' section. In order to show it also in the 'traces' section, we need another log that does not contain 'exc_info'. - """ - def filter(self, record): - if record.exc_info: - logger = logging.getLogger(record.name) - _, exception_value, _ = record.exc_info - message = f"{record.getMessage()}\nException message: '{exception_value}'" - logger.log(record.levelno, message) - - return True - +def initialize_logging(logging_level: int, add_console_handler: bool, application: FastAPI) -> logging.Logger: -def initialize_logging(logging_level: int, correlation_id: Optional[str] = None) -> logging.LoggerAdapter: - """ - Adds the Application Insights handler for the root logger and sets the given logging level. - Creates and returns a logger adapter that integrates the correlation ID, if given, to the log messages. - - :param logging_level: The logging level to set e.g., logging.WARNING. - :param correlation_id: Optional. The correlation ID that is passed on to the operation_Id in App Insights. - :returns: A newly created logger adapter. - """ logger = logging.getLogger() + logger.setLevel(logging_level) - disable_unwanted_loggers() + if add_console_handler: + console_formatter = logging.Formatter( + fmt="%(module)-7s %(name)-7s %(process)-7s %(asctime)s %(otelServiceName)-7s %(otelTraceID)-7s %(otelSpanID)-7s %(levelname)-7s %(message)s" + ) + console_handler = logging.StreamHandler() + console_handler.setLevel(logging_level) + console_handler.setFormatter(console_formatter) + logger.addHandler(console_handler) try: - # picks up APPLICATIONINSIGHTS_CONNECTION_STRING automatically - azurelog_handler = AzureLogHandler() - azurelog_handler.add_telemetry_processor(telemetry_processor_callback_function) - azurelog_handler.addFilter(ExceptionTracebackFilter()) - logger.addHandler(azurelog_handler) + configure_azure_monitor() except ValueError as e: logger.error(f"Failed to set Application Insights logger handler: {e}") - config_integration.trace_integrations(['logging']) - logging.basicConfig(level=logging_level, format='%(asctime)s traceId=%(traceId)s spanId=%(spanId)s %(message)s') - Tracer(sampler=AlwaysOnSampler()) - logger.setLevel(logging_level) - - extra = {} - - if correlation_id: - extra = {'traceId': correlation_id} - - adapter = logging.LoggerAdapter(logger, extra) - adapter.debug(f"Logger adapter initialized with extra: {extra}") - - return adapter + LoggingInstrumentor().instrument( + set_logging_format=True, + level=logging_level + ) + + FastAPIInstrumentor.instrument_app(application) + RequestsInstrumentor().instrument() + + return logger + + +def shell_output_logger( + console_output: str, + prefix_item: str, + logger: logging.LoggerAdapter, + logging_level: int, +): + + if not console_output: + logging.debug("shell console output is empty.") + return + + console_output = console_output.strip() + + if ( + logging_level != logging.INFO + and len(console_output) < 200 + and console_output.startswith("Unable to find image '") + and console_output.endswith("' locally") + ): + logging.debug("Image not present locally, setting log to INFO.") + logging_level = logging.INFO + + logger.log(logging_level, f"{prefix_item} {console_output}") + + +class AzureLogFormatter(logging.Formatter): + # 7-bit C1 ANSI sequences + ansi_escape = re.compile( + r""" + \x1B # ESC + (?: # 7-bit C1 Fe (except CSI) + [@-Z\\-_] + | # or [ for CSI, followed by a control sequence + \[ + [0-?]* # Parameter bytes + [ -/]* # Intermediate bytes + [@-~] # Final byte + ) + """, + re.VERBOSE, + ) + + MAX_MESSAGE_LENGTH = 32000 + TRUNCATION_TEXT = "MESSAGE TOO LONG, TAILING..." + + def format(self, record): + s = super().format(record) + s = AzureLogFormatter.ansi_escape.sub("", s) + + # not doing this here might produce errors if we try to log empty strings. + if s == "": + s = "EMPTY MESSAGE!" + + # azure monitor is limiting the message size. + if len(s) > AzureLogFormatter.MAX_MESSAGE_LENGTH: + s = f"{AzureLogFormatter.TRUNCATION_TEXT}\n{s[-1 * AzureLogFormatter.MAX_MESSAGE_LENGTH:]}" + + return s diff --git a/api_app/services/tracing.py b/api_app/services/tracing.py deleted file mode 100644 index cf269af625..0000000000 --- a/api_app/services/tracing.py +++ /dev/null @@ -1,90 +0,0 @@ -import logging - -from fastapi import Request -from opencensus.trace import ( - attributes_helper, - execution_context, - print_exporter, - samplers, -) -from opencensus.trace import span as span_module -from opencensus.trace import tracer as tracer_module -from opencensus.trace import utils -from opencensus.trace.propagation import trace_context_http_header_format -from starlette.types import ASGIApp -from starlette.middleware.base import BaseHTTPMiddleware - -HTTP_HOST = attributes_helper.COMMON_ATTRIBUTES["HTTP_HOST"] -HTTP_METHOD = attributes_helper.COMMON_ATTRIBUTES["HTTP_METHOD"] -HTTP_PATH = attributes_helper.COMMON_ATTRIBUTES["HTTP_PATH"] -HTTP_ROUTE = attributes_helper.COMMON_ATTRIBUTES["HTTP_ROUTE"] -HTTP_URL = attributes_helper.COMMON_ATTRIBUTES["HTTP_URL"] -HTTP_STATUS_CODE = attributes_helper.COMMON_ATTRIBUTES["HTTP_STATUS_CODE"] - -module_logger = logging.getLogger(__name__) - - -class RequestTracerMiddleware(BaseHTTPMiddleware): - def __init__( - self, - app: ASGIApp, - excludelist_paths=None, - excludelist_hostnames=None, - sampler=None, - exporter=None, - propagator=None, - ) -> None: - super().__init__(app) - self.app = app - self.excludelist_paths = excludelist_paths - self.excludelist_hostnames = excludelist_hostnames - self.sampler = sampler or samplers.AlwaysOnSampler() - self.exporter = exporter or print_exporter.PrintExporter() - self.propagator = ( - propagator or trace_context_http_header_format.TraceContextPropagator() - ) - - async def dispatch(self, request: Request, call_next): - - # Do not trace if the url is in the exclude list - if utils.disable_tracing_url(str(request.url), self.excludelist_paths): - return await call_next(request) - - try: - span_context = self.propagator.from_headers(request.headers) - - tracer = tracer_module.Tracer( - span_context=span_context, - sampler=self.sampler, - exporter=self.exporter, - propagator=self.propagator, - ) - except Exception: # pragma: NO COVER - module_logger.error("Failed to trace request", exc_info=True) - return await call_next(request) - - try: - span = tracer.start_span() - span.span_kind = span_module.SpanKind.SERVER - span.name = "[{}]{}".format(request.method, request.url) - - tracer.add_attribute_to_current_span(HTTP_HOST, request.url.hostname) - tracer.add_attribute_to_current_span(HTTP_METHOD, request.method) - tracer.add_attribute_to_current_span(HTTP_PATH, request.url.path) - tracer.add_attribute_to_current_span(HTTP_ROUTE, request.url.path) - tracer.add_attribute_to_current_span(HTTP_URL, str(request.url)) - - execution_context.set_opencensus_attr( - "excludelist_hostnames", self.excludelist_hostnames - ) - except Exception: # pragma: NO COVER - module_logger.error("Failed to trace request", exc_info=True) - - response = await call_next(request) - try: - tracer.add_attribute_to_current_span(HTTP_STATUS_CODE, response.status_code) - except Exception: # pragma: NO COVER - module_logger.error("Failed to trace response", exc_info=True) - finally: - tracer.end_span() - return response diff --git a/api_app/tests_ma/test_db/test_repositories/test_airlock_request_repository.py b/api_app/tests_ma/test_db/test_repositories/test_airlock_request_repository.py index 8e817513c1..ffff8a3c18 100644 --- a/api_app/tests_ma/test_db/test_repositories/test_airlock_request_repository.py +++ b/api_app/tests_ma/test_db/test_repositories/test_airlock_request_repository.py @@ -152,4 +152,4 @@ async def test_get_airlock_requests_queries_db(airlock_request_repo): ] await airlock_request_repo.get_airlock_requests(WORKSPACE_ID) - airlock_request_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=expected_parameters, enable_cross_partition_query=True) + airlock_request_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=expected_parameters) diff --git a/api_app/tests_ma/test_db/test_repositories/test_workpaces_repository.py b/api_app/tests_ma/test_db/test_repositories/test_workpaces_repository.py index ce4960c54e..7ae495fade 100644 --- a/api_app/tests_ma/test_db/test_repositories/test_workpaces_repository.py +++ b/api_app/tests_ma/test_db/test_repositories/test_workpaces_repository.py @@ -53,7 +53,7 @@ async def test_get_workspaces_queries_db(workspace_repo): expected_query = workspace_repo.workspaces_query_string() await workspace_repo.get_workspaces() - workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None, enable_cross_partition_query=True) + workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None) @pytest.mark.asyncio @@ -62,7 +62,7 @@ async def test_get_active_workspaces_queries_db(workspace_repo): expected_query = workspace_repo.active_workspaces_query_string() await workspace_repo.get_active_workspaces() - workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None, enable_cross_partition_query=True) + workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None) @pytest.mark.asyncio @@ -94,7 +94,7 @@ async def test_get_workspace_by_id_queries_db(workspace_repo, workspace): expected_query = f'SELECT * FROM c WHERE c.resourceType = "workspace" AND c.id = "{workspace.id}"' await workspace_repo.get_workspace_by_id(workspace.id) - workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None, enable_cross_partition_query=True) + workspace_repo.container.query_items.assert_called_once_with(query=expected_query, parameters=None) @pytest.mark.asyncio diff --git a/cli/requirements.txt b/cli/requirements.txt index 35218691b7..5aefcf956d 100644 --- a/cli/requirements.txt +++ b/cli/requirements.txt @@ -4,8 +4,8 @@ httpx~=0.23.0 msal==1.20.0 jmespath==1.0.1 tabulate==0.9.0 -pygments==2.15.0 -PyJWT==2.6.0 +pygments==2.16.1 +PyJWT==2.8.0 azure-cli-core==2.47.0 -azure-identity==1.12.0 +azure-identity==1.14.1 aiohttp==3.8.5 diff --git a/cli/setup.py b/cli/setup.py index 955a58aa98..123fef1803 100644 --- a/cli/setup.py +++ b/cli/setup.py @@ -45,10 +45,10 @@ "msal==1.20.0", "jmespath==1.0.1", "tabulate==0.9.0", - "pygments==2.15.0", - "PyJWT==2.6.0", + "pygments==2.16.1", + "PyJWT==2.8.0", "azure-cli-core==2.47.0", - "azure-identity==1.12.0", + "azure-identity==1.14.1", "aiohttp==3.8.5" ], diff --git a/core/terraform/api-webapp.tf b/core/terraform/api-webapp.tf index 902e8b374a..cf73002896 100644 --- a/core/terraform/api-webapp.tf +++ b/core/terraform/api-webapp.tf @@ -58,6 +58,7 @@ resource "azurerm_linux_web_app" "api" { RESOURCE_MANAGER_ENDPOINT = module.terraform_azurerm_environment_configuration.resource_manager_endpoint MICROSOFT_GRAPH_URL = module.terraform_azurerm_environment_configuration.microsoft_graph_endpoint STORAGE_ENDPOINT_SUFFIX = module.terraform_azurerm_environment_configuration.storage_suffix + OTEL_RESOURCE_ATTRIBUTES = "service.name=api,service.version=${local.version}" } identity { diff --git a/core/terraform/resource_processor/vmss_porter/cloud-config.yaml b/core/terraform/resource_processor/vmss_porter/cloud-config.yaml index b1d1223543..d4bb9a2cd2 100644 --- a/core/terraform/resource_processor/vmss_porter/cloud-config.yaml +++ b/core/terraform/resource_processor/vmss_porter/cloud-config.yaml @@ -57,6 +57,7 @@ write_files: AZURE_ENVIRONMENT=${azure_environment} AAD_AUTHORITY_URL=${aad_authority_url} MICROSOFT_GRAPH_FQDN=${microsoft_graph_fqdn} + OTEL_RESOURCE_ATTRIBUTES=service.name=resource_processor,service.version=${resource_processor_vmss_porter_image_tag} ${rp_bundle_values} - path: /etc/cron.hourly/docker-prune # An hourly cron job to have docker free disk space. Running this frquently diff --git a/resource_processor/_version.py b/resource_processor/_version.py index f6104e0c26..4e27eedb8d 100644 --- a/resource_processor/_version.py +++ b/resource_processor/_version.py @@ -1 +1 @@ -__version__ = "0.6.7" +__version__ = "0.7.7" diff --git a/resource_processor/resources/commands.py b/resource_processor/resources/commands.py index 8c61220f93..e6214f7805 100644 --- a/resource_processor/resources/commands.py +++ b/resource_processor/resources/commands.py @@ -10,21 +10,34 @@ def azure_login_command(config): - set_cloud_command = f"az cloud set --name {config['azure_environment']}" + set_cloud_command = f"az cloud set --name {config['azure_environment']} >/dev/null " if config["vmss_msi_id"]: # Use the Managed Identity when in VMSS context - login_command = f"az login --identity -u {config['vmss_msi_id']}" + login_command = f"az login --identity -u {config['vmss_msi_id']} >/dev/null " + else: # Use a Service Principal when running locally - login_command = f"az login --service-principal --username {config['arm_client_id']} --password {config['arm_client_secret']} --tenant {config['arm_tenant_id']}" + login_command = f"az login --service-principal --username {config['arm_client_id']} --password {config['arm_client_secret']} --tenant {config['arm_tenant_id']} >/dev/null" return f"{set_cloud_command} && {login_command}" +def apply_porter_credentials_sets_command(config): + if config["vmss_msi_id"]: + # Use the Managed Identity when in VMSS context + porter_credential_sets = "porter credentials apply vmss_porter/arm_auth.json >/dev/null 2>&1 && porter credentials apply vmss_porter/aad_auth.json >/dev/null 2>&1" + + else: + # Use a Service Principal when running locally + porter_credential_sets = "porter credentials apply vmss_porter/arm_auth_local_debugging.json >/dev/null 2>&1 && porter credentials apply vmss_porter/aad_auth_local_debugging.json >/dev/null 2>&1" + + return f"{porter_credential_sets}" + + def azure_acr_login_command(config): acr_name = _get_acr_name(acr_fqdn=config['registry_server']) - return f"az acr login --name {acr_name}" + return f"az acr login --name {acr_name} >/dev/null " async def build_porter_command(config, logger, msg_body, custom_action=False): @@ -87,8 +100,8 @@ async def build_porter_command_for_outputs(msg_body): async def get_porter_parameter_keys(config, logger, msg_body): - command = [f"{azure_login_command(config)} >/dev/null && \ - {azure_acr_login_command(config)} >/dev/null && \ + command = [f"{azure_login_command(config)} && \ + {azure_acr_login_command(config)} && \ porter explain --reference {config['registry_server']}/{msg_body['name']}:v{msg_body['version']} --output json"] proc = await asyncio.create_subprocess_shell( diff --git a/resource_processor/run.sh b/resource_processor/run.sh index 307d69d389..7fd1a8837f 100755 --- a/resource_processor/run.sh +++ b/resource_processor/run.sh @@ -55,11 +55,6 @@ else echo "Porter v0 state doesn't exist." fi -# Can't be in the image since DB connection is needed. -echo "Applying credential sets..." -porter credentials apply vmss_porter/arm_auth_local_debugging.json -porter credentials apply vmss_porter/aad_auth.json - # Launch the runner echo "Starting resource processor..." python -u vmss_porter/runner.py diff --git a/resource_processor/shared/config.py b/resource_processor/shared/config.py index 1e6a3e5c45..5c9712f63e 100644 --- a/resource_processor/shared/config.py +++ b/resource_processor/shared/config.py @@ -1,12 +1,16 @@ +import logging import os - +from opentelemetry import trace from _version import __version__ VERSION = __version__ +logger = logging.getLogger() + -def get_config(logger_adapter) -> dict: - config = {} +def get_config(tracer: trace.Tracer) -> dict: + with tracer.start_as_current_span("get_config"): + config = {} config["registry_server"] = os.environ["REGISTRY_SERVER"] config["tfstate_container_name"] = os.environ["TERRAFORM_STATE_CONTAINER_NAME"] @@ -26,7 +30,7 @@ def get_config(logger_adapter) -> dict: try: config["number_processes_int"] = int(config["number_processes"]) except ValueError: - logger_adapter.info("Invalid setting for NUMBER_PROCESSES, will default to 1") + logger.info("Invalid setting for NUMBER_PROCESSES, will default to 1") config["number_processes_int"] = 1 # Needed for running porter diff --git a/resource_processor/shared/logging.py b/resource_processor/shared/logging.py index e19c78b7f0..796e977242 100644 --- a/resource_processor/shared/logging.py +++ b/resource_processor/shared/logging.py @@ -1,17 +1,14 @@ import logging import os import re - -from opencensus.ext.azure.log_exporter import AzureLogHandler -from opencensus.trace import config_integration -from opencensus.trace.samplers import AlwaysOnSampler -from opencensus.trace.tracer import Tracer - -from shared.config import VERSION +from opentelemetry.instrumentation.logging import LoggingInstrumentor +from azure.monitor.opentelemetry import configure_azure_monitor UNWANTED_LOGGERS = [ "azure.core.pipeline.policies.http_logging_policy", "azure.eventhub._eventprocessor.event_processor", + # suppressing, have support case open + "azure.servicebus._pyamqp.aio._session_async", "azure.identity.aio._credentials.managed_identity", "azure.identity.aio._credentials.environment", "azure.identity.aio._internal.get_token_mixin", @@ -34,96 +31,58 @@ "uamqp.async_ops.session_async", "uamqp.sender", "uamqp.client", - "azure.servicebus.aio._base_handler_async" + "azure.servicebus.aio._base_handler_async", + "azure.monitor.opentelemetry.exporter.export._base", + "azure.servicebus.aio._base_handler_async", + "azure.servicebus._pyamqp.aio._connection_async", + "azure.servicebus._pyamqp.aio._link_async", + "opentelemetry.attributes", + "azure.servicebus._pyamqp.aio._management_link_async", + "azure.servicebus._pyamqp.aio._cbs_async", + "azure.servicebus._pyamqp.aio._client_async" ] -debug = os.environ.get('DEBUG', 'False').lower() in ('true', '1') +debug = os.environ.get("DEBUG", "False").lower() in ("true", "1") +logger = logging.getLogger() -def disable_unwanted_loggers(): - """ - Disables the unwanted loggers. - """ - for logger_name in UNWANTED_LOGGERS: - logging.getLogger(logger_name).disabled = True +def configure_loggers(): -def telemetry_processor_callback_function(envelope): - envelope.tags['ai.cloud.role'] = 'resource_processor' - envelope.tags['ai.application.ver'] = VERSION + for logger_name in UNWANTED_LOGGERS: + logging.getLogger(logger_name).disabled = True + for logger_name in LOGGERS_FOR_ERRORS_ONLY: + logging.getLogger(logger_name).setLevel(logging.ERROR) -def initialize_logging(logging_level: int, correlation_id: str, add_console_handler: bool = False) -> logging.LoggerAdapter: - """ - Adds the Application Insights handler for the root logger and sets the given logging level. - Creates and returns a logger adapter that integrates the correlation ID, if given, to the log messages. - Note: This should be called only once, otherwise duplicate log entries could be produced. - :param logging_level: The logging level to set e.g., logging.WARNING. - :param correlation_id: Optional. The correlation ID that is passed on to the operation_Id in App Insights. - :returns: A newly created logger adapter. - """ - logger = logging.getLogger() +def initialize_logging(logging_level: int, add_console_handler: bool = False) -> logging.Logger: - # When using sessions and NEXT_AVAILABLE_SESSION we see regular exceptions which are actually expected - # See https://github.com/Azure/azure-sdk-for-python/issues/9402 - # Other log entries such as 'link detach' also confuse the logs, and are expected. - # We don't want these making the logs any noisier so we raise the logging level for that logger here - # To inspect all the loggers, use -> loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict] - for logger_name in LOGGERS_FOR_ERRORS_ONLY: - logging.getLogger(logger_name).setLevel(logging.ERROR) + logger.setLevel(logging_level) if add_console_handler: - console_formatter = logging.Formatter(fmt='%(module)-7s %(name)-7s %(process)-7s %(asctime)s %(levelname)-7s %(message)s') + console_formatter = logging.Formatter( + fmt="%(module)-7s %(name)-7s %(process)-7s %(asctime)s %(otelServiceName)-7s %(otelTraceID)-7s %(otelSpanID)-7s %(levelname)-7s %(message)s" + ) console_handler = logging.StreamHandler() + console_handler.setLevel(logging_level) console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) try: - # picks up APPLICATIONINSIGHTS_CONNECTION_STRING automatically - azurelog_handler = AzureLogHandler() - azurelog_handler.add_telemetry_processor(telemetry_processor_callback_function) - azurelog_formatter = AzureLogFormatter() - azurelog_handler.setFormatter(azurelog_formatter) - logger.addHandler(azurelog_handler) + configure_azure_monitor() except ValueError as e: logger.error(f"Failed to set Application Insights logger handler: {e}") - config_integration.trace_integrations(['logging']) - logging.basicConfig(level=logging_level, format='%(asctime)s traceId=%(traceId)s spanId=%(spanId)s %(message)s') - Tracer(sampler=AlwaysOnSampler()) - logger.setLevel(logging_level) - - extra = None - - if correlation_id: - extra = {'traceId': correlation_id} - - adapter = logging.LoggerAdapter(logger, extra) - adapter.debug(f"Logger adapter initialized with extra: {extra}") - - return adapter - - -def get_message_id_logger(correlation_id: str) -> logging.LoggerAdapter: - """ - Gets a logger that includes message id for easy correlation between log entries. - :param correlation_id: Optional. The correlation ID that is passed on to the operation_Id in App Insights. - :returns: A modified logger adapter (from the original initiated one). - """ - logger = logging.getLogger() - extra = None - - if correlation_id: - extra = {'traceId': correlation_id} - - adapter = logging.LoggerAdapter(logger, extra) - adapter.debug(f"Logger adapter now includes extra: {extra}") + LoggingInstrumentor().instrument( + set_logging_format=True, + level=logging_level + ) - return adapter + return logger -def shell_output_logger(console_output: str, prefix_item: str, logger: logging.LoggerAdapter, logging_level: int): +def shell_output_logger(console_output: str, prefix_item: str, logging_level: int): """ Logs the shell output (stdout/err) a line at a time with an option to remove ANSI control chars. """ diff --git a/resource_processor/vmss_porter/Dockerfile b/resource_processor/vmss_porter/Dockerfile index 1724e5c1e8..d91fd0b552 100644 --- a/resource_processor/vmss_porter/Dockerfile +++ b/resource_processor/vmss_porter/Dockerfile @@ -8,7 +8,7 @@ RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloa # Install Azure CLI ARG AZURE_CLI_VERSION=2.47.0-1~bullseye COPY scripts/azure-cli.sh /tmp/ -RUN --mount=type=cache,target=/var/cache/apt --mount=type=cache,target=/var/lib/apt \ +RUN \ export AZURE_CLI_VERSION=${AZURE_CLI_VERSION} \ && /tmp/azure-cli.sh @@ -49,7 +49,7 @@ RUN export PORTER_VERSION=${PORTER_VERSION} \ ENV PATH ${PORTER_HOME_V1}:$PATH # Install Docker -RUN --mount=type=cache,target=/var/cache/apt --mount=type=cache,target=/var/lib/apt \ +RUN \ apt-get update && apt-get install -y apt-transport-https ca-certificates curl gnupg lsb-release --no-install-recommends \ && curl -fsSL https://download.docker.com/linux/debian/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg \ && echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian $(lsb_release -cs) stable" \ diff --git a/resource_processor/vmss_porter/requirements.txt b/resource_processor/vmss_porter/requirements.txt index 6bbf937a9e..f192f2a6e0 100644 --- a/resource_processor/vmss_porter/requirements.txt +++ b/resource_processor/vmss_porter/requirements.txt @@ -1,6 +1,6 @@ -azure-servicebus==7.8.1 -opencensus-ext-azure==1.1.7 -opencensus-ext-logging==0.1.1 -azure-identity==1.12.0 +azure-servicebus==7.11.3 +azure-identity==1.14.1 aiohttp==3.8.5 -azure-cli-core==2.46.0 +azure-cli-core==2.47.0 +azure-monitor-opentelemetry==1.0.0 +opentelemetry.instrumentation.logging==0.41b0 diff --git a/resource_processor/vmss_porter/runner.py b/resource_processor/vmss_porter/runner.py index 118eafe8d4..d989efe195 100644 --- a/resource_processor/vmss_porter/runner.py +++ b/resource_processor/vmss_porter/runner.py @@ -5,12 +5,12 @@ import asyncio import logging import sys -from resources.commands import build_porter_command, build_porter_command_for_outputs +from resources.commands import build_porter_command, build_porter_command_for_outputs, apply_porter_credentials_sets_command from shared.config import get_config from resources.helpers import get_installation_id from resources.httpserver import start_server -from shared.logging import disable_unwanted_loggers, initialize_logging, get_message_id_logger, shell_output_logger # pylint: disable=import-error # noqa +from shared.logging import initialize_logging, configure_loggers, shell_output_logger # pylint: disable=import-error # noqa from shared.config import VERSION from resources import strings, statuses # pylint: disable=import-error # noqa from contextlib import asynccontextmanager @@ -19,20 +19,18 @@ from azure.servicebus.aio import ServiceBusClient, AutoLockRenewer from azure.identity.aio import DefaultAzureCredential +from opentelemetry import trace -def set_up_logger(enable_console_logging: bool) -> logging.LoggerAdapter: - # Initialise logging - logger_adapter = initialize_logging(logging.INFO, socket.gethostname(), enable_console_logging) - disable_unwanted_loggers() - return logger_adapter +logger = logging.getLogger() -def set_up_config(logger_adapter: logging.LoggerAdapter) -> Optional[dict]: +def set_up_config(tracer: trace.Tracer) -> Optional[dict]: + try: - config = get_config(logger_adapter) + config = get_config(tracer) return config except KeyError as e: - logger_adapter.error(f"Environment variable {e} is not set correctly...Exiting") + logger.error(f"Environment variable {e} is not set correctly...Exiting") sys.exit(1) @@ -46,7 +44,7 @@ async def default_credentials(msi_id): await credential.close() -async def receive_message(service_bus_client, logger_adapter: logging.LoggerAdapter, config: dict): +async def receive_message(tracer: trace.Tracer, service_bus_client, config: dict): """ This method is run per process. Each process will connect to service bus and try to establish a session. If messages are there, the process will continue to receive all the messages associated with that session. @@ -56,10 +54,10 @@ async def receive_message(service_bus_client, logger_adapter: logging.LoggerAdap while True: try: - logger_adapter.info("Looking for new session...") + logger.info("Looking for new session...") # max_wait_time=1 -> don't hold the session open after processing of the message has finished async with service_bus_client.get_queue_receiver(queue_name=q_name, max_wait_time=1, session_id=NEXT_AVAILABLE_SESSION) as receiver: - logger_adapter.info(f"Got a session containing messages: {receiver.session.session_id}") + logger.info(f"Got a session containing messages: {receiver.session.session_id}") async with AutoLockRenewer() as renewer: # allow a session to be auto lock renewed for up to an hour - if it's processing a message renewer.register(receiver, receiver.session, max_lock_renewal_duration=3600) @@ -70,36 +68,42 @@ async def receive_message(service_bus_client, logger_adapter: logging.LoggerAdap try: message = json.loads(str(msg)) - logger_adapter.info(f"Message received for resource_id={message['id']}, operation_id={message['operationId']}, step_id={message['stepId']}") - message_logger_adapter = get_message_id_logger(message['operationId']) # correlate messages per operation - result = await invoke_porter_action(message, service_bus_client, message_logger_adapter, config) except (json.JSONDecodeError) as e: logging.error(f"Received bad service bus resource request message: {e}") - if result: - logging.info(f"Resource request for {message} is complete") - else: - logging.error('Message processing failed!') + with tracer.start_as_current_span("invoke_porter_action") as current_span: + current_span.set_attribute("resource_id", message["id"]) + current_span.set_attribute("action", message["action"]) + current_span.set_attribute("step_id", message["stepId"]) + current_span.set_attribute("operation_id", message["operationId"]) + logger.info(f"Message received for resource_id={message['id']}, operation_id={message['operationId']}, step_id={message['stepId']}") + + result = await invoke_porter_action(tracer, message, service_bus_client, config) + + if result: + logger.info(f"Resource request for {message} is complete") + else: + logger.error('Message processing failed!') - logger_adapter.info(f"Message for resource_id={message['id']}, operation_id={message['operationId']} processed as {result} and marked complete.") - await receiver.complete_message(msg) + logger.info(f"Message for resource_id={message['id']}, operation_id={message['operationId']} processed as {result} and marked complete.") + await receiver.complete_message(msg) - logger_adapter.info(f"Closing session: {receiver.session.session_id}") + logger.info(f"Closing session: {receiver.session.session_id}") except OperationTimeoutError: # Timeout occurred whilst connecting to a session - this is expected and indicates no non-empty sessions are available - logger_adapter.debug("No sessions for this process. Will look again...") + logger.debug("No sessions for this process. Will look again...") except ServiceBusConnectionError: # Occasionally there will be a transient / network-level error in connecting to SB. - logger_adapter.info("Unknown Service Bus connection error. Will retry...") + logger.info("Unknown Service Bus connection error. Will retry...") except Exception: # Catch all other exceptions, log them via .exception to get the stack trace, sleep, and reconnect - logger_adapter.exception("Unknown exception. Will retry...") + logger.exception("Unknown exception. Will retry...") -async def run_porter(command, logger_adapter: logging.LoggerAdapter, config: dict): +async def run_porter(command, config: dict): """ Run a Porter command """ @@ -116,11 +120,11 @@ async def run_porter(command, logger_adapter: logging.LoggerAdapter, config: dic if stdout: result_stdout = stdout.decode() - shell_output_logger(result_stdout, '[stdout]', logger_adapter, logging.INFO) + shell_output_logger(result_stdout, '[stdout]', logging.INFO) if stderr: result_stderr = stderr.decode() - shell_output_logger(result_stderr, '[stderr]', logger_adapter, logging.WARN) + shell_output_logger(result_stderr, '[stderr]', logging.WARN) return (proc.returncode, result_stdout, result_stderr) @@ -141,30 +145,31 @@ def service_bus_message_generator(sb_message: dict, status: str, deployment_mess message_dict["outputs"] = outputs resource_request_message = json.dumps(message_dict) - logger_adapter.info(f"Deployment Status Message: {resource_request_message}") + logger.info(f"Deployment Status Message: {resource_request_message}") return resource_request_message -async def invoke_porter_action(msg_body: dict, sb_client: ServiceBusClient, message_logger_adapter: logging.LoggerAdapter, config: dict) -> bool: +async def invoke_porter_action(tracer: trace.Tracer, msg_body: dict, sb_client: ServiceBusClient, config: dict) -> bool: """ Handle resource message by invoking specified porter action (i.e. install, uninstall) """ + installation_id = get_installation_id(msg_body) action = msg_body["action"] - message_logger_adapter.info(f"{installation_id}: {action} action starting...") + logger.info(f"{installation_id}: {action} action starting...") sb_sender = sb_client.get_queue_sender(queue_name=config["deployment_status_queue"]) # post an update message to set the status to an 'in progress' one resource_request_message = service_bus_message_generator(msg_body, statuses.in_progress_status_string_for[action], "Job starting") await sb_sender.send_messages(ServiceBusMessage(body=resource_request_message, correlation_id=msg_body["id"], session_id=msg_body["operationId"])) - message_logger_adapter.info(f'Sent status message for {installation_id} - {statuses.in_progress_status_string_for[action]} - Job starting') + logger.info(f'Sent status message for {installation_id} - {statuses.in_progress_status_string_for[action]} - Job starting') # Build and run porter command (flagging if its a built-in action or custom so we can adapt porter command appropriately) is_custom_action = action not in ["install", "upgrade", "uninstall"] - porter_command = await build_porter_command(config, message_logger_adapter, msg_body, is_custom_action) - message_logger_adapter.debug("Starting to run porter execution command...") - returncode, _, err = await run_porter(porter_command, message_logger_adapter, config) - message_logger_adapter.debug("Finished running porter execution command.") + porter_command = await build_porter_command(config, logger, msg_body, is_custom_action) + logger.debug("Starting to run porter execution command...") + returncode, _, err = await run_porter(porter_command, config) + logger.debug("Finished running porter execution command.") action_completed_without_error = True # Handle command output @@ -173,7 +178,7 @@ async def invoke_porter_action(msg_body: dict, sb_client: ServiceBusClient, mess action_completed_without_error = False if "uninstall" == action and "could not find installation" in err: - message_logger_adapter.warning("The installation doesn't exist. Treating as a successful action to allow the flow to proceed.") + logger.warning("The installation doesn't exist. Treating as a successful action to allow the flow to proceed.") action_completed_without_error = True error_message = f"A success despite of underlying error. {error_message}" @@ -185,11 +190,11 @@ async def invoke_porter_action(msg_body: dict, sb_client: ServiceBusClient, mess resource_request_message = service_bus_message_generator(msg_body, status_for_sb_message, error_message) # Post message on sb queue to notify receivers of action failure - message_logger_adapter.info(f"{installation_id}: Porter action failed with error = {error_message}") + logger.info(f"{installation_id}: Porter action failed with error = {error_message}") else: # Get the outputs - get_porter_outputs_successful, outputs = await get_porter_outputs(msg_body, message_logger_adapter, config) + get_porter_outputs_successful, outputs = await get_porter_outputs(msg_body, config) if get_porter_outputs_successful: status_for_sb_message = statuses.pass_status_string_for[action] @@ -202,24 +207,24 @@ async def invoke_porter_action(msg_body: dict, sb_client: ServiceBusClient, mess resource_request_message = service_bus_message_generator(msg_body, status_for_sb_message, status_message, outputs) await sb_sender.send_messages(ServiceBusMessage(body=resource_request_message, correlation_id=msg_body["id"], session_id=msg_body["operationId"])) - message_logger_adapter.info(f"Sent status message for {installation_id}: {status_for_sb_message}") + logger.info(f"Sent status message for {installation_id}: {status_for_sb_message}") # return true as want to continue processing the message return action_completed_without_error -async def get_porter_outputs(msg_body: dict, message_logger_adapter: logging.LoggerAdapter, config: dict): +async def get_porter_outputs(msg_body: dict, config: dict): """ Get outputs JSON from a Porter command """ porter_command = await build_porter_command_for_outputs(msg_body) - message_logger_adapter.debug("Starting to run porter output command...") - returncode, stdout, err = await run_porter(porter_command, message_logger_adapter, config) - message_logger_adapter.debug("Finished running porter output command.") + logger.debug("Starting to run porter output command...") + returncode, stdout, err = await run_porter(porter_command, config) + logger.debug("Finished running porter output command.") if returncode != 0: error_message = "Error context message = " + " ".join(err.split('\n')) - message_logger_adapter.info(f"{get_installation_id(msg_body)}: Failed to get outputs with error = {error_message}") + logger.info(f"{get_installation_id(msg_body)}: Failed to get outputs with error = {error_message}") return False, {} else: outputs_json = {} @@ -231,55 +236,60 @@ async def get_porter_outputs(msg_body: dict, message_logger_adapter: logging.Log if "{" in outputs_json[i]['value'] or "[" in outputs_json[i]['value']: outputs_json[i]['value'] = json.loads(outputs_json[i]['value'].replace("\\", "")) - message_logger_adapter.info(f"Got outputs as json: {outputs_json}") + logger.info(f"Got outputs as json: {outputs_json}") except ValueError: - message_logger_adapter.error(f"Got outputs invalid json: {stdout}") + logger.error(f"Got outputs invalid json: {stdout}") return True, outputs_json -async def runner(logger_adapter: logging.LoggerAdapter, config: dict): - async with default_credentials(config["vmss_msi_id"]) as credential: - service_bus_client = ServiceBusClient(config["service_bus_namespace"], credential) - await receive_message(service_bus_client, logger_adapter, config) - +async def runner(process_number: int, config: dict): + tracer = trace.get_tracer(f"{socket.gethostname()}_{process_number}") + with tracer.start_as_current_span(process_number): + async with default_credentials(config["vmss_msi_id"]) as credential: + service_bus_client = ServiceBusClient(config["service_bus_namespace"], credential) + await receive_message(tracer, service_bus_client, config) -def start_runner_process(config: dict): - # Set up logger adapter copy for this process - logger_adapter = set_up_logger(enable_console_logging=False) - asyncio.run(runner(logger_adapter, config)) - -async def check_runners(processes: list, httpserver: Process, logger_adapter: logging.LoggerAdapter): - logger_adapter.info("Starting runners check...") +async def check_runners(processes: list, httpserver: Process): + logger.info("Starting runners check...") while True: await asyncio.sleep(30) - if all(not process.is_alive() for process in processes): - logger_adapter.error("All runner processes have failed!") + logger.error("All runner processes have failed!") httpserver.kill() +def apply_porter_credential_sets(config: dict): + porter_credential_command = apply_porter_credentials_sets_command(config) + run_porter(porter_credential_command, config) + + if __name__ == "__main__": - logger_adapter: logging.LoggerAdapter = set_up_logger(enable_console_logging=True) - config = set_up_config(logger_adapter) + configure_loggers() + logger = initialize_logging(logging.INFO, True) + tracer = trace.get_tracer(f"{socket.gethostname()}_main") + config = set_up_config(tracer) httpserver = Process(target=start_server) httpserver.start() - logger_adapter.info("Started http server") + logger.info("Started http server") + + apply_porter_credential_sets(config) + logger.info("Applied porter credential sets") processes = [] num = config["number_processes_int"] - logger_adapter.info(f"Starting {num} processes...") + logger.info(f"Starting {num} processes...") for i in range(num): - logger_adapter.info(f"Starting process {str(i)}") - process = Process(target=lambda: start_runner_process(config)) + logger.info(f"Starting process {str(i)}") + process = Process(target=lambda: asyncio.run(runner(i, config))) processes.append(process) process.start() - logger_adapter.info("All proceesses have been started. Version is: %s", VERSION) + logger.info("All processes have been started. Version is: %s", VERSION) - asyncio.run(check_runners(processes, httpserver, logger_adapter)) + asyncio.run(check_runners(processes, httpserver)) - logger_adapter.warn("Exiting main...") + logger.warn("Exiting main...")