Skip to content

Commit

Permalink
feat(fal_client): introduce priority for subscribe and submit (#343)
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop authored Oct 23, 2024
1 parent 05beae0 commit 033d19f
Showing 1 changed file with 26 additions and 3 deletions.
29 changes: 26 additions & 3 deletions projects/fal_client/src/fal_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import base64
from dataclasses import dataclass, field
from functools import cached_property
from typing import Any, AsyncIterator, Iterator, TYPE_CHECKING, Optional
from typing import Any, AsyncIterator, Iterator, TYPE_CHECKING, Optional, Literal

import httpx
from httpx_sse import aconnect_sse, connect_sse
Expand All @@ -18,6 +18,7 @@
from PIL import Image

AnyJSON = dict[str, Any]
Priority = Literal["normal", "low"]

RUN_URL_FORMAT = f"https://{FAL_RUN_HOST}/"
QUEUE_URL_FORMAT = f"https://queue.{FAL_RUN_HOST}/"
Expand Down Expand Up @@ -317,6 +318,7 @@ async def submit(
path: str = "",
hint: str | None = None,
webhook_url: str | None = None,
priority: Optional[Priority] = None,
) -> AsyncRequestHandle:
"""Submit an application with the given arguments (which will be JSON serialized). The path parameter can be used to
specify a subpath when applicable. This method will return a handle to the request that can be used to check the status
Expand All @@ -333,6 +335,9 @@ async def submit(
if hint is not None:
headers["X-Fal-Runner-Hint"] = hint

if priority is not None:
headers["X-Fal-Queue-Priority"] = priority

response = await self._client.post(
url,
json=arguments,
Expand All @@ -359,8 +364,15 @@ async def subscribe(
with_logs: bool = False,
on_enqueue: Optional[callable[[Queued], None]] = None,
on_queue_update: Optional[callable[[Status], None]] = None,
priority: Optional[Priority] = None,
) -> AnyJSON:
handle = await self.submit(application, arguments, path=path, hint=hint)
handle = await self.submit(
application,
arguments,
path=path,
hint=hint,
priority=priority,
)

if on_enqueue is not None:
on_enqueue(handle.request_id)
Expand Down Expand Up @@ -501,6 +513,7 @@ def submit(
path: str = "",
hint: str | None = None,
webhook_url: str | None = None,
priority: Optional[Priority] = None,
) -> SyncRequestHandle:
"""Submit an application with the given arguments (which will be JSON serialized). The path parameter can be used to
specify a subpath when applicable. This method will return a handle to the request that can be used to check the status
Expand All @@ -517,6 +530,9 @@ def submit(
if hint is not None:
headers["X-Fal-Runner-Hint"] = hint

if priority is not None:
headers["X-Fal-Queue-Priority"] = priority

response = self._client.post(
url,
json=arguments,
Expand Down Expand Up @@ -544,8 +560,15 @@ def subscribe(
with_logs: bool = False,
on_enqueue: Optional[callable[[Queued], None]] = None,
on_queue_update: Optional[callable[[Status], None]] = None,
priority: Optional[Priority] = None,
) -> AnyJSON:
handle = self.submit(application, arguments, path=path, hint=hint)
handle = self.submit(
application,
arguments,
path=path,
hint=hint,
priority=priority,
)

if on_enqueue is not None:
on_enqueue(handle.request_id)
Expand Down

0 comments on commit 033d19f

Please sign in to comment.