Skip to content

08-WebSocket

Python 3.11+

本章讲解 FastAPI WebSocket 实时通信。


第一部分:基础 WebSocket

1.1 实际场景

需要实现实时聊天功能,客户端和服务器之间保持长连接,实时推送消息。

问题:如何使用 WebSocket 实现实时通信?

1.2 基本使用

python
from fastapi import FastAPI, WebSocket

app: FastAPI = FastAPI()


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    try:
        while True:
            data: str = await websocket.receive_text()
            await websocket.send_text(f"Echo: {data}")
    except Exception:
        await websocket.close()

1.3 客户端示例

javascript
const ws = new WebSocket('ws://localhost:8000/ws');

ws.onmessage = (event) => {
    console.log('Received:', event.data);
};

ws.send('Hello Server');

第二部分:连接管理

2.1 实际场景

聊天室有多个用户,需要管理所有 WebSocket 连接,支持广播消息。

问题:如何管理多个 WebSocket 连接?

2.2 连接管理器

python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List


class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []
    
    async def connect(self, websocket: WebSocket) -> None:
        await websocket.accept()
        self.active_connections.append(websocket)
    
    def disconnect(self, websocket: WebSocket) -> None:
        self.active_connections.remove(websocket)
    
    async def send_personal_message(self, message: str, websocket: WebSocket) -> None:
        await websocket.send_text(message)
    
    async def broadcast(self, message: str) -> None:
        for connection in self.active_connections:
            await connection.send_text(message)


