潜江市网站建设_网站建设公司_Python_seo优化
2026/1/5 16:25:25 网站建设 项目流程

24. 手搓一个“ChatGPT”

  • 上一节的聊天机器人并不具备对话功能,这一节,我们真正来实现一个自制的“ChatGPT”
  • 对话数据 + 停止规则 + 采样 + 分词
  • 对话格式的数据集应该长什么样
  • 怎么从文件读取成训练用样本
  • 用 jieba 做中文分词,改成“词级 token”
  • 设计特殊 token:<bos>、<eos>、<user>、<assistant>
  • 训练思路(仍然是 LM)(不多展开)
  • 推理端:temperature + top-k + top-p + EOS 停止 + 对话解码函数

24.1 对话格式的数据集:建议用 role + 文本

 你好
 你好,我是一个简易对话机器人,可以和你聊天。 
  • 多轮对话可以拼起来(以后再扩,这里先一问一答)
  • 文件格式(推荐):JSON Lines,一行一个 dict,如 dialogue.jsonl
{"user": "你好", "assistant": "你好,我是一个简易对话机器人,可以和你聊天。"}
{"user": "你能做什么?", "assistant": "我可以回答你的问题,帮你解释概念,或者陪你聊聊天。"}
{"user": "解释一下什么是Transformer。", "assistant": "Transformer是一种基于自注意力机制的神经网络结构,常用于NLP任务,比如翻译和对话。"}
{"user": "用一句话安慰一个加班到凌晨的人。", "assistant": "辛苦了,你已经做得很好了,剩下的交给时间和明天的太阳。"}
  • 根据你下载的对话语料,自行编写处理脚本到指定格式,我这里用的数据是:https://github.com/PlexPt/chatgpt-corpus/releases/tag/3
  • 提供一个处理脚本:
import json
import random
from pathlib import Path
def load_json_list(path: Path):
"""
从文件中读取一个 JSON 列表。
"""
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
data = [data]
if not isinstance(data, list):
raise ValueError("输入 JSON 顶层应为列表。")
return data
def sample_qa(entries, num, seed=None):
"""
从 entries 中随机抽取 num 条。
"""
if seed is not None:
random.seed(seed)
if num >= len(entries):
sampled = entries[:]
random.shuffle(sampled)
return sampled
return random.sample(entries, num)
def save_as_jsonl(entries, out_path: Path):
"""
将条目按 JSON Lines 格式保存到文件。
"""
with out_path.open("w", encoding="utf-8") as f:
for item in entries:
q = item.get("q", "")
a = item.get("a", "")
rec = {"user": q, "assistant": a}
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
def sample_json_to_jsonl(input_path, output_path, num, seed=None):
"""
Jupyter Notebook 直接使用的主函数。
示例:
sample_json_to_jsonl("source.json", "out.jsonl", num=100)
"""
in_path = Path(input_path)
out_path = Path(output_path)
entries = load_json_list(in_path)
sampled = sample_qa(entries, num, seed=seed)
save_as_jsonl(sampled, out_path)
print(f"读取 {len(entries)} 条记录,抽取 {len(sampled)} 条,已写入 {out_path}")
print("函数 sample_json_to_jsonl 已加载,可直接调用。")
函数 sample_json_to_jsonl 已加载,可直接调用。
  • 运行:
sample_json_to_jsonl("./QA_DATA/qa.json", "./QA_DATA/qa_sampled.jsonl", num=1000, seed=12)
读取 3208736 条记录,抽取 1000 条,已写入 QA_DATA\qa_sampled.jsonl
  • 工具函数,把每一行转换成一个训练样本文本
def format_dialog_sample(user_text, assistant_text):
# 注意空格,自己约定格式,尽量统一
return f"<user> {user_text} <assistant> {assistant_text} <EOS>"

24.2 从文件读取对话数据

