Building a Connection Manager for Chat and Notifications

Building a Connection Manager for Chat and Notifications

Scale your real-time apps. Learn how to track active users, handle multiple connection rooms, and broadcast messages efficiently.

Building a Connection Manager

Individual WebSockets are easy, but managing 1,000 users in 50 different chat rooms is a challenge. You need a centralized way to track who is connected and where their messages should go.

In this lesson, we build a ConnectionManager—the standard pattern for real-time FastAPI applications.


1. The ConnectionManager Pattern

Think of this as the "Switchboard" of your app. It maintains a list of active WebSocket objects and provides methods to send data to them.

from typing import List
from fastapi import WebSocket

class ConnectionManager:
    def __init__(self):
        # List to track active connections
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

# Global Manager Instance
manager = ConnectionManager()

2. Implementing Chat Rooms (Topic-Based)

In a real app, users shouldn't see every message. They should only see messages for their specific "Room ID." We can update our tracker to be a dictionary: {room_id: [List of WebSockets]}.

class RoomManager:
    def __init__(self):
        self.rooms: dict[str, List[WebSocket]] = {}

    async def connect(self, room_id: str, websocket: WebSocket):
        await websocket.accept()
        if room_id not in self.rooms:
            self.rooms[room_id] = []
        self.rooms[room_id].append(websocket)

    async def broadcast_to_room(self, room_id: str, message: str):
        for connection in self.rooms.get(room_id, []):
            await connection.send_text(message)

3. Real-Time Notifications

WebSockets aren't just for chats. They are perfect for "Toast" notifications.

  • Use Case: A user's background task (Module 11) finishes processing a video. The server uses the ConnectionManager to find the user's active WebSocket and sends: "Your video is ready!".

4. Scaling WebSockets (The "Redis Pub/Sub" Secret)

If you have two servers, User A might be connected to Server 1 and User B to Server 2. If User A sends a message, Server 1 doesn't know about User B.

The Solution: Use Redis Pub/Sub.

  • Server 1 publishes the message to a "Redis Channel."
  • Server 2 (and 100 other servers) subscribe to that channel.
  • Server 2 receives the message from Redis and pushes it to User B.

Visualizing Multi-Server Broadcast

graph TD
    U1["User A"] -- "WS" --> S1["Server 1"]
    U2["User B"] -- "WS" --> S2["Server 2"]
    
    S1 -- "Publish Message" --> R["Redis (Message Broker)"]
    R -- "Distribute" --> S1
    R -- "Distribute" --> S2
    
    S2 -- "Push to User B" --> U2

Summary

  • ConnectionManager: Essential for tracking active users.
  • Dictionaries: Use them to handle multiple "Rooms" or "Topics."
  • Push Notifications: Move from polling (GET /check-status) to pushing (WS update).
  • Redis: The key to scaling real-time apps across multiple servers.

In the next lesson, we wrap up Module 12 with Exercises on building a real-time notification engine.


Exercise: The Notify Guard

Design a manager for a Stock Trading App.

  1. Users "Subscribe" to specific symbols (e.g., "BTC", "AAPL").
  2. When the price of "BTC" changes, how does the manager know which users to notify?
  3. Sketch the dictionary structure that would hold these subscriptions.

Subscribe to our newsletter

Get the latest posts delivered right to your inbox.

Subscribe on LinkedIn