Caching Middleware
Let's create a simple chat interface that uses LanguageModelMiddleware
to cache the assistant's responses in fast KV storage.
Client
Let's create a simple chat interface that allows users to send messages to the assistant and receive responses. You will integrate the useChat
hook from ai/react
to stream responses.
'use client';
import { useChat } from 'ai/react';
export default function Chat() { const { messages, input, handleInputChange, handleSubmit, error } = useChat(); if (error) return <div>{error.message}</div>;
return ( <div className="flex flex-col w-full max-w-md py-24 mx-auto stretch"> <div className="space-y-4"> {messages.map(m => ( <div key={m.id} className="whitespace-pre-wrap"> <div> <div className="font-bold">{m.role}</div> {m.toolInvocations ? ( <pre>{JSON.stringify(m.toolInvocations, null, 2)}</pre> ) : ( <p>{m.content}</p> )} </div> </div> ))} </div>
<form onSubmit={handleSubmit}> <input className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl" value={input} placeholder="Say something..." onChange={handleInputChange} /> </form> </div> );}
Middleware
Next, you will create a LanguageModelMiddleware
that caches the assistant's responses in KV storage. LanguageModelMiddleware
has two methods: wrapGenerate
and wrapStream
. wrapGenerate
is called when using generateText
and generateObject
, while wrapStream
is called when using streamText
and streamObject
.
For wrapGenerate
, you can cache the response directly. Instead, for wrapStream
, you cache an array of the stream parts, which can then be used with simulateReadableStream
function to create a simulated ReadableStream
that returns the cached response. In this way, the cached response is returned chunk-by-chunk as if it were being generated by the model. You can control the initial delay and delay between chunks by adjusting the initialDelayInMs
and chunkDelayInMs
parameters of simulateReadableStream
.
import { Redis } from '@upstash/redis';import type { LanguageModelV1, Experimental_LanguageModelV1Middleware as LanguageModelV1Middleware, LanguageModelV1StreamPart,} from 'ai';import { simulateReadableStream } from 'ai/test';
const redis = new Redis({ url: process.env.KV_URL, token: process.env.KV_TOKEN,});
export const cacheMiddleware: LanguageModelV1Middleware = { wrapGenerate: async ({ doGenerate, params }) => { const cacheKey = JSON.stringify(params);
const cached = (await redis.get(cacheKey)) as Awaited< ReturnType<LanguageModelV1['doGenerate']> > | null;
if (cached !== null) { return { ...cached, response: { ...cached.response, timestamp: cached?.response?.timestamp ? new Date(cached?.response?.timestamp) : undefined, }, }; }
const result = await doGenerate();
redis.set(cacheKey, result);
return result; }, wrapStream: async ({ doStream, params }) => { const cacheKey = JSON.stringify(params);
// Check if the result is in the cache const cached = await redis.get(cacheKey);
// If cached, return a simulated ReadableStream that yields the cached result if (cached !== null) { // Format the timestamps in the cached response const formattedChunks = (cached as LanguageModelV1StreamPart[]).map(p => { if (p.type === 'response-metadata' && p.timestamp) { return { ...p, timestamp: new Date(p.timestamp) }; } else return p; }); return { stream: simulateReadableStream({ initialDelayInMs: 0, chunkDelayInMs: 10, chunks: formattedChunks, }), rawCall: { rawPrompt: null, rawSettings: {} }, }; }
// If not cached, proceed with streaming const { stream, ...rest } = await doStream();
const fullResponse: LanguageModelV1StreamPart[] = [];
const transformStream = new TransformStream< LanguageModelV1StreamPart, LanguageModelV1StreamPart >({ transform(chunk, controller) { fullResponse.push(chunk); controller.enqueue(chunk); }, flush() { // Store the full response in the cache after streaming is complete redis.set(cacheKey, fullResponse); }, });
return { stream: stream.pipeThrough(transformStream), ...rest, }; },};
This example uses @upstash/redis
to store and retrieve the assistant's
responses but you can use any KV storage provider you would like.
Server
Finally, you will create an API route for api/chat
to handle the assistant's messages and responses. You can use your cache middleware by wrapping the model with wrapLanguageModel
and passing the middleware as an argument.
import { cacheMiddleware } from '@/ai/middleware';import { openai } from '@ai-sdk/openai';import { experimental_wrapLanguageModel as wrapLanguageModel, streamText, tool,} from 'ai';import { z } from 'zod';
const wrappedModel = wrapLanguageModel({ model: openai('gpt-4o-mini'), middleware: cacheMiddleware,});
export async function POST(req: Request) { const { messages } = await req.json();
const result = streamText({ model: wrappedModel, messages, tools: { weather: tool({ description: 'Get the weather in a location', parameters: z.object({ location: z.string().describe('The location to get the weather for'), }), execute: async ({ location }) => ({ location, temperature: 72 + Math.floor(Math.random() * 21) - 10, }), }), }, }); return result.toDataStreamResponse();}