零基础入门电路仿真软件:交流电路仿真示例
2025/12/30 1:17:09
import torch import torch.nn as nn import torch.nn.functional as F import math # --- 1. 位置编码 (Positional Encoding) --- # 算法逻辑:Transformer 抛弃了 RNN 的序列结构,导致模型无法感知词语的顺序。 # 这里使用不同频率的正弦和余弦函数来编码绝对位置信息。 class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=5000): super().__init__() # 创建一个 [max_len, d_model] 的矩阵来存放位置向量 pe = torch.zeros(max_len, d_model) # 生成位置索引 [0, 1, 2, ..., max_len-1] 并增加一个维度变成列向量 position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # 计算缩放频率。公式:1 / 10000^(2i/d_model) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # 语法:0::2 表示从索引 0 开始步长为 2 (偶数列使用 sin) pe[:, 0::2] = torch.sin(position * div_term) # 语法:1::2 表示从索引 1 开始步长为 2 (奇数列使用 cos) pe[:, 1::2] = torch.cos(position * div_term) # 语法:register_buffer 用于定义不参与梯度更新的张量,但会随模型一起保存到 state_dict 中 self.register_buffer('pe', pe.unsqueeze(0)) def forward(self, x): # 算法逻辑:将位置编码直接与词向量相加,完成位置信息的注入 # x shape: [batch_size, seq_len, d_model] return x + self.pe[:, :x.size(1)] # --- 2. 多头注意力 (Multi-Head Attention) --- # 算法逻辑:让模型在不同的“表示子空间”中同时学习到不同位置的相关性信息。 class MultiHeadAttention(nn.Module): def __init__(self, d_model, num_heads): super().__init__() assert d_model % num_heads == 0 # 确保向量维度可以被头数整除 self.d_k = d_model // num_heads # 每个头的维度 self.h = num_heads # 线性层用于将输入映射到 Q, K, V self.w_q = nn.Linear(d_model, d_model) self.w_k = nn.Linear(d_model, d_model) self.w_v = nn.Linear(d_model, d_model) self.fc_out = nn.Linear(d_model, d_model) def forward(self, q, k, v, mask=None): batch_size = q.size(0) # 语法:view 用于改变张量形状,transpose(1, 2) 将 "头" 维度放到前面 # 形状变化:[batch, seq, d_model] -> [batch, seq, h, d_k] -> [batch, h, seq, d_k] q = self.w_q(q).view(batch_size, -1, self.h, self.d_k).transpose(1, 2) k = self.w_k(k).view(batch_size, -1, self.h, self.d_k).transpose(1, 2) v = self.w_v(v).view(batch_size, -1, self.h, self.d_k).transpose(1, 2) # 算法逻辑:缩放点积注意力公式 softmax(QK^T / sqrt(d_k)) * V # k.transpose(-2, -1) 将 K 的最后两个维度(seq_len 和 d_k)转置 scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k) # 语法:masked_fill 将 mask 为 0 的位置填充为一个极小的数(-1e9), # 这样在经过 softmax 之后这些位置的权重几乎为 0 if mask is not None: scores = scores.masked_fill(mask == 0, -1e9) attn = torch.softmax(scores, dim=-1) out = torch.matmul(attn, v) # 语法:contiguous() 确保内存连续,否则后续 view 操作会报错。然后合并多头。 out = out.transpose(1, 2).contiguous().view(batch_size, -1, self.h * self.d_k) return self.fc_out(out) # --- 3. 编码器层 (Encoder Layer) --- class EncoderLayer(nn.Module): def __init__(self, d_model, num_heads, d_ff, dropout=0.1): super().__init__() self.norm1 = nn.LayerNorm(d_model) # 层归一化 self.norm2 = nn.LayerNorm(d_model) self.attn = MultiHeadAttention(d_model, num_heads) # 点式前馈网络:增加模型的非线性拟合能力 self.ff = nn.Sequential( nn.Linear(d_model, d_ff), nn.ReLU(), nn.Linear(d_ff, d_model) ) self.dropout = nn.Dropout(dropout) def forward(self, x, mask): # 算法逻辑:残差连接 (x + sublayer(x))。先计算注意力,Dropout 后与原输入相加,再归一化。 attn_out = self.attn(x, x, x, mask) x = self.norm1(x + self.dropout(attn_out)) ff_out = self.ff(x) x = self.norm2(x + self.dropout(ff_out)) return x # --- 4. 解码器层 (Decoder Layer) --- class DecoderLayer(nn.Module): def __init__(self, d_model, num_heads, d_ff, dropout=0.1): super().__init__() self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.norm3 = nn.LayerNorm(d_model) self.self_attn = MultiHeadAttention(d_model, num_heads) self.cross_attn = MultiHeadAttention(d_model, num_heads) self.ff = nn.Sequential( nn.Linear(d_model, d_ff), nn.ReLU(), nn.Linear(d_ff, d_model) ) self.dropout = nn.Dropout(dropout) def forward(self, x, enc_out, src_mask, trg_mask): # 1. 自注意力:解码器关注自己已经生成的词 self_attn_out = self.self_attn(x, x, x, trg_mask) x = self.norm1(x + self.dropout(self_attn_out)) # 2. 交叉注意力:Query 来自解码器,Key/Value 来自编码器输出 (enc_out) # 算法逻辑:这步让解码器能够获取输入序列的上下文信息。 cross_attn_out = self.cross_attn(x, enc_out, enc_out, src_mask) x = self.norm2(x + self.dropout(cross_attn_out)) # 3. 前馈网络 ff_out = self.ff(x) x = self.norm3(x + self.dropout(ff_out)) return x # --- 5. 完整 Transformer 模型 --- class Transformer(nn.Module): def __init__(self, src_vocab_size, trg_vocab_size, d_model=512, num_layers=6, num_heads=8, d_ff=2048, max_len=100, dropout=0.1): super().__init__() self.src_embedding = nn.Embedding(src_vocab_size, d_model) self.trg_embedding = nn.Embedding(trg_vocab_size, d_model) self.pos_encoding = PositionalEncoding(d_model, max_len) # 语法:ModuleList 类似于普通的 Python list,但能正确注册参数到整个模型中 self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]) self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]) self.fc_out = nn.Linear(d_model, trg_vocab_size) self.dropout = nn.Dropout(dropout) # 算法逻辑:Padding Mask 用于标记输入序列中哪些位置是 [PAD],防止注意力计算这些无效位置 def make_src_mask(self, src): return (src != 0).unsqueeze(1).unsqueeze(2) # [batch, 1, 1, src_len] # 算法逻辑:结合了 Padding Mask 和 Causal Mask (下三角矩阵) # 确保解码器在预测第 t 个词时,不能看到第 t+1 个词的信息。 def make_trg_mask(self, trg): batch_size, trg_len = trg.shape trg_mask = (trg != 0).unsqueeze(1).unsqueeze(2) # 语法:torch.tril 生成下三角矩阵 lookup_mask = torch.tril(torch.ones((trg_len, trg_len))).expand(batch_size, 1, trg_len, trg_len).to(trg.device) return trg_mask & lookup_mask.bool() def forward(self, src, trg): src_mask = self.make_src_mask(src) trg_mask = self.make_trg_mask(trg) # 编码流程 enc_out = self.dropout(self.pos_encoding(self.src_embedding(src))) for layer in self.encoder_layers: enc_out = layer(enc_out, src_mask) # 解码流程 dec_out = self.dropout(self.pos_encoding(self.trg_embedding(trg))) for layer in self.decoder_layers: dec_out = layer(dec_out, enc_out, src_mask, trg_mask) return self.fc_out(dec_out) # --- 验证脚本 --- if __name__ == "__main__": # 配置运行设备 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"当前运行设备: {device}") # 初始化超参数 src_vocab_size = 100 trg_vocab_size = 100 # 实例化模型 model = Transformer(src_vocab_size, trg_vocab_size).to(device) # 构造模拟输入 (Batch_size=2, Seq_len=10) # 假设输入和目标序列中有一些 0 作为 Padding src = torch.tensor([[1, 2, 3, 4, 5, 0, 0, 0, 0, 0], [4, 3, 2, 9, 0, 0, 0, 0, 0, 0]]).to(device) trg = torch.tensor([[1, 2, 3, 4, 0, 0, 0, 0, 0, 0], [5, 6, 7, 0, 0, 0, 0, 0, 0, 0]]).to(device) # 前向传播:计算预测结果 model.eval() # 切换到预测模式(关闭 Dropout) with torch.no_grad(): # 推理阶段不计算梯度,节省显存 output = model(src, trg) print("-" * 30) print(f"源序列 (src) Shape: {src.shape}") print(f"目标序列 (trg) Shape: {trg.shape}") print(f"模型输出 (output) Shape: {output.shape}") # 预期输出形状: [Batch_size, Target_Seq_len, Vocab_size] -> [2, 10, 100] # 校验输出维度是否正确 if output.shape == (2, 10, 100): print("✅ 验证通过:Transformer 架构维度计算完全正确!") else: print("❌ 验证失败:输出维度不匹配。")import torch import torch.nn.functional as F def manual_flash_attention(q, k, v, block_size=128): """ 简易版 FlashAttention 的 Python/PyTorch 实现 q, k, v: [Batch, Head, Seq_len, Dim] """ B, H, N, D = q.shape scale = D ** -0.5 # 初始化输出 O, 最大值 m, 累加和 l out = torch.zeros_like(q) l = torch.zeros((B, H, N, 1), device=q.device) m = torch.full((B, H, N, 1), -float('inf'), device=q.device) # 计算分块数量 # Tr: Row blocks (Q), Tc: Column blocks (K, V) Tr = (N + block_size - 1) // block_size Tc = (N + block_size - 1) // block_size for j in range(Tc): # 加载 K, V 的分块到 "SRAM" k_block = k[:, :, j*block_size : (j+1)*block_size, :] v_block = v[:, :, j*block_size : (j+1)*block_size, :] for i in range(Tr): # 加载 Q 的分块 q_block = q[:, :, i*block_size : (i+1)*block_size, :] # 计算当前块的注意力分数 # S_ij = Q_i @ K_j^T attn_weights = torch.matmul(q_block, k_block.transpose(-1, -2)) * scale # 计算当前块的统计量 m_block = torch.max(attn_weights, dim=-1, keepdim=True)[0] p_block = torch.exp(attn_weights - m_block) l_block = torch.sum(p_block, dim=-1, keepdim=True) # 更新全局统计量 m_old = m[:, :, i*block_size : (i+1)*block_size, :] l_old = l[:, :, i*block_size : (i+1)*block_size, :] m_new = torch.max(m_old, m_block) # 关键:根据公式对齐旧数据和新数据 alpha = torch.exp(m_old - m_new) beta = torch.exp(m_block - m_new) l_new = alpha * l_old + beta * l_block # 更新输出 O # O_i = (O_i * alpha * l_old + beta * P_ij @ V_j) / l_new out_block = out[:, :, i*block_size : (i+1)*block_size, :] p_v = torch.matmul(p_block, v_block) out[:, :, i*block_size : (i+1)*block_size, :] = ( out_block * (alpha * l_old) + beta * p_v ) / l_new # 保存新的统计量 m[:, :, i*block_size : (i+1)*block_size, :] = m_new l[:, :, i*block_size : (i+1)*block_size, :] = l_new return out # 官方标准实现 def standard_attention(q, k, v): attn = torch.matmul(q, k.transpose(-1, -2)) * (D**-0.5) attn = F.softmax(attn, dim=-1) return torch.matmul(attn, v) if __name__ == "__main__": # 测试代码 B, H, N, D = 2, 4, 512, 64 q, k, v = torch.randn(B, H, N, D), torch.randn(B, H, N, D), torch.randn(B, H, N, D) out_std = standard_attention(q, k, v) out_flash = manual_flash_attention(q, k, v, block_size=128) print(f"最大绝对误差: {torch.max(torch.abs(out_std - out_flash)).item():.2e}")