GRU是LSTM網路的一種效果很好的變體,它較LSTM網路的結構更加簡單,而且效果也很好,因此也是當前非常流形的一種網路。GRU既然是LSTM的變體,因此也是可以解決RNN網路中的長依賴問題。
GRU的引數較少,因此訓練速度更快,GRU能夠降低過擬合的風險。
在LSTM中引入了三個門函數:輸入門、遺忘門和輸出門來控制輸入值、記憶值和輸出值。而在GRU模型中只有兩個門:分別是更新門和重置門。具體結構如下圖所示:
·
圖中的zt和rt分別表示更新門和重置門。更新門用於控制前一時刻的狀態資訊被帶入到當前狀態中的程度,更新門的值越大說明前一時刻的狀態資訊帶入越多。重置門控制前一狀態有多少資訊被寫入到當前的候選集 h~t
rnn = nn.GRU(input_size, hidden_size, num_layers, bias, batch_first, dropout, bidirectional)
初始化:
input_size: input的特徵維度
hidden_size: 隱藏層的寬度
num_layers: 單元的數量(層數),預設為1,如果為2以為著將兩個GRU堆疊在一起,當成一個GRU單元使用。
bias: True or False,是否使用bias項,預設使用
batch_first: Ture or False, 預設的輸入是三個維度的,即:(seq, batch, feature),第一個維度是時間序列,第二個維度是batch,第三個維度是特徵。如果設定為True,則(batch, seq, feature)。即batch,時間序列,每個時間點特徵。
dropout:設定隱藏層是否啟用dropout,預設為0
bidirectional:True or False, 預設為False,是否使用雙向的GRU,如果使用雙向的GRU,則自動將序列正序和反序各輸入一次。
輸入:
rnn(input, h_0)
輸出:
output, hn = rnn(input, h0)
形狀的和LSTM差不多,也有雙向
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
import os
import re
import pickle
import numpy as np
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
dataset_path = r'C:\Users\ci21615\Downloads\aclImdb_v1\aclImdb'
MAX_LEN = 500
def tokenize(text):
"""
分詞,處理原始文字
:param text:
:return:
"""
fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>', '\?', '@'
, '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '」', '「', ]
text = re.sub("<.*?>", " ", text, flags=re.S)
text = re.sub("|".join(fileters), " ", text, flags=re.S)
return [i.strip() for i in text.split()]
class ImdbDataset(Dataset):
"""
準備資料集
"""
def __init__(self, mode):
super(ImdbDataset, self).__init__()
if mode == 'train':
text_path = [os.path.join(dataset_path, i) for i in ['train/neg', 'train/pos']]
else:
text_path = [os.path.join(dataset_path, i) for i in ['test/neg', 'test/pos']]
self.total_file_path_list = []
for i in text_path:
self.total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
def __getitem__(self, item):
cur_path = self.total_file_path_list[item]
cur_filename = os.path.basename(cur_path)
# 獲取標籤
label_temp = int(cur_filename.split('_')[-1].split('.')[0]) - 1
label = 0 if label_temp < 4 else 1
text = tokenize(open(cur_path, encoding='utf-8').read().strip())
return label, text
def __len__(self):
return len(self.total_file_path_list)
class Word2Sequence():
UNK_TAG = 'UNK'
PAD_TAG = 'PAD'
UNK = 0
PAD = 1
def __init__(self):
self.dict = {
self.UNK_TAG: self.UNK,
self.PAD_TAG: self.PAD
}
self.count = {} # 統計詞頻
def fit(self, sentence):
"""
把單個句子儲存到dict中
:return:
"""
for word in sentence:
self.count[word] = self.count.get(word, 0) + 1
def build_vocab(self, min=5, max=None, max_feature=None):
"""
生成詞典
:param min: 最小出現的次數
:param max: 最大次數
:param max_feature: 一共保留多少個詞語
:return:
"""
# 刪除詞頻小於min的word
if min is not None:
self.count = {word:value for word,value in self.count.items() if value > min}
# 刪除詞頻大於max的word
if max is not None:
self.count = {word:value for word,value in self.count.items() if value < max}
# 限制保留的詞語數
if max_feature is not None:
temp = sorted(self.count.items(), key=lambda x:x[-1],reverse=True)[:max_feature]
self.count = dict(temp)
for word in self.count:
self.dict[word] = len(self.dict)
# 得到一個反轉的字典
self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys()))
def transform(self, sentence, max_len=None):
"""
把句子轉化為序列
:param sentence: [word1, word2...]
:param max_len: 對句子進行填充或裁剪
:return:
"""
if max_len is not None:
if max_len > len(sentence):
sentence = sentence + [self.PAD_TAG] * (max_len - len(sentence)) # 填充
if max_len < len(sentence):
sentence = sentence[:max_len] # 裁剪
return [self.dict.get(word, self.UNK) for word in sentence]
def inverse_transform(self, indices):
"""
把序列轉化為句子
:param indices: [1,2,3,4...]
:return:
"""
return [self.inverse_dict.get(idx) for idx in indices]
def __len__(self):
return len(self.dict)
def fit_save_word_sequence():
"""
從資料集構建字典
:return:
"""
ws = Word2Sequence()
train_path = [os.path.join(dataset_path, i) for i in ['train/neg', 'train/pos']]
total_file_path_list = []
for i in train_path:
total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
for cur_path in tqdm(total_file_path_list, desc='fitting'):
sentence = open(cur_path, encoding='utf-8').read().strip()
res = tokenize(sentence)
ws.fit(res)
# 對wordSequesnce進行儲存
ws.build_vocab(min=10)
# pickle.dump(ws, open('./lstm_model/ws.pkl', 'wb'))
return ws
def get_dataloader(mode='train', batch_size=20, ws=None):
"""
獲取資料集,轉換成詞向量後的資料集
:param mode:
:return:
"""
# 匯入詞典
# ws = pickle.load(open('./model/ws.pkl', 'rb'))
# 自定義collate_fn函數
def collate_fn(batch):
"""
batch是list,其中是一個一個元組,每個元組是dataset中__getitem__的結果
:param batch:
:return:
"""
batch = list(zip(*batch))
labels = torch.LongTensor(batch[0])
texts = batch[1]
# 獲取每個文字的長度
lengths = [len(i) if len(i) < MAX_LEN else MAX_LEN for i in texts]
# 每一段文字句子都轉換成了n個單詞對應的數位組成的向量,即500個單詞數位組成的向量
temp = [ws.transform(i, MAX_LEN) for i in texts]
texts = torch.LongTensor(temp)
del batch
return labels, texts, lengths
dataset = ImdbDataset(mode)
dataloader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
return dataloader
class ImdbLstmModel(nn.Module):
def __init__(self, ws):
super(ImdbLstmModel, self).__init__()
self.hidden_size = 64 # 隱藏層神經元的數量,即每一層有多少個LSTM單元
self.embedding_dim = 200 # 每個詞語使用多長的向量表示
self.num_layer = 1 # 即RNN的中LSTM單元的層數
self.bidriectional = True # 是否使用雙向LSTM,預設是False,表示雙向LSTM,也就是序列從左往右算一次,從右往左又算一次,這樣就可以兩倍的輸出
self.num_directions = 2 if self.bidriectional else 1 # 是否雙向取值,雙向取值為2,單向取值為1
self.dropout = 0.5 # dropout的比例,預設值為0。dropout是一種訓練過程中讓部分引數隨機失活的一種方式,能夠提高訓練速度,同時能夠解決過擬合的問題。這裡是在LSTM的最後一層,對每個輸出進行dropout
# 每個句子長度為500
# ws = pickle.load(open('./model/ws.pkl', 'rb'))
print(len(ws))
self.embedding = nn.Embedding(len(ws), self.embedding_dim)
# self.lstm = nn.LSTM(self.embedding_dim,self.hidden_size,self.num_layer,bidirectional=self.bidriectional,dropout=self.dropout)
self.gru = nn.GRU(input_size=self.embedding_dim, hidden_size=self.hidden_size, bidirectional=self.bidriectional)
self.fc = nn.Linear(self.hidden_size * self.num_directions, 20)
self.fc2 = nn.Linear(20, 2)
def init_hidden_state(self, batch_size):
"""
初始化 前一次的h_0(前一次的隱藏狀態)和c_0(前一次memory)
:param batch_size:
:return:
"""
h_0 = torch.rand(self.num_layer * self.num_directions, batch_size, self.hidden_size)
return h_0
def forward(self, input):
# 句子轉換成詞向量
x = self.embedding(input)
# 如果batch_first為False的話轉換一下seq_len和batch_size的位置
x = x.permute(1,0,2) # [seq_len, batch_size, embedding_num]
# 初始化前一次的h_0(前一次的隱藏狀態)和c_0(前一次memory)
h_0 = self.init_hidden_state(x.size(1)) # [num_layers * num_directions, batch, hidden_size]
output, h_n = self.gru(x, h_0)
# 只要最後一個lstm單元處理的結果,這裡多去的hidden state
out = torch.cat([h_n[-2, :, :], h_n[-1, :, :]], dim=-1)
out = self.fc(out)
out = F.relu(out)
out = self.fc2(out)
return F.log_softmax(out, dim=-1)
train_batch_size = 64
test_batch_size = 5000
def train(epoch, ws):
"""
訓練
:param epoch: 輪次
:param ws: 字典
:return:
"""
mode = 'train'
imdb_lstm_model = ImdbLstmModel(ws)
optimizer = optim.Adam(imdb_lstm_model.parameters())
for i in range(epoch):
train_dataloader = get_dataloader(mode=mode, batch_size=train_batch_size, ws=ws)
for idx, (target, input, input_length) in enumerate(train_dataloader):
optimizer.zero_grad()
output = imdb_lstm_model(input)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
pred = torch.max(output, dim=-1, keepdim=False)[-1]
acc = pred.eq(target.data).numpy().mean() * 100.
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\t ACC: {:.6f}'.format(i, idx * len(input), len(train_dataloader.dataset),
100. * idx / len(train_dataloader), loss.item(), acc))
torch.save(imdb_lstm_model.state_dict(), 'model/gru_model.pkl')
torch.save(optimizer.state_dict(), 'model/gru_optimizer.pkl')
def test(ws):
mode = 'test'
# 載入模型
lstm_model = ImdbLstmModel(ws)
lstm_model.load_state_dict(torch.load('model/lstm_model.pkl'))
optimizer = optim.Adam(lstm_model.parameters())
optimizer.load_state_dict(torch.load('model/lstm_optimizer.pkl'))
lstm_model.eval()
test_dataloader = get_dataloader(mode=mode, batch_size=test_batch_size, ws=ws)
with torch.no_grad():
for idx, (target, input, input_length) in enumerate(test_dataloader):
output = lstm_model(input)
test_loss = F.nll_loss(output, target, reduction='mean')
pred = torch.max(output, dim=-1, keepdim=False)[-1]
correct = pred.eq(target.data).sum()
acc = 100. * pred.eq(target.data).cpu().numpy().mean()
print('idx: {} Test set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(idx, test_loss, correct, target.size(0), acc))
if __name__ == '__main__':
# 構建字典
ws = fit_save_word_sequence()
# 訓練
train(10, ws)
# 測試
# test(ws)
結果展示: