Skip to content

Go back

从零实现AI助手 - 消息结构与状态管理《二》

Published:  at 

文章目录

系列导航


数据流步骤总览

  1. 用户在输入框输入内容并按下回车或点击发送。
  2. 前端 ChatInput 调用 sendMessage,将用户消息写入 store。
  3. store 立即追加一条 assistant 占位消息,状态为 streaming
  4. 前端向后端 POST /api/chat 发起 SSE 请求,并附带中断信号 AbortSignal
  5. 后端先把用户消息和 assistant 占位消息写入 PostgreSQL。
  6. 后端先回传一条 meta 事件,告知前端本次 assistant 的消息 ID。
  7. 后端调用 DeepSeek 流式接口,持续接收增量文本(delta)。
  8. 每收到一段 delta,后端通过 SSE 回传;前端按消息 ID 增量更新 content
  9. 流式结束后,后端更新数据库中 assistant 消息状态为 completed,并写入 token 用量。
  10. 后端发送 done(可附带 usage)事件,前端将消息状态更新为完成。

共享类型:packages/shared/types/chat.ts

与概览文档的差异:同目录 ai_assistant.md 中示例曾同时使用 messages[] 与单独的 streamingContent 字段表示流式内容,容易造成双源不一致(例如完成时忘记把 streamingContent 合并回消息)。推荐做法:流式过程中只更新 messages 里对应 assistant 条目的 contentstatus,全局不再维护平行的 streamingContent

export type MessageRole = "user" | "assistant" | "system";

export type MessageStatus =
  | "pending"
  | "streaming"
  | "completed"
  | "error"
  | "aborted";

export interface Message {
  id: string;
  sessionId: string;
  role: MessageRole;
  content: string;
  /** 可选:用于搜索/摘要,可由 marked strip 或纯文本缓存 */
  contentText?: string;
  createdAt: number;
  status: MessageStatus;
  tokenUsage?: number;
  /** 重新生成版本树:指向被替换的上一条 assistant */
  parentId?: string | null;
  /** 幂等键:与后端 request_id 对齐 */
  requestId?: string;
  error?: { code: string; message: string; retryable?: boolean };
  abortReason?: string;
}

export interface SessionMeta {
  tokenUsage: number;
  messageCount: number;
  lastMessageRole: MessageRole;
}

export interface Session {
  id: string;
  title: string;
  model: string;
  createdAt: number;
  updatedAt: number;
  messages: Message[];
  meta: SessionMeta;
}

/** 后端 SSE 单帧(第二版协议,便于扩展) */
export type ServerStreamEvent =
  | { type: "meta"; assistantMessageId: string; requestId: string }
  | { type: "delta"; text: string }
  | { type: "usage"; totalTokens?: number }
  | { type: "done" }
  | { type: "error"; code: string; message: string; retryable?: boolean };

请求体协议:ChatRequest

前端 POST /api/chat

export interface ChatRequest {
  sessionId: string;
  messages: Array<{ role: MessageRole; content: string }>;
  stream: true;
  model: string;
  temperature: number;
  max_tokens?: number;
  /** 幂等与去重,整链路透传 */
  requestId: string;
  /** regenerate:目标 assistant id,后端可据此删占位或更新同 id */
  regenerateAssistantId?: string;
}

后端校验:sessionId 存在、messages 最后一条须为 user(或你约定的「工具结果」角色)、requestId 唯一。


前端状态机

会话级可再维护 phase: 'idle' | 'submitting' | 'streaming'消息级Message.status 表达单条 assistant 的生命周期更准确。


zustand Store

apps/web/src/store/chatStore.ts 核心思路:按 slice 拆分文件也可,此处合并便于阅读。

import { create } from "zustand";
import type { Message, MessageRole, Session } from "@ai-chat/shared";

export interface ChatSettings {
  model: string;
  temperature: number;
  maxTokens: number;
  theme: "light" | "dark" | "auto";
}

interface ChatState {
  sessions: Session[];
  currentSessionId: string | null;
  messages: Message[];
  settings: ChatSettings;
  phase: "idle" | "submitting" | "streaming";

  setPhase: (p: ChatState["phase"]) => void;
  appendUserMessage: (m: Omit<Message, "status"> & { status?: Message["status"] }) => void;
  appendAssistantPlaceholder: (m: Message) => void;
  patchMessage: (id: string, patch: Partial<Message>) => void;
  setCurrentSession: (id: string | null) => void;
}

