基于全同态加密的逻辑回归心脏病预测示例详解

张开发
2026/4/8 16:10:28 15 分钟阅读

分享文章

基于全同态加密的逻辑回归心脏病预测示例详解
1人口统计学Demographic性别Sex男性或女性分类变量年龄Age患者年龄连续变量——虽然记录为整数但年龄本质上是连续的教育Education教育类型分为4类2行为因素Behavioral当前是否吸烟Current Smoker当前是否为吸烟者分类变量每日吸烟量Cigs Per Day每天平均吸烟数量可视为连续变量因为可以是任意数量3医学史Medical - history是否服用降压药BP Meds是否服用降压药分类变量是否曾中风Prevalent Stroke是否有中风史分类变量是否高血压Prevalent Hyp是否患有高血压分类变量是否糖尿病Diabetes是否患有糖尿病分类变量4当前医学指标Medical - current总胆固醇Tot Chol连续变量收缩压Sys BP连续变量舒张压Dia BP连续变量体重指数BMI连续变量心率Heart Rate连续变量虽然本质离散但通常视为连续变量血糖Glucose连续变量5预测变量目标变量10 年冠心病风险CHD二分类变量1 表示“是”0 表示“否”。1.2 数据处理在原文中提到的P-valueP值指的是显著性水平显著性水平是假设检验中的一个概念是指当原假设为正确时人们却把它拒绝了的概率或风险这里它用于判断某个特征如“抽烟”对结果的影响是否是真实存在的还是仅仅因为随机巧合。显著性水平是假设检验中的一个概念是指当原假设为正确时人们却把它拒绝了的概率或风险。它是公认的小概率事件的概率值必须在每一次统计检验之前确定通常取α0.05或α0.01。在该实例中P值量化了“虚假关联”的风险即P值α则认为风险低要把特征收入预测模型里P值α风险高这个特征可能是“骗子”可以把它丢弃。文中纸用向后剔除法Backward Elimination剔除P值0.05的特征最后只保留P值小于0.05的有效特征如性别男性患病几率比女性高78.8%年龄年龄每增加1岁风险增加 7%等。2 理论基础2.1 线性回归Linear Regression)1 概念首先看下线性、非线性和回归的概念线性两个变量之间的关系是一次函数关系的——图象是直线叫做线性。非线性两个变量之间的关系不是一次函数关系的——图象不是直线叫做非线性。回归人们在测量事物的时候因为客观条件所限求得的都是测量值而不是事物真实的值为了能够得到真实值无限次的进行测量最后通过这些测量数据计算回归到真实值这就是回归的由来。所以线性回归就是用一条直线或超平面去拟合数据通过已知的输入 X预测一个连续的输出 Y。2 解决的问题对大量的观测数据进行处理从而得到比较符合事物内部规律的数学表达式。也就是说寻找到数据与数据之间的规律所在从而就可以模拟出结果也就是对结果进行预测。解决的就是通过已知的数据得到未知的结果例如对房价的预测、判断信用评价、电影票房预估等。3 模型及公式最简单的线性回归可以用公式y wx b表示这里y是预测结果x是输入特征w是权重斜率b是偏置截距这就是一条直线线性回归的任务就是找到最好的w和b让这条直线最贴合数据。在此基础上扩展到多个特征时有公式这时y仍为预测值x1~xn是n个特征w1~wn是对用的系数权重b仍为截距该公式还可以表示为更简洁的向量形式此时就可通过损失函数来得到预测的损失如基于MSEMean Squared Error均方误差基于RMSERoot Mean Squared Error均方根误差基于MAEMean Absolute Error平均绝对值误差以上m是样本数量yi是实际值^yi是预测值由线性回归公式计算得到之后即可由梯度下降法通过对损失函数对各个参数求偏导计算梯度并沿梯度方向反方向迭代更新参数使得Loss损失最小。2.2 逻辑回归Logistic Regression逻辑回归是一种统计中的回归分析方法用于根据一组自变量预测分类因变量的结果。逻辑回归主要用于预测同时也可以用于计算某事件发生的概率。Logistic回归的因变量可以是二分类的也可以是多分类的但是二分类的更为常用也更加容易解释。逻辑回归常用于数据挖掘疾病自动诊断经济预测等领域。逻辑回归和线性回归关系密切简单来说逻辑回归 线性回归 Sigmoid激活函数逻辑回归的核心计算部分本质就是线性回归只是在输出层套了一层Sigmoid把线性输出压缩到0~1的概率区间从而实现二分类。Sigmoid函数其图形如下意义是将连续的(-∞, ∞)映射到[0, 1]这里[0, 1]正好可以对应概率p取值范围即psigmoid(x) 1/(1e-x)将之前预测结果表示为logit(p) WTX b则有同时可知即logit(p)正是线性回归的线性部分同时它是概率的对数几率把0~1的概率线性映射到整个实数域它正是线性模型和概率之间的桥梁。在进行逻辑回归时不直接预测概率而是预测概率的logit值再由logit值通过Sigmoid函数还原概率完美解决了“线性模型预测概率”的问题。对应到心脏病预测实例通过以下公式完成预测2.3 Sigmoid函数近似表示在全同态加密时不能简单地在加密数据上计算sigmoid需要使用低次多项式来近似该而且受限于同态加密乘法运算深度限制次数越低越好所以目标是执行尽可能少的乘法以便能够使用较小的参数从而优化计算。参考3中的文章给出两个接近Sigmoid函数的近似函数这里选取在[-5, 5]范围内更接近Sigmoid函数的σ(x) 0.5 0.197x - 0.004x3。2.4 Z-Score标准化目的是将不同量纲单位的特征——比如“年龄”20-80岁和“胆固醇”150-400mg/dL——缩放到同一个“起跑线”上即将特征取值标准化为均值为0、标准差为1的分布对应的数学公式是xi特征的取值μ特征取值的平均值特征值减去它能让数据的中心点移动到0σ特征取值的标准差除以它能让数据的波动范围缩放比例变为1执行该操作出于以下原因1梯度爆炸与难以收敛在训练逻辑回归或神经网络时如果“胆固醇”数值是 300而“是否抽烟”是1模型会给胆固醇分配极小的权重给抽烟分配极大的权重。这会导致梯度下降时像在“深谷”中反复震荡很难找到最优解。2特征权重不公平如果一个特征的数值范围是0-1000另一个是0-1模型会下意识地认为数值大的特征更重要。标准化确保了每个特征对预测结果的贡献是基于其变化规律而不是数值大小。3 源码分析3.1 Setup程序最一开始需要import所有依赖的相关模块请确保各模块已经按照到系统中import torch import tenseal as ts import pandas as pd import random from time import time # those are optional and are not necessary for training import numpy as np import matplotlib.pyplot as plt接下来程序会对之前Kaggle网站上下载的程序进行处理如删除无效数据及无关特征列并按相同的数量随机采样患病和不患病数据集这样做的目的是解决类不平衡问题Class Imbalance。因为在心脏病预测Framingham数据集中患病1的人数远少于未患病0的人数如果不处理模型会倾向于预测所有人都不患病。程序还提供random_data()函数该函数生成随机的、线性可分的点对于那些只想看看事情是如何运作的人来说可以使用它来代替Kaggle的数据集。这部分程序完整源码如下torch.random.manual_seed(73) random.seed(73) # 将原始数据进行随机洗牌后按7:3的比例分割分别做为训练集和测试集 def split_train_test(x, y, test_ratio0.3): idxs [i for i in range(len(x))] random.shuffle(idxs) # delimiter between test and train data delim int(len(x) * test_ratio) test_idxs, train_idxs idxs[:delim], idxs[delim:] return x[train_idxs], y[train_idxs], x[test_idxs], y[test_idxs] def heart_disease_data(): data pd.read_csv(./data/framingham.csv) # drop rows with missing values data data.dropna() # drop some features data data.drop(columns[education, currentSmoker, BPMeds, diabetes, diaBP, BMI]) print(save cleaned data to data/framingham_cleaned.csv) data.to_csv(./data/framingham_cleaned.csv, indexFalse) # 根据TenYearCHD十年内心脏病风险0或1这一列将原始数据集拆分为两个子集 grouped data.groupby(TenYearCHD) #data grouped.apply(lambda x: x.sample(grouped.size().min(), random_state73).reset_index(dropTrue)) # 1. 执行分组并采样使用数量少的那个类别个数进行采样每个组0 和 1都随机抽取等量的样本 data grouped.apply(lambda x: x.sample(grouped.size().min(), random_state73), include_groupsFalse) #data.to_csv(./data/twogroupdata1.csv) # 2. 核心步骤恢复被排除的分组列 # 因为 include_groupsFalse 把它变成了索引我们需要把它变回列使数据结构回归到普通的表格形式 data data.reset_index(level0).reset_index(dropTrue) #data.to_csv(./data/twogroupdata2.csv) # extract labels y torch.tensor(data[TenYearCHD].values).float().unsqueeze(1) # 丢弃TenYearCHD列 data data.drop(columnsTenYearCHD) # standardize data标准化确保了每个特征对预测结果的贡献是基于其变化规律而不是数值大小 data (data - data.mean()) / data.std() #data.to_csv(./data/last.csv) #print(have save data/last.csv) x torch.tensor(data.values).float() return split_train_test(x, y) def random_data(m1024, n2): # data separable by the line y x x_train torch.randn(m, n) x_test torch.randn(m // 2, n) y_train (x_train[:, 0] x_train[:, 1]).float().unsqueeze(0).t() y_test (x_test[:, 0] x_test[:, 1]).float().unsqueeze(0).t() return x_train, y_train, x_test, y_test # You can use whatever data you want without modification to the tutorial # x_train, y_train, x_test, y_test random_data() x_train, y_train, x_test, y_test heart_disease_data() print(############# Data summary #############) print(fx_train has shape: {x_train.shape}) print(fy_train has shape: {y_train.shape}) print(fx_test has shape: {x_test.shape}) print(fy_test has shape: {y_test.shape}) print(#######################################)程序运行后输出如下3.2 训练逻辑回归模型将首先训练一个逻辑回归模型没有任何加密它可以被视为一个具有单个节点的单层神经网络后续将使用此模型作为与加密训练和评估进行比较的手段。class LR(torch.nn.Module): def __init__(self, n_features): super(LR, self).__init__() self.lr torch.nn.Linear(n_features, 1) def forward(self, x): out torch.sigmoid(self.lr(x)) return out n_features x_train.shape[1] model LR(n_features) # use gradient descent with a learning_rate1 optim torch.optim.SGD(model.parameters(), lr1) # use Binary Cross Entropy Loss criterion torch.nn.BCELoss() # define the number of epochs for both plain and encrypted training EPOCHS 5 def train(model, optim, criterion, x, y, epochsEPOCHS): for e in range(1, epochs 1): optim.zero_grad() out model(x) loss criterion(out, y) loss.backward() optim.step() print(fLoss at epoch {e}: {loss.data}) return model model train(model, optim, criterion, x_train, y_train) def accuracy(model, x, y): out model(x) correct torch.abs(y - out) 0.5 return correct.float().mean() plain_accuracy accuracy(model, x_test, y_test) print(fAccuracy on plain test_set: {plain_accuracy})该段程序输出如下正如原文所说高精度不是该程序的目标这里只是想看看加密数据的训练不会影响最终结果所以将比较加密数据的准确性和在这里得到的plain_accuracy。3.3 基于明文参数的加密数据评估在这一部分中将只关注在加密测试集上使用明文参数可选加密参数评估逻辑回归模型。首先创建一个类似PyTorch的LR模型可以评估加密数据class EncryptedLR: def __init__(self, torch_lr): # TenSEAL processes lists and not torch tensors, # so we take out the parameters from the PyTorch model self.weight torch_lr.lr.weight.data.tolist()[0] self.bias torch_lr.lr.bias.data.tolist() def forward(self, enc_x): # We dont need to perform sigmoid as this model # will only be used for evaluation, and the label # can be deduced without applying sigmoid enc_out enc_x.dot(self.weight) self.bias return enc_out def __call__(self, *args, **kwargs): return self.forward(*args, **kwargs) ################################################ ## You can use the functions below to perform ## ## the evaluation with an encrypted model ## ################################################ def encrypt(self, context): self.weight ts.ckks_vector(context, self.weight) self.bias ts.ckks_vector(context, self.bias) def decrypt(self, context): self.weight self.weight.decrypt() self.bias self.bias.decrypt() eelr EncryptedLR(model)之后创建一个TenSEAL Context用于指定要使用的方案和参数。在这里选择小而安全的参数允许进行一次乘法。这足以评估逻辑回归模型之后会发现在对加密数据进行训练时会需要更大的参数。# parameters poly_mod_degree 4096 coeff_mod_bit_sizes [40, 20, 40] # create TenSEALContext ctx_eval ts.context(ts.SCHEME_TYPE.CKKS, poly_mod_degree, -1, coeff_mod_bit_sizes) # scale of ciphertext to use ctx_eval.global_scale 2 ** 20 # this key is needed for doing dot-product operations ctx_eval.generate_galois_keys() t_start time() enc_x_test [ts.ckks_vector(ctx_eval, x.tolist()) for x in x_test] t_end time() print(fEncryption of the test-set took {int(t_end - t_start)} seconds)在代码中会在评估前加密测试数据集。接下来在构建EncryptedLR类时不会在线性层的加密输出上计算sigmoid函数仅仅是因为它不是必需的在加密数据上计算sigmic会增加计算时间并需要更大的加密参数但是在之后的加密训练中会使用sigmoid。当前直接进行加密测试集上的评估并将其准确性与普通测试集进行比较。def encrypted_evaluation(model, enc_x_test, y_test): t_start time() correct 0 for enc_x, y in zip(enc_x_test, y_test): # encrypted evaluation enc_out model(enc_x) # plain comparison out enc_out.decrypt() out torch.tensor(out) out torch.sigmoid(out) if torch.abs(out - y) 0.5: correct 1 t_end time() print(fEvaluated test_set of {len(x_test)} entries in {int(t_end - t_start)} seconds) print(fAccuracy: {correct}/{len(x_test)} {correct / len(x_test)}) return correct / len(x_test) encrypted_accuracy encrypted_evaluation(eelr, enc_x_test, y_test) diff_accuracy plain_accuracy - encrypted_accuracy print(fDifference between plain and encrypted accuracies: {diff_accuracy}) if diff_accuracy 0: print(Oh! We got a better accuracy on the encrypted test-set! The noise was on our side...)程序运行结果如下不仅比明文直接评估的精度有所下降而且对比原文章加密评估的精度0.6736526946107785还要低一些只有0.6167664670658682这里不知是什么原因。3.4 基于加密数据训练的加密逻辑回归模型在这一部分中将重新定义一个类似PyTorch的模型该模型既可以向前传播加密数据也可以反向传播以更新权重从而在加密数据上训练加密的逻辑回归模型以下是关于训练的更多细节。1 损失函数这里使用带有正则化的二元交叉熵损失函数y(i)是第i个预期标签^y(i)是是逻辑回归模型的第i个输出θ是n维权重向量损失函数如下上面公式中m是样本数量n是特征数量损失可以分成两部分前半部分是交叉熵损失当真实值y0和y1时以上公式可分别简化为以真实值y0为例预测^y越接近0则损失值越小越接近零反之预测越接近1损失值越大同样对于真实值y1时有相同的规律而且通过响应函数的图形也能直观的看到该规律公式中的后半部分是正则化项Regularizationθj权重向量中的第j个值λ正则化系数该部分意义在于惩罚过大的权重以保证模型不能过度依赖某一个特征必须保持权重相对“温和”。2 参数更新为了进行参数更新使用如下规则这里x(i)是第i个输入数据然而由于同态加密约束这里使用α1以减少乘法并使用λ/m0.05从而得出以下更新规则3 同态加密参数从输入数据到参数更新密文需要深度为6的乘法运算1用于点积运算2用于sigmoid算法近似计算3用于反向传播阶段其中1个隐藏在backward函数中的self._delta_w enc_x * out_minus_y运算中该运算将1维度的向量与n维度的向量相乘需要掩码提取第一个槽位的值并依次复制到n个槽位的其他位置。对于大约20位的缩放我们需要6个与缩放具有相同比特大小的系数模数加上最后一个需要更多比特的系数我们已经超出了4096个多项式模数如果我们考虑128位的安全性这需要系数模数的总比特数109所以将使用8192。这将允许我们在单个密文中批量处理多达4096个值。# Training an Encrypted Logistic Regression Model on Encrypted Data class EncryptedLR: def __init__(self, torch_lr): self.weight torch_lr.lr.weight.data.tolist()[0] self.bias torch_lr.lr.bias.data.tolist() # we accumulate gradients and counts the number of iterations self._delta_w 0 self._delta_b 0 self._count 0 def forward(self, enc_x): enc_out enc_x.dot(self.weight) self.bias enc_out EncryptedLR.sigmoid(enc_out) return enc_out def backward(self, enc_x, enc_out, enc_y): out_minus_y (enc_out - enc_y) self._delta_w enc_x * out_minus_y self._delta_b out_minus_y self._count 1 def update_parameters(self): if self._count 0: raise RuntimeError(You should at least run one forward iteration) # update weights # We use a small regularization term to keep the output # of the linear layer in the range of the sigmoid approximation self.weight - self._delta_w * (1 / self._count) self.weight * 0.05 self.bias - self._delta_b * (1 / self._count) # reset gradient accumulators and iterations count self._delta_w 0 self._delta_b 0 self._count 0 staticmethod def sigmoid(enc_x): # We use the polynomial approximation of degree 3 # sigmoid(x) 0.5 0.197 * x - 0.004 * x^3 # from https://eprint.iacr.org/2018/462.pdf # which fits the function pretty well in the range [-5,5] return enc_x.polyval([0.5, 0.197, 0, -0.004]) def plain_accuracy(self, x_test, y_test): # evaluate accuracy of the model on # the plain (x_test, y_test) dataset w torch.tensor(self.weight) b torch.tensor(self.bias) out torch.sigmoid(x_test.matmul(w) b).reshape(-1, 1) correct torch.abs(y_test - out) 0.5 return correct.float().mean() def encrypt(self, context): self.weight ts.ckks_vector(context, self.weight) self.bias ts.ckks_vector(context, self.bias) def decrypt(self): self.weight self.weight.decrypt() self.bias self.bias.decrypt() def __call__(self, *args, **kwargs): return self.forward(*args, **kwargs) # parameters poly_mod_degree 8192 coeff_mod_bit_sizes [40, 21, 21, 21, 21, 21, 21, 40] # create TenSEALContext ctx_training ts.context(ts.SCHEME_TYPE.CKKS, poly_mod_degree, -1, coeff_mod_bit_sizes) ctx_training.global_scale 2 ** 21 ctx_training.generate_galois_keys() t_start time() enc_x_train [ts.ckks_vector(ctx_training, x.tolist()) for x in x_train] enc_y_train [ts.ckks_vector(ctx_training, y.tolist()) for y in y_train] t_end time() print(fEncryption of the training_set took {int(t_end - t_start)} seconds)下面结合源码详细分析乘法深度在代码forward函数中密文enc_x会和密文self.weight进行点积运算会消耗第1层深度Level 1接下来调用sigmoid多项式近似计算x2需要一次乘法消耗第2层计算x3会消耗第3层这里其实是0.004·x3操作过程中会行“常数合并”在进行系数相乘时会借用x3生成过程中本来就消耗调的层并不会在系数乘消耗额外的层所以这里总共消耗2层正如原文所述在接下来进行self._delta_w enc_x * out_minus_y时除了enc_x*out_minus_y乘法本身会消耗一层外这里还有一个隐藏操作会额外消耗一层易知out_minus_y是通过enc_out - enc_y得到的虽然逻辑上它是一个误差值但是在TenSEAL的内存布局中它依然是一个向量是一个继承自enc_out的Level 3密文内部构造类似[err, noise1, noise2, ..., noisek]除了第一个槽位有有效值外其他本是“空”的槽位上充满了不可控的随机噪声如果直接进行“旋转并累加”完成enc_x*out_minus_y操作会把后面槽位上的噪声也一起叠加进来所以TenSEAL中先执行了out_minus_y*[1, 0, 0, ...]即掩码操作最终产生[err, 0, 0, ...]清空了其他位置的干扰正是这里的乘法操作导致Scale变大为了接下来的“旋转累加”操作这里密文的Scale必须出于标准状态通常是global_scale所以这里会执行一次rescale从而消耗了1层深度接下来即可正常进行旋转与加法如将out_minus_y向右旋转1位得到y_rot1 [0, err, 0, 0, ...]y y y_rot1得到y [err, err, 0, 0, ...]继续旋转加最终得到y [err, err, err, ...]之后即可真正的执行enc_x * out_minus_y操作所以这里总共消耗2层深度此时共消耗5层深度接下来循环完成后的update_parameters函数调用中self.weight - self._delta_w * (1 / self._count) self.weight * 0.05操作中的乘法还会消耗1层深度所以最终需要的乘法深度是6。运行结果如下接下来研究在明文域和加密域内x.dot(weight) bias的分布确保它们落在[-5, 5]区间内这正是sigmoid近似比较准确的位置我们不希望给它提供超出此范围的数据这样就不会得到错误的输出错误的输出会使得我们的训练不可预测。但是权重会在训练过程中发生变化我们应该在学习的同时尽量保持它们尽可能小。有一种经常用于逻辑回归的技术正是这样做的但服务于另一个目的即泛化被称为正则化你可能已经在update_parameters函数中发现了附加项self.weight*0.05这是正则化的结果。简而言之由于我们的sigmoid近似仅在[-5, 5]范围内有效我们希望它的所有输入都在这个范围内为了做到这一点需要保持逻辑回归参数尽可能小因此我们应用正则化。注意保持参数较小肯定会降低输出的幅度但如果数据没有标准化我们也可能超出范围。你可能已经发现我们将数据标准化为均值为0标准差为1这既是为了提高性能也是为了将sigmoid的输入保持在所需的范围内。normal_dist lambda x, mean, var: np.exp(- np.square(x - mean) / (2 * var)) / np.sqrt(2 * np.pi * var) def plot_normal_dist(mean, var, rmin-10, rmax10): x np.arange(rmin, rmax, 0.01) y normal_dist(x, mean, var) fig plt.plot(x, y) # plain distribution lr LR(n_features) data lr.lr(x_test) mean, var map(float, [data.mean(), data.std() ** 2]) plot_normal_dist(mean, var) print(Distribution on plain data:) plt.show() # encrypted distribution def encrypted_out_distribution(eelr, enc_x_test): w eelr.weight b eelr.bias data [] for enc_x in enc_x_test: enc_out enc_x.dot(w) b data.append(enc_out.decrypt()) data torch.tensor(data) mean, var map(float, [data.mean(), data.std() ** 2]) plot_normal_dist(mean, var) print(Distribution on encrypted data:) plt.show() eelr EncryptedLR(lr) eelr.encrypt(ctx_training) encrypted_out_distribution(eelr, enc_x_train)明文数据分布密文数据分布大部分数据属于[-5, 5]sigmoid近似应该足够好4 基于加密数据的逻辑回归模型终于到达了最后一部分这是关于在加密数据上训练加密逻辑回归模型你可以看到我们解密权重并在每个历元后再次加密它们这是必要的因为在历元结束时更新权重后我们不能再使用它们来执行足够的乘法所以我们需要将它们恢复到初始密文级别。在真实场景中这将转化为将权重发送回密钥持有者进行解密和重新加密。在这种情况下每个轮次只会产生几千字节的通信。eelr EncryptedLR(LR(n_features)) accuracy eelr.plain_accuracy(x_test, y_test) print(fAccuracy at epoch #0 is {accuracy}) times [] for epoch in range(EPOCHS): eelr.encrypt(ctx_training) # if you want to keep an eye on the distribution to make sure # the function approximation is still working fine # WARNING: this operation is time consuming # encrypted_out_distribution(eelr, enc_x_train) t_start time() for enc_x, enc_y in zip(enc_x_train, enc_y_train): enc_out eelr.forward(enc_x) eelr.backward(enc_x, enc_out, enc_y) eelr.update_parameters() t_end time() times.append(t_end - t_start) eelr.decrypt() accuracy eelr.plain_accuracy(x_test, y_test) print(fAccuracy at epoch #{epoch 1} is {accuracy}) print(f\nAverage time per epoch: {int(sum(times) / len(times))} seconds) print(fFinal accuracy is {accuracy}) diff_accuracy plain_accuracy - accuracy print(fDifference between plain and encrypted accuracies: {diff_accuracy}) if diff_accuracy 0: print(Oh! We got a better accuracy when training on encrypted data! The noise was on our side...)

更多文章