Overview

GraphQL subscriptions enable real-time communication between clients and servers. Unlike queries and mutations, subscriptions maintain a persistent connection and push updates to clients as data changes.

Subscription Basics

Subscription Definition

type Subscription {
  userCreated: User!
  postUpdated(id: ID!): Post!
  messageReceived(chatId: ID!): Message!
}

Client Usage

const SUBSCRIBE_TO_USER_UPDATES = gql`
  subscription OnUserCreated {
    userCreated {
      id
      name
      email
    }
  }
`;
 
const { data, loading, error } = useSubscription(SUBSCRIBE_TO_USER_UPDATES, {
  onSubscriptionData: ({ subscriptionData }) => {
    console.log('New user created:', subscriptionData.data.userCreated);
  }
});

Transport Protocols

WebSocket

  • Standard: Most common transport for subscriptions
  • Connection: Persistent full-duplex connection
  • Protocol: graphql-ws or subscriptions-transport-ws

Server-Sent Events (SSE)

  • HTTP-based: Uses regular HTTP connections
  • Unidirectional: Server to client only
  • Fallback: Good for environments without WebSocket support

HTTP Polling

  • Fallback: Regular HTTP requests at intervals
  • Compatibility: Works everywhere
  • Efficiency: Less efficient than WebSocket

Apollo Server Implementation

Basic Setup

const { ApolloServer, gql, PubSub } = require('apollo-server');
 
const pubsub = new PubSub();
 
const typeDefs = gql`
  type Subscription {
    postAdded: Post!
  }
  
  type Mutation {
    addPost(title: String!, content: String!): Post!
  }
`;
 
const resolvers = {
  Subscription: {
    postAdded: {
      subscribe: () => pubsub.asyncIterator(['POST_ADDED'])
    }
  },
  
  Mutation: {
    addPost: async (_, { title, content }) => {
      const post = { id: Date.now(), title, content };
      
      // Publish event
      pubsub.publish('POST_ADDED', { postAdded: post });
      
      return post;
    }
  }
};
 
const server = new ApolloServer({
  typeDefs,
  resolvers,
  context: ({ req, res }) => ({ req, res, pubsub })
});

PubSub Implementation

// In-memory PubSub (development)
const pubsub = new PubSub();
 
// Redis PubSub (production)
const { RedisPubSub } = require('graphql-redis-subscriptions');
const pubsub = new RedisPubSub({
  connection: {
    host: process.env.REDIS_HOST,
    port: process.env.REDIS_PORT,
    password: process.env.REDIS_PASSWORD
  }
});
 
// Custom PubSub
class CustomPubSub {
  constructor() {
    this.subscriptions = new Map();
  }
  
  async publish(triggerName, payload) {
    const subscribers = this.subscriptions.get(triggerName) || [];
    await Promise.all(
      subscribers.map(async ({ resolve, reject }) => {
        try {
          resolve(payload);
        } catch (error) {
          reject(error);
        }
      })
    );
  }
  
  async subscribe(triggerName) {
    return new Promise((resolve, reject) => {
      if (!this.subscriptions.has(triggerName)) {
        this.subscriptions.set(triggerName, []);
      }
      
      this.subscriptions.get(triggerName).push({ resolve, reject });
    });
  }
}

Subscription Resolvers

Basic Subscription Resolver

const resolvers = {
  Subscription: {
    userCreated: {
      subscribe: (_, args, context, info) => {
        return context.pubsub.asyncIterator('USER_CREATED');
      }
    }
  }
};

Filtered Subscriptions

const resolvers = {
  Subscription: {
    postUpdated: {
      subscribe: (_, { id }, context, info) => {
        return context.pubsub.asyncIterator(`POST_UPDATED_${id}`);
      }
    },
    
    // With filtering
    userPostsUpdated: {
      subscribe: withFilter(
        (_, args, context, info) => {
          return context.pubsub.asyncIterator('POST_UPDATED');
        },
        (payload, variables, context, info) => {
          // Filter based on user ownership
          return payload.postUpdated.authorId === context.user.id;
        }
      )
    }
  }
};

Subscription with Arguments

const typeDefs = gql`
  type Subscription {
    chatMessages(chatId: ID!): Message!
    userActivity(userId: ID!): Activity!
  }
`;
 
const resolvers = {
  Subscription: {
    chatMessages: {
      subscribe: (_, { chatId }, context, info) => {
        // Validate access to chat
        if (!canAccessChat(context.user.id, chatId)) {
          throw new ForbiddenError('Access denied');
        }
        
        return context.pubsub.asyncIterator(`CHAT_MESSAGES_${chatId}`);
      }
    }
  }
};

