Loading...
Loading...
Weekly AI insights —
Real strategies, no fluff. Unsubscribe anytime.
Written by Gareth Simono, Founder and CEO of Agentik {OS}. Full-stack developer and AI architect with years of experience shipping production applications across SaaS, mobile, and enterprise platforms. Gareth orchestrates 267 specialized AI agents to deliver production software 10x faster than traditional development teams.
Founder & CEO, Agentik {OS}
AI interfaces need streaming responses, connection resilience, and scalable pub/sub. Here is how to build WebSocket infrastructure that handles it all.

The moment you add AI to a product, users expect streaming responses. They expect to see tokens appear word by word. They expect the typing effect. They expect it because ChatGPT trained them to expect it.
And honestly? They are right to expect it. The difference between waiting 8 seconds for a complete response and seeing text stream in after 200 milliseconds is enormous. Not in total time. In perceived responsiveness. The streaming version feels fast even when it takes the same total time. Psychology beats physics here every single time.
WebSockets make this possible. HTTP is request-response. WebSocket is a persistent, bidirectional channel. The server can push data to the client whenever it wants, as fast as it wants, without the client polling. Perfect for streaming AI outputs.
But streaming text is just the beginning. The real power of WebSockets in AI applications goes much deeper than a typing effect.
Server-Sent Events (SSE) and HTTP streaming work for the simplest case: one user, one conversation, text only. Many AI products start here and hit walls later.
SSE is unidirectional. Server pushes to client. Client cannot respond over the same connection. For a conversation where the user might interrupt, cancel a generation, or send follow-up context mid-stream, you need bidirectional communication.
HTTP streaming also has reconnection complexity. When a connection drops mid-stream, SSE has a built-in reconnection with event ID tracking, but the implementation details around resuming partial AI responses are gnarly. WebSocket reconnection libraries handle this more cleanly.
For multi-user scenarios, shared AI workspaces, real-time collaboration on AI outputs, or broadcasting updates to multiple subscribers, SSE requires multiple connections per user. WebSocket handles all of this on one persistent connection.
Start with WebSockets. The added complexity is worth it.
The basic pattern is deceptively simple. Client sends a message over WebSocket. Server forwards it to the AI provider with streaming enabled. As tokens arrive from the provider, the server immediately pushes them to the client. The client appends each token to the display.
The implementation details are where things get interesting.
First, message framing. Every WebSocket message needs a type, a request ID, and a payload. The request ID is critical. Without it, you cannot support multiple concurrent requests, and users will have them. Someone submits a long query, gets impatient, submits a follow-up question, and now you have two streams running simultaneously.
// Message type definitions
type WSMessageType = 'request' | 'token' | 'done' | 'error' | 'cancel' | 'ping' | 'pong';
interface WSMessage {
type: WSMessageType;
requestId: string;
payload?: unknown;
}
interface TokenMessage extends WSMessage {
type: 'token';
payload: { content: string; index: number };
}
interface DoneMessage extends WSMessage {
type: 'done';
payload: {
totalInputTokens: number;
totalOutputTokens: number;
stopReason: string;
};
}
interface ErrorMessage extends WSMessage {
type: 'error';
payload: { code: string; message: string; retryable: boolean };
}The index field in TokenMessage is your sequence number. When a client reconnects and needs to sync state, it sends the last index it received. The server knows exactly where to resume.
import WebSocket, { WebSocketServer } from 'ws';
import Anthropic from '@anthropic-ai/sdk';
const wss = new WebSocketServer({ port: 8080 });
const anthropic = new Anthropic();
// Track active streams so we can cancel them
const activeStreams = new Map<string, AbortController>();
wss.on('connection', (ws) => {
console.log('Client connected');
ws.on('message', async (data) => {
let parsed: WSMessage;
try {
parsed = JSON.parse(data.toString());
} catch {
return; // Ignore malformed messages
}
if (parsed.type === 'request') {
await handleAIRequest(ws, parsed);
} else if (parsed.type === 'cancel') {
const controller = activeStreams.get(parsed.requestId);
controller?.abort();
activeStreams.delete(parsed.requestId);
} else if (parsed.type === 'ping') {
ws.send(JSON.stringify({ type: 'pong', requestId: parsed.requestId }));
}
});
ws.on('close', () => {
// Abort all active streams for this connection
activeStreams.forEach((controller) => controller.abort());
activeStreams.clear();
});
});
async function handleAIRequest(ws: WebSocket, message: WSMessage): Promise<void> {
const { requestId } = message;
const { userMessage } = message.payload as { userMessage: string };
const abortController = new AbortController();
activeStreams.set(requestId, abortController);
let tokenIndex = 0;
try {
const stream = anthropic.messages.stream(
{
model: 'claude-sonnet-4-6',
max_tokens: 2048,
messages: [{ role: 'user', content: userMessage }],
},
{ signal: abortController.signal }
);
for await (const event of stream) {
if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
const tokenMsg: TokenMessage = {
type: 'token',
requestId,
payload: { content: event.delta.text, index: tokenIndex++ },
};
ws.send(JSON.stringify(tokenMsg));
}
}
const finalMessage = await stream.finalMessage();
const doneMsg: DoneMessage = {
type: 'done',
requestId,
payload: {
totalInputTokens: finalMessage.usage.input_tokens,
totalOutputTokens: finalMessage.usage.output_tokens,
stopReason: finalMessage.stop_reason || 'end_turn',
},
};
ws.send(JSON.stringify(doneMsg));
} catch (error) {
if ((error as Error).name === 'AbortError') {
return; // User cancelled, not an error
}
const errorMsg: ErrorMessage = {
type: 'error',
requestId,
payload: {
code: 'STREAM_ERROR',
message: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
},
};
ws.send(JSON.stringify(errorMsg));
} finally {
activeStreams.delete(requestId);
}
}The AbortController integration is important. Users cancel generations constantly. Developers forget to handle this. When the client sends a cancel message, you need to stop the upstream API call immediately, or you are paying for tokens the user will never see.
Backpressure is the issue nobody thinks about until production. If your AI provider sends tokens faster than your WebSocket can transmit them, you need a buffer. If the buffer fills, the client is on a very slow connection and you have a different problem entirely.
WebSocket connections die. Constantly. Mobile users switch between WiFi and cellular. Laptops go to sleep. Network hiccups interrupt connections for a few seconds. Hotel WiFi happens. The application must handle all of this gracefully.
Heartbeat mechanisms detect dead connections before the OS does. TCP's keep-alive is measured in minutes. Your application needs to know within seconds.
Send a ping every 30 seconds from the server. Expect a pong within 10 seconds. If no pong arrives, close the connection and clean up server-side resources. Without this, dead connections accumulate. Each one holds a file descriptor and memory. At scale, this becomes a resource leak that requires periodic server restarts.
function setupHeartbeat(ws: WebSocket): NodeJS.Timeout {
let isAlive = true;
ws.on('pong', () => {
isAlive = true;
});
const interval = setInterval(() => {
if (!isAlive) {
console.log('Terminating dead connection');
ws.terminate();
return;
}
isAlive = false;
ws.ping();
}, 30_000);
ws.on('close', () => clearInterval(interval));
return interval;
}Automatic reconnection on the client is non-negotiable. When the connection drops, reconnect with exponential backoff: 1 second, 2 seconds, 4 seconds, 8 seconds, capped at 30 seconds. Show the user a subtle indicator. Do not show an error modal. Reconnection is normal infrastructure behavior, not an application failure.
// Client-side reconnection logic
class ReconnectingWebSocket {
private ws: WebSocket | null = null;
private reconnectAttempts = 0;
private readonly maxDelay = 30_000;
private readonly baseDelay = 1_000;
private messageQueue: string[] = [];
constructor(private readonly url: string) {
this.connect();
}
private connect(): void {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
this.reconnectAttempts = 0;
this.flushQueue();
};
this.ws.onclose = (event) => {
if (!event.wasClean) {
this.scheduleReconnect();
}
};
this.ws.onerror = () => {
this.ws?.close();
};
this.ws.onmessage = (event) => {
this.handleMessage(JSON.parse(event.data));
};
}
private scheduleReconnect(): void {
const delay = Math.min(
this.baseDelay * Math.pow(2, this.reconnectAttempts),
this.maxDelay
);
this.reconnectAttempts++;
setTimeout(() => this.connect(), delay);
}
send(message: WSMessage): void {
const serialized = JSON.stringify(message);
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(serialized);
} else {
this.messageQueue.push(serialized);
}
}
private flushQueue(): void {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift()!;
this.ws?.send(message);
}
}
private handleMessage(message: WSMessage): void {
// Dispatch to appropriate handlers
}
}Here is where most implementations fall apart. The user was receiving a streaming response. The connection dropped mid-stream. They reconnected. What happens?
Option A: Resume from where you left off. The server buffered all tokens with sequence numbers. On reconnect, the client sends the last sequence number it received. The server replays from that point. Clean experience. Requires server-side buffering.
Option B: Mark the partial response as incomplete and let the user retry. Simpler to implement. Slightly worse experience. Works reliably without server-side memory.
Option C: Discard the partial response and start fresh. Worst experience. Never do this silently.
I recommend Option B for most applications. The buffer management in Option A adds significant complexity for an edge case. The user experience difference is minor. A visible "Connection interrupted, click to retry" message is honest and clear.
If you go with Option A, here is the buffer structure:
interface StreamBuffer {
requestId: string;
userId: string;
tokens: Array<{ content: string; index: number }>;
status: 'streaming' | 'complete' | 'error';
createdAt: number;
expiresAt: number; // Clean up after 5 minutes
}
// Store in Redis with TTL
async function saveTokenToBuffer(
requestId: string,
content: string,
index: number
): Promise<void> {
const key = `stream_buffer:${requestId}`;
await redis.rpush(key, JSON.stringify({ content, index }));
await redis.expire(key, 300); // 5 minute TTL
}
async function replayFromIndex(
ws: WebSocket,
requestId: string,
fromIndex: number
): Promise<void> {
const key = `stream_buffer:${requestId}`;
const allTokens = await redis.lrange(key, 0, -1);
for (const tokenJson of allTokens) {
const token = JSON.parse(tokenJson);
if (token.index > fromIndex) {
ws.send(JSON.stringify({ type: 'token', requestId, payload: token }));
}
}
}Streaming a response to one user is the simple case. Real products eventually need to broadcast AI outputs to multiple users simultaneously.
A shared research document where an AI is writing and multiple team members watch it develop. A meeting assistant where multiple attendees see the live transcript. A multiplayer game where an AI dungeon master narrates to all players simultaneously.
These scenarios require a pub/sub layer connecting your WebSocket servers.
import { createClient } from 'redis';
const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();
await Promise.all([pubClient.connect(), subClient.connect()]);
// Track which WebSocket connections are subscribed to which rooms
const roomSubscriptions = new Map<string, Set<WebSocket>>();
// When an AI generates a token in a shared session
async function broadcastToken(
roomId: string,
requestId: string,
content: string,
index: number
): Promise<void> {
const message = JSON.stringify({ type: 'token', requestId, payload: { content, index } });
await pubClient.publish(`room:${roomId}`, message);
}
// Subscribe to room broadcasts on this server instance
await subClient.pSubscribe('room:*', (message, channel) => {
const roomId = channel.replace('room:', '');
const subscribers = roomSubscriptions.get(roomId) ?? new Set();
subscribers.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
});
});
// Handle client subscribing to a room
function subscribeToRoom(ws: WebSocket, roomId: string): void {
if (!roomSubscriptions.has(roomId)) {
roomSubscriptions.set(roomId, new Set());
}
roomSubscriptions.get(roomId)!.add(ws);
ws.on('close', () => {
roomSubscriptions.get(roomId)?.delete(ws);
});
}This is the foundation for scaling WebSocket applications horizontally. Server 1 handles the AI API call and publishes tokens. All servers, including Server 1, receive the message via Redis pub/sub and forward to their connected clients. Adding more WebSocket servers is just adding more subscribers.
HTTP is stateless. WebSocket is stateful. That single difference makes horizontal scaling dramatically harder.
When a user connects via WebSocket, they connect to a specific server instance. The connection lives on that server. A message intended for that user might originate from a different server instance.
Sticky sessions via load balancer are the first approach. Route each user to the same server consistently using IP hash or session cookie. Works until that server goes down and all its connections die simultaneously.
The pub/sub approach shown above is the production solution. Messages flow through Redis regardless of which server instance the client is on. This adds a few milliseconds of latency. For streaming AI tokens, imperceptible.
Connection limits per server deserve attention. Each WebSocket connection consumes a file descriptor and some memory, roughly 20-50KB depending on your buffer sizes. A standard cloud instance handles 10,000-50,000 concurrent WebSocket connections comfortably. Plan your scaling thresholds accordingly.
Monitor these metrics per server:
When active connections exceed 70% of your server's comfortable capacity, add more instances. Do not wait for 90%.
This is also why Convex's real-time backend is often the right choice for new projects: it handles WebSocket complexity, pub/sub, connection management, and horizontal scaling behind a clean API. You trade control for speed. For most applications, the trade is worth it.
Modern AI applications do not just stream text. They stream tool calls, structured data, and intermediate thinking steps. Handling these requires extending your message protocol.
type TokenPayload =
| { type: 'text'; content: string; index: number }
| { type: 'tool_call_start'; toolName: string; toolUseId: string }
| { type: 'tool_call_input'; toolUseId: string; partialJson: string }
| { type: 'tool_call_complete'; toolUseId: string; input: Record<string, unknown> }
| { type: 'tool_result'; toolUseId: string; result: unknown }
| { type: 'thinking'; content: string };
// Extended streaming handler for tool use
for await (const event of stream) {
if (event.type === 'content_block_start') {
if (event.content_block.type === 'tool_use') {
ws.send(JSON.stringify({
type: 'token',
requestId,
payload: {
type: 'tool_call_start',
toolName: event.content_block.name,
toolUseId: event.content_block.id,
},
}));
}
} else if (event.type === 'content_block_delta') {
if (event.delta.type === 'text_delta') {
ws.send(JSON.stringify({
type: 'token',
requestId,
payload: { type: 'text', content: event.delta.text, index: tokenIndex++ },
}));
} else if (event.delta.type === 'input_json_delta') {
ws.send(JSON.stringify({
type: 'token',
requestId,
payload: { type: 'tool_call_input', toolUseId: event.index, partialJson: event.delta.partial_json },
}));
}
}
}The client renders tool calls as live activity indicators. "Searching the web..." with animated ellipsis while the tool call input streams. "Reading file..." as the model constructs its file path argument. This makes multi-step agent workflows feel responsive rather than blank and then suddenly complete.
WebSocket security has different considerations than HTTP.
Authenticate on connection. The WebSocket upgrade request is an HTTP request. Include your auth token as a query parameter or in the Authorization header. Validate it before accepting the connection. After the connection is established, you cannot re-authenticate per message without significant overhead.
wss.on('connection', (ws, request) => {
const url = new URL(request.url!, `http://${request.headers.host}`);
const token = url.searchParams.get('token');
if (!token) {
ws.close(4001, 'Authentication required');
return;
}
const user = validateToken(token);
if (!user) {
ws.close(4003, 'Invalid token');
return;
}
// Attach user to the connection
(ws as WebSocket & { user: User }).user = user;
setupHeartbeat(ws);
});Input validation on every message. Do not trust any data from the client. Validate message types, request IDs, and payload structures before processing. A malicious client can send anything.
Rate limiting applies to WebSocket connections too. Use the same token bucket pattern from the previous section. Apply it per connection, checking on each incoming message.
Q: How do WebSockets work with AI streaming?
WebSockets provide a persistent bidirectional connection between client and server, enabling real-time AI response streaming. As the AI model generates tokens, they are immediately pushed to the client through the WebSocket, creating a typing effect. This is faster and more responsive than polling-based approaches.
Q: What is the best approach for streaming AI responses?
The best approach uses Server-Sent Events (SSE) for simple one-way streaming (AI to client) or WebSockets for bidirectional communication (chat interfaces). The Vercel AI SDK abstracts streaming complexity. For most AI applications, SSE through Next.js Server Actions is the simplest and most reliable approach.
Q: When should you use WebSockets vs SSE for AI?
Use SSE for simple AI response streaming (one-way, server to client) — it is simpler to implement and works with HTTP/2. Use WebSockets when you need bidirectional communication (user can interrupt or send messages while AI is responding), presence indicators, or real-time collaboration features alongside AI.
Full-stack developer and AI architect with years of experience shipping production applications across SaaS, mobile, and enterprise. Gareth built Agentik {OS} to prove that one person with the right AI system can outperform an entire traditional development team. He has personally architected and shipped 7+ production applications using AI-first workflows.

Real-Time Apps with AI Agents and Convex
Your users refresh to see new data. That's a 2015 architecture. Real-time with Convex and AI agents makes reactivity the default, not a bolt-on.

Convex Real-Time Backend: The Complete Guide
Convex eliminates WebSocket plumbing, cache invalidation, and consistency headaches. Every query is a live subscription. Here's what that means in practice.

API Rate Limiting for AI: Stop Cost Emergencies
Someone will abuse your AI API. Without rate limiting, you face a financial emergency. Token bucket, per-user quotas, and circuit breakers prevent this.
Stop reading about AI and start building with it. Book a free discovery call and see how AI agents can accelerate your business.