文章目录
系列导航
停止生成:客户端
用户点击「停止」→ abortController.abort("user")。fetch 会 reject AbortError,需在业务层区分用户取消与异常。
try {
await streamChat(body, { signal: ac.signal, onEvent: ... });
} catch (e) {
if ((e as Error).name === "AbortError") {
useChatStore.getState().patchMessage(assistantId, {
status: "aborted",
abortReason: "user",
});
return;
}
useChatStore.getState().patchMessage(assistantId, {
status: "error",
error: { code: "E_NETWORK", message: (e as Error).message, retryable: true },
});
}
不要删除半截 assistant 消息:用户可能需要复制已生成部分或点击「继续生成」(见文末续写)。
级联中断:服务端
仅断浏览器连接不够:若 Bun 仍向上游读流,会继续计费。应把 req.signal 传给 DeepSeek 的 fetch。
export async function handleChat(req: Request): Promise<Response> {
const upstreamSignal = req.signal;
const bodyStream = new ReadableStream({
async start(controller) {
const enc = new TextEncoder();
const send = (obj: unknown) =>
controller.enqueue(enc.encode(`data: ${JSON.stringify(obj)}\n\n`));
const onAbort = () => {
/* 可选:通知上游,若 fetch 已带 signal 会自动中断 reader */
};
upstreamSignal.addEventListener("abort", onAbort);
try {
for await (const delta of deepseekChatStream(messages, { signal: upstreamSignal })) {
send({ type: "delta", text: delta });
}
send({ type: "done" });
} catch (e) {
if ((e as Error).name === "AbortError") {
send({ type: "error", code: "E_ABORTED", message: "aborted", retryable: false });
} else {
send({
type: "error",
code: "E_INTERNAL",
message: (e as Error).message,
retryable: true,
});
}
} finally {
upstreamSignal.removeEventListener("abort", onAbort);
controller.close();
}
},
});
return new Response(bodyStream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
deepseekChatStream内部fetch必须传同一个signal,并在reader.read()循环中捕获 abort。
useChatController:统一 send / stop / regenerate
import { useRef, useCallback } from "react";
import { v4 as uuid } from "uuid";
import { streamChat } from "../services/chatService";
import { useChatStore } from "../store/chatStore";
export function useChatController() {
const acRef = useRef<AbortController | null>(null);
const stop = useCallback(() => {
acRef.current?.abort("user");
acRef.current = null;
useChatStore.getState().setPhase("idle");
}, []);
const send = useCallback(async (content: string) => {
if (useChatStore.getState().phase !== "idle") return; // 并发锁,见下文
const assistantId = uuid();
const requestId = uuid();
acRef.current = new AbortController();
const { appendUserMessage, appendAssistantPlaceholder, patchMessage, setPhase } =
useChatStore.getState();
setPhase("submitting");
appendUserMessage({
id: uuid(),
sessionId: "...",
role: "user",
content,
createdAt: Date.now(),
});
appendAssistantPlaceholder({
id: assistantId,
sessionId: "...",
role: "assistant",
content: "",
createdAt: Date.now(),
});
setPhase("streaming");
try {
await streamChat(
{
sessionId: "...",
messages: useChatStore.getState().messages.map((m) => ({
role: m.role,
content: m.content,
})),
stream: true,
model: useChatStore.getState().settings.model,
temperature: useChatStore.getState().settings.temperature,
requestId,
},
{
signal: acRef.current.signal,
onEvent: (e) => {
if (e.type === "delta") {
const m = useChatStore.getState().messages.find((x) => x.id === assistantId);
patchMessage(assistantId, { content: (m?.content ?? "") + e.text });
}
if (e.type === "done") setPhase("idle");
if (e.type === "error") {
patchMessage(assistantId, {
status: "error",
error: { code: e.code, message: e.message, retryable: e.retryable },
});
setPhase("idle");
}
},
}
);
} catch {
/* 已在 streamChat 外层或 onEvent 处理 */
} finally {
acRef.current = null;
}
}, []);
return { send, stop };
}
错误分类与展示
| code | 含义 | retryable | 典型来源 |
|---|---|---|---|
E_NETWORK | 断网、DNS、Failed to fetch | 是 | fetch 抛错 |
E_TIMEOUT | 软超时(久无 delta)或网关硬超时 | 是 | 客户端定时器 / 504 |
E_RATE_LIMIT | 限流 429 | 是(退避后) | 上游 API |
E_CONTEXT_TOO_LONG | 上下文过长 400 类 | 否 | 模型返回 |
E_INTERNAL | 5xx 或未分类 | 视情况 | 服务端 / 上游 |
网络层:TypeError: Failed to fetch → 映射 E_NETWORK,UI 展示「网络异常」+ 重试按钮。
SSE 协议层:解析到 {"type":"error",...} → 更新消息 status: error,保留已累积 content。
上游归一化(Bun 内):捕获 DeepSeek 响应体 JSON 中的 error.message,映射为 { code, message, retryable } 再写入 SSE,避免把原始堆栈吐给前端。
超时:硬与软
硬超时(服务端):对整段上游 fetch 包 AbortSignal.timeout(120_000)(或 setTimeout + abort),防止连接悬挂。
软超时(客户端):若 N 秒(如 30s)未收到任何 delta,认为卡住,本地 abort() 并 patchMessage(..., { error: { code:'E_TIMEOUT', ... } })。
let last = Date.now();
const STALL_MS = 30_000;
const tick = window.setInterval(() => {
if (Date.now() - last > STALL_MS) {
acRef.current?.abort("stall");
clearInterval(tick);
}
}, 1000);
// 每次 on delta: last = Date.now();
防抖、并发锁与幂等
- 按钮双态:
phase !== 'idle'时禁用发送或显示「停止」并绑定stop()。 - 函数级防抖:
send开头若已在streaming则直接 return(见useChatController)。 - 幂等
request_id:每条请求带uuid();数据库messages.request_id TEXT UNIQUE,插入占位时用ON CONFLICT (request_id) DO NOTHING或先查重,避免用户连点或网络重试导致双份 user/assistant。
INSERT INTO messages (id, session_id, role, content, status, request_id)
VALUES ($1, $2, 'assistant', '', 'streaming', $3)
ON CONFLICT (request_id) DO NOTHING
RETURNING id;
若 RETURNING 为空,说明重复请求,可直接结束或返回「已在处理」SSE。
续写(Continue)
当回复被停止或超长截断,可提供「继续生成」:
- 把当前 assistant 已有内容作为最后一条 assistant 放入
messages,再发一条 user:请从上次中断处继续,不要重复已输出内容。 - 或使用模型自带的
suffix/ 部分厂商的continue参数(视 DeepSeek 当时 API 文档而定)。
关键是 system prompt 约束:续写不得重复大段前文。
刷新与僵尸 streaming
用户刷新页面时,DB 中可能仍有 status='streaming'。服务端定时任务:
UPDATE messages
SET status = 'aborted', content = content
WHERE status = 'streaming' AND created_at < now() - interval '10 minutes';
前端加载历史后,对这些消息显示「已中断」+「继续生成」。
进阶:离线待发队列(简述)
navigator.onLine 为 false 时,将 send 请求写入 IndexedDB,上线后顺序重放;每条仍带同一 requestId 以保证幂等。
小结
- Abort 必须前后端贯通,避免白烧 token。
- 错误分层映射到统一
code + retryable,UI 才可一致。 - 超时 + 幂等 + 并发锁 是生产环境三连,与流式体验同样重要。