nestjs-trpc logoNestJS tRPC v2

Subscriptions

Subscriptions

Subscriptions enable real-time event streaming between the server and client using Server-Sent Events (SSE). Use subscriptions when you need to push real-time updates to clients.

tRPC v11 supports SSE-based subscriptions out of the box. SSE is simpler to set up than WebSockets and works through standard HTTP.

Creating a Subscription

Subscriptions use async generator functions that yield values over time. Use the @Subscription() decorator to mark a method as a subscription procedure.

notification.router.ts
import { Injectable } from '@nestjs/common';
import { Router, Subscription, Signal } from 'nestjs-trpc-v2';
import { z } from 'zod';

const notificationSchema = z.object({
  id: z.string(),
  message: z.string(),
  timestamp: z.number(),
});

@Injectable()
@Router()
export class NotificationRouter {
  @Subscription({ output: notificationSchema })
  async *onNotification(@Signal() signal?: AbortSignal) {
    let id = 0;
    while (!signal?.aborted) {
      yield {
        id: String(id),
        message: `Notification ${id}`,
        timestamp: Date.now(),
      };
      id++;
      await new Promise((resolve) => setTimeout(resolve, 1000));
    }
  }
}

The @Signal Decorator

The @Signal() parameter decorator extracts the AbortSignal from the procedure options. This signal is aborted when the client disconnects, allowing you to clean up resources.

@Subscription({ output: messageSchema })
async *onMessage(@Signal() signal?: AbortSignal) {
  while (!signal?.aborted) {
    // Stream messages until client disconnects
  }
}

Using Input with Subscriptions

Subscriptions can accept input parameters just like queries and mutations:

post.router.ts
import { Injectable } from '@nestjs/common';
import { Router, Subscription, Input, Signal } from 'nestjs-trpc-v2';
import { tracked } from '@trpc/server';
import { z } from 'zod';

const postSchema = z.object({
  id: z.number(),
  title: z.string(),
  createdAt: z.date(),
});

@Injectable()
@Router()
export class PostRouter {
  @Subscription({
    input: z.object({ lastEventId: z.coerce.number().optional() }),
    output: postSchema,
  })
  async *onPostCreated(
    @Input() input: { lastEventId?: number },
    @Signal() signal?: AbortSignal,
  ) {
    let lastId = input.lastEventId ?? 0;

    while (!signal?.aborted) {
      const newPosts = await this.postService.findAfter(lastId);

      for (const post of newPosts) {
        yield tracked(String(post.id), post);
        lastId = post.id;
      }

      await new Promise((resolve) => setTimeout(resolve, 1000));
    }
  }
}

Tracked Events for Auto-Reconnection

Use the tracked() helper from @trpc/server to enable automatic reconnection. When a client reconnects, it will receive the lastEventId in the input, allowing you to resume from where it left off.

import { tracked } from '@trpc/server';

@Subscription({
  input: z.object({ lastEventId: z.string().nullish() }),
  output: messageSchema,
})
async *onMessage(
  @Input() input: { lastEventId?: string | null },
  @Signal() signal?: AbortSignal,
) {
  // Catch up on missed messages
  if (input.lastEventId) {
    const missed = await this.messageService.getAfter(input.lastEventId);
    for (const msg of missed) {
      yield tracked(msg.id, msg);
    }
  }

  // Stream new messages with tracking
  for await (const msg of this.messageService.subscribe(signal)) {
    yield tracked(msg.id, msg);
  }
}

Client Setup

On the client side, use httpSubscriptionLink with splitLink to handle subscriptions:

client.ts
import {
  createTRPCClient,
  httpBatchLink,
  httpSubscriptionLink,
  splitLink,
} from '@trpc/client';
import type { AppRouter } from './server/app-router';

const trpc = createTRPCClient<AppRouter>({
  links: [
    splitLink({
      condition: (op) => op.type === 'subscription',
      true: httpSubscriptionLink({
        url: 'http://localhost:3000/trpc',
      }),
      false: httpBatchLink({
        url: 'http://localhost:3000/trpc',
      }),
    }),
  ],
});

// Subscribe to notifications
const subscription = trpc.notificationRouter.onNotification.subscribe(
  undefined,
  {
    onData: (notification) => {
      console.log('New notification:', notification);
    },
    onError: (err) => {
      console.error('Subscription error:', err);
    },
  },
);

// Unsubscribe when done
subscription.unsubscribe();

Event Emitter Pattern

For real-time events from your application, use Node.js EventEmitter:

chat.router.ts
import { Injectable } from '@nestjs/common';
import { Router, Subscription, Signal } from 'nestjs-trpc-v2';
import { EventEmitter, on } from 'events';
import { z } from 'zod';

const messageSchema = z.object({
  id: z.string(),
  text: z.string(),
  userId: z.string(),
});

@Injectable()
@Router()
export class ChatRouter {
  private readonly events = new EventEmitter();

  @Subscription({ output: messageSchema })
  async *onMessage(@Signal() signal?: AbortSignal) {
    for await (const [message] of on(this.events, 'message', { signal })) {
      yield message;
    }
  }

  emitMessage(message: z.infer<typeof messageSchema>) {
    this.events.emit('message', message);
  }
}

Parameter Decorators

DecoratorDescription
@Signal()

Extracts the AbortSignal for detecting client disconnection

@Input()

Extracts the validated input (including lastEventId for reconnection)

@Options()
Extracts the full procedure options object
@Context()
Extracts the tRPC context

On this page