Client-Side Implementation

Apollo Client Setup

import { ApolloClient, InMemoryCache, split, HttpLink } from '@apollo/client';
import { WebSocketLink } from '@apollo/client/link/ws';
import { getMainDefinition } from '@apollo/client/utilities';
 
const httpLink = new HttpLink({
  uri: 'http://localhost:4000/graphql'
});
 
const wsLink = new WebSocketLink({
  uri: 'ws://localhost:4000/graphql',
  options: {
    reconnect: true,
    connectionParams: {
      authToken: localStorage.getItem('authToken')
    }
  }
});
 
const splitLink = split(
  ({ query }) => {
    const definition = getMainDefinition(query);
    return (
      definition.kind === 'OperationDefinition' &&
      definition.operation === 'subscription'
    );
  },
  wsLink,
  httpLink
);
 
const client = new ApolloClient({
  link: splitLink,
  cache: new InMemoryCache()
});

React Hook Usage

import { useSubscription } from '@apollo/client';
 
function UserList() {
  const { data, loading, error } = useSubscription(
    gql`
      subscription OnUserCreated {
        userCreated {
          id
          name
          email
        }
      }
    `,
    {
      onSubscriptionData: ({ subscriptionData }) => {
        // Handle real-time updates
        console.log('New user:', subscriptionData.data.userCreated);
      },
      onError: (error) => {
        console.error('Subscription error:', error);
      }
    }
  );
 
  if (loading) return <p>Connecting...</p>;
  if (error) return <p>Error: {error.message}</p>;
 
  return (
    <div>
      <h2>Real-time User Updates</h2>
      {data?.userCreated && (
        <div>New user: {data.userCreated.name}</div>
      )}
    </div>
  );
}

Subscription Management

function ChatRoom({ chatId }) {
  const [messages, setMessages] = useState([]);
  
  const { data, loading } = useSubscription(CHAT_MESSAGES, {
    variables: { chatId },
    onSubscriptionData: ({ subscriptionData }) => {
      const newMessage = subscriptionData.data.messageReceived;
      setMessages(prev => [...prev, newMessage]);
    }
  });
  
  // Cleanup handled automatically by Apollo Client
  return (
    <div>
      {messages.map(message => (
        <div key={message.id}>{message.content}</div>
      ))}
    </div>
  );
}

Authentication and Authorization

Context with Authentication

const server = new ApolloServer({
  typeDefs,
  resolvers,
  context: async ({ req, connection }) => {
    // Handle both HTTP and WebSocket connections
    if (connection) {
      // WebSocket connection
      return {
        user: connection.context.user,
        pubsub
      };
    } else {
      // HTTP connection
      const token = req.headers.authorization;
      const user = await getUserFromToken(token);
      
      return { user, pubsub };
    }
  },
  
  subscriptions: {
    onConnect: async (connectionParams, webSocket, context) => {
      const token = connectionParams.authToken;
      const user = await getUserFromToken(token);
      
      return { user };
    },
    
    onDisconnect: async (webSocket, context) => {
      // Cleanup resources
      console.log('Client disconnected');
    }
  }
});

Subscription Authorization

const resolvers = {
  Subscription: {
    privateMessages: {
      subscribe: (_, args, { user, pubsub }) => {
        if (!user) {
          throw new AuthenticationError('Not authenticated');
        }
        
        return pubsub.asyncIterator(`PRIVATE_MESSAGES_${user.id}`);
      }
    },
    
    teamUpdates: {
      subscribe: (_, { teamId }, { user, pubsub }) => {
        if (!canAccessTeam(user.id, teamId)) {
          throw new ForbiddenError('Access denied');
        }
        
        return pubsub.asyncIterator(`TEAM_UPDATES_${teamId}`);
      }
    }
  }
};

Error Handling

Subscription Errors

const resolvers = {
  Subscription: {
    dataUpdates: {
      subscribe: (_, args, context, info) => {
        try {
          return context.pubsub.asyncIterator('DATA_UPDATES');
        } catch (error) {
          console.error('Subscription setup failed:', error);
          throw new Error('Failed to establish subscription');
        }
      }
    }
  }
};

Connection Errors

const wsLink = new WebSocketLink({
  uri: 'ws://localhost:4000/graphql',
  options: {
    reconnect: true,
    reconnectionAttempts: 5,
    connectionParams: {
      authToken: localStorage.getItem('authToken')
    },
    onError: (error) => {
      console.error('WebSocket connection error:', error);
    },
    onReconnect: () => {
      console.log('Reconnected to WebSocket');
    }
  }
});

