主页 > 开源代码  > 

回归实战详细代码+解析:预测新冠感染人数

回归实战详细代码+解析:预测新冠感染人数
回归实战:预测新冠感染人数 先回顾下回归是个啥玩意

首先需要一组训练集,说人话就是通过一系列x[x1,x2…xn]通过神秘计算得到y的过程,当然人和机器现在都不知道什么计算是什么,这是一个黑箱。 黑箱比喻:把模型想象成自动售货机,投币(输入特征x)→ 内部神秘机制(模型计算)→ 吐出饮料(预测值y^)。核心任务:通过不断调整内部零件(参数w),让售货机吐出的饮料尽可能接近真实需求(真实值y)。

然后我们先随机的选定一系列参数,然后把参数和x带入神秘公式,计算出预测值y^

将y与实际的y进行计算,得到误差loss,预测y与实际y相聚越远,loss显然越大,所以我们可以通过loss来评价一个模型的好坏

光知道这模型不准还没用,我们需要让预测值越来越接近,具体来说,就要使用梯度下降来将误差反馈给参数w

for example:

​ w = w - d(loss)/d(w) * lr

在这循环往复的过程中,实现了机器的自主学习(额额。。参数不调好,也会越学越垃圾的,就像人学新知识也常常伴随踩雷和反复)

训练过程

随机初始化:给售货机随便装一堆零件(随机初始参数w)预测试错:投币测试,记录误差(计算预测y^与真实y的Loss)梯度下降:根据误差反向调整零件(w = w - 梯度×学习率)循环迭代:重复投币→调整→测试,直到误差最小


实战代码主要部分解析

样例所属的项目kaggle地址

