主页 > 开源代码  > 

基于LSTM的情感分析

基于LSTM的情感分析
导包

根据代码需要进行相关的导包

import os import torch import random import numpy as np from sklearn.model_selection import train_test_split from torch.utils.data import TensorDataset, DataLoader import torch.nn as nn from tqdm import tqdm 相关路径文件的准备 # 原始数据路径 data_file = os.path.relpath(os.path.join(os.path.dirname(__file__),'data/hotel_discuss2.csv')) #判断路径是否存在 if not os.path.exists(data_file): print(f"{data_file}数据文件不存在!") exit() # 字典路径 dict_file = os.path.relpath(os.path.join(os.path.dirname(__file__),'data/dict.txt')) # 编码后存放的路径 encoding_file = os.path.relpath(os.path.join(os.path.dirname(__file__),'data/encoding.txt')) 设备的切换 # 设备切换 device = "cuda" if torch.cuda.is_available() else "cpu" 数据准备(数据清洗) # 需要过滤的字符(例如标点符号) puncts = '。,' # 词向量编码 with open(data_file, 'r', encoding='utf-8-sig') as f: data = f.readlines() for line in data: # line = line.strip() 是一个在编程中常见的操作,主要用于去除字符串开头和结尾的空白字符。 # 这里的空白字符包括空格、制表符(tab)、换行符等。 line = line.strip() # line字符串可进行遍历循环 for word in line: # 先过滤掉不需要的字符 if word not in puncts: # 判断遍历的词是否在字典中,不在,则加入字典中 # 达到一个词去重的目的 if word not in mydict: # 从1开始编码 mydict[word] = code code += 1 # 未知字符的编码 mydict['<unk>'] = code # 将数据字典存到本地 with open(dict_file, 'w', encoding='utf-8-sig') as f: f.write(str(mydict)) 加载词向量文件 #加载字典内容 def load_dict(dict_file): with open(dict_file, 'r', encoding='utf-8-sig') as file: """ file.read():这个方法会读取文件中的所有内容,并将其作为一个字符串返回。如果文件很大,这样做可能会消耗大量内存。 eval():这是一个内置函数,它会解析传入的字符串表达式,并在当前上下文中执行它。 这意味着如果文件中包含的是有效的Python表达式或语句,eval() 将执行这些代码并返回执行的结果。 重点 例如,如果文件中包含字典、列表或任何其他Python表达式,eval() 可以将它们转换成相应的对象。 """ return eval(file.read()) new_dict = load_dict(dict_file) 定义词向量字典的长度 # 定义获取字典大小的函数 def get_dict_size(new_dict): return len(new_dict) 打开原始数据,构建训练数据集 # 打开原始数据 with open(data_file, 'r', encoding='utf-8-sig') as f1: # 写入编码文件 with open(encoding_file, 'w', encoding='utf-8-sig') as f2: #将原始数据的每行数据进行编码 for line in f1.readlines(): # 去字符串两头的空格 line = line.strip() # 获取标签和内容 label = line[0] # 从2取是因为索引1的位置是, content = line[2:] # 对内容进行遍历 for ch in content: # 剔除在puncts中的字符 if ch not in puncts: # 写入每个词对应的编码 f2.write(str(new_dict[ch])) # 每个词之间用逗号隔开 f2.write(',') # 写入标签:用空格和内容进行分隔,并对下一行1数据进行换行 f2.write('\t'+str(label)+'\n') 数据长度的填充 #存放训练数据 values = [] #存放真实数据 labels = [] #预设每次输入的长度是256,不够就补0,超过的部分则不要 max_lens = 256 # 读取编码数据 with open(encoding_file, 'r', encoding='utf-8-sig') as f: lines = f.readlines() # random.shuffle(lines) 是 Python 中用于将列表 lines 的元素随机打乱顺序的方法。 random.shuffle(lines) for line in lines: # 根据空格来划分训练数据和标签 data, label = line.split('\t') # 根据逗号进行分割,并转换为int类型,存入列表中 val = [int(i) for i in data[0:-1].split(',')] # 如果i 大于 max_lens,则将val列表中的元素填充为0,直到 val 的长度等于 max_lens。 for i in range(max_lens - len(val)): val.append(0) val = val[:max_lens] values.append(val) labels.append(int(label)) # 将列表转换为numpy数组,并转换为int64类型 values = np.array(values, dtype='int64') labels = np.array(labels, dtype='int64') 数据集的划分 # 数据集的划分 train_x, test_x, train_y, test_y = train_test_split(values, labels, test_size=0.2, random_state=1) 转为Tensor数据集

