streamingassistant

Stream Assistant Responses

In this example, you'll learn how to stream responses along with tool calls from OpenAI's Assistant API using ai/rsc.

Client

In your client component, you will create a simple chat interface that allows users to send messages to the assistant and receive responses. The assistant's responses will be streamed in two parts: the status of the current run and the text content of the messages.

'use client';
import { useState } from 'react';
import { ClientMessage, submitMessage } from './actions';
import { useActions } from 'ai/rsc';
export default function Home() {
const [input, setInput] = useState('');
const [messages, setMessages] = useState<ClientMessage[]>([]);
const { submitMessage } = useActions();
const handleSubmission = async () => {
setMessages(currentMessages => [
...currentMessages,
{
id: '123',
status: 'user.message.created',
text: input,
gui: null,
},
]);
const response = await submitMessage(input);
setMessages(currentMessages => [...currentMessages, response]);
setInput('');
};
return (
<div className="flex flex-col-reverse">
<div className="flex flex-row gap-2 p-2 bg-zinc-100 w-full">
<input
className="bg-zinc-100 w-full p-2 outline-none"
value={input}
onChange={event => setInput(event.target.value)}
placeholder="Ask a question"
onKeyDown={event => {
if (event.key === 'Enter') {
handleSubmission();
}
}}
/>
<button
className="p-2 bg-zinc-900 text-zinc-100 rounded-md"
onClick={handleSubmission}
>
Send
</button>
</div>
<div className="flex flex-col h-[calc(100dvh-56px)] overflow-y-scroll">
<div>
{messages.map(message => (
<div key={message.id} className="flex flex-col gap-1 border-b p-2">
<div className="flex flex-row justify-between">
<div className="text-sm text-zinc-500">{message.status}</div>
</div>
<div className="flex flex-col gap-2">{message.gui}</div>
<div>{message.text}</div>
</div>
))}
</div>
</div>
</div>
);
}
'use client';
import { StreamableValue, useStreamableValue } from 'ai/rsc';
export function Message({ textStream }: { textStream: StreamableValue }) {
const [text] = useStreamableValue(textStream);
return <div>{text}</div>;
}

Server

In your server action, you will create a function called submitMessage that adds the user's message to the thread. The function will create a new thread if one does not exist and add the user's message to the thread. If a thread already exists, the function will add the user's message to the existing thread. The function will then create a run and stream the assistant's response to the client. Furthermore, the run queue is used to manage multiple runs in the same thread during the lifetime of the server action.

In case the assistant requires a tool call, the server action will handle the tool call and return the output to the assistant. In this example, the assistant requires a tool call to search for emails. The server action will search for emails based on the query and has_attachments parameters and return the output to the both the assistant and the client.

'use server';
import { generateId } from 'ai';
import { createStreamableUI, createStreamableValue } from 'ai/rsc';
import { OpenAI } from 'openai';
import { ReactNode } from 'react';
import { searchEmails } from './function';
import { Message } from './message';
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
export interface ClientMessage {
id: string;
status: ReactNode;
text: ReactNode;
gui: ReactNode;
}
const ASSISTANT_ID = 'asst_xxxx';
let THREAD_ID = '';
let RUN_ID = '';
export async function submitMessage(question: string): Promise<ClientMessage> {
const status = createStreamableUI('thread.init');
const textStream = createStreamableValue('');
const textUIStream = createStreamableUI(
<Message textStream={textStream.value} />,
);
const gui = createStreamableUI();
const runQueue = [];
(async () => {
if (THREAD_ID) {
await openai.beta.threads.messages.create(THREAD_ID, {
role: 'user',
content: question,
});
const run = await openai.beta.threads.runs.create(THREAD_ID, {
assistant_id: ASSISTANT_ID,
stream: true,
});
runQueue.push({ id: generateId(), run });
} else {
const run = await openai.beta.threads.createAndRun({
assistant_id: ASSISTANT_ID,
stream: true,
thread: {
messages: [{ role: 'user', content: question }],
},
});
runQueue.push({ id: generateId(), run });
}
while (runQueue.length > 0) {
const latestRun = runQueue.shift();
if (latestRun) {
for await (const delta of latestRun.run) {
const { data, event } = delta;
status.update(event);
if (event === 'thread.created') {
THREAD_ID = data.id;
} else if (event === 'thread.run.created') {
RUN_ID = data.id;
} else if (event === 'thread.message.delta') {
data.delta.content?.map((part: any) => {
if (part.type === 'text') {
if (part.text) {
textStream.append(part.text.value);
}
}
});
} else if (event === 'thread.run.requires_action') {
if (data.required_action) {
if (data.required_action.type === 'submit_tool_outputs') {
const { tool_calls } = data.required_action.submit_tool_outputs;
const tool_outputs = [];
for (const tool_call of tool_calls) {
const { id: toolCallId, function: fn } = tool_call;
const { name, arguments: args } = fn;
if (name === 'search_emails') {
const { query, has_attachments } = JSON.parse(args);
gui.append(
<div className="flex flex-row gap-2 items-center">
<div>
Searching for emails: {query}, has_attachments:
{has_attachments ? 'true' : 'false'}
</div>
</div>,
);
await new Promise(resolve => setTimeout(resolve, 2000));
const fakeEmails = searchEmails({ query, has_attachments });
gui.append(
<div className="flex flex-col gap-2">
{fakeEmails.map(email => (
<div
key={email.id}
className="p-2 bg-zinc-100 rounded-md flex flex-row gap-2 items-center justify-between"
>
<div className="flex flex-row gap-2 items-center">
<div>{email.subject}</div>
</div>
<div className="text-zinc-500">{email.date}</div>
</div>
))}
</div>,
);
tool_outputs.push({
tool_call_id: toolCallId,
output: JSON.stringify(fakeEmails),
});
}
}
const nextRun: any =
await openai.beta.threads.runs.submitToolOutputs(
THREAD_ID,
RUN_ID,
{
tool_outputs,
stream: true,
},
);
runQueue.push({ id: generateId(), run: nextRun });
}
}
} else if (event === 'thread.run.failed') {
console.log(data);
}
}
}
}
status.done();
textUIStream.done();
gui.done();
})();
return {
id: generateId(),
status: status.value,
text: textUIStream.value,
gui: gui.value,
};
}
import { createAI } from 'ai/rsc';
import { submitMessage } from './actions';
export const AI = createAI({
actions: {
submitMessage,
},
initialAIState: [],
initialUIState: [],
});

And finally, make sure to update your layout component to wrap the children with the AI component.

import { ReactNode } from 'react';
import { AI } from './ai';
export default function Layout({ children }: { children: ReactNode }) {
return <AI>{children}</AI>;
}