Skip to content

Go back

从零实现AI助手 - 中断、错误与防抖《四》

Published:  at 

文章目录

系列导航


停止生成:客户端

用户点击「停止」→ abortController.abort("user")fetch 会 reject AbortError,需在业务层区分用户取消异常

try {
  await streamChat(body, { signal: ac.signal, onEvent: ... });
} catch (e) {
  if ((e as Error).name === "AbortError") {
    useChatStore.getState().patchMessage(assistantId, {
      status: "aborted",
      abortReason: "user",
    });
    return;
  }
  useChatStore.getState().patchMessage(assistantId, {
    status: "error",
    error: { code: "E_NETWORK", message: (e as Error).message, retryable: true },
  });
}

不要删除半截 assistant 消息:用户可能需要复制已生成部分或点击「继续生成」(见文末续写)。


级联中断:服务端

仅断浏览器连接不够:若 Bun 仍向上游读流,会继续计费。应把 req.signal 传给 DeepSeek 的 fetch

export async function handleChat(req: Request): Promise<Response> {
  const upstreamSignal = req.signal;

  const bodyStream = new ReadableStream({
    async start(controller) {
      const enc = new TextEncoder();
      const send = (obj: unknown) =>
        controller.enqueue(enc.encode(`data: ${JSON.stringify(obj)}\n\n`));

      const onAbort = () => {
        /* 可选:通知上游,若 fetch 已带 signal 会自动中断 reader */
      };
      upstreamSignal.addEventListener("abort", onAbort);

      try {
        for await (const delta of deepseekChatStream(messages, { signal: upstreamSignal })) {
          send({ type: "delta", text: delta });
        }
        send({ type: "done" });
      } catch (e) {
        if ((e as Error).name === "AbortError") {
          send({ type: "error", code: "E_ABORTED", message: "aborted", retryable: false });
        } else {
          send({
            type: "error",
            code: "E_INTERNAL",
            message: (e as Error).message,
            retryable: true,
          });
        }
      } finally {
        upstreamSignal.removeEventListener("abort", onAbort);
        controller.close();
      }
    },
  });

  return new Response(bodyStream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

deepseekChatStream 内部 fetch 必须传同一个 signal,并在 reader.read() 循环中捕获 abort。


useChatController:统一 send / stop / regenerate

import { useRef, useCallback } from "react";
import { v4 as uuid } from "uuid";
import { streamChat } from "../services/chatService";
import { useChatStore } from "../store/chatStore";

export function useChatController() {
  const acRef = useRef<AbortController | null>(null);

  const stop = useCallback(() => {
    acRef.current?.abort("user");
    acRef.current = null;
    useChatStore.getState().setPhase("idle");
  }, []);

  const send = useCallback(async (content: string) => {
    if (useChatStore.getState().phase !== "idle") return; // 并发锁,见下文

    const assistantId = uuid();
    const requestId = uuid();
    acRef.current = new AbortController();

    const { appendUserMessage, appendAssistantPlaceholder, patchMessage, setPhase } =
      useChatStore.getState();

    setPhase("submitting");
    appendUserMessage({
      id: uuid(),
      sessionId: "...",
      role: "user",
      content,
      createdAt: Date.now(),
    });
    appendAssistantPlaceholder({
      id: assistantId,
      sessionId: "...",
      role: "assistant",
      content: "",
      createdAt: Date.now(),
    });
    setPhase("streaming");

    try {
      await streamChat(
        {
          sessionId: "...",
          messages: useChatStore.getState().messages.map((m) => ({
            role: m.role,
            content: m.content,
          })),
          stream: true,
          model: useChatStore.getState().settings.model,
          temperature: useChatStore.getState().settings.temperature,
          requestId,
        },
        {
          signal: acRef.current.signal,
          onEvent: (e) => {
            if (e.type === "delta") {
              const m = useChatStore.getState().messages.find((x) => x.id === assistantId);
              patchMessage(assistantId, { content: (m?.content ?? "") + e.text });
            }
            if (e.type === "done") setPhase("idle");
            if (e.type === "error") {
              patchMessage(assistantId, {
                status: "error",
                error: { code: e.code, message: e.message, retryable: e.retryable },
              });
              setPhase("idle");
            }
          },
        }
      );
    } catch {
      /* 已在 streamChat 外层或 onEvent 处理 */
    } finally {
      acRef.current = null;
    }
  }, []);

  return { send, stop };
}

错误分类与展示

code含义retryable典型来源
E_NETWORK断网、DNS、Failed to fetchfetch 抛错
E_TIMEOUT软超时(久无 delta)或网关硬超时客户端定时器 / 504
E_RATE_LIMIT限流 429是(退避后)上游 API
E_CONTEXT_TOO_LONG上下文过长 400 类模型返回
E_INTERNAL5xx 或未分类视情况服务端 / 上游

网络层TypeError: Failed to fetch → 映射 E_NETWORK,UI 展示「网络异常」+ 重试按钮。

SSE 协议层:解析到 {"type":"error",...} → 更新消息 status: error,保留已累积 content

上游归一化(Bun 内):捕获 DeepSeek 响应体 JSON 中的 error.message,映射为 { code, message, retryable } 再写入 SSE,避免把原始堆栈吐给前端。


超时:硬与软

硬超时(服务端):对整段上游 fetchAbortSignal.timeout(120_000)(或 setTimeout + abort),防止连接悬挂。

软超时(客户端):若 N 秒(如 30s)未收到任何 delta,认为卡住,本地 abort()patchMessage(..., { error: { code:'E_TIMEOUT', ... } })

let last = Date.now();
const STALL_MS = 30_000;
const tick = window.setInterval(() => {
  if (Date.now() - last > STALL_MS) {
    acRef.current?.abort("stall");
    clearInterval(tick);
  }
}, 1000);
// 每次 on delta: last = Date.now();

防抖、并发锁与幂等

  1. 按钮双态phase !== 'idle' 时禁用发送或显示「停止」并绑定 stop()
  2. 函数级防抖send 开头若已在 streaming 则直接 return(见 useChatController)。
  3. 幂等 request_id:每条请求带 uuid();数据库 messages.request_id TEXT UNIQUE,插入占位时用 ON CONFLICT (request_id) DO NOTHING 或先查重,避免用户连点或网络重试导致双份 user/assistant
INSERT INTO messages (id, session_id, role, content, status, request_id)
VALUES ($1, $2, 'assistant', '', 'streaming', $3)
ON CONFLICT (request_id) DO NOTHING
RETURNING id;

RETURNING 为空,说明重复请求,可直接结束或返回「已在处理」SSE。


续写(Continue)

当回复被停止或超长截断,可提供「继续生成」:

关键是 system prompt 约束:续写不得重复大段前文。


刷新与僵尸 streaming

用户刷新页面时,DB 中可能仍有 status='streaming'。服务端定时任务:

UPDATE messages
SET status = 'aborted', content = content
WHERE status = 'streaming' AND created_at < now() - interval '10 minutes';

前端加载历史后,对这些消息显示「已中断」+「继续生成」。


进阶:离线待发队列(简述)

navigator.onLine 为 false 时,将 send 请求写入 IndexedDB,上线后顺序重放;每条仍带同一 requestId 以保证幂等。


小结



Next Post
从零实现AI助手 - 流式渲染与增量解析《三》