Graceful Degradation

function useSubscriptionWithFallback(query, options) {
  const subscription = useSubscription(query, options);
  
  // Fallback to polling if subscription fails
  const [pollingData, setPollingData] = useState(null);
  
  useEffect(() => {
    if (subscription.error) {
      const interval = setInterval(async () => {
        try {
          const result = await client.query({ query });
          setPollingData(result.data);
        } catch (error) {
          console.error('Polling failed:', error);
        }
      }, 5000);
      
      return () => clearInterval(interval);
    }
  }, [subscription.error]);
  
  return subscription.error ? 
    { ...subscription, data: pollingData } : 
    subscription;
}

Performance Optimization

Connection Pooling

// Server-side connection limits
const server = new ApolloServer({
  subscriptions: {
    path: '/graphql',
    keepAlive: 30000, // 30 seconds
    onConnect: (connectionParams) => {
      // Limit concurrent connections per user
      return checkConnectionLimit(connectionParams.userId);
    }
  }
});

Payload Optimization

const resolvers = {
  Subscription: {
    userUpdated: {
      subscribe: withFilter(
        (_, args, context) => {
          return context.pubsub.asyncIterator('USER_UPDATED');
        },
        (payload, variables, context) => {
          // Only send updates relevant to the subscriber
          return payload.userUpdated.id === variables.userId;
        }
      )
    }
  }
};

Batching Updates

class BatchedPubSub {
  constructor(pubsub, batchDelay = 100) {
    this.pubsub = pubsub;
    this.batchDelay = batchDelay;
    this.batches = new Map();
  }
  
  async publish(triggerName, payload) {
    if (!this.batches.has(triggerName)) {
      this.batches.set(triggerName, []);
      
      setTimeout(() => {
        const batch = this.batches.get(triggerName);
        this.batches.delete(triggerName);
        
        // Publish batched updates
        this.pubsub.publish(triggerName, { updates: batch });
      }, this.batchDelay);
    }
    
    this.batches.get(triggerName).push(payload);
  }
}

Testing Subscriptions

Unit Testing

const { createTestClient } = require('apollo-server-testing');
 
describe('Subscriptions', () => {
  let pubsub;
  
  beforeEach(() => {
    pubsub = new PubSub();
  });
  
  it('should publish user created events', async () => {
    const { subscribe } = createTestClient(server);
    
    const subscription = subscribe({
      query: `
        subscription {
          userCreated {
            id
            name
          }
        }
      `
    });
    
    // Trigger mutation that publishes event
    await createUser({ name: 'John Doe' });
    
    const result = await subscription.next();
    expect(result.value.data.userCreated.name).toBe('John Doe');
  });
});

Integration Testing

const WebSocket = require('ws');
const { SubscriptionClient } = require('subscriptions-transport-ws');
 
describe('WebSocket Subscriptions', () => {
  let client;
  
  beforeAll(() => {
    client = new SubscriptionClient(
      'ws://localhost:4000/graphql',
      {},
      WebSocket
    );
  });
  
  afterAll(() => {
    client.close();
  });
  
  it('should receive real-time updates', (done) => {
    client.request({
      query: `
        subscription {
          postAdded {
            id
            title
          }
        }
      `
    }).subscribe({
      next: (result) => {
        expect(result.data.postAdded.title).toBeDefined();
        done();
      }
    });
    
    // Trigger the event
    createPost({ title: 'Test Post' });
  });
});

Production Considerations

Scaling

  • Load Balancing: Distribute WebSocket connections
  • Redis: Use Redis for cross-server pub/sub
  • Clustering: Handle subscriptions across multiple server instances

Monitoring

  • Connection Count: Monitor active connections
  • Message Rate: Track subscription message frequency
  • Error Rates: Monitor subscription failures

Security

  • Rate Limiting: Limit subscription frequency
  • Connection Limits: Restrict connections per user/IP
  • Timeout: Set connection timeouts

Common Patterns

Real-time Chat

type Subscription {
  messageReceived(chatId: ID!): Message!
  userJoined(chatId: ID!): User!
  userLeft(chatId: ID!): User!
}

Live Data Updates

type Subscription {
  stockPriceUpdated(symbol: String!): StockPrice!
  weatherUpdated(location: String!): Weather!
  sensorDataUpdated(sensorId: ID!): SensorData!
}

Collaborative Editing

type Subscription {
  documentUpdated(documentId: ID!): Document!
  userCursorMoved(documentId: ID!, userId: ID!): CursorPosition!
  userSelectionChanged(documentId: ID!, userId: ID!): Selection!
}

0 items under this folder.