本项目使用的pytorch的模型框架进行训练,所以数据集最终要转为tensor类型的数据集

# 将数据转换为tensor train_data = TensorDataset(torch.from_numpy(train_x), torch.from_numpy(train_y)) test_data = TensorDataset(torch.from_numpy(test_x), torch.from_numpy(test_y)) 数据集批次的划分 train_data = DataLoader(dataset=train_data, batch_size=64, shuffle=True) test_data = DataLoader(dataset=test_data, batch_size=64, shuffle=True) 模型搭建 # 模型搭建 class LSTM(nn.Module): def __init__(self, input_size, output_size, embedding_dim=512,hidden_size=512): # 继承父类 super(LSTM, self).__init__() # 定义隐藏层大小 self.hidden_size = hidden_size # 词嵌入 # input_size:字典的大小 # embedding_dim:词向量的维度 # device:设备 self.embedding = nn.Embedding(input_size, embedding_dim, device=device) # 定义LSTM层 # embedding_dim:词向量的维度 # hidden_size:隐藏层的维度 # num_layers:LSTM层的数量 self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers=4) # 定义全连接层,输出层 # hidden_size:隐藏层的维度 # output_size:输出维度 self.fc = nn.Linear(hidden_size, output_size) def forward(self, x): # 词嵌入 embed = self.embedding(x) # 数据形状:(seq_len, batch, input_size),因为batch_first=True,所以需要转置一下 permute_ = embed.permute(1, 0, 2) # h0 = torch.zeros(1, permute_.size(0), self.hidden_size).to(device) # c0 = torch.zeros(1, permute_.size(0), self.hidden_size).to(device) # LSTM层,输出:(seq_len, batch, hidden_size) # out: 输出,h_n:最后一个隐藏层的状态,c_n:最后一个cell的状态 out, _ = self.lstm(permute_) # 输出层 """ out[-1, :, :]:这个索引操作选择了out张量的最后一个时间步的所有批次和所有隐藏单元的数据。具体来说: -1 表示选择序列中的最后一个时间步的输出。 : 表示选择该时间步下所有批次的数据。 第二个 : 表示选择这些批次数据中的所有隐藏单元。 """ out = self.fc(out[-1, :, :]) return out 训练参数定义 # 学习率 lr = 1e-4 # 训练轮次 epochs = 20 模型、损坏函数、优化器的创建 model = LSTM(get_dict_size(new_dict),2) # model.train() model.to(device) optimizer = torch.optim.AdamW(model.parameters(), lr=0.001) loss_fc = nn.CrossEntropyLoss() loss_old = 100 模型训练与测试 # 初始化保存训练和测试结果的列表 train_result = [] test_result = [] # 初始化保存训练和测试损失、精度的列表 train_losses = [] train_accuracies = [] test_losses = [] test_accuracies = [] # 循环训练每个周期 for epoch in range(1, epochs + 1): # 创建一个进度条以显示训练数据的处理进度 pbar = tqdm(train_data) # 初始化每个周期的总损失和总准确度 loss_all = 0 acc_all = 0 # 遍历训练数据集的每一个批次 for step, (x, y) in enumerate(pbar): # 将标签数据移动到指定的设备(如GPU) y = y.to(device) # 将输入数据移动到指定的设备(如GPU) x = x.to(device) # 计算每个序列的实际长度(非零元素的数量) # lengths = torch.sum(x != 0, dim=-1).cpu().long() # 使用模型进行前向传播,得到输出 out = model(x) # 计算损失 loss = loss_fc(out, y) # 反向传播计算梯度 loss.backward() # 更新模型参数 optimizer.step() # 将梯度归零 optimizer.zero_grad() # 累加损失 loss_all += loss.item() # 计算当前批次的平均损失 loss_time = loss_all / (step + 1) # 计算当前批次的准确度 # 使用 torch.mean() 计算这些 1.0 和 0.0 值的平均数,也就是所有预测正确的样本占总样本的比例,这实际上就是模型的准确率。 acc = torch.mean((y == torch.argmax(out, dim=-1)).float()) # 累加准确度 acc_all += acc # 计算当前批次的平均准确度 acc_time = acc_all / (step + 1) # 构建进度条的描述信息 s = "train => epoch:{} - step:{} - loss:{:.4f} - loss_time:{:.4f} - acc:{:.4f} - acc_time:{:.4f}".format(epoch, step, loss, loss_time, acc, acc_time) # 设置进度条的描述 pbar.set_description(s) # 将描述信息添加到训练结果列表中 train_result.append(s + "\n") # 记录当前epoch的训练损失和精度 train_losses.append(loss_time) train_accuracies.append(acc_time.item()) # 禁用梯度计算,因为我们只需要前向传播进行预测 with torch.no_grad(): # 创建一个进度条以显示测试数据的处理进度 pbar = tqdm(test_data) # 初始化每个周期的总损失和总准确度 loss_all = 0 acc_all = 0 # 遍历测试数据集的每一个批次 for step, (x, y) in enumerate(pbar): # 将标签数据移动到指定的设备(如GPU) y = y.to(device) # 将输入数据移动到指定的设备(如GPU) x = x.to(device) # 计算每个序列的实际长度(非零元素的数量) lengths = torch.sum(x != 0, dim=-1).cpu().long() # 使用模型进行前向传播,得到输出 out = model(x) # 计算损失 loss = loss_fc(out, y) # 累加损失 loss_all += loss.item() # 计算当前批次的平均损失 test_loss_time = loss_all / (step + 1) # 计算当前批次的准确度 acc = torch.mean((y == torch.argmax(out, dim=-1)).float()) # 累加准确度 acc_all += acc # 计算当前批次的平均准确度 acc_time = acc_all / (step + 1) # 构建进度条的描述信息 s = "test => epoch:{} - step:{} - loss:{:.4f} - loss_time:{:.4f} -acc:{:.4f} - acc_time:{:.4f}".format(epoch, step, loss, test_loss_time, acc, acc_time) # 设置进度条的描述 pbar.set_description(s) # 将描述信息添加到测试结果列表中 test_result.append(s + "\n") # 记录当前epoch的测试损失和精度 test_losses.append(test_loss_time) test_accuracies.append(acc_time.item()) # 将训练结果写入文件 with open(f"train_result.txt", "w") as f: f.writelines(train_result) # 将测试结果写入文件 with open(f"test_result.txt", "w") as f: f.writelines(test_result) # 保存模型,如果当前测试损失小于之前记录的最小损失 if loss_old > test_loss_time: loss_old = test_loss_time torch.save(model.state_dict(), f"model.pkl") 应用 import torch from data_deal import * from model import LSTM # 加载字典 new_dict = load_dict(dict_file) # 设备切换 device = "cuda" if torch.cuda.is_available() else "cpu" # 模型加载 model = LSTM(get_dict_size(new_dict), 2) model.load_state_dict(torch.load("model.pkl")) model.to(device) model.eval() # 设置为评估模式 # 用户输入 while True: text = input("请输入文本(输入'退出'以结束):") if text == "退出": break # 词编码 text = [new_dict[ch] for ch in sen] text = torch.tensor(text).unsqueeze(0).to(device) prediction = model(text) prediction = torch.argmax(prediction, dim=1).item() print(f"预测结果: {'正面' if prediction == 1 else '负面'}")

标签:

基于LSTM的情感分析由讯客互联开源代码栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“基于LSTM的情感分析