import json
DATA_PATH = "./QA_DATA/qa_sampled.jsonl"
raw_samples = []
with open(DATA_PATH, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
obj = json.loads(line)
user = obj["user"]
assistant = obj["assistant"]
text = format_dialog_sample(user, assistant)
raw_samples.append(text)
print("样本数:", len(raw_samples))
print("第一个样本:")
print(raw_samples[0])
样本数: 1000
第一个样本:
 请评价中国足球在亚洲和世界上的竞争力以及发展前景。  作为一个 AI,我没有感情和立场,只能从数据和事实出发评价中国足球的竞争力和发展前景。
在亚洲范围内来看,中国足球在过去几年有了很大的进步。中国足协采取了一系列的改革措施,投入大量的资金和资源进行青训和俱乐部建设,引进了一批优秀的外籍教练和球员,提升了联赛水平和国家队实力。中国国家男子足球队在2019年亚洲杯上进入了8强,这是中国男足自2004年以来的最好成绩。此外,在亚洲足球俱乐部比赛中,中超球队也有了不俗的表现,多次进入亚冠联赛淘汰赛阶段。可以说,中国足球在亚洲范围内已经具备了一定的竞争力。
但是,在世界范围内来看,中国足球与欧洲和南美洲等足球强国相比还有一定的差距。根据FIFA排名,中国男足在2021年3月排名第77位,而世界杯冠军法国队则排名第二。中国足球在技战术、战术理念、球员素质等方面都需要进一步提升,才能与世界强队竞争。
未来的发展前景,需要中国足球继续推进改革措施,加强青训和俱乐部建设,提升联赛水平和国家队实力。此外,中国足球还需要加强与国际足球的交流与合作,学习借鉴先进的足球理念和技术,提升竞争力。总之,中国足球的发展前景还需要进一步努力和探索。 
  • raw_samples 就是一堆字符串,每个字符串包含一条对话:<user> ... <assistant> ... <EOS>

24.3 用 jieba 分词:从“字符级”升级到“词级 token”

  • 用“词级 token”要比“字符级”更语义一些,但工业界更常用的是 BPE / SentencePiece 的“子词(subword)”。
  • 不过我们现在教学 +中文,jieba 分词已经很够玩了。
  1. 安装 & 引入 jieba
!pip install jieba
import jieba
  1. 设计一个简单的 tokenizer(词级 + 特殊 token)
  • 我们要支持:特殊 token:<PAD>, <BOS>, <EOS>, <user>, <assistant>
  • 其他的中文内容就用 jieba.lcut 分词
SPECIAL_TOKENS = ["<PAD>", "<BOS>", "<EOS>", "<user>", "<assistant>"]PAD = 0BOS = 1EOS = 2USER = 3ASSISTANT = 4
  • 构建词表:
from collections import Counter
# 统计所有词(特殊 token + 分词结果)
counter = Counter()
for text in raw_samples:
# 先把 role token 单独切开
# 示例 text: "<user> 你好 <assistant> 今天天气不错。 <eos>"# 简单做法:直接按空格 split,再对非特殊 token 用 jieba 分parts = text.split()tokens = []for part in parts:if part in ["<user>", "<assistant>", "<EOS>"]:tokens.append(part)else:# 对普通句子分词tokens.extend(jieba.lcut(part))counter.update(tokens)# 把 SPECIAL_TOKENS 放在最前面vocab = list(SPECIAL_TOKENS)# 其他词按频率添加for token, freq in counter.most_common():if token in SPECIAL_TOKENS:continuevocab.append(token)id2token = vocabtoken2id = {tok: idx for idx, tok in enumerate(id2token)}vocab_size = len(id2token)print("vocab_size:", vocab_size)print("前 50 个 token:", id2token[:50])
vocab_size: 13042
前 50 个 token: ['', '', '', '', '', '的', ',', '。', '和', '.', ':', '、', '可以', '等', '是', '在', '中', '进行', '使用', '?', '1', '将', '数据', '3', '需要', '2', '了', '技术', '模型', '4', '对', '学习', '或', '一个', '用户', '-', '5', '有', '通过', '并', '提高', '以下', '我', '提供', '为', '一些', '包括', '如', '来', '人工智能']
这段代码用于保存词表
import json
VOCAB_PATH = "./checkpoints/vocab.json"
# 保存
with open(VOCAB_PATH, "w", encoding="utf-8") as f:
json.dump({
"id2token": id2token,
"token2id": token2id,
"PAD": PAD,
"BOS": BOS,
"EOS": EOS,
"USER": USER,
"ASSISTANT": ASSISTANT,
"SEQ_LEN": SEQ_LEN,
}, f, ensure_ascii=False, indent=2)
print("词表与配置已保存:", VOCAB_PATH)
下面的代码用于加载词表
import json
VOCAB_PATH = "./checkpoints/vocab.json"
# 保存
with open(VOCAB_PATH, "w", encoding="utf-8") as f:
json.dump({
"id2token": id2token,
"token2id": token2id,
"PAD": PAD,
"BOS": BOS,
"EOS": EOS,
"USER": USER,
"ASSISTANT": ASSISTANT,
"SEQ_LEN": SEQ_LEN,
}, f, ensure_ascii=False, indent=2)
print("词表与配置已保存:", VOCAB_PATH)
  1. encode / decode 函数(词级)
def tokenize_text(text):
"""
把一条对话样本字符串 -> 词级 token 列表
示例 text: "<user> 你好 <assistant> 今天天气不错。 <EOS>""""parts = text.split()tokens = []for part in parts:if part in ["<user>", "<assistant>", "<EOS>"]:tokens.append(part)else:tokens.extend(jieba.lcut(part))return tokensdef encode_tokens(tokens):return [token2id.get(tok, token2id["<PAD>"]) for tok in tokens]def decode_ids(id_list):return "".join(id2token[i] for i in id_list if i < len(id2token))
  • 这里 decode 比较粗糙,会把 token 直接拼起来(中文还好一点),可以以后再在 decode 里加空格或规则美化输出。

24.4 构造训练数据:LM 任务不变,只是换成“对话词序列”

  • 其实仍然是做 语言模型(next token prediction)
    输入:[BOS, t0, t1, …, t_{L-1}]
    目标:[t0, t1, …, t_{L-1}, t_L]
  • 把所有样本拼接起来,或者逐条 sample。为了简单,我们沿用之前“滑窗 LM”的写法,不过输入是词 id
import torch
from torch.utils.data import Dataset, DataLoader
# 把所有样本的 token id 接到一个长序列里(简单粗暴版)
all_ids = []
for text in raw_samples:
toks = tokenize_text(text)
ids = encode_tokens(toks)
all_ids.extend(ids)
all_ids = torch.tensor(all_ids, dtype=torch.long)
print("all_ids length:", len(all_ids))
class DialogLMDataset(Dataset):
def __init__(self, ids, seq_len):
self.ids = ids
self.seq_len = seq_len
self.num_samples = (len(ids) - 1) // seq_len
def __len__(self):
return self.num_samples
def __getitem__(self, idx):
start = idx * self.seq_len
end = start + self.seq_len + 1
chunk = self.ids[start:end]
if len(chunk) < self.seq_len + 1:
pad_len = self.seq_len + 1 - len(chunk)
chunk = torch.cat([chunk, torch.full((pad_len,), PAD, dtype=torch.long)])
input_ids = torch.empty(self.seq_len + 1, dtype=torch.long)
input_ids[0] = BOS
input_ids[1:] = chunk[:-1]
target_ids = chunk
return input_ids, target_ids
SEQ_LEN = 64
dataset = DialogLMDataset(all_ids, seq_len=SEQ_LEN)
print("dataset size:", len(dataset))
loader = DataLoader(dataset, batch_size=16, shuffle=True, drop_last=False)
all_ids length: 188649
dataset size: 2947

24.5 构造模型和训练循环

  • 和之前一样,复制和导入基本模块
# 第21节的方式导入
from MyTransformer import MultiHeadSelfAttention, PositionwiseFeedForward, PositionalEncoding, TokenEmbedding
import torch.nn as nn
class TransformerLM(nn.Module):
def __init__(self,
vocab_size,
d_model=256,
num_heads=4,
d_ff=512,
num_layers=4,
max_len=2048,
pad_id=PAD,
dropout=0.1):
super().__init__()
self.d_model = d_model
self.pad_id = pad_id
self.tok_embed = TokenEmbedding(vocab_size, d_model, pad_id=pad_id)
self.pos_encoding = PositionalEncoding(d_model, max_len=max_len)
self.dropout = nn.Dropout(dropout)
self.layers = nn.ModuleList([
DecoderOnlyLayer(d_model, num_heads, d_ff, dropout=dropout)
for _ in range(num_layers)
])
self.output_proj = nn.Linear(d_model, vocab_size)
def make_pad_mask(self, ids):
return (ids == self.pad_id).int()  # (B, L)
def forward(self, input_ids):
"""
input_ids: (B, L) —— 已有上下文(含 BOS)
返回:
logits: (B, L, vocab_size)
"""
B, L = input_ids.shape
pad_mask = self.make_pad_mask(input_ids)
x = self.tok_embed(input_ids)  # (B,L,d_model)
x = x * math.sqrt(self.d_model)
pos = self.pos_encoding(x)     # (B,L,d_model)
x = x + pos
x = self.dropout(x)
attn_maps = []
for layer in self.layers:
x, attn = layer(x, pad_mask=pad_mask)
attn_maps.append(attn)
logits = self.output_proj(x)
return logits, attn_maps
class DecoderOnlyLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
super().__init__()
self.self_attn = MultiHeadSelfAttention(d_model, num_heads)
self.ffn = PositionwiseFeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
def forward(self, x, pad_mask=None):
"""
x: (B, L, d_model)
pad_mask: (B, L)  —— 1 表示 PAD
"""
B, L, _ = x.shape
device = x.device
# 生成因果 mask(不能看未来)
subsequent_mask = torch.triu(
torch.ones(L, L, device=device), diagonal=1
)  # (L, L)
subsequent_mask = subsequent_mask.unsqueeze(0).unsqueeze(0)  # (1,1,L,L)
# Self-Attn
_attn_out, self_attn_map = self.self_attn(
x,
pad_mask=pad_mask,      # 屏蔽 PAD
attn_mask=subsequent_mask  # 屏蔽未来
)
x = x + self.dropout1(_attn_out)
x = self.norm1(x)
# FFN
_ffn_out = self.ffn(x)
x = x + self.dropout2(_ffn_out)
x = self.norm2(x)
return x, self_attn_map
  • 构造模型
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TransformerLM(
vocab_size=vocab_size,
d_model=256,
num_heads=4,
d_ff=512,
num_layers=4,
max_len=SEQ_LEN + 1,
pad_id=PAD,
dropout=0.1,
).to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = torch.nn.CrossEntropyLoss(ignore_index=PAD)
下面这段代码可以加载之前保存的模型
SAVE_PATH = "./checkpoints/chatgpt_small.pt"
model = TransformerLM(
vocab_size=vocab_size,
d_model=256,
num_heads=4,
d_ff=512,
num_layers=4,
max_len=SEQ_LEN + 1,
pad_id=PAD,
dropout=0.1,
).to(DEVICE)
model.load_state_dict(torch.load(SAVE_PATH, map_location=DEVICE))
model.eval()
print("模型已加载")
  • 注意:模型结构参数必须和当初训练时一样,否则 load 会报错。

  • 确保使用上面的加载词表加载了之前模型保存的词表,否则会映射出错

  • 训练循环基本可以沿用之前的的 LM 训练代码,只是 vocab_size 换成新的,数据换成这个 loader

import matplotlib.pyplot as plt
from IPython.display import clear_output
import math
import os
# 自动保存最优的模型到路径
os.makedirs("./checkpoints", exist_ok=True)
BEST_PATH = "./checkpoints/chatgpt_small_best.pt"
best_loss = float("inf")
EPOCHS = 650  # 对话数据多,电脑跑得慢,演示就少一点了
train_losses = []
for epoch in range(1, EPOCHS + 1):
model.train()
total_loss = 0.0
for batch_idx, (input_ids, target_ids) in enumerate(loader):
input_ids = input_ids.to(DEVICE)
target_ids = target_ids.to(DEVICE)
logits, _ = model(input_ids)  # (B, L, vocab_size)
B, L, V = logits.shape
logits_flat = logits.view(B * L, V)
target_flat = target_ids.view(B * L)
loss = criterion(logits_flat, target_flat)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / (batch_idx + 1)
train_losses.append(avg_loss)
# 如果本轮比历史最优还低,就保存
if avg_loss < best_loss:
best_loss = avg_loss
torch.save(model.state_dict(), BEST_PATH)
print(f"Epoch {epoch}: loss={avg_loss:.4f} (NEW BEST, 已保存)")
if epoch % 1 == 0 or epoch == 1:
clear_output(wait=True)
plt.figure(figsize=(6,4))
plt.plot(range(1, len(train_losses)+1), train_losses, marker="o")
plt.xlabel("Epoch")
plt.ylabel("Avg Loss")
plt.title("Chat-GPT Style LM Training Loss")
plt.grid(True)
plt.show()
print(f"Epoch {epoch}/{EPOCHS}, avg_loss = {avg_loss:.4f}")
Epoch 650/650, avg_loss = 0.2085

24.6 停止条件 + 采样策略:temperature / top-k / top-p

  1. logits → 采样函数
  • 写一个通用的采样函数:
import torch
def sample_from_logits(logits, temperature=1.0, top_k=None, top_p=None):
"""
logits: (vocab_size,)
返回: 采样出的 token id (int)
"""
# 1) temperature
if temperature is not None and temperature > 0:
logits = logits / temperature
# 2) top-k
if top_k is not None and top_k > 0:
v, ix = torch.topk(logits, top_k)
probs = torch.softmax(v, dim=-1)
# 在 top-k 中采样
idx_in_topk = torch.multinomial(probs, num_samples=1).item()
return ix[idx_in_topk].item()
# 3) top-p (nucleus sampling)
if top_p is not None and 0 < top_p < 1.0:
sorted_logits, sorted_indices = torch.sort(logits, descending=True)
sorted_probs = torch.softmax(sorted_logits, dim=-1)
cumulative_probs = torch.cumsum(sorted_probs, dim=-1)
# 保留累积概率 <= top_p 的部分
mask = cumulative_probs <= top_p
# 至少保留一个
mask[0] = True
filtered_logits = sorted_logits[mask]
filtered_indices = sorted_indices[mask]
filtered_probs = torch.softmax(filtered_logits, dim=-1)
idx_in_filtered = torch.multinomial(filtered_probs, num_samples=1).item()
return filtered_indices[idx_in_filtered].item()
# 4) 默认:整部分 softmax + multinomial
probs = torch.softmax(logits, dim=-1)
idx = torch.multinomial(probs, num_samples=1).item()
return idx
  • 注意:可以选择只用 temperature 或 top-k + temperature 或 top-p + temperature,不要三种全开,一般用 “temperature + top_p” 就挺好

24.7 重写对话推理函数:带 EOS 停止 + 采样

  • 实现完整的chat_generate(model, user_input, …)
  • prompt 要转成: … 的 token 序列
  • 生成时,一直 sample token,直到生成 (id2token[idx] == “”),或到达 max_new_tokens
def build_chat_prompt(user_input):
# 这里用和训练数据一样的格式
return f"<user> {user_input} <assistant>"def generate_chat_reply(model,user_input,max_new_tokens=64,temperature=0.8,top_k=None,top_p=0.9,):model.eval()with torch.no_grad():prompt = build_chat_prompt(user_input)tokens = tokenize_text(prompt)          # 词级ids = encode_tokens(tokens)input_ids = [BOS] + idsinput_ids = torch.tensor(input_ids, dtype=torch.long, device=DEVICE).unsqueeze(0)generated_ids = []for _ in range(max_new_tokens):# 截断到模型支持的最长长度if input_ids.size(1) > SEQ_LEN:input_chunk = input_ids[:, -SEQ_LEN:]else:input_chunk = input_idslogits, _ = model(input_chunk)next_logits = logits[0, -1, :]   # (vocab_size,)next_id = sample_from_logits(next_logits,temperature=temperature,top_k=top_k,top_p=top_p)# 停止条件:EOSif next_id == EOS:break# 追加到整个输入next_token_tensor = torch.tensor([[next_id]], dtype=torch.long, device=DEVICE)input_ids = torch.cat([input_ids, next_token_tensor], dim=1)generated_ids.append(next_id)# 解码:只解码生成部分reply_text = decode_ids(generated_ids)return reply_text.strip()

TSET(不加载保存的模型和词表):

reply = generate_chat_reply(
model,
"中国足球",
max_new_tokens=2000,
temperature=0.8,
top_k=15,
top_p=None,   # 或 top_p=0.9, top_k=None
)
print("模型回复:", reply)
模型回复: 作为一个AI语言模型,我没有喜欢,但我可以为您提供以下答案:由于其美丽的推荐:由于其美丽的记录,我是两个严重的最重要的文化和最重要的文化信息之一。因此,在社交场合中,您可以主动最大限度,有着其功能。3.改善睡眠的文化和建议:您可以提高您可以提高您的文化和投篮的文化和独特的文化和语言。4.保持练习可以提高语言能力。您可以提高语言速度,可以提高自己的英语口语班或者上传的写作的写作过程。5.的文化的文化意义。6.的文化:有技巧,游客可能只是一些激发因素的文化和习惯,提高语言交流能力。综上所述,要有效的成绩的姿态的成绩手机度,及时度,及时度。
Temperature —— 控制“随机性”的总开关
  • 原始 logits → softmax 的概率分布是模型算出来的“下一字概率”。
  • 但我们通常不直接用 softmax,而是先除以 temperature:
logits = logits / temperature
  • 温度如何影响输出?
temperature效果举例(模型下一字预测)
< 1(如 0.7)让高概率词更高、低概率词更低 → 更稳、更保守、更像 ChatGPT“你好,我…”
= 1原始概率原样
> 1(如 1.3)让分布更均匀 → 更随机、容易乱飞“你好,天空漂…”
趋近 0直接 argmax → 复读机模式“你已经做得很好,你已经做得很好…”
Top-k —— 只在“最有可能”候选词里随机抽
  • 只保留概率最高的 k 个 token,其他全部丢掉。
v, ix = topk(logits, k)
  • 模型真正可能想说的是从 10 个词里挑一个但 softmax 会给全部 vocab(几千词)概率,有些很奇怪的词依然有微弱概率
  • top-k 会直接阻止模型选到垃圾 token,明显减少胡言乱语,加强句子连贯性
  • 下一步最多从概率最高的 top-k 个词中随机抽一个,避免模型突然选到非常稀奇的 token(比如奇怪符号)
Top-p(Nucleus Sampling)—— 控制“概率累计阈值”
  • top-p 是一种比 top-k 更“智能”的策略
  • 把 token 按概率从高到低排序,从上往下累加概率,累加到 p(比如 0.9)就停止,只在这些 token 中采样
  • 比如 top_p = 0.9,如果当前分布很尖锐(少数几个词概率很高)可能只保留 5 个词,如果当前分布很平(很多词 probability 差不多)可能保留几十个词,top-p 动态适应模型当前的预测

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询