manager: ConnectionManager = ConnectionManager()


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data: str = await websocket.receive_text()
            await manager.broadcast(f"Broadcast: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)

第三部分:完整示例

python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict
import json

app: FastAPI = FastAPI()


# ==================== 连接管理 ====================
class ConnectionManager:
    def __init__(self):
        self.connections: Dict[str, WebSocket] = {}
    
    async def connect(self, client_id: str, websocket: WebSocket) -> None:
        await websocket.accept()
        self.connections[client_id] = websocket
    
    def disconnect(self, client_id: str) -> None:
        if client_id in self.connections:
            del self.connections[client_id]
    
    async def send(self, client_id: str, message: str) -> None:
        if client_id in self.connections:
            await self.connections[client_id].send_text(message)
    
    async def broadcast(self, message: str) -> None:
        for ws in self.connections.values():
            await ws.send_text(message)


manager: ConnectionManager = ConnectionManager()

# ==================== WebSocket 路由 ====================
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(client_id: str, websocket: WebSocket):
    await manager.connect(client_id, websocket)
    try:
        while True:
            data: str = await websocket.receive_text()
            message: dict = json.loads(data)
            
            if message.get("type") == "broadcast":
                await manager.broadcast(f"[{client_id}] {message.get('content')}")
            elif message.get("type") == "private":
                target: str = message.get("to", "")
                content: str = f"[{client_id} -> {target}] {message.get('content')}"
                await manager.send(target, content)
                await manager.send(client_id, content)
    except WebSocketDisconnect:
        manager.disconnect(client_id)
        await manager.broadcast(f"Client {client_id} disconnected")

# ==================== HTTP 路由 ====================
@app.get("/")
def root() -> dict[str, str]:
    return {"message": "WebSocket server running"}

第四部分:L3 专家层

4.1 WebSocket 握手过程(HTTP Upgrade)

WebSocket 连接始于一次 HTTP 协议的 Upgrade 请求,完成握手后升级为独立的 WebSocket 协议:

Client                              Server
  │── GET /ws ─────────────────────→│
  │   HTTP/1.1                      │
  │   Upgrade: websocket            │
  │   Connection: Upgrade           │
  │   Sec-WebSocket-Key: dGhlI...   │  ← 随机 16 字节 Base64
  │   Sec-WebSocket-Version: 13     │
  │                                 │
  │←─ 101 Switching Protocols ──────│
  │   HTTP/1.1                      │
  │   Upgrade: websocket            │
  │   Connection: Upgrade           │
  │   Sec-WebSocket-Accept: s3pP... │  ← SHA1(key + magic) 的 Base64
  │                                 │
  │═══════════════════════════════════│  ← 此后使用 WebSocket 帧通信
  │◄══════ 二进制帧 / 文本帧 ═══════►│
python
# Starlette 握手验证核心逻辑(简化版)
MAGIC_STRING = "258EAFA5-E914-47DA-95CA-5AB5AC88DD11"

def compute_accept_key(key: str) -> str:
    import hashlib
    import base64
    digest = hashlib.sha1((key + MAGIC_STRING).encode()).digest()
    return base64.b64encode(digest).decode()

# FastAPI /ws 路由被调用时的流程:
# 1. ASGI scope["type"] == "websocket"
# 2. FastAPI 匹配 WebSocket 路由
# 3. 调用 websocket.accept() 时发送 101 响应
# 4. 此后通过 ASGI websocket.send / websocket.receive 消息类型通信
握手 Header作用
Upgrade: websocket声明协议升级意图
Connection: Upgrade告知中间件此连接需要升级
Sec-WebSocket-Key客户端生成的随机值,防止非 WebSocket 客户端误连
Sec-WebSocket-Version协议版本(当前为 13)
Sec-WebSocket-Accept服务端验证签名,证明理解 WebSocket 协议
Sec-WebSocket-Protocol子协议协商(如 graphql-ws

4.2 帧结构(FIN / Opcode / Mask / Payload)

WebSocket 数据传输基于帧(Frame),每帧结构如下:

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len == 126/127)  |
| |1|2|3|       |K|             |                                |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
 4               5               6                               7
+-+-+-+-+-------+-+-------------+-------------------------------+
|     Extended payload length (cont.)   | Masking-key (4 bytes) |
+ - - - - - - - - - - - - - - - +-------------------------------+
| Masking-key (cont.)           |       Actual payload data     |
+-------------------------------+ - - - - - - - - - - - - - - - +
:                       Actual payload data                     :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
字段位数说明
FIN1 bit1=消息最后一帧,0=还有后续帧(分片)
RSV1/2/33 bits保留位,默认为 0(扩展协议使用)
Opcode4 bits帧类型:0x0 续帧, 0x1 文本, 0x2 二进制, 0x8 关闭, 0x9 Ping, 0xA Pong
MASK1 bit1=客户端到服务端(必须掩码),0=服务端到客户端
Payload Length7/23/71 bits数据长度:0-125 直接存储,126 用后续 2 字节,127 用后续 8 字节
Masking Key4 bytes仅客户端帧有,用于对 payload 做 XOR 解码
Payload Data可变实际数据(文本需 UTF-8 编码)
python
# 客户端帧掩码解码(RFC 6455)
def unmask_payload(masked_data: bytes, masking_key: bytes) -> bytes:
    """客户端发送的帧数据需要与 masking_key 做 XOR 运算解码"""
    return bytes(
        masked_data[i] ^ masking_key[i % 4]
        for i in range(len(masked_data))
    )

# 示例:Opcode 含义
OPCODES: dict[int, str] = {
    0x0: "continuation",   # 分片消息的后续帧
    0x1: "text",            # 文本帧(UTF-8)
    0x2: "binary",          # 二进制帧
    0x8: "close",           # 连接关闭帧
    0x9: "ping",            # 心跳探测
    0xA: "pong",            # 心跳响应
}

4.3 连接保活(Ping/Pong)

WebSocket 基于 TCP,需要应用层心跳检测连接存活:

Client                              Server
  │                                 │
  │── Ping (0x9, optional data) ──→│
  │                                 │
  │←─ Pong (0xA, same data) ──────│  ← 必须原样返回 data
  │                                 │
  │                                 │
  │  ... 空闲一段时间 ...             │
  │                                 │
  │←─ Ping ────────────────────────│  ← 服务端主动探测
  │                                 │
  │── Pong ───────────────────────→│  ← 客户端回应
  │                                 │
  │  (超时未收到 Pong → 判定断连)    │
python
import asyncio
from fastapi import FastAPI, WebSocket

app: FastAPI = FastAPI()


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket) -> None:
    await websocket.accept()
    
    # 启动心跳任务
    async def ping_loop() -> None:
        try:
            while True:
                await asyncio.sleep(30)  # 每 30 秒发送一次 Ping
                await websocket.send_bytes(b"")  # 触发 ASGI ping
        except Exception:
            pass
    
    ping_task: asyncio.Task = asyncio.create_task(ping_loop())
    
    try:
        while True:
            data: str = await websocket.receive_text()
            await websocket.send_text(f"Echo: {data}")
    except Exception:
        pass
    finally:
        ping_task.cancel()
        await websocket.close()
