Subscriptions
Subscriptions
Subscriptions enable real-time event streaming between the server and client using Server-Sent Events (SSE). Use subscriptions when you need to push real-time updates to clients.
tRPC v11 supports SSE-based subscriptions out of the box. SSE is simpler to set up than WebSockets and works through standard HTTP.
Creating a Subscription
Subscriptions use async generator functions that yield values over time. Use the @Subscription() decorator to mark a method as a subscription procedure.
import { Injectable } from '@nestjs/common';
import { Router, Subscription, Signal } from 'nestjs-trpc-v2';
import { z } from 'zod';
const notificationSchema = z.object({
id: z.string(),
message: z.string(),
timestamp: z.number(),
});
@Injectable()
@Router()
export class NotificationRouter {
@Subscription({ output: notificationSchema })
async *onNotification(@Signal() signal?: AbortSignal) {
let id = 0;
while (!signal?.aborted) {
yield {
id: String(id),
message: `Notification ${id}`,
timestamp: Date.now(),
};
id++;
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
}The @Signal Decorator
The @Signal() parameter decorator extracts the AbortSignal from the procedure options. This signal is aborted when the client disconnects, allowing you to clean up resources.
@Subscription({ output: messageSchema })
async *onMessage(@Signal() signal?: AbortSignal) {
while (!signal?.aborted) {
// Stream messages until client disconnects
}
}Using Input with Subscriptions
Subscriptions can accept input parameters just like queries and mutations:
import { Injectable } from '@nestjs/common';
import { Router, Subscription, Input, Signal } from 'nestjs-trpc-v2';
import { tracked } from '@trpc/server';
import { z } from 'zod';
const postSchema = z.object({
id: z.number(),
title: z.string(),
createdAt: z.date(),
});
@Injectable()
@Router()
export class PostRouter {
@Subscription({
input: z.object({ lastEventId: z.coerce.number().optional() }),
output: postSchema,
})
async *onPostCreated(
@Input() input: { lastEventId?: number },
@Signal() signal?: AbortSignal,
) {
let lastId = input.lastEventId ?? 0;
while (!signal?.aborted) {
const newPosts = await this.postService.findAfter(lastId);
for (const post of newPosts) {
yield tracked(String(post.id), post);
lastId = post.id;
}
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
}Tracked Events for Auto-Reconnection
Use the tracked() helper from @trpc/server to enable automatic reconnection. When a client reconnects, it will receive the lastEventId in the input, allowing you to resume from where it left off.
import { tracked } from '@trpc/server';
@Subscription({
input: z.object({ lastEventId: z.string().nullish() }),
output: messageSchema,
})
async *onMessage(
@Input() input: { lastEventId?: string | null },
@Signal() signal?: AbortSignal,
) {
// Catch up on missed messages
if (input.lastEventId) {
const missed = await this.messageService.getAfter(input.lastEventId);
for (const msg of missed) {
yield tracked(msg.id, msg);
}
}
// Stream new messages with tracking
for await (const msg of this.messageService.subscribe(signal)) {
yield tracked(msg.id, msg);
}
}Client Setup
On the client side, use httpSubscriptionLink with splitLink to handle subscriptions:
import {
createTRPCClient,
httpBatchLink,
httpSubscriptionLink,
splitLink,
} from '@trpc/client';
import type { AppRouter } from './server/app-router';
const trpc = createTRPCClient<AppRouter>({
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({
url: 'http://localhost:3000/trpc',
}),
false: httpBatchLink({
url: 'http://localhost:3000/trpc',
}),
}),
],
});
// Subscribe to notifications
const subscription = trpc.notificationRouter.onNotification.subscribe(
undefined,
{
onData: (notification) => {
console.log('New notification:', notification);
},
onError: (err) => {
console.error('Subscription error:', err);
},
},
);
// Unsubscribe when done
subscription.unsubscribe();Event Emitter Pattern
For real-time events from your application, use Node.js EventEmitter:
import { Injectable } from '@nestjs/common';
import { Router, Subscription, Signal } from 'nestjs-trpc-v2';
import { EventEmitter, on } from 'events';
import { z } from 'zod';
const messageSchema = z.object({
id: z.string(),
text: z.string(),
userId: z.string(),
});
@Injectable()
@Router()
export class ChatRouter {
private readonly events = new EventEmitter();
@Subscription({ output: messageSchema })
async *onMessage(@Signal() signal?: AbortSignal) {
for await (const [message] of on(this.events, 'message', { signal })) {
yield message;
}
}
emitMessage(message: z.infer<typeof messageSchema>) {
this.events.emit('message', message);
}
}Parameter Decorators
| Decorator | Description |
|---|---|
@Signal() | Extracts the |
@Input() | Extracts the validated input (including |
@Options() | Extracts the full procedure options object |
@Context() | Extracts the tRPC context |