-
Notifications
You must be signed in to change notification settings - Fork 42
/
Copy pathlstm.py
129 lines (118 loc) · 4.63 KB
/
lstm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import torch
from torch.autograd import Variable
import torch.nn as nn
import pandas as pd
from pandas import DataFrame
import matplotlib.pyplot as plt
import numpy as np
class lstm_reg(nn.Module):
def __init__(self, input_size, hidden_size, output_size=1, num_layers=2):
'''
:param input_size:输入样本的特征维度
:param hidden_size:LSTM层的神经元个数
:param output_size:输出的特征维度
:param num_layers:LSTM网络的层数
'''
super(lstm_reg, self).__init__()
self.rnn = nn.LSTM(input_size, hidden_size, num_layers)#lstm层
self.reg = nn.Linear(hidden_size, output_size)#线性层
def forward(self, x):
x, _ = self.rnn(x)
s, b, h = x.shape
x = x.view(s*b, h)
x = self.reg(x)
x = x.view(s, b, -1)
return x
#创建预测输入和输出数据集
def create_dataset(dataset,look_back=2):
'''
:param dataset: 数据集
:param look_back: 滑动窗口大小
:return: 预测的输入data_X,输出data_Y
'''
data_X, data_Y = [], []
for i in range(len(dataset)-look_back):
a = dataset[i: i+look_back]
data_X.append(a)
data_Y.append(dataset[i+look_back])
return np.array(data_X), np.array(data_Y)
#划分测试集和训练集,默认分割系数为0.7,然后改变数据集的形状使其符合lstm的输入
def split_reshape_dataset(data_X, data_Y, factor=0.7,look_back=2):
train_size = int(len(data_X) * factor)
train_X = data_X[:train_size]
train_Y = data_Y[:train_size]
test_X = data_X[train_size:]
test_Y = data_Y[train_size:]
#我们需要将数据改变一下形状,因为 lstm 读入的数据维度是 (seq, batch, feature),
# 所以要重新改变一下数据的维度,这里只有一个序列,所以 batch 是 1,
# 而输入的 feature 就是我们希望依据的特征,这里默认的是根据前两天的输入来进行预测(也就是滑动窗口的大小),所以 feature 就是 2.
train_X = train_X.reshape(-1, 1, look_back)
train_Y = train_Y.reshape(-1, 1, 1)
test_X = test_X.reshape(-1, 1, look_back)
test_Y = test_Y.reshape(-1, 1, 1)
return train_X, train_Y, test_X, test_Y
#数据预处理
def data_preprocessing(dataset):
"""
:param dataset: 数据集
:return: 归一化后的数据,归一化的尺度
"""
dataset = dataset.dropna() # 丢弃空值
if isinstance(dataset, pd.Series):
data = dataset.values
elif isinstance(dataset, list):
data = np.array(dataset)
elif isinstance(dataset, np.ndarray):
data = dataset
else:
data = dataset.values
data = data.astype('float32')
#0-1归一化
scalar = np.max(data) - np.min(data)
data = list(map(lambda x: x / scalar, data))
return data, scalar
if __name__ == "__main__":
dataset = pd.read_csv('data.csv', usecols=[1])
data, scalar = data_preprocessing(dataset)
data_X, data_Y = create_dataset(data)
train_X, train_Y, test_X, test_Y = split_reshape_dataset(data_X, data_Y)
#构建lstm网络
net = lstm_reg(2, 4)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-2)
#开始训练
# print("=================开始训练====================")
# print(train_X)
# for e in range(10000):
# train_X = torch.as_tensor(torch.Tensor(train_X), dtype=torch.float32)
# train_Y = torch.as_tensor(torch.Tensor(train_Y), dtype=torch.float32)
# out = net.forward(train_X)
# loss = criterion(out, train_Y)
# optimizer.zero_grad()
# loss.backward()
# optimizer.step()
# if (e+1) % 100 == 0:
# print('Epoch: {}, Loss:{:.5f}'.format(e+1, loss.item()))
# #保存训练好的模型
# torch.save(net.state_dict(), "data.net_params.pkl")
#加载模型
net.load_state_dict(torch.load('data.net_params.pkl'))
#预测值
test_X = torch.as_tensor(torch.Tensor(test_X), dtype=torch.float32)
pred_test = net.forward(test_X)
# 乘以原来归一化的刻度放缩回到原来的值域
origin_test_Y = test_Y * scalar
origin_pred_test = pred_test * scalar
#画图
plt.plot(origin_pred_test.data.numpy().reshape(-1), 'r', label='prediction')
plt.plot(origin_test_Y.reshape(-1), 'b', label='real')
plt.legend(loc='best')
plt.show()
#计算MSE
true_data = origin_test_Y.data
true_data = np.array(true_data)
true_data = np.squeeze(true_data) # 从二维变成一维
MSE = true_data - origin_pred_test.data.numpy().reshape(-1)
MSE = MSE * MSE
MSE_loss = sum(MSE) / len(MSE)
print("MSE_loss =",MSE_loss)