From 5ae497bcd65ca0ffa2b792d9dc88c294425c160e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez?= Date: Sun, 7 Jul 2024 09:58:25 +0200 Subject: [PATCH 1/3] feat: split stream in lines --- .../ollama/src/ollama-chat-language-model.ts | 3 +- packages/ollama/src/utils/index.ts | 2 + .../{utils.ts => utils/remove-undefined.ts} | 0 packages/ollama/src/utils/response-handler.ts | 41 +++++++++++++++++++ packages/ollama/src/utils/text-line-stream.ts | 27 ++++++++++++ 5 files changed, 71 insertions(+), 2 deletions(-) create mode 100644 packages/ollama/src/utils/index.ts rename packages/ollama/src/{utils.ts => utils/remove-undefined.ts} (100%) create mode 100644 packages/ollama/src/utils/response-handler.ts create mode 100644 packages/ollama/src/utils/text-line-stream.ts diff --git a/packages/ollama/src/ollama-chat-language-model.ts b/packages/ollama/src/ollama-chat-language-model.ts index e1e5af9..8695cd0 100644 --- a/packages/ollama/src/ollama-chat-language-model.ts +++ b/packages/ollama/src/ollama-chat-language-model.ts @@ -8,7 +8,6 @@ import { } from '@ai-sdk/provider' import { createJsonResponseHandler, - createJsonStreamResponseHandler, generateId, ParseResult, postJsonToApi, @@ -21,7 +20,7 @@ import { InferToolCallsFromStream } from '@/generate-tool/infer-tool-calls-from- import { mapOllamaFinishReason } from '@/map-ollama-finish-reason' import { OllamaChatModelId, OllamaChatSettings } from '@/ollama-chat-settings' import { ollamaFailedResponseHandler } from '@/ollama-error' -import { removeUndefined } from '@/utils' +import { createJsonStreamResponseHandler, removeUndefined } from '@/utils' interface OllamaChatConfig { baseURL: string diff --git a/packages/ollama/src/utils/index.ts b/packages/ollama/src/utils/index.ts new file mode 100644 index 0000000..3f574e5 --- /dev/null +++ b/packages/ollama/src/utils/index.ts @@ -0,0 +1,2 @@ +export * from './remove-undefined' +export * from './response-handler' diff --git a/packages/ollama/src/utils.ts b/packages/ollama/src/utils/remove-undefined.ts similarity index 100% rename from packages/ollama/src/utils.ts rename to packages/ollama/src/utils/remove-undefined.ts diff --git a/packages/ollama/src/utils/response-handler.ts b/packages/ollama/src/utils/response-handler.ts new file mode 100644 index 0000000..ff8a1a6 --- /dev/null +++ b/packages/ollama/src/utils/response-handler.ts @@ -0,0 +1,41 @@ +import { EmptyResponseBodyError } from '@ai-sdk/provider' +import { + extractResponseHeaders, + ParseResult, + ResponseHandler, + safeParseJSON, +} from '@ai-sdk/provider-utils' +import { ZodSchema } from 'zod' + +import { TextLineStream } from '@/utils/text-line-stream' + +export const createJsonStreamResponseHandler = + ( + chunkSchema: ZodSchema, + ): ResponseHandler>> => + async ({ response }: { response: Response }) => { + const responseHeaders = extractResponseHeaders(response) + + if (response.body === null) { + throw new EmptyResponseBodyError({}) + } + + return { + responseHeaders, + value: response.body + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new TextLineStream()) + .pipeThrough( + new TransformStream>({ + transform(chunkText, controller) { + controller.enqueue( + safeParseJSON({ + schema: chunkSchema, + text: chunkText, + }), + ) + }, + }), + ), + } + } diff --git a/packages/ollama/src/utils/text-line-stream.ts b/packages/ollama/src/utils/text-line-stream.ts new file mode 100644 index 0000000..ea6618d --- /dev/null +++ b/packages/ollama/src/utils/text-line-stream.ts @@ -0,0 +1,27 @@ +export class TextLineStream extends TransformStream { + private buffer = '' + + constructor() { + super({ + flush: (controller) => { + if (this.buffer.length === 0) return + + controller.enqueue(this.buffer) + }, + transform: (chunkText, controller) => { + chunkText = this.buffer + chunkText + + while (true) { + const EOL = chunkText.indexOf('\n') + + if (EOL === -1) break + + controller.enqueue(chunkText.slice(0, EOL)) + chunkText = chunkText.slice(EOL + 1) + } + + this.buffer = chunkText + }, + }) + } +} From 92cb1dcf1b7b107842669b2ab3a67cb3db6a49fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez?= Date: Sun, 7 Jul 2024 10:00:27 +0200 Subject: [PATCH 2/3] test: split stream in lines --- packages/ollama/src/test/NOTICE | 9 +++ .../test/convert-array-to-readable-stream.ts | 15 ++++ .../test/convert-readable-stream-to-array.ts | 14 ++++ packages/ollama/src/test/index.ts | 2 + .../ollama/src/utils/response-handler.test.ts | 73 +++++++++++++++++++ 5 files changed, 113 insertions(+) create mode 100644 packages/ollama/src/test/NOTICE create mode 100644 packages/ollama/src/test/convert-array-to-readable-stream.ts create mode 100644 packages/ollama/src/test/convert-readable-stream-to-array.ts create mode 100644 packages/ollama/src/test/index.ts create mode 100644 packages/ollama/src/utils/response-handler.test.ts diff --git a/packages/ollama/src/test/NOTICE b/packages/ollama/src/test/NOTICE new file mode 100644 index 0000000..10b3735 --- /dev/null +++ b/packages/ollama/src/test/NOTICE @@ -0,0 +1,9 @@ +This folder includes code from https://github.com/vercel/ai, which is licensed under the Apache License, Version 2.0. + +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +You may obtain the original code at + + https://github.com/vercel/ai diff --git a/packages/ollama/src/test/convert-array-to-readable-stream.ts b/packages/ollama/src/test/convert-array-to-readable-stream.ts new file mode 100644 index 0000000..6438c5f --- /dev/null +++ b/packages/ollama/src/test/convert-array-to-readable-stream.ts @@ -0,0 +1,15 @@ +export function convertArrayToReadableStream( + values: T[], +): ReadableStream { + return new ReadableStream({ + start(controller) { + try { + for (const value of values) { + controller.enqueue(value) + } + } finally { + controller.close() + } + }, + }) +} diff --git a/packages/ollama/src/test/convert-readable-stream-to-array.ts b/packages/ollama/src/test/convert-readable-stream-to-array.ts new file mode 100644 index 0000000..e4a6ab3 --- /dev/null +++ b/packages/ollama/src/test/convert-readable-stream-to-array.ts @@ -0,0 +1,14 @@ +export async function convertReadableStreamToArray( + stream: ReadableStream, +): Promise { + const reader = stream.getReader() + const result: T[] = [] + + while (true) { + const { done, value } = await reader.read() + if (done) break + result.push(value) + } + + return result +} diff --git a/packages/ollama/src/test/index.ts b/packages/ollama/src/test/index.ts new file mode 100644 index 0000000..637ecdb --- /dev/null +++ b/packages/ollama/src/test/index.ts @@ -0,0 +1,2 @@ +export * from './convert-array-to-readable-stream' +export * from './convert-readable-stream-to-array' diff --git a/packages/ollama/src/utils/response-handler.test.ts b/packages/ollama/src/utils/response-handler.test.ts new file mode 100644 index 0000000..d339e1e --- /dev/null +++ b/packages/ollama/src/utils/response-handler.test.ts @@ -0,0 +1,73 @@ +import { z } from 'zod' + +import { + convertArrayToReadableStream, + convertReadableStreamToArray, +} from '@/test' + +import { createJsonStreamResponseHandler } from './response-handler' + +describe('createJsonStreamResponseHandler', () => { + it('should return a stream of complete json chunks', async () => { + const handler = createJsonStreamResponseHandler(z.object({ a: z.number() })) + + const { value: stream } = await handler({ + requestBodyValues: {}, + response: new Response( + convertArrayToReadableStream([ + JSON.stringify({ a: 1 }) + '\n', + JSON.stringify({ a: 2 }) + '\n', + ]).pipeThrough(new TextEncoderStream()), + ), + url: 'some url', + }) + + expect(await convertReadableStreamToArray(stream)).toStrictEqual([ + { success: true, value: { a: 1 } }, + { success: true, value: { a: 2 } }, + ]) + }) + + it('should return a stream of partial json chunks', async () => { + const handler = createJsonStreamResponseHandler(z.object({ a: z.number() })) + + const { value: stream } = await handler({ + requestBodyValues: {}, + response: new Response( + convertArrayToReadableStream([ + '{ "a":', // start + '1 }\n', // end + ]).pipeThrough(new TextEncoderStream()), + ), + url: 'some url', + }) + + expect(await convertReadableStreamToArray(stream)).toStrictEqual([ + { success: true, value: { a: 1 } }, + ]) + }) + + it('should return a stream of multiple json chunks', async () => { + const handler = createJsonStreamResponseHandler(z.object({ a: z.number() })) + + const { value: stream } = await handler({ + requestBodyValues: {}, + response: new Response( + convertArrayToReadableStream([ + JSON.stringify({ a: 1 }) + '\n' + JSON.stringify({ a: 2 }) + '\n', + '{ "a":', // start + '3 }\n', // end + JSON.stringify({ a: 4 }) + '\n', + ]).pipeThrough(new TextEncoderStream()), + ), + url: 'some url', + }) + + expect(await convertReadableStreamToArray(stream)).toStrictEqual([ + { success: true, value: { a: 1 } }, + { success: true, value: { a: 2 } }, + { success: true, value: { a: 3 } }, + { success: true, value: { a: 4 } }, + ]) + }) +}) From 6c68a1e93bc7c42c05fba807a16d46b51f0b66c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez?= Date: Sun, 7 Jul 2024 10:07:57 +0200 Subject: [PATCH 3/3] ci: update pnpm actions --- .github/workflows/ci.yml | 2 +- .github/workflows/quality.yml | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 25639ff..de7e522 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: uses: actions/checkout@v3 - name: Setup pnpm - uses: pnpm/action-setup@v2.2.2 + uses: pnpm/action-setup@v4.0.0 with: version: 9 diff --git a/.github/workflows/quality.yml b/.github/workflows/quality.yml index f69c39d..805f6dc 100644 --- a/.github/workflows/quality.yml +++ b/.github/workflows/quality.yml @@ -15,7 +15,7 @@ jobs: uses: actions/checkout@v3 - name: Setup pnpm - uses: pnpm/action-setup@v2.2.4 + uses: pnpm/action-setup@v4.0.0 with: version: 9 @@ -39,7 +39,7 @@ jobs: uses: actions/checkout@v3 - name: Setup pnpm - uses: pnpm/action-setup@v2.2.4 + uses: pnpm/action-setup@v4.0.0 with: version: 9 @@ -63,7 +63,7 @@ jobs: uses: actions/checkout@v3 - name: Setup pnpm - uses: pnpm/action-setup@v2.2.4 + uses: pnpm/action-setup@v4.0.0 with: version: 9