
Building a Connection Manager for Chat and Notifications
Scale your real-time apps. Learn how to track active users, handle multiple connection rooms, and broadcast messages efficiently.
Building a Connection Manager
Individual WebSockets are easy, but managing 1,000 users in 50 different chat rooms is a challenge. You need a centralized way to track who is connected and where their messages should go.
In this lesson, we build a ConnectionManager—the standard pattern for real-time FastAPI applications.
1. The ConnectionManager Pattern
Think of this as the "Switchboard" of your app. It maintains a list of active WebSocket objects and provides methods to send data to them.
from typing import List
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
# List to track active connections
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
# Global Manager Instance
manager = ConnectionManager()
2. Implementing Chat Rooms (Topic-Based)
In a real app, users shouldn't see every message. They should only see messages for their specific "Room ID." We can update our tracker to be a dictionary: {room_id: [List of WebSockets]}.
class RoomManager:
def __init__(self):
self.rooms: dict[str, List[WebSocket]] = {}
async def connect(self, room_id: str, websocket: WebSocket):
await websocket.accept()
if room_id not in self.rooms:
self.rooms[room_id] = []
self.rooms[room_id].append(websocket)
async def broadcast_to_room(self, room_id: str, message: str):
for connection in self.rooms.get(room_id, []):
await connection.send_text(message)
3. Real-Time Notifications
WebSockets aren't just for chats. They are perfect for "Toast" notifications.
- Use Case: A user's background task (Module 11) finishes processing a video. The server uses the
ConnectionManagerto find the user's active WebSocket and sends:"Your video is ready!".
4. Scaling WebSockets (The "Redis Pub/Sub" Secret)
If you have two servers, User A might be connected to Server 1 and User B to Server 2. If User A sends a message, Server 1 doesn't know about User B.
The Solution: Use Redis Pub/Sub.
- Server 1 publishes the message to a "Redis Channel."
- Server 2 (and 100 other servers) subscribe to that channel.
- Server 2 receives the message from Redis and pushes it to User B.
Visualizing Multi-Server Broadcast
graph TD
U1["User A"] -- "WS" --> S1["Server 1"]
U2["User B"] -- "WS" --> S2["Server 2"]
S1 -- "Publish Message" --> R["Redis (Message Broker)"]
R -- "Distribute" --> S1
R -- "Distribute" --> S2
S2 -- "Push to User B" --> U2
Summary
ConnectionManager: Essential for tracking active users.- Dictionaries: Use them to handle multiple "Rooms" or "Topics."
- Push Notifications: Move from polling (
GET /check-status) to pushing (WS update). - Redis: The key to scaling real-time apps across multiple servers.
In the next lesson, we wrap up Module 12 with Exercises on building a real-time notification engine.
Exercise: The Notify Guard
Design a manager for a Stock Trading App.
- Users "Subscribe" to specific symbols (e.g.,
"BTC","AAPL"). - When the price of "BTC" changes, how does the manager know which users to notify?
- Sketch the dictionary structure that would hold these subscriptions.