Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove default object lifecycle for CDN and use header when possible #369

Merged
merged 5 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions projects/fal/src/fal/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from fal.api import RouteSignature
from fal.exceptions import FalServerlessException, RequestCancelledException
from fal.logging import get_logger
from fal.toolkit.file import get_lifecycle_preference
from fal.toolkit.file.providers.fal import GLOBAL_LIFECYCLE_PREFERENCE
from fal.toolkit.file import request_lifecycle_repference
from fal.toolkit.file.providers.fal import LIFECYCLE_PREFERENCE

REALTIME_APP_REQUIREMENTS = ["websockets", "msgpack"]
REQUEST_ID_KEY = "x-fal-request-id"
Expand Down Expand Up @@ -342,13 +342,11 @@ async def provide_hints_headers(request, call_next):
@app.middleware("http")
async def set_global_object_preference(request, call_next):
try:
preference_dict = get_lifecycle_preference(request) or {}
expiration_duration = preference_dict.get("expiration_duration_seconds")
if expiration_duration is not None:
GLOBAL_LIFECYCLE_PREFERENCE.expiration_duration_seconds = int(
expiration_duration
)

preference_dict = request_lifecycle_repference(request)
if preference_dict is not None:
# This will not work properly for apps with multiplexing enabled
# we may mix up the preferences between requests
LIFECYCLE_PREFERENCE.set(preference_dict)
except Exception:
from fastapi.logger import logger

Expand All @@ -357,7 +355,12 @@ async def set_global_object_preference(request, call_next):
self.__class__.__name__,
)

return await call_next(request)
try:
return await call_next(request)
finally:
# We may miss the global preference if there are operations
# being done in the background that go beyond the request
LIFECYCLE_PREFERENCE.set(None)

@app.middleware("http")
async def set_request_id(request, call_next):
Expand Down
24 changes: 13 additions & 11 deletions projects/fal/src/fal/toolkit/file/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pydantic import BaseModel, Field

from fal.toolkit.file.providers.fal import (
LIFECYCLE_PREFERENCE,
FalCDNFileRepository,
FalFileRepository,
FalFileRepositoryV2,
Expand Down Expand Up @@ -149,7 +150,9 @@ def from_bytes(

fdata = FileData(data, content_type, file_name)

object_lifecycle_preference = get_lifecycle_preference(request)
object_lifecycle_preference = (
request_lifecycle_repference(request) or LIFECYCLE_PREFERENCE.get()
)

try:
url = repo.save(fdata, object_lifecycle_preference, **save_kwargs)
Expand Down Expand Up @@ -203,7 +206,9 @@ def from_path(
fallback_save_kwargs = fallback_save_kwargs or {}

content_type = content_type or "application/octet-stream"
object_lifecycle_preference = get_lifecycle_preference(request)
object_lifecycle_preference = (
request_lifecycle_repference(request) or LIFECYCLE_PREFERENCE.get()
)

try:
url, data = repo.save_file(
Expand Down Expand Up @@ -288,21 +293,18 @@ def __del__(self):
shutil.rmtree(self.extract_dir)


def get_lifecycle_preference(request: Request) -> dict[str, str] | None:
def request_lifecycle_repference(request: Optional[Request]) -> dict[str, str] | None:
import json

preference_str = (
request.headers.get(OBJECT_LIFECYCLE_PREFERENCE_KEY)
if request is not None
else None
)
if request is None:
return None

preference_str = request.headers.get(OBJECT_LIFECYCLE_PREFERENCE_KEY)
if preference_str is None:
return None

object_lifecycle_preference = {}
try:
object_lifecycle_preference = json.loads(preference_str)
return object_lifecycle_preference
return json.loads(preference_str)
except Exception as e:
print(f"Failed to parse object lifecycle preference: {e}")
return None
58 changes: 38 additions & 20 deletions projects/fal/src/fal/toolkit/file/providers/fal.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import dataclasses
import json
import math
import os
Expand All @@ -9,6 +8,7 @@
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Generic, TypeVar
from urllib.error import HTTPError
from urllib.parse import urlparse, urlunparse
from urllib.request import Request, urlopen
Expand Down Expand Up @@ -105,14 +105,21 @@ class FalV3TokenManager(FalV2TokenManager):
fal_v3_token_manager = FalV3TokenManager()


@dataclass
class ObjectLifecyclePreference:
expiration_duration_seconds: int
VariableType = TypeVar("VariableType")


class VariableReference(Generic[VariableType]):
def __init__(self, value: VariableType) -> None:
self.set(value)

def get(self) -> VariableType:
return self.value

def set(self, value: VariableType) -> None:
self.value = value

GLOBAL_LIFECYCLE_PREFERENCE = ObjectLifecyclePreference(
expiration_duration_seconds=86400
)

LIFECYCLE_PREFERENCE: VariableReference[dict[str, str] | None] = VariableReference(None)


@dataclass
Expand Down Expand Up @@ -528,6 +535,16 @@ def save(

@dataclass
class FalCDNFileRepository(FileRepository):
def _object_lifecycle_headers(
self,
headers: dict[str, str],
object_lifecycle_preference: dict[str, str] | None,
):
if object_lifecycle_preference:
headers["X-Fal-Object-Lifecycle-Preference"] = json.dumps(
object_lifecycle_preference
)

@retry(max_retries=3, base_delay=1, backoff_type="exponential", jitter=True)
def save(
self,
Expand All @@ -539,10 +556,10 @@ def save(
"Accept": "application/json",
"Content-Type": file.content_type,
"X-Fal-File-Name": file.file_name,
"X-Fal-Object-Lifecycle-Preference": json.dumps(
dataclasses.asdict(GLOBAL_LIFECYCLE_PREFERENCE)
),
}

self._object_lifecycle_headers(headers, object_lifecycle_preference)

url = os.getenv("FAL_CDN_HOST", _FAL_CDN) + "/files/upload"
request = Request(url, headers=headers, method="POST", data=file.data)
try:
Expand Down Expand Up @@ -578,26 +595,27 @@ class InternalFalFileRepositoryV3(FileRepository):
That way it can avoid the need to refresh the token for every upload.
"""

def _object_lifecycle_headers(
self,
headers: dict[str, str],
object_lifecycle_preference: dict[str, str] | None,
):
if object_lifecycle_preference:
headers["X-Fal-Object-Lifecycle"] = json.dumps(object_lifecycle_preference)

@retry(max_retries=3, base_delay=1, backoff_type="exponential", jitter=True)
def save(
self, file: FileData, object_lifecycle_preference: dict[str, str] | None
) -> str:
lifecycle = dataclasses.asdict(GLOBAL_LIFECYCLE_PREFERENCE)
if object_lifecycle_preference is not None:
lifecycle = {
key: object_lifecycle_preference[key]
if key in object_lifecycle_preference
else value
for key, value in lifecycle.items()
}

headers = {
**self.auth_headers,
"Accept": "application/json",
"Content-Type": file.content_type,
"X-Fal-File-Name": file.file_name,
"X-Fal-Object-Lifecycle-Preference": json.dumps(lifecycle),
}

self._object_lifecycle_headers(headers, object_lifecycle_preference)

url = os.getenv("FAL_CDN_V3_HOST", _FAL_CDN_V3) + "/files/upload"
request = Request(url, headers=headers, method="POST", data=file.data)
try:
Expand Down
Loading