Caching Middleware

Let's create a simple chat interface that uses LanguageModelMiddleware to cache the assistant's responses in fast KV storage.

Client

Let's create a simple chat interface that allows users to send messages to the assistant and receive responses. You will integrate the useChat hook from ai/react to stream responses.

'use client';
import { useChat } from 'ai/react';
export default function Chat() {
const { messages, input, handleInputChange, handleSubmit, error } = useChat();
if (error) return <div>{error.message}</div>;
return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
<div className="space-y-4">
{messages.map(m => (
<div key={m.id} className="whitespace-pre-wrap">
<div>
<div className="font-bold">{m.role}</div>
{m.toolInvocations ? (
<pre>{JSON.stringify(m.toolInvocations, null, 2)}</pre>
) : (
<p>{m.content}</p>
)}
</div>
</div>
))}
</div>
<form onSubmit={handleSubmit}>
<input
className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl"
value={input}
placeholder="Say something..."
onChange={handleInputChange}
/>
</form>
</div>
);
}

Middleware

Next, you will create a LanguageModelMiddleware that caches the assistant's responses in KV storage. LanguageModelMiddleware has two methods: wrapGenerate and wrapStream. wrapGenerate is called when using generateText and generateObject, while wrapStream is called when using streamText and streamObject.

For wrapGenerate, you can cache the response directly. Instead, for wrapStream, you cache an array of the stream parts, which can then be used with simulateReadableStream function to create a simulated ReadableStream that returns the cached response. In this way, the cached response is returned chunk-by-chunk as if it were being generated by the model. You can control the initial delay and delay between chunks by adjusting the initialDelayInMs and chunkDelayInMs parameters of simulateReadableStream.

import { Redis } from '@upstash/redis';
import type {
LanguageModelV1,
Experimental_LanguageModelV1Middleware as LanguageModelV1Middleware,
LanguageModelV1StreamPart,
} from 'ai';
import { simulateReadableStream } from 'ai/test';
const redis = new Redis({
url: process.env.KV_URL,
token: process.env.KV_TOKEN,
});
export const cacheMiddleware: LanguageModelV1Middleware = {
wrapGenerate: async ({ doGenerate, params }) => {
const cacheKey = JSON.stringify(params);
const cached = (await redis.get(cacheKey)) as Awaited<
ReturnType<LanguageModelV1['doGenerate']>
> | null;
if (cached !== null) {
return {
...cached,
response: {
...cached.response,
timestamp: cached?.response?.timestamp
? new Date(cached?.response?.timestamp)
: undefined,
},
};
}
const result = await doGenerate();
redis.set(cacheKey, result);
return result;
},
wrapStream: async ({ doStream, params }) => {
const cacheKey = JSON.stringify(params);
// Check if the result is in the cache
const cached = await redis.get(cacheKey);
// If cached, return a simulated ReadableStream that yields the cached result
if (cached !== null) {
// Format the timestamps in the cached response
const formattedChunks = (cached as LanguageModelV1StreamPart[]).map(p => {
if (p.type === 'response-metadata' && p.timestamp) {
return { ...p, timestamp: new Date(p.timestamp) };
} else return p;
});
return {
stream: simulateReadableStream({
initialDelayInMs: 0,
chunkDelayInMs: 10,
chunks: formattedChunks,
}),
rawCall: { rawPrompt: null, rawSettings: {} },
};
}
// If not cached, proceed with streaming
const { stream, ...rest } = await doStream();
const fullResponse: LanguageModelV1StreamPart[] = [];
const transformStream = new TransformStream<
LanguageModelV1StreamPart,
LanguageModelV1StreamPart
>({
transform(chunk, controller) {
fullResponse.push(chunk);
controller.enqueue(chunk);
},
flush() {
// Store the full response in the cache after streaming is complete
redis.set(cacheKey, fullResponse);
},
});
return {
stream: stream.pipeThrough(transformStream),
...rest,
};
},
};

This example uses @upstash/redis to store and retrieve the assistant's responses but you can use any KV storage provider you would like.

Server

Finally, you will create an API route for api/chat to handle the assistant's messages and responses. You can use your cache middleware by wrapping the model with wrapLanguageModel and passing the middleware as an argument.

import { cacheMiddleware } from '@/ai/middleware';
import { openai } from '@ai-sdk/openai';
import {
experimental_wrapLanguageModel as wrapLanguageModel,
streamText,
tool,
} from 'ai';
import { z } from 'zod';
const wrappedModel = wrapLanguageModel({
model: openai('gpt-4o-mini'),
middleware: cacheMiddleware,
});
export async function POST(req: Request) {
const { messages } = await req.json();
const result = streamText({
model: wrappedModel,
messages,
tools: {
weather: tool({
description: 'Get the weather in a location',
parameters: z.object({
location: z.string().describe('The location to get the weather for'),
}),
execute: async ({ location }) => ({
location,
temperature: 72 + Math.floor(Math.random() * 21) - 10,
}),
}),
},
});
return result.toDataStreamResponse();
}