Skip to content

Commit

Permalink
Merge pull request #9 from sgomez/split-stream-lines
Browse files Browse the repository at this point in the history
Split stream lines
  • Loading branch information
sgomez authored Jul 7, 2024
2 parents dc17c5c + 6c68a1e commit ada6968
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
uses: actions/checkout@v3

- name: Setup pnpm
uses: pnpm/action-setup@v2.2.2
uses: pnpm/action-setup@v4.0.0
with:
version: 9

Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/quality.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
uses: actions/checkout@v3

- name: Setup pnpm
uses: pnpm/action-setup@v2.2.4
uses: pnpm/action-setup@v4.0.0
with:
version: 9

Expand All @@ -39,7 +39,7 @@ jobs:
uses: actions/checkout@v3

- name: Setup pnpm
uses: pnpm/action-setup@v2.2.4
uses: pnpm/action-setup@v4.0.0
with:
version: 9

Expand All @@ -63,7 +63,7 @@ jobs:
uses: actions/checkout@v3

- name: Setup pnpm
uses: pnpm/action-setup@v2.2.4
uses: pnpm/action-setup@v4.0.0
with:
version: 9

Expand Down
3 changes: 1 addition & 2 deletions packages/ollama/src/ollama-chat-language-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
} from '@ai-sdk/provider'
import {
createJsonResponseHandler,
createJsonStreamResponseHandler,
generateId,
ParseResult,
postJsonToApi,
Expand All @@ -21,7 +20,7 @@ import { InferToolCallsFromStream } from '@/generate-tool/infer-tool-calls-from-
import { mapOllamaFinishReason } from '@/map-ollama-finish-reason'
import { OllamaChatModelId, OllamaChatSettings } from '@/ollama-chat-settings'
import { ollamaFailedResponseHandler } from '@/ollama-error'
import { removeUndefined } from '@/utils'
import { createJsonStreamResponseHandler, removeUndefined } from '@/utils'

interface OllamaChatConfig {
baseURL: string
Expand Down
9 changes: 9 additions & 0 deletions packages/ollama/src/test/NOTICE
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
This folder includes code from https://github.com/vercel/ai, which is licensed under the Apache License, Version 2.0.

You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

You may obtain the original code at

https://github.com/vercel/ai
15 changes: 15 additions & 0 deletions packages/ollama/src/test/convert-array-to-readable-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export function convertArrayToReadableStream<T>(
values: T[],
): ReadableStream<T> {
return new ReadableStream({
start(controller) {
try {
for (const value of values) {
controller.enqueue(value)
}
} finally {
controller.close()
}
},
})
}
14 changes: 14 additions & 0 deletions packages/ollama/src/test/convert-readable-stream-to-array.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export async function convertReadableStreamToArray<T>(
stream: ReadableStream<T>,
): Promise<T[]> {
const reader = stream.getReader()
const result: T[] = []

while (true) {
const { done, value } = await reader.read()
if (done) break
result.push(value)
}

return result
}
2 changes: 2 additions & 0 deletions packages/ollama/src/test/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './convert-array-to-readable-stream'
export * from './convert-readable-stream-to-array'
2 changes: 2 additions & 0 deletions packages/ollama/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './remove-undefined'
export * from './response-handler'
File renamed without changes.
73 changes: 73 additions & 0 deletions packages/ollama/src/utils/response-handler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { z } from 'zod'

import {
convertArrayToReadableStream,
convertReadableStreamToArray,
} from '@/test'

import { createJsonStreamResponseHandler } from './response-handler'

describe('createJsonStreamResponseHandler', () => {
it('should return a stream of complete json chunks', async () => {
const handler = createJsonStreamResponseHandler(z.object({ a: z.number() }))

const { value: stream } = await handler({
requestBodyValues: {},
response: new Response(
convertArrayToReadableStream([
JSON.stringify({ a: 1 }) + '\n',
JSON.stringify({ a: 2 }) + '\n',
]).pipeThrough(new TextEncoderStream()),
),
url: 'some url',
})

expect(await convertReadableStreamToArray(stream)).toStrictEqual([
{ success: true, value: { a: 1 } },
{ success: true, value: { a: 2 } },
])
})

it('should return a stream of partial json chunks', async () => {
const handler = createJsonStreamResponseHandler(z.object({ a: z.number() }))

const { value: stream } = await handler({
requestBodyValues: {},
response: new Response(
convertArrayToReadableStream([
'{ "a":', // start
'1 }\n', // end
]).pipeThrough(new TextEncoderStream()),
),
url: 'some url',
})

expect(await convertReadableStreamToArray(stream)).toStrictEqual([
{ success: true, value: { a: 1 } },
])
})

it('should return a stream of multiple json chunks', async () => {
const handler = createJsonStreamResponseHandler(z.object({ a: z.number() }))

const { value: stream } = await handler({
requestBodyValues: {},
response: new Response(
convertArrayToReadableStream([
JSON.stringify({ a: 1 }) + '\n' + JSON.stringify({ a: 2 }) + '\n',
'{ "a":', // start
'3 }\n', // end
JSON.stringify({ a: 4 }) + '\n',
]).pipeThrough(new TextEncoderStream()),
),
url: 'some url',
})

expect(await convertReadableStreamToArray(stream)).toStrictEqual([
{ success: true, value: { a: 1 } },
{ success: true, value: { a: 2 } },
{ success: true, value: { a: 3 } },
{ success: true, value: { a: 4 } },
])
})
})
41 changes: 41 additions & 0 deletions packages/ollama/src/utils/response-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { EmptyResponseBodyError } from '@ai-sdk/provider'
import {
extractResponseHeaders,
ParseResult,
ResponseHandler,
safeParseJSON,
} from '@ai-sdk/provider-utils'
import { ZodSchema } from 'zod'

import { TextLineStream } from '@/utils/text-line-stream'

export const createJsonStreamResponseHandler =
<T>(
chunkSchema: ZodSchema<T>,
): ResponseHandler<ReadableStream<ParseResult<T>>> =>
async ({ response }: { response: Response }) => {
const responseHeaders = extractResponseHeaders(response)

if (response.body === null) {
throw new EmptyResponseBodyError({})
}

return {
responseHeaders,
value: response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream())
.pipeThrough(
new TransformStream<string, ParseResult<T>>({
transform(chunkText, controller) {
controller.enqueue(
safeParseJSON({
schema: chunkSchema,
text: chunkText,
}),
)
},
}),
),
}
}
27 changes: 27 additions & 0 deletions packages/ollama/src/utils/text-line-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
export class TextLineStream extends TransformStream<string, string> {
private buffer = ''

constructor() {
super({
flush: (controller) => {
if (this.buffer.length === 0) return

controller.enqueue(this.buffer)
},
transform: (chunkText, controller) => {
chunkText = this.buffer + chunkText

while (true) {
const EOL = chunkText.indexOf('\n')

if (EOL === -1) break

controller.enqueue(chunkText.slice(0, EOL))
chunkText = chunkText.slice(EOL + 1)
}

this.buffer = chunkText
},
})
}
}

0 comments on commit ada6968

Please sign in to comment.