來自IJCAI 2018的一篇論文:《Spatio-Temporal Graph Convolutional Networks: A Deep Learning Framework for Traffic Forecasting 》
使用Kipf & Welling 2017的近似譜圖折積得到的圖折積作為空間上的折積操作,時間上使用一維折積TCN對所有頂點進行折積,兩者交替進行,組成了時空折積塊,在加州PeMS和北京市的兩個資料集上做了驗證。論文中圖的構建方法並不是基於實際路網,而是通過數學方法構建了一個基於距離關係的網路。
在交通研究中,交通流的基本變數,也就是速度、流量和密度(實際中,還有排隊長度,時間佔有率,空間佔有率,車頭時距等多個變數),這些變數通常作為監控當前交通狀態以及未來預測的指示指標。根據預測的長度,主要是指預測時間視窗的大小,交通預測大體分為兩個尺度:短期(5~30min),中和長期預測(超過30min)。大多數流行的統計方法(比如,線性迴歸)可以在短期預測上表現的很好。然而,由於交通流的不確定性和複雜性,這些方法在相對長期的預測上不是很有效。
中長期交通預測上的研究可以分為兩類:動態建模和資料驅動的方法。
深度學習方法:深度學習已經廣泛且成功地應用於各式各樣的交通任務中,並取得了很顯著的成果,比如層疊自編碼器(SAE)。然而,這些全連線神經網路很難從輸入中提取空間和時間特徵。而且,空間屬性的嚴格限制甚至完全缺失,這些網路的表示能力被限制的很嚴重。為了充分利用空間特徵,一些研究者使用了折積神經網路來捕獲交通網路中的臨近資訊,同時也在時間軸上部署了迴圈神經網路。通過組合LSTM和1維折積,比如特徵層面融合的架構CLTFP來預測短期交通狀況。CLTFP是第一個嘗試對時間和空間規律性對齊的方法。後來,有學者提出帶有嵌入折積層的全連線網路:ConvLSTM。但是常規折積操作只適合處理規則化的網路結構,比如影象或者視訊,而大多數領域是非結構化的,比如社群網路,交通路網等。此外,RNN對於序列的學習需要迭代訓練,這會導致誤差的積累。並且還有難以訓練和耗時的缺點。
針對以上問題和缺陷:該文引入了一些策略來有效的對交通流的時間動態和空間依賴進行建模。為了完全利用空間資訊,利用廣義圖對交通網路建模,而不是將交通流看成各個離散的部分(比如網格或碎塊)。為了處理迴圈神經網路的缺陷,我們在時間軸上部署了一個全折積結構來加速模型的訓練過程。綜上所述,該文提出了一個新的神經網路架構-時空圖折積網路,來預測交通狀況。這個架構由多個時空圖折積塊組成。
主要貢獻:
文章首先將網格資料改為圖資料作為輸入,圖可以用鄰接矩陣來表示,圖中的W就是圖的鄰接矩陣,實驗中使用的資料集PeMSD7(M)共有228個資料點,相當於一個具有228個頂點的圖,因為這個模型主要是對速度進行預測,所以每個頂點只有一個特徵就是:速度。
網路架構是本文的重點部分。如下圖所示,STGCN有多個時空折積塊組成,每一個都是像一個「三明治」結構的組成,有兩個門序列折積層和一個空間圖折積層在中間。
組成結構:STGCN 有兩個ST-Conv Block(淡藍色部分)快和一個全連線輸出layer(綠色部分),其中每個ST-Conv Block塊有包括兩個時間折積塊(橙色部分)和一個空間折積塊(淺黃色部分)
提取時間特徵的圖折積神經網路,即對應網路結構中的Spatial Graph-Conv 模組。
交通路網是非結構化的影象,為了捕獲空間上的相關性,本篇論文采用的是切比雪夫近似與一階近似後的圖折積公式。只需要看最終的那個折積公式,其中D為圖的度矩陣,A_hat為圖的鄰接矩陣+單位矩陣,為的是在折積過程中不僅考慮鄰居節點的狀態,也考慮自身的狀態。
提取空間特徵的圖折積神經網路,即對應網路結構中的Temporal Gated-Conv 模組。
在時間維度上,本文采用門控折積來捕獲時間依賴性,而且與傳統的折積方法不同,由於還要考慮時間序列的問題,所以這裡採用的是因果折積(TCN)。使用折積操作,就不在像採用RNN的方法依賴於之前的輸出,並且還可以對資料進行並行處理,這樣使得模型訓練速度更快。
此外,採用還採用了GLU操作,GLU是在這篇論文中提出的:Language Modeling with Gated Convolutional Networks。在STGCN這篇論文,文章只是簡單提到採用這種操作可以緩解梯度消失等現象還可以保留模型的非線效能力。
將以上的圖折積和門控CNN組合成如圖所示的結構,其中使用了瓶頸策略來實現尺度壓縮和特徵壓縮。並在每層之後接歸一化層來防止過擬合。
ST-Conv Block的公式就是圖的另一個解釋,輸入資料先做時間維度折積,輸出結果再做圖折積,圖折積的輸出結果經過一個RELU,在進行一個時間維度折積,就是整個ST-Conv Block的輸出。
最後的模型是堆疊兩個St-Conv Block之後接一個輸出層,其中輸出層首先用時間維度的折積將之前的輸出資料的時間維度進行合併,合併之後在經過一個折積輸出最終的預測資料,預測資料就是下一個時間維度的一張圖[1,228,1]。模型採用的是L2損失。
原始碼以Pytorch為例
原始資料時間視窗長度是5min一個資料片,即:每5分鐘記錄該時間視窗內,各個路網節點的資料資訊,每條資料資訊包含2個維度資訊。
讀取資料,並且利用 Z-score method進行歸一化:
X = X - means.reshape(1, -1, 1)和 X = X / stds.reshape(1, -1, 1),最終返回資料格式為:X:[207, 2, 34272],表示有圖中有207個節點,34272個時間片,每個時間片對應的節點資料維度是2
def load_metr_la_data():
if (not os.path.isfile("data/adj_mat.npy")
or not os.path.isfile("data/node_values.npy")):
with zipfile.ZipFile("data/METR-LA.zip", 'r') as zip_ref:
zip_ref.extractall("data/")
A = np.load("data/adj_mat.npy")
X = np.load("data/node_values.npy").transpose((1, 2, 0))
X = X.astype(np.float32)
# Normalization using Z-score method
means = np.mean(X, axis=(0, 2))
X = X - means.reshape(1, -1, 1)
stds = np.std(X, axis=(0, 2))
X = X / stds.reshape(1, -1, 1)
return A, X, means, stds
讀取鄰接矩陣,並返回度資訊
def get_normalized_adj(A):
"""
Returns the degree normalized adjacency matrix.
"""
A = A + np.diag(np.ones(A.shape[0], dtype=np.float32))
D = np.array(np.sum(A, axis=1)).reshape((-1,))
D[D <= 10e-5] = 10e-5 # Prevent infs
diag = np.reciprocal(np.sqrt(D))
A_wave = np.multiply(np.multiply(diag.reshape((-1, 1)), A),
diag.reshape((1, -1)))
return A_wave
def generate_dataset(X, num_timesteps_input, num_timesteps_output):
"""
Takes node features for the graph and divides them into multiple samples
along the time-axis by sliding a window of size (num_timesteps_input+
num_timesteps_output) across it in steps of 1.
:param X: Node features of shape (num_vertices, num_features,
num_timesteps)
:return:
- Node features divided into multiple samples. Shape is
(num_samples, num_vertices, num_features, num_timesteps_input).
- Node targets for the samples. Shape is
(num_samples, num_vertices, num_features, num_timesteps_output).
"""
# Generate the beginning index and the ending index of a sample, which
# contains (num_points_for_training + num_points_for_predicting) points
indices = [(i, i + (num_timesteps_input + num_timesteps_output)) for i
in range(X.shape[2] - (
num_timesteps_input + num_timesteps_output) + 1)]
# Save samples
features, target = [], []
for i, j in indices:
features.append(
X[:, :, i: i + num_timesteps_input].transpose(
(0, 2, 1)))
target.append(X[:, 0, i + num_timesteps_input: j])
return torch.from_numpy(np.array(features)), \
torch.from_numpy(np.array(target))
這裡batch_size設定為50,同時為了比較快速清晰的檢視各個模組的輸出,我們就以一個batch為例,即:輸入的總時間片長度為50
STGCN包含三個模型,其中兩個時空STGCNBlock模組和一個TimeBlock模組和最後一個全連線層,而一個STGCNBlock又包含兩個temporal(temporal)模組,(也就是TimeBlock模組模組)和 一個Theta(spatial)模組,一個TimeBlock模組包含因果折積操作目的就是為了提取時間資訊(主要是W維度上的)TimeBlock模組主要是為了提取時間資訊,利用的是一維折積,kernel_size=3,折積核大小為(1,kernel_size=3,分別對應H,W)。
TimeBlock模組:
class TimeBlock(nn.Module):
"""
Neural network block that applies a temporal convolution to each node of
a graph in isolation.
"""
def __init__(self, in_channels, out_channels, kernel_size=3):
"""
:param in_channels: Number of input features at each node in each time
step.
:param out_channels: Desired number of output channels at each node in
each time step.
:param kernel_size: Size of the 1D temporal kernel.
"""
super(TimeBlock, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, (1, kernel_size))
self.conv2 = nn.Conv2d(in_channels, out_channels, (1, kernel_size))
self.conv3 = nn.Conv2d(in_channels, out_channels, (1, kernel_size))
def forward(self, X):
"""
:param X: Input data of shape (batch_size, num_nodes, num_timesteps,
num_features=in_channels)
:return: Output data of shape (batch_size, num_nodes,
num_timesteps_out, num_features_out=out_channels)
"""
# Convert into NCHW format for pytorch to perform convolutions.
X = X.permute(0, 3, 1, 2)
temp = self.conv1(X) + torch.sigmoid(self.conv2(X))
out = F.relu(temp + self.conv3(X))
# Convert back from NCHW to NHWC
out = out.permute(0, 2, 3, 1)
return out
STGCNBlock模組
class STGCNBlock(nn.Module):
"""
Neural network block that applies a temporal convolution on each node in
isolation, followed by a graph convolution, followed by another temporal
convolution on each node.
"""
def __init__(self, in_channels, spatial_channels, out_channels,
num_nodes):
"""
:param in_channels: Number of input features at each node in each time
step.
:param spatial_channels: Number of output channels of the graph
convolutional, spatial sub-block.
:param out_channels: Desired number of output features at each node in
each time step.
:param num_nodes: Number of nodes in the graph.
"""
super(STGCNBlock, self).__init__()
self.temporal1 = TimeBlock(in_channels=in_channels,
out_channels=out_channels)
self.Theta1 = nn.Parameter(torch.FloatTensor(out_channels,
spatial_channels))
self.temporal2 = TimeBlock(in_channels=spatial_channels,
out_channels=out_channels)
self.batch_norm = nn.BatchNorm2d(num_nodes)
self.reset_parameters()
def reset_parameters(self):
stdv = 1. / math.sqrt(self.Theta1.shape[1])
self.Theta1.data.uniform_(-stdv, stdv)
def forward(self, X, A_hat):
"""
:param X: Input data of shape (batch_size, num_nodes, num_timesteps,
num_features=in_channels).
:param A_hat: Normalized adjacency matrix.
:return: Output data of shape (batch_size, num_nodes,
num_timesteps_out, num_features=out_channels).
"""
t = self.temporal1(X)
lfs = torch.einsum("ij,jklm->kilm", [A_hat, t.permute(1, 0, 2, 3)])
# t2 = F.relu(torch.einsum("ijkl,lp->ijkp", [lfs, self.Theta1]))
t2 = F.relu(torch.matmul(lfs, self.Theta1))
t3 = self.temporal2(t2)
return self.batch_norm(t3)
# return t3
STGCN模組:
class STGCN(nn.Module):
"""
Spatio-temporal graph convolutional network as described in
https://arxiv.org/abs/1709.04875v3 by Yu et al.
Input should have shape (batch_size, num_nodes, num_input_time_steps,
num_features).
"""
def __init__(self, num_nodes, num_features, num_timesteps_input,
num_timesteps_output):
"""
:param num_nodes: Number of nodes in the graph.
:param num_features: Number of features at each node in each time step.
:param num_timesteps_input: Number of past time steps fed into the
network.
:param num_timesteps_output: Desired number of future time steps
output by the network.
"""
super(STGCN, self).__init__()
self.block1 = STGCNBlock(in_channels=num_features, out_channels=64,
spatial_channels=16, num_nodes=num_nodes)
self.block2 = STGCNBlock(in_channels=64, out_channels=64,
spatial_channels=16, num_nodes=num_nodes)
self.last_temporal = TimeBlock(in_channels=64, out_channels=64)
self.fully = nn.Linear((num_timesteps_input - 2 * 5) * 64,
num_timesteps_output)
def forward(self, A_hat, X):
"""
:param X: Input data of shape (batch_size, num_nodes, num_timesteps,
num_features=in_channels).
:param A_hat: Normalized adjacency matrix.
"""
out1 = self.block1(X, A_hat)
out2 = self.block2(out1, A_hat)
out3 = self.last_temporal(out2)
out4 = self.fully(out3.reshape((out3.shape[0], out3.shape[1], -1)))
return out4