Skip to content

Commit

Permalink
feat: add response callback & message id
Browse files Browse the repository at this point in the history
  • Loading branch information
HuberTRoy committed Jan 2, 2025
1 parent c5fafa1 commit dc07bac
Show file tree
Hide file tree
Showing 12 changed files with 256 additions and 128 deletions.
65 changes: 65 additions & 0 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export async function runApp(config: {
config.projectPath,
config.ipfs,
config.cacheDir,
config.forceReload,
config.forceReload
);

const sandbox = await getDefaultSandbox(loader, config.toolTimeout);
Expand All @@ -39,7 +39,7 @@ export async function runApp(config: {
config.host,
sandbox,
loader,
config.openAiApiKey,
config.openAiApiKey
);

const runnerHost = new RunnerHost(() => {
Expand Down
62 changes: 38 additions & 24 deletions src/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const Message = Type.Object({
Type.Literal("assistant"),
Type.Literal("tool"),
]),
id: Type.Optional(Type.String()),
conversation_id: Type.Optional(Type.String()),
});

const CompletionChoice = Type.Object({
Expand Down Expand Up @@ -56,6 +58,7 @@ const ChatResponse = Type.Object({

const ChatChunkResponse = Type.Object({
id: Type.String(),
conversation_id: Type.Optional(Type.String()),
model: Type.String(),
choices: Type.Array(CompletionChunkChoice),
created: Type.Number({ description: "Unix timestamp in seconds" }),
Expand All @@ -73,13 +76,13 @@ export function http(
runnerHost: RunnerHost,
port: number,
streamKeepAlive: number = 0,
onReady?: Promise<unknown>,
onReady?: Promise<unknown>
): Deno.HttpServer<Deno.NetAddr> {
const app = new Hono();

// The ready status should change once the project is fully loaded, including the vector DB
let ready = false;
onReady?.then(() => ready = true);
onReady?.then(() => (ready = true));

app.use("*", cors());

Expand Down Expand Up @@ -120,11 +123,13 @@ export function http(
if (req.stream) {
return streamSSE(c, async (stream) => {
// Send empty data to keep the connection alive in browsers, they have a default timeout of 1m
const interval = streamKeepAlive && setInterval(async () => {
const empty = createChatChunkResponse("", "", new Date());
await stream.writeSSE({ data: JSON.stringify(empty) });
await stream.sleep(20);
}, streamKeepAlive);
const interval =
streamKeepAlive &&
setInterval(async () => {
const empty = createChatChunkResponse("", "", new Date());
await stream.writeSSE({ data: JSON.stringify(empty) });
await stream.sleep(20);
}, streamKeepAlive);

const chatRes = await runner.promptMessages(req.messages);

Expand All @@ -140,6 +145,8 @@ export function http(
chatRes.model,
chatRes.created_at,
last ? "stop" : null,
chatRes.message.id,
chatRes.message.conversation_id
);
await stream.writeSSE({ data: JSON.stringify(res) });
await stream.sleep(20);
Expand All @@ -149,7 +156,7 @@ export function http(
const res_space = createChatChunkResponse(
" ",
chatRes.model,
chatRes.created_at,
chatRes.created_at
);
await stream.writeSSE({ data: JSON.stringify(res_space) });
await stream.sleep(20);
Expand All @@ -161,17 +168,19 @@ export function http(
const chatRes = await runner.promptMessages(req.messages);

const response: ChatResponse = {
id: "0",
id: crypto.randomUUID(),
model: chatRes.model,
choices: [{
index: 0,
message: {
content: chatRes.message.content,
role: "assistant",
choices: [
{
index: 0,
message: {
content: chatRes.message.content,
role: "assistant",
},
logprobs: null,
finish_reason: chatRes.done_reason,
},
logprobs: null,
finish_reason: chatRes.done_reason,
}],
],
created: new Date(chatRes.created_at).getTime() / 1000,
object: "chat.completion",
usage: {
Expand Down Expand Up @@ -204,18 +213,23 @@ function createChatChunkResponse(
model: string,
createdAt: Date,
finish_reason: string | null = null,
id?: string,
conversation_id?: string
): ChatChunkResponse {
const res: ChatChunkResponse = {
id: "0",
id: id ?? "0",
conversation_id: conversation_id ?? "0",
object: "chat.completion.chunk",
model,
created: new Date(createdAt).getTime() / 1000,
choices: [{
index: 0,
delta: { role: "assistant", content: message },
logprobs: null,
finish_reason,
}],
choices: [
{
index: 0,
delta: { role: "assistant", content: message },
logprobs: null,
finish_reason,
},
],
};
Value.Assert(ChatChunkResponse, res);
return res;
Expand Down
37 changes: 18 additions & 19 deletions src/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import { getLogger } from "./logger.ts";
const logger = await getLogger("loader");

export const getOSTempDir = () =>
Deno.env.get("TMPDIR") || Deno.env.get("TMP") || Deno.env.get("TEMP") ||
Deno.env.get("TMPDIR") ||
Deno.env.get("TMP") ||
Deno.env.get("TEMP") ||
"/tmp";

async function loadJson(path: string): Promise<unknown> {
Expand Down Expand Up @@ -58,13 +60,12 @@ async function loadManfiest(path: string): Promise<ProjectManifest> {
*/
async function extractArchive(
readable: ReadableStream<Uint8Array>,
dest: string,
dest: string
): Promise<string | undefined> {
let first: string | undefined;
for await (
const entry of readable.pipeThrough(new DecompressionStream("gzip"))
.pipeThrough(new UntarStream())
) {
for await (const entry of readable
.pipeThrough(new DecompressionStream("gzip"))
.pipeThrough(new UntarStream())) {
const path = resolve(dest, entry.path);
if (!first) {
first = path;
Expand Down Expand Up @@ -92,7 +93,7 @@ export async function pullContent(
fileName: string,
tmpDir?: string,
force?: boolean,
workingPath?: string,
workingPath?: string
): Promise<[string, Source]> {
if (CIDReg.test(path)) {
const cid = path.replace("ipfs://", "");
Expand Down Expand Up @@ -155,7 +156,7 @@ export async function pullContent(
}

const tmp = resolve(
fromFileUrlSafe(tmpDir ?? await Deno.makeTempDir()),
fromFileUrlSafe(tmpDir ?? (await Deno.makeTempDir()))
);
try {
const p = await extractArchive(res.body, tmp);
Expand All @@ -173,7 +174,7 @@ export async function pullContent(

const localPath = resolve(fromFileUrlSafe(workingPath ?? ""), path);
if (localPath.endsWith(".gz")) {
const tmp = resolve(fromFileUrlSafe(tmpDir ?? await Deno.makeTempDir()));
const tmp = resolve(fromFileUrlSafe(tmpDir ?? (await Deno.makeTempDir())));

const archive = await Deno.open(localPath);

Expand All @@ -187,10 +188,7 @@ export async function pullContent(
}

// File urls are used to avoid imports being from the same package registry as the framework is run from
return [
toFileUrlString(localPath),
"local",
];
return [toFileUrlString(localPath), "local"];
}

export class Loader {
Expand All @@ -201,7 +199,7 @@ export class Loader {
readonly projectPath: string,
ipfs: IPFSClient,
readonly tmpDir?: string,
force?: boolean,
force?: boolean
) {
this.#ipfs = ipfs;
this.#force = force ?? false;
Expand All @@ -211,15 +209,15 @@ export class Loader {
path: string,
fileName: string,
tmpDir = this.tmpDir,
workingPath?: string,
workingPath?: string
): Promise<[string, Source]> {
return await pullContent(
path,
this.#ipfs,
fileName,
tmpDir,
this.#force,
workingPath,
workingPath
);
}

Expand All @@ -234,7 +232,7 @@ export class Loader {
this.projectPath,
"manifest.json",
undefined,
Deno.cwd(),
Deno.cwd()
);

logger.debug(`getManifest [${source}] ${manifestPath}`);
Expand All @@ -254,9 +252,10 @@ export class Loader {
manifest.entry,
"project.ts",
dirname(manifestPath),
manifestSource == "local" ? dirname(this.projectPath) : undefined,
manifestSource == "local" ? dirname(this.projectPath) : undefined
);
logger.debug(`getProject [${source}] ${projectPath}`);

return [projectPath, source];
}

Expand All @@ -275,7 +274,7 @@ export class Loader {
manifest.vectorStorage.path,
"db.gz",
dirname(manifestPath),
manifestSource == "local" ? dirname(this.projectPath) : undefined,
manifestSource == "local" ? dirname(this.projectPath) : undefined
);
logger.debug(`getVectorDb [${res[1]}] ${res[0]}`);
return res;
Expand Down
Loading

0 comments on commit dc07bac

Please sign in to comment.