机制说明
Ping 帧 (0x9)心跳探测,可携带最多 125 字节数据
Pong 帧 (0xA)必须回复,data 字段与 Ping 一致
超时检测发送 Ping 后启动定时器,超时未收到 Pong 视为断连
代理层问题Nginx 等代理可能有 proxy_read_timeout,超时后主动断开 TCP
ws vs wsswss:// 使用 TLS 加密,防中间人探测/劫持

4.4 性能考量

操作耗时级别说明
HTTP 握手~1ms一次完整的 HTTP 请求-响应
帧封装/解封装~0.001ms/帧纯内存操作
客户端掩码(XOR)~0.01ms/帧逐字节异或,125 字节以内可忽略
Ping/Pong~0.001ms控制帧,无 payload 开销
单连接消息吞吐~10000 msg/s受限于 asyncio 事件循环
并发连接数~10000-50000受限于文件描述符和内存

4.5 设计动机

设计选择动机
HTTP Upgrade 握手复用现有 HTTP 基础设施(端口、防火墙、代理)
客户端帧必须掩码防止早期 HTTP/1.1 中间缓存污染攻击
服务端帧不掩码服务端可控,无需额外 XOR 开销
Ping/Pong 控制帧应用层心跳,与 TCP Keep-Alive 互补(TCP 检测网络层,Ping/Pong 检测应用层)
FastAPI 的 websocket.accept()显式接受连接,允许在 accept 前做认证检查(如 query 参数中的 token)

4.6 知识关联

WebSocket
├── 握手协议
│   ├── HTTP/1.1 Upgrade 请求
│   ├── Sec-WebSocket-Key → Sec-WebSocket-Accept (SHA1)
│   └── 101 Switching Protocols

├── 帧格式(RFC 6455)
│   ├── FIN:分片控制
│   ├── Opcode:帧类型(text/binary/close/ping/pong)
│   ├── MASK:客户端掩码(XOR)
│   └── Payload Length:变长编码(7/23/71 bits)

├── 连接管理
│   ├── accept():握手接受
│   ├── receive_text()/receive_bytes():接收消息
│   ├── send_text()/send_bytes():发送消息
│   └── close():优雅关闭(发送 Close 帧)

├── 保活机制
│   ├── Ping/Pong 控制帧
│   ├── 心跳间隔(30-60s)
│   └── 超时检测(断连判定)

├── 安全考量
│   ├── wss:// 加密传输
│   ├── 认证(accept 前验证 token)
│   └── 掩码防缓存污染

└── 生产实践
    ├── 连接管理器(广播/私聊)
    ├── 消息序列化(JSON / MessagePack)
    ├── 分片处理(大消息拆分)
    └── Nginx 配置(proxy_read_timeout, upgrade header)

总结

知识点说明
WebSocket实时通信
accept/close连接管理
receive_text接收消息
send_text发送消息
连接管理器广播和私聊