import time import matplotlib.pyplot as plt import torch import numpy as np import csv import pandas as pd from sklearn.feature_selection import SelectKBest, chi2 from torch.utils.data import DataLoader, Dataset import torch.nn as nn from torch import optim def get_feature_importance(feature_data, label_data, k =4,column = None): """ 特征重要性选择函数 Parameters: feature_data : 特征数据矩阵 label_data : 对应标签数据 k : 选择的最佳特征数量 column : 特征名称列表(可选) Returns: X_new : 选择后的特征数据 indices : 被选特征的列索引 """ # 使用卡方检验选择特征 model = SelectKBest(chi2, k=k) #定义一个选择k个最佳特征的函数 feature_data = np.array(feature_data, dtype=np.float64) # 确保数据类型为float64以满足sklearn要求 # label_data = np.array(label_data, dtype=np.float64) X_new = model.fit_transform(feature_data, label_data) #用这个函数选择k个最佳特征 #feature_data是特征数据,label_data是标签数据,该函数可以选择出k个特征 print('x_new', X_new) scores = model.scores_ # scores即每一列与结果的相关性 # 按重要性排序,选出最重要的 k 个 indices = np.argsort(scores)[::-1] #[::-1]表示反转一个列表或者矩阵。 # argsort这个函数, 可以矩阵排序后的下标。 比如 indices[0]表示的是,scores中最小值的下标。 if column: # 如果需要打印选中的列 k_best_features = [column[i+1] for i in indices[0:k].tolist()] # 选中这些列 打印 print('k best features are: ',k_best_features) return X_new, indices[0:k] # 返回选中列的特征和他们的下标。 """COVID数据加载器""" class CovidDataset(Dataset): """ Parameters: file_path : 数据文件路径 mode : 数据集模式(train/val/test) all_feature : 是否使用全部特征 feature_dim : 选择特征维度 """ # 数据预处理:给模型喂“干净粮食” def __init__(self, file_path, mode="train", all_feature=False, feature_dim=6): with open(file_path, "r") as f: ori_data = list(csv.reader(f)) column = ori_data[0] csv_data = np.array(ori_data[1:])[:, 1:].astype(float) feature = np.array(ori_data[1:])[:, 1:-1] label_data = np.array(ori_data[1:])[:, -1] if all_feature: col = np.array([i for i in range(0, 93)]) else: _, col = get_feature_importance(feature, label_data, feature_dim, column) col = col.tolist() if mode == "train": # 80%训练集 indices = [i for i in range(len(csv_data)) if i % 5 != 0] data = torch.tensor(csv_data[indices, :-1]) self.y = torch.tensor(csv_data[indices, -1]) elif mode == "val": # 20%验证集 indices = [i for i in range(len(csv_data)) if i % 5 == 0] data = torch.tensor(csv_data[indices, :-1]) self.y = torch.tensor(csv_data[indices, -1]) else: # test模式 indices = [i for i in range(len(csv_data))] data = torch.tensor(csv_data[indices]) # 数据标准化处理(将不同尺度的数据变为同一尺度) data = data[:, col] self.data = (data - data.mean(dim=0, keepdim=True)) / data.std(dim=0, keepdim=True) self.mode = mode """获取单条数据""" def __getitem__(self, idx): if self.mode != "test": return self.data[idx].float(), self.y[idx].float() else: return self.data[idx].float() def __len__(self): return len(self.data) # 以上是数据装载部分 class MyModel(nn.Module): """自定义全连接神经网络""" def __init__(self, inDim): """ Parameters: inDim : 输入特征维度 """ super(MyModel, self).__init__() self.fc1 = nn.Linear(inDim, 64) self.relu1 = nn.ReLU() self.fc2 = nn.Linear(64, 1) def forward(self, x): # 模型前向过程 """前向传播""" x = self.fc1(x) x = self.relu1(x) x = self.fc2(x) if len(x.size()) > 1: return x.squeeze(1) return x def train_val(model, train_loader, val_loader, device, epochs, optimizer, loss, save_path): """ 模型训练与验证函数 Parameters: model : 待训练模型 train_loader: 训练数据加载器 val_loader : 验证数据加载器 device : 计算设备(CPU/GPU) epochs : 训练轮数 optimizer : 优化器 loss : 损失函数 save_path : 模型保存路径 """ model = model.to(device) plt_train_loss = [] # 记录所有轮次的训练loss plt_val_loss = [] # 验证loss记录 min_val_loss = 9999999999999999 # 最佳验证损失初始化 for epoch in range(epochs): # 开始训练 train_loss = 0.0 val_loss = 0.0 start_time = time.time() model.train() # 模型调整为训练模式 for batch_x, batch_y in train_loader: x, target = batch_x.to(device), batch_y.to(device) pred = model(x) # 前向传播 train_bat_loss = loss(pred, target, model) train_bat_loss.backward() # 反向传播 optimizer.step() # 更新模型 optimizer.zero_grad() train_loss += train_bat_loss.cpu().item() plt_train_loss.append(train_loss / train_loader.__len__()) # 验证阶段 model.eval() with torch.no_grad(): for batch_x, batch_y in val_loader: x, target = batch_x.to(device), batch_y.to(device) pred = model(x) val_bat_loss = loss(pred, target, model) val_loss += val_bat_loss.cpu().item() plt_val_loss.append(val_loss / val_loader.__len__()) # 保存最佳模型 if val_loss < min_val_loss: torch.save(model, save_path) min_val_loss = val_loss print("[%03d/%03d] %2.2f sec(s) Trainloss: %.6f | Valloss: %.6f" % \ (epoch, epochs, time.time() - start_time, plt_train_loss[-1], plt_val_loss[-1])) # 损失曲线可视化 plt.plot(plt_train_loss) plt.plot(plt_val_loss) plt.title("loss") plt.legend(["train", "val"]) plt.show() def evaluate(save_path, test_loader, device, rel_path): # 得出测试结果文件 # 加载最佳模型 model = torch.load(save_path).to(device) rel = [] # 预测结果 with torch.no_grad(): for x in test_loader: pred = model(x.to(device)) rel.append(pred.cpu().item()) print(rel) # 保存CSV结果 with open(rel_path, "w", newline='') as f: csvWriter = csv.writer(f) csvWriter.writerow(["id", "tested_positive"]) for i, value in enumerate(rel): csvWriter.writerow([str(i), str(value)]) print("文件已保存到{}".format(rel_path)) # 配置参数 all_feature = False # 是否使用全部特征 feature_dim = 6 # 特征维度 if all_feature: feature_dim = 93 else: feature_dim = 6 config = { "lr": 0.001, # 学习率 "epochs": 20, # 训练轮数 "momentum": 0.9, # 动量系数 "save_path": "model_save/best_model.pth", # 模型保存路径 "rel_path": "pred.csv" # 预测结果路径 } # 设备检测 device = "cuda" if torch.cuda.is_available() else "cpu" print(device) # 数据加载 train_file = "covid.train.csv" test_file = "covid.test.csv" train_dataset = CovidDataset(train_file, "train", all_feature, feature_dim) val_dataset = CovidDataset(train_file, "val", all_feature, feature_dim) test_dataset = CovidDataset(test_file, "test", all_feature, feature_dim) # for data in train_dataset: # print(data) # 创建数据加载器 batch_size = 16 train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) # 随机梯度下降 val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True) # 随机梯度下降 test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False) # 随机梯度下降 # for batch_x, batch_y in train_loader: # print(batch_x, batch_y) def mseLoss_with_reg(pred, target, model): loss = nn.MSELoss(reduction='mean') ''' Calculate loss ''' regularization_loss = 0 # 正则项 for param in model.parameters(): # TODO: you may implement L1/L2 regularization here # 使用L2正则项 # regularization_loss += torch.sum(abs(param)) regularization_loss += torch.sum(param ** 2) # 计算所有参数平方 return loss(pred, target) + 0.00075 * regularization_loss # 返回损失。 model = MyModel(inDim=feature_dim).to(device) # 向硬件挂载任务 # loss = nn.MSELoss() # Loss函数 loss = mseLoss_with_reg optimizer = optim.SGD(model.parameters(), lr=config["lr"], momentum=config["momentum"]) # 优化器 train_val(model, train_loader, val_loader, device, config["epochs"], optimizer, loss, config["save_path"]) evaluate(config["save_path"], test_loader, device, config["rel_path"]) 1. 数据读取