export const useChatStore = create<ChatState>((set, get) => ({
  sessions: [],
  currentSessionId: null,
  messages: [],
  phase: "idle",
  settings: {
    model: "deepseek-chat",
    temperature: 0.7,
    maxTokens: 4096,
    theme: "dark",
  },

  setPhase: (phase) => set({ phase }),

  appendUserMessage: (m) =>
    set((s) => ({
      messages: [...s.messages, { ...m, status: m.status ?? "completed" }],
    })),

  appendAssistantPlaceholder: (m) =>
    set((s) => ({
      messages: [...s.messages, { ...m, status: "streaming", content: "" }],
    })),

  patchMessage: (id, patch) =>
    set((s) => ({
      messages: s.messages.map((x) => (x.id === id ? { ...x, ...patch } : x)),
    })),

  setCurrentSession: (currentSessionId) => set({ currentSessionId }),
}));

发送一条消息的时序(伪代码,真实 fetch 见第三篇解析器):

  1. setPhase('submitting')
  2. appendUserMessage({ id, sessionId, role:'user', content, createdAt, status:'completed' })
  3. appendAssistantPlaceholder({ id: assistantId, sessionId, role:'assistant', ... })
  4. setPhase('streaming'),收到 meta 后若后端返回的 assistantMessageId 与本地占位 id 不一致,用 patchMessage 对齐(少见,一般预先由前端生成 id 并由后端 INSERT 同 id)
  5. 每个 deltapatchMessage(assistantId, { content: prev + text }) —— 仅改 messages,不用第二个 state 字段
  6. donepatchMessage(assistantId, { status:'completed', tokenUsage })setPhase('idle')

chatService.ts:建立 SSE 请求

fetch + AbortController(停止与重试在第四篇展开)。此处展示请求建立与事件分派骨架:

import { v4 as uuid } from "uuid";
import type { ChatRequest, ServerStreamEvent } from "@ai-chat/shared";
import { useChatStore } from "./chatStore";

export async function streamChat(
  body: Omit<ChatRequest, "requestId"> & { requestId?: string },
  opts: {
    signal: AbortSignal;
    onEvent: (e: ServerStreamEvent) => void;
  }
) {
  const requestId = body.requestId ?? uuid();
  const res = await fetch("/api/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    signal: opts.signal,
    body: JSON.stringify({ ...body, requestId, stream: true }),
  });
  if (!res.ok) throw new Error(`HTTP ${res.status}`);
  if (!res.body) throw new Error("No body");

  const reader = res.body.getReader();
  const decoder = new TextDecoder();
  let buf = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    buf += decoder.decode(value, { stream: true });
    const parts = buf.split("\n\n");
    buf = parts.pop() ?? "";
    for (const block of parts) {
      for (const line of block.split("\n")) {
        const t = line.trim();
        if (!t || t.startsWith(":")) continue;
        if (!t.startsWith("data:")) continue;
        const json = t.slice(5).trim();
        try {
          opts.onEvent(JSON.parse(json) as ServerStreamEvent);
        } catch {
          /* 非 JSON 可记录 */
        }
      }
    }
  }
}

regenerate:从 messages 中找到目标 assistant,截断其后消息,取上一条 user 内容,带上 regenerateAssistantId 再次调用同一接口;前端可复用原 assistant id 并先 patchMessage(id, { content:'', status:'streaming' })


后端:占位消息与数据库节奏

推荐顺序

  1. 校验 ChatRequestINSERT user 消息(若尚未持久化该条)。
  2. INSERT assistant 占位行:status='streaming', content='', request_id=requestId
  3. 打开 SSE,首包发送:
{"type":"meta","assistantMessageId":"<uuid>","requestId":"<same>"}
  1. 循环 deepseekChatStream,每 token {"type":"delta","text":"..."}
  2. 结束后 UPDATE messages SET content=$1, status='completed', token_usage=$2 WHERE id=$3
  3. 发送 {"type":"usage",...}{"type":"done"}

异常时 flushPartial

客户端断开或上游错误时:

UPDATE messages
SET content = $1,
    status = CASE WHEN $2 THEN 'error' ELSE 'aborted' END
WHERE id = $3;

避免库里永远停在 streaming 的僵尸行(第四篇写定时修复任务亦可)。


历史会话列表与分页

GET /api/sessions?limit=20&cursor=<updated_at_iso>

SELECT id, title, model, updated_at
FROM sessions
WHERE ($1::timestamptz IS NULL OR updated_at < $1::timestamptz)
ORDER BY updated_at DESC
LIMIT $2;

cursor 传上一页最后一条的 updated_at。前端首次进入拉第一页;打开某会话再 GET /api/sessions/:id/messages?before=<message_created_at> 做消息向上分页。


客户端缓存策略(可选)


小结

下一篇:流式渲染与增量解析



Previous Post
从零实现AI助手 - 流式渲染与增量解析《三》
Next Post
从零实现AI助手 - 前后端项目搭建《一》