Skip to content

Commit

Permalink
WIP - working intial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
marrobi committed Oct 24, 2023
1 parent 90b6fae commit 65c7204
Show file tree
Hide file tree
Showing 22 changed files with 297 additions and 362 deletions.
8 changes: 6 additions & 2 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@
"console": "integratedTerminal",
"preLaunchTask": "Copy_env_file_for_api_debug",
"cwd": "${workspaceFolder}/api_app",
"envFile": "${workspaceFolder}/api_app/.env"
"envFile": "${workspaceFolder}/api_app/.env",
"env": {
"OTEL_RESOURCE_ATTRIBUTES": "service.name=api,service.instance.id=local_debug,service.version=dev"
}
},
{
"name": "E2E Extended",
Expand Down Expand Up @@ -190,7 +193,8 @@
"cwd": "${workspaceFolder}/resource_processor",
"envFile": "${workspaceFolder}/core/private.env",
"env": {
"PYTHONPATH": "."
"PYTHONPATH": ".",
"OTEL_RESOURCE_ATTRIBUTES": "service.name=resource_processor,service.instance.id=local_debug,service.version=dev"
}
},
{
Expand Down
2 changes: 1 addition & 1 deletion api_app/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.15.17"
__version__ = "0.15.19"
2 changes: 1 addition & 1 deletion api_app/db/repositories/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def _get_container(cls, container_name, partition_key_obj) -> ContainerPro
raise UnableToAccessDatabase

async def query(self, query: str, parameters: Optional[dict] = None):
items = self.container.query_items(query=query, parameters=parameters, enable_cross_partition_query=True)
items = self.container.query_items(query=query, parameters=parameters)
return [i async for i in items]

async def read_item_by_id(self, item_id: str) -> dict:
Expand Down
29 changes: 11 additions & 18 deletions api_app/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
from opencensus.ext.azure.trace_exporter import AzureExporter
import uvicorn

from fastapi import FastAPI
Expand All @@ -9,9 +8,6 @@
from fastapi_utils.tasks import repeat_every
from service_bus.airlock_request_status_update import receive_step_result_message_and_update_status

from services.tracing import RequestTracerMiddleware
from opencensus.trace.samplers import ProbabilitySampler

from starlette.exceptions import HTTPException
from starlette.middleware.errors import ServerErrorMiddleware

Expand All @@ -21,10 +17,13 @@
from api.errors.generic_error import generic_error_handler
from core import config
from core.events import create_start_app_handler, create_stop_app_handler
from services.logging import initialize_logging, telemetry_processor_callback_function
from services.logging import initialize_logging
from service_bus.deployment_status_updater import DeploymentStatusUpdater


logger = logging.getLogger()


def get_application() -> FastAPI:
application = FastAPI(
title=config.PROJECT_NAME,
Expand All @@ -36,17 +35,16 @@ def get_application() -> FastAPI:
openapi_url=None
)

if config.DEBUG:
initialize_logging(logging.DEBUG, True, application)
else:
initialize_logging(logging.INFO, False, application)

application.add_event_handler("startup", create_start_app_handler(application))
application.add_event_handler("shutdown", create_stop_app_handler(application))

try:
exporter = AzureExporter(sampler=ProbabilitySampler(1.0))
exporter.add_telemetry_processor(telemetry_processor_callback_function)
application.add_middleware(RequestTracerMiddleware, exporter=exporter)
except Exception:
logging.exception("Failed to add RequestTracerMiddleware")

application.add_middleware(ServerErrorMiddleware, handler=generic_error_handler)

# Allow local UI debugging with local API
if config.ENABLE_LOCAL_DEBUGGING:
application.add_middleware(
Expand All @@ -63,17 +61,12 @@ def get_application() -> FastAPI:
return application


if config.DEBUG:
initialize_logging(logging.DEBUG)
else:
initialize_logging(logging.INFO)

app = get_application()


@app.on_event("startup")
async def watch_deployment_status() -> None:
logging.info("Starting deployment status watcher thread")
logger.info("Starting deployment status watcher thread")
statusWatcher = DeploymentStatusUpdater(app)
await statusWatcher.init_repos()
current_event_loop = asyncio.get_event_loop()
Expand Down
18 changes: 9 additions & 9 deletions api_app/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
# API
azure-core==1.26.1
azure-core==1.29.4
aiohttp==3.8.5
azure-cosmos==4.3.0
azure-identity==1.12.0
azure-mgmt-cosmosdb==9.0.0
azure-cosmos==4.5.1
azure-identity==1.14.1
azure-monitor-opentelemetry==1.0.0
azure-mgmt-cosmosdb==9.3.0
azure-mgmt-compute==29.1.0
azure-mgmt-costmanagement==3.0.0
azure-storage-blob==12.15.0
azure-servicebus==7.8.1
azure-storage-blob==12.18.3
azure-servicebus==7.11.3
azure-eventgrid==4.9.1
fastapi[all]==0.95.0
fastapi-utils==0.2.1
gunicorn==20.1.0
jsonschema[format_nongpl]==4.17.1
msal~=1.20.0
opencensus-ext-azure==1.1.7
opencensus-ext-logging==0.1.1
PyJWT==2.6.0
PyJWT==2.8.0
uvicorn[standard]==0.20.0
semantic-version==2.10.0
pytz~=2022.7
python-dateutil~=2.8.2
azure-mgmt-resource==22.0.0
pandas==1.5.2
opentelemetry.instrumentation.logging==0.41b0
15 changes: 12 additions & 3 deletions api_app/services/aad_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from typing import List, Optional
import jwt
import requests
import rsa

from fastapi import Request, HTTPException, status
from msal import ConfidentialClientApplication
Expand All @@ -19,6 +18,11 @@
from api.dependencies.database import get_db_client_from_request
from db.repositories.workspaces import WorkspaceRepository

from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization


MICROSOFT_GRAPH_URL = config.MICROSOFT_GRAPH_URL.strip("/")


Expand Down Expand Up @@ -179,9 +183,14 @@ def _get_token_key(self, key_id: str) -> str:
for key in keys['keys']:
n = int.from_bytes(base64.urlsafe_b64decode(self._ensure_b64padding(key['n'])), "big")
e = int.from_bytes(base64.urlsafe_b64decode(self._ensure_b64padding(key['e'])), "big")
pub_key = rsa.PublicKey(n, e)
pub_key = rsa.RSAPublicNumbers(e, n).public_key(default_backend())
pub_key_pkcs1 = pub_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.PKCS1
)

# Cache the PEM formatted public key.
AzureADAuthorization._jwt_keys[key['kid']] = pub_key.save_pkcs1()
AzureADAuthorization._jwt_keys[key['kid']] = pub_key_pkcs1

return AzureADAuthorization._jwt_keys[key_id]

Expand Down
156 changes: 96 additions & 60 deletions api_app/services/logging.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import logging
from typing import Optional
import os
import re
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor

from opencensus.ext.azure.log_exporter import AzureLogHandler
from opencensus.trace import config_integration
from opencensus.trace.samplers import AlwaysOnSampler
from opencensus.trace.tracer import Tracer

from core.config import VERSION
from fastapi import FastAPI

UNWANTED_LOGGERS = [
"azure.core.pipeline.policies.http_logging_policy",
Expand All @@ -33,74 +33,110 @@
"uamqp.async_ops.session_async",
"uamqp.sender",
"uamqp.client",
"azure.servicebus.aio._base_handler_async"
"azure.servicebus.aio._base_handler_async",
"azure.servicebus._pyamqp.aio._connection_async",
"azure.servicebus._pyamqp.aio._link_async",
"opentelemetry.attributes"
]


def disable_unwanted_loggers():
"""
Disables the unwanted loggers.
"""
debug = os.environ.get("DEBUG", "False").lower() in ("true", "1")


def configure_loggers():

for logger_name in UNWANTED_LOGGERS:
logging.getLogger(logger_name).disabled = True

for logger_name in LOGGERS_FOR_ERRORS_ONLY:
logging.getLogger(logger_name).setLevel(logging.ERROR)


def telemetry_processor_callback_function(envelope):
envelope.tags['ai.cloud.role'] = 'api'
envelope.tags['ai.application.ver'] = VERSION


class ExceptionTracebackFilter(logging.Filter):
"""
If a record contains 'exc_info', it will only show in the 'exceptions' section of Application Insights without showing
in the 'traces' section. In order to show it also in the 'traces' section, we need another log that does not contain 'exc_info'.
"""
def filter(self, record):
if record.exc_info:
logger = logging.getLogger(record.name)
_, exception_value, _ = record.exc_info
message = f"{record.getMessage()}\nException message: '{exception_value}'"
logger.log(record.levelno, message)

return True

def initialize_logging(logging_level: int, add_console_handler: bool, application: FastAPI) -> logging.Logger:

def initialize_logging(logging_level: int, correlation_id: Optional[str] = None) -> logging.LoggerAdapter:
"""
Adds the Application Insights handler for the root logger and sets the given logging level.
Creates and returns a logger adapter that integrates the correlation ID, if given, to the log messages.
:param logging_level: The logging level to set e.g., logging.WARNING.
:param correlation_id: Optional. The correlation ID that is passed on to the operation_Id in App Insights.
:returns: A newly created logger adapter.
"""
logger = logging.getLogger()
logger.setLevel(logging_level)

disable_unwanted_loggers()
if add_console_handler:
console_formatter = logging.Formatter(
fmt="%(module)-7s %(name)-7s %(process)-7s %(asctime)s %(otelServiceName)-7s %(otelTraceID)-7s %(otelSpanID)-7s %(levelname)-7s %(message)s"
)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging_level)
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)

try:
# picks up APPLICATIONINSIGHTS_CONNECTION_STRING automatically
azurelog_handler = AzureLogHandler()
azurelog_handler.add_telemetry_processor(telemetry_processor_callback_function)
azurelog_handler.addFilter(ExceptionTracebackFilter())
logger.addHandler(azurelog_handler)
configure_azure_monitor()
except ValueError as e:
logger.error(f"Failed to set Application Insights logger handler: {e}")

config_integration.trace_integrations(['logging'])
logging.basicConfig(level=logging_level, format='%(asctime)s traceId=%(traceId)s spanId=%(spanId)s %(message)s')
Tracer(sampler=AlwaysOnSampler())
logger.setLevel(logging_level)

extra = {}

if correlation_id:
extra = {'traceId': correlation_id}

adapter = logging.LoggerAdapter(logger, extra)
adapter.debug(f"Logger adapter initialized with extra: {extra}")

return adapter
LoggingInstrumentor().instrument(
set_logging_format=True,
level=logging_level
)

FastAPIInstrumentor.instrument_app(application)
RequestsInstrumentor().instrument()

return logger


def shell_output_logger(
console_output: str,
prefix_item: str,
logger: logging.LoggerAdapter,
logging_level: int,
):

if not console_output:
logging.debug("shell console output is empty.")
return

console_output = console_output.strip()

if (
logging_level != logging.INFO
and len(console_output) < 200
and console_output.startswith("Unable to find image '")
and console_output.endswith("' locally")
):
logging.debug("Image not present locally, setting log to INFO.")
logging_level = logging.INFO

logger.log(logging_level, f"{prefix_item} {console_output}")


class AzureLogFormatter(logging.Formatter):
# 7-bit C1 ANSI sequences
ansi_escape = re.compile(
r"""

Check warning

Code scanning / CodeQL

Overly permissive regular expression range Medium

Suspicious character range that is equivalent to [@A-Z].

Check warning

Code scanning / CodeQL

Overly permissive regular expression range Medium

Suspicious character range that is equivalent to \[0-9:;<=>?\].
\x1B # ESC
(?: # 7-bit C1 Fe (except CSI)
[@-Z\\-_]
| # or [ for CSI, followed by a control sequence
\[
[0-?]* # Parameter bytes
[ -/]* # Intermediate bytes
[@-~] # Final byte
)
""",
re.VERBOSE,
)

MAX_MESSAGE_LENGTH = 32000
TRUNCATION_TEXT = "MESSAGE TOO LONG, TAILING..."

def format(self, record):
s = super().format(record)
s = AzureLogFormatter.ansi_escape.sub("", s)

# not doing this here might produce errors if we try to log empty strings.
if s == "":
s = "EMPTY MESSAGE!"

# azure monitor is limiting the message size.
if len(s) > AzureLogFormatter.MAX_MESSAGE_LENGTH:
s = f"{AzureLogFormatter.TRUNCATION_TEXT}\n{s[-1 * AzureLogFormatter.MAX_MESSAGE_LENGTH:]}"

return s
Loading

0 comments on commit 65c7204

Please sign in to comment.