其实对基本的模型来说,训练过程都是一样的,而最麻烦的是数据的输入,我们在输入过程中有时可以剔除部分不需要的数据,来更好的构建模型,但哪些重要哪些不重要,又是一个问题。。。

看看代码吧

def get_feature_importance(feature_data, label_data, k =4,column = None): """ 特征重要性选择函数 Parameters: feature_data : 特征数据矩阵 label_data : 对应标签数据 k : 选择的最佳特征数量 column : 特征名称列表(可选) Returns: X_new : 选择后的特征数据 indices : 被选特征的列索引 """ # 使用卡方检验选择特征 model = SelectKBest(chi2, k=k) #定义一个选择k个最佳特征的函数 feature_data = np.array(feature_data, dtype=np.float64) # 确保数据类型为float64以满足sklearn要求 # label_data = np.array(label_data, dtype=np.float64) X_new = model.fit_transform(feature_data, label_data) #用这个函数选择k个最佳特征 #feature_data是特征数据,label_data是标签数据,该函数可以选择出k个特征 print('x_new', X_new) scores = model.scores_ # scores即每一列与结果的相关性 # 按重要性排序,选出最重要的 k 个 indices = np.argsort(scores)[::-1] #[::-1]表示反转一个列表或者矩阵。 # argsort这个函数, 可以矩阵排序后的下标。 比如 indices[0]表示的是,scores中最小值的下标。 if column: # 如果需要打印选中的列 k_best_features = [column[i+1] for i in indices[0:k].tolist()] # 选中这些列 打印 print('k best features are: ',k_best_features) return X_new, indices[0:k] # 返回选中列的特征和他们的下标。

get_feature_importance()在所有的特征中 通过SelectKBest算法来找到K个影响最大的特征,借此排除无效计算

"""COVID数据加载器""" class CovidDataset(Dataset): """ Parameters: file_path : 数据文件路径 mode : 数据集模式(train/val/test) all_feature : 是否使用全部特征 feature_dim : 选择特征维度 """ # 数据预处理:给模型喂“干净粮食” def __init__(self, file_path, mode="train", all_feature=False, feature_dim=6): with open(file_path, "r") as f: ori_data = list(csv.reader(f)) column = ori_data[0] csv_data = np.array(ori_data[1:])[:, 1:].astype(float) feature = np.array(ori_data[1:])[:, 1:-1] label_data = np.array(ori_data[1:])[:, -1] if all_feature: col = np.array([i for i in range(0, 93)]) else: _, col = get_feature_importance(feature, label_data, feature_dim, column) col = col.tolist() if mode == "train": # 80%训练集 indices = [i for i in range(len(csv_data)) if i % 5 != 0] data = torch.tensor(csv_data[indices, :-1]) self.y = torch.tensor(csv_data[indices, -1]) elif mode == "val": # 20%验证集 indices = [i for i in range(len(csv_data)) if i % 5 == 0] data = torch.tensor(csv_data[indices, :-1]) self.y = torch.tensor(csv_data[indices, -1]) else: # test模式 indices = [i for i in range(len(csv_data))] data = torch.tensor(csv_data[indices]) # 数据标准化处理(将不同尺度的数据变为同一尺度) data = data[:, col] self.data = (data - data.mean(dim=0, keepdim=True)) / data.std(dim=0, keepdim=True) self.mode = mode """获取单条数据""" def __getitem__(self, idx): if self.mode != "test": return self.data[idx].float(), self.y[idx].float() else: return self.data[idx].float() def __len__(self): return len(self.data)

CovidDataset类是数据装载需要用到的

__init__函数对CovidDataset进行了初始化,将文件读入,并排除无用的行列之后,转化为张量的形式,同时根据训练的模式来选择传出全部数据还是部分关键数据,并且自动分割训练集和测试集。

说实话读数据的代码看着还不算难,但是自己写还真是一次写不出来。。

入门没有练度的时候,看这些东西都不知道为什么要设计这个环节

数据标准化:公平对待每个特征

为什么要做:身高(170cm)和体重(70kg)单位不同,直接比较会扭曲模型判断。操作方法:对每个特征列,减去均值、除以标准差 → 数据服从标准正态分布(代码中的(data - data.mean)/datastd)。 2. 核心模型 class MyModel(nn.Module): """自定义全连接神经网络""" def __init__(self, inDim): """ Parameters: inDim : 输入特征维度 """ super(MyModel, self).__init__() self.fc1 = nn.Linear(inDim, 64) self.relu1 = nn.ReLU() self.fc2 = nn.Linear(64, 1) def forward(self, x): # 模型前向过程 """前向传播""" x = self.fc1(x) x = self.relu1(x) x = self.fc2(x) if len(x.size()) > 1: return x.squeeze(1) return x

这是模型本身的算法类,这里直接使用nn现成的算法,不用再自己造轮子了

使用了Linear来线性预测,ReLu作为激活函数

先从输入数据个参数,降到64个,最后直接降到1个,即输出本身

3. 训练模块 def train_val(model, train_loader, val_loader, device, epochs, optimizer, loss, save_path): """ 模型训练与验证函数 Parameters: model : 待训练模型 train_loader: 训练数据加载器 val_loader : 验证数据加载器 device : 计算设备(CPU/GPU) epochs : 训练轮数 optimizer : 优化器 loss : 损失函数 save_path : 模型保存路径 """ model = model.to(device) plt_train_loss = [] # 记录所有轮次的训练loss plt_val_loss = [] # 验证loss记录 min_val_loss = 9999999999999999 # 最佳验证损失初始化 for epoch in range(epochs): # 开始训练 train_loss = 0.0 val_loss = 0.0 start_time = time.time() model.train() # 模型调整为训练模式 for batch_x, batch_y in train_loader: x, target = batch_x.to(device), batch_y.to(device) pred = model(x) # 前向传播 train_bat_loss = loss(pred, target, model) train_bat_loss.backward() # 反向传播 optimizer.step() # 更新模型 optimizer.zero_grad() train_loss += train_bat_loss.cpu().item() plt_train_loss.append(train_loss / train_loader.__len__()) # 验证阶段 model.eval() with torch.no_grad(): for batch_x, batch_y in val_loader: x, target = batch_x.to(device), batch_y.to(device) pred = model(x) val_bat_loss = loss(pred, target, model) val_loss += val_bat_loss.cpu().item() plt_val_loss.append(val_loss / val_loader.__len__()) # 保存最佳模型 if val_loss < min_val_loss: torch.save(model, save_path) min_val_loss = val_loss print("[%03d/%03d] %2.2f sec(s) Trainloss: %.6f | Valloss: %.6f" % \ (epoch, epochs, time.time() - start_time, plt_train_loss[-1], plt_val_loss[-1])) # 损失曲线可视化 plt.plot(plt_train_loss) plt.plot(plt_val_loss) plt.title("loss") plt.legend(["train", "val"]) plt.show() 4. 输出模型结果 def evaluate(save_path, test_loader, device, rel_path): # 得出测试结果文件 # 加载最佳模型 model = torch.load(save_path).to(device) rel = [] # 预测结果 with torch.no_grad(): for x in test_loader: pred = model(x.to(device)) rel.append(pred.cpu().item()) print(rel) # 保存CSV结果 with open(rel_path, "w", newline='') as f: csvWriter = csv.writer(f) csvWriter.writerow(["id", "tested_positive"]) for i, value in enumerate(rel): csvWriter.writerow([str(i), str(value)]) print("文件已保存到{}".format(rel_path)) 5. 优化:正则化 loss = loss+ W*W def mseLoss_with_reg(pred, target, model): loss = nn.MSELoss(reduction='mean') ''' Calculate loss ''' regularization_loss = 0 # 正则项 for param in model.parameters(): # TODO: you may implement L1/L2 regularization here # 使用L2正则项 # regularization_loss += torch.sum(abs(param)) regularization_loss += torch.sum(param ** 2) # 计算所有参数平方 return loss(pred, target) + 0.00075 * regularization_loss # 返回损失。

我们如果直接使用MSELoss来计算loss,容易造成过拟合

这是因为MSE的计算公式loss(xi,yi)=(xi−yi)^2,如果出现了一个非常离谱的噪声y,就会产生巨大的loss,模型就会努力的扭曲函数,让他勾到这个奇怪的噪声点,造成曲线的失真

相对而言的:

通过正则化的MSE:loss = loss+ W*W,(W为参数)能使曲线更为平滑,能避免过拟合。

我们想想,模型的目标是追求更低的loss,如果模型为了去抓任性的噪声而随意地变更参数w,由于此时w对loss造成的影响是指数上升的,所以会抑制w的无端突变,从而达成平滑曲线的目的


避坑指南:新手常见错误 特征未标准化 → 模型被大范围特征(如人口数)主导,忽视小范围特征(如温度)。忽略验证集 → 模型在训练集表现完美,实际预测一塌糊涂。学习率过大 → Loss剧烈震荡无法收敛(如下右)。


总结:回归实战四步曲 数据预处理:清洗 → 特征选择 → 标准化(给模型喂干净数据)模型设计:输入层 → 隐藏层(+ReLU) → 输出层(搭积木式构建)训练调参:Loss监控 → 梯度下降 → 早停机制(防止过拟合,这里还没有写)结果分析:Loss曲线 → 正则化效果 → 模型推理测试
标签:

回归实战详细代码+解析:预测新冠感染人数由讯客互联开源代码栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“回归实战详细代码+解析:预测新冠感染人数