0%

图神经网络进行僵尸网络检测源码解析

概述:首页描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def scatter_(name, src, index, dim_size=None, out=None):
r"""Aggregates all values from the :attr:`botdet` tensor at the indices
specified in the :attr:`index` tensor along the first dimension.
If multiple indices reference the same location, their contributions
are aggregated according to :attr:`name` (either :obj:`"add"`,
:obj:`"mean"` or :obj:`"max"`).
Args:
name (string): The aggregation to use (:obj:`"add"`, :obj:`"mean"`,
:obj:`"max"`).
botdet (Tensor): The source tensor.
index (LongTensor): The indices of elements to scatter.
dim_size (int, optional): Automatically create output tensor with size
:attr:`dim_size` in the first dimension. If set to :attr:`None`, a
minimal sized output tensor is returned. (default: :obj:`None`)
:rtype: :class:`Tensor`
"""

assert name in ['add', 'mean', 'max']

op = getattr(torch_scatter, 'scatter_{}'.format(name))
fill_value = -1e38 if name == 'max' else 0
out = op(src, index, 0, out, dim_size)

if isinstance(out, tuple):
out = out[0]

if name == 'max':
out[out == fill_value] = 0

return out
1
2


NodeModelBase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class NodeModelBase(nn.Module):
"""
基于节点和边权重更新节点权重的模型的基础模型。
注意:非线性聚合方式采用add的方式

Args:
in_channels (int): 输入通道数
out_channels (int): 输出通道数
in_edgedim (int, optional): 输入的边特征维度
deg_norm (str, optional): 节点正则化常亮计算方法
Choose from [None, 'sm', 'rw'].
edge_gate (str, optional): method of applying edge gating mechanism. Choose from [None, 'proj', 'free'].
Note: 当设置free时,应该提分that when set to 'free', should also provide `num_edges` as an argument (but then it can only work
with fixed edge graph).
aggr (str, optional): 信息传递方法. ['add', 'mean', 'max'],默认为'add'.
**kwargs: could include `num_edges`, etc.

Input:
- x (torch.Tensor): 节点特征矩阵 (N, C_in)
- edge_index (torch.LongTensor): COO 格式的边索引,(2, E)
- edge_attr (torch.Tensor, optional): 边特征矩阵 (E, D_in)

Output:
- xo (torch.Tensor):更新的节点特征 (N, C_out)

where
N: 输入节点数量
E: 边数量
C_in/C_out: 输入/输出节点特征的维度
D_in: 输入的边特征维度
"""

def __init__(self, in_channels, out_channels, in_edgedim=None, deg_norm='none', edge_gate='none', aggr='add',
*args, **kwargs):
assert deg_norm in ['none', 'sm', 'rw']
assert edge_gate in ['none', 'proj', 'free']
assert aggr in ['add', 'mean', 'max']

super(NodeModelBase, self).__init__()

self.in_channels = in_channels
self.out_channels = out_channels
self.in_edgedim = in_edgedim
self.deg_norm = deg_norm
self.aggr = aggr

if edge_gate == 'proj':
self.edge_gate = EdgeGateProj(out_channels, in_edgedim=in_edgedim, bias=True)
elif edge_gate == 'free':
assert 'num_edges' in kwargs # note: this will restrict the model to only a fixed number of edges
self.edge_gate = EdgeGateFree(kwargs['num_edges']) # so don't use this unless necessary
else:
self.register_parameter('edge_gate', None)

@staticmethod
def degnorm_const(edge_index=None, num_nodes=None, deg=None, edge_weight=None, method='sm', device=None):
"""
计算归一化常数
Calculating the normalization constants based on out-degrees for a graph.
`_sm` 使用对称归一化,"symmetric". 更适合用于无向图.
`_rw` 使用随即游走归一化(均值),"random walk". 更适合用于有向图.

Procedure:
- 检查edge_weight,如果不为None,那么必须同时提供edge_index和num_nodes,计算全部节点的度
- 如果edge_weighe,如果是None,检查是否已经存在deg(节点的度矩阵):
- 如果度矩阵存在,那么忽略edge_index和num_nodes
- 如果度矩阵不存在,则必须提供edge_index和num_nodes,并计算全部节点的度

Input:
- edge_index (torch.Tensor): COO格式的图关系, (2, E),long
- num_nodes (int): 节点数量
- deg (torch.Tensor): 节点的度,(N,),float
- edge_weight (torch.Tensor): 边权重,(E,),float
- method (str): 度标准化方法, choose from ['sm', 'rw']
- device (str or torch.device): 驱动器编号

Output:
- norm (torch.Tensor): 基于节点度和边权重的标准化常数.
If `method` == 'sm', size (E,);
if `method` == 'rw' and `edge_weight` != None, size (E,);
if `method` == 'rw' and `edge_weight` == None, size (N,).

where
N: 节点数量
E: 边数量
"""
assert method in ['sm', 'rw']

if device is None and edge_index is not None:
device = edge_index.device

if edge_weight is not None:
assert edge_index is not None, 'edge_index must be provided when edge_weight is not None'
assert num_nodes is not None, 'num_nodes must be provided when edge_weight is not None'

edge_weight = edge_weight.view(-1)
assert edge_weight.size(0) == edge_index.size(1)

calculate_deg = True # 时候需要计算节点度
edge_weight_equal = False
else:
if deg is None:
assert edge_index is not None, 'edge_index must be provided when edge_weight is None ' \
'but deg not provided'
assert num_nodes is not None, 'num_nodes must be provided when edge_weight is None ' \
'but deg not provided'
edge_weight = torch.ones((edge_index.size(1),), device=device)
calculate_deg = True
else:
# node degrees are provided
calculate_deg = False
edge_weight_equal = True

row, col = edge_index
# 计算节点度
if calculate_deg:
deg = scatter_add(edge_weight, row, dim=0, dim_size=num_nodes)
# 节点度标准化
if method == 'sm':
deg_inv_sqrt = deg.pow(-0.5)
elif method == 'rw':
deg_inv_sqrt = deg.pow(-1)
else:
raise ValueError

deg_inv_sqrt[deg_inv_sqrt == float('inf')] = 0

if method == 'sm':
# 采用对称标准化的方式,得到的结果向量为(E,)
norm = (deg_inv_sqrt[row] * edge_weight * deg_inv_sqrt[col] if not edge_weight_equal # 注意,这里没有直接使用deg_inv_sqrt是因为要乘以权重
else deg_inv_sqrt[row] * deg_inv_sqrt[col]) # size (E,)
elif method == 'rw':
# 采用随即游走标准化,如果没有边权重矩阵,那么直接输出签名开方的结果,为(N,),否则与上面类似输出为(E,)
norm = (deg_inv_sqrt[row] * edge_weight if not edge_weight_equal # size (E,)
else deg_inv_sqrt) # size (N,)
else:
raise ValueError

return norm

def forward(self, x, edge_index, edge_attr=None, deg=None, edge_weight=None, *args, **kwargs):
return x

def num_parameters(self):
if not hasattr(self, 'num_para'):
self.num_para = sum([p.nelement() for p in self.parameters()])
return self.num_para

def __repr__(self):
return '{} (in_channels: {}, out_channels: {}, in_edgedim: {}, deg_norm: {}, edge_gate: {},' \
'aggr: {} | number of parameters: {})'.format(
self.__class__.__name__, self.in_channels, self.out_channels, self.in_edgedim,
self.deg_norm, self.edge_gate.__class__.__name__, self.aggr, self.num_parameters())

NodeModelAdditive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class NodeModelAdditive(NodeModelBase):
"""
通过邻域节点的节点和边特征更新节点特征,节点特征表示选用节点的出度
"""

def __init__(self, in_channels, out_channels, in_edgedim=None, deg_norm='sm', edge_gate='none', aggr='sum',
bias=True,
**kwargs):
super(NodeModelAdditive, self).__init__(in_channels, out_channels, in_edgedim, deg_norm, edge_gate, aggr,
**kwargs)
# 节点权重矩阵
self.weight_node = Parameter(torch.Tensor(in_channels, out_channels))
# 边权重矩阵
if in_edgedim is not None:
self.weight_edge = Parameter(torch.Tensor(in_edgedim, out_channels))

if bias:
self.bias = Parameter(torch.Tensor(out_channels))
else:
self.register_parameter('bias', None)

self.reset_parameters()

def reset_parameters(self):
glorot(self.weight_node)
if self.in_edgedim is not None:
glorot(self.weight_edge)
if self.bias is not None:
zeros(self.bias)

def forward(self, x, edge_index, edge_attr=None, deg=None, edge_weight=None, **kwargs):

# 将节点特征转化为向量表达, (Node_nums, C_out)
x = torch.matmul(x, self.weight_node)

# 构建边特征向量(如果存在的话)
if edge_attr is not None:
assert self.in_edgedim is not None
x_je = torch.matmul(edge_attr, self.weight_edge) # size (E, C_out)

# 为信息传递准备节点特征, 包括信息normalization和合并边缘两部分
if self.deg_norm == 'none':
# 直接使用起始节点特征形成(E, C_out)的起始节点向量矩阵
x_j = torch.index_select(x, 0, edge_index[0])
else:
# 使用节点的度和边权重计算节点的正则化量,(E,)或(N,)
norm = self.degnorm_const(edge_index, num_nodes=x.size(0), deg=deg,
edge_weight=edge_weight, method=self.deg_norm, device=x.device)
if self.deg_norm == 'rw' and edge_weight is None:
x_j = x * norm.view(-1, 1) # this saves much memory when N << E
# lift the features to source nodes, resulting size (E, C_out)
x_j = torch.index_select(x_j, 0, edge_index[0])
else:
# lift the features to source nodes, resulting size (E, C_out)
x_j = torch.index_select(x, 0, edge_index[0])
x_j = x_j * norm.view(-1, 1) # norm.view(-1, 1) second dim set to 1 for broadcasting
#----------------- 聚合节点+边特征,得到最终新的节点特征--------------
# 获得最终要进行聚合的特征向量,是否包含边特征两种
x_j = x_j + x_je if edge_attr is not None else x_j

# 使用edge gates
if self.edge_gate is not None:
eg = self.edge_gate(x, edge_index, edge_attr=edge_attr, edge_weight=edge_weight)
x_j = eg * x_j

# 整合特征信息到节点中,这里需要重点理解 得到(N, C_out)
x = scatter_(self.aggr, x_j, edge_index[1], dim_size=x.size(0))

# 添加bias
if self.bias is not None:
x = x + self.bias

return x

GCNLayer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class GCNLayer(nn.Module):
"""
图卷积层. 各种节点更新模型的封装,例如基本加法模型、MLP、attention模型。也可以拓展为边更新模型和extra read out operations.
Args:
in_channels (int): input channels
out_channels (int): output channels
in_edgedim (int, optional): 输入边维度
deg_norm (str, optional): 出度正则化方法.['none', 'sm', 'rw']. 默认为'sm'.
'sm': symmetric, 更适合无向图. 'rw': random walk, 更适合有向图.
注意:当sm用于有向图时,如果有的节点没有出度,将会报错
edge_gate (str, optional): method of apply edge gating mechanism. ['none', 'proj', 'free'].
Note that when set to 'free', should also provide `num_edges` as an argument (but then it can only work with
fixed edge graph).
aggr (str, optional): 整合邻居特征的方法. ['add', 'mean', 'max'].默认为'add'.
bias (bool, optional): 是否使用bias. 默认为True.
nodemodel (str, optional): 要进行封装的节点模型名称.['additive','mlp','attention']
non_linear (str, optional): 非线性激活函数名称.
**kwargs: could include `num_edges`, etc.
"""
nodemodel_dict = {'additive': NodeModelAdditive,
'mlp': NodeModelMLP,
'attention': NodeModelAttention}

def __init__(self, in_channels, out_channels, in_edgedim=None, deg_norm='sm', edge_gate='none', aggr='add',
bias=True, nodemodel='additive', non_linear='relu', **kwargs):
assert nodemodel in ['additive', 'mlp', 'attention']
super().__init__()
self.gcn = self.nodemodel_dict[nodemodel](in_channels,
out_channels,
in_edgedim,
deg_norm=deg_norm,
edge_gate=edge_gate,
aggr=aggr,
bias=bias,
**kwargs)

self.non_linear = activation(non_linear)

def forward(self, x, edge_index, edge_attr=None, deg=None, edge_weight=None, **kwargs):
print("start gcn forward")
xo = self.gcn(x, edge_index, edge_attr, deg, edge_weight, **kwargs)
xo = self.non_linear(xo)
return xo

GCNModel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class GCNModel(nn.Module):
"""
图神经网络模型,包含GCN Layer,残差连接、最终输出层几部分。

Args:
in_channels (int): 输入通道数
enc_sizes (List[int]): 每层输出通道数, e.g. [32, 64, 64, 32]
num_classes (int): 最终预测的类别数
non_linear (str): 非线性激活函数
non_linear_layer_wise (str): 非线性激活函数在每层的残差之前,默认为none,一般不更改
residual_hop (int): 每隔几层建立一个残差连接. 如果维度是相同的,输出将来自之前层层输出的直接加和,否则先使用一个无bias的线性转换层进行转换再加.
dropout (float): 应用在隐藏层节点的dropout系数 (不包括初始的输入特征).
final_layer_config (dict):最后一层的配置参数,
if it is different from previous layers.This is useful when the last layer is the direct output layer, and you want to change some setup, such as
the attention heads, etc.
final_type (str): final layer type for the predicted scores. Default: 'none'.
pred_on (str): whether the prediction task is on nodes or on the whole graph. Default: 'node'.
**kwargs: could include other configuration arguments for each layer, such as for graph attention layers.

Input:
- x (torch.Tensor): node features of size (B * N, C_in)
- edge_index (torch.LongTensor): COO format edge index of size (2, E)
- edge_attr (torch.Tensor, optional): edge attributes/features of size (E, D_in)
- deg (torch.Tensor, optional): node degrees of size (B * N,); this could save computation and memory for
computing the node degrees every forward pass when message normalization is dependent on degrees.
- edge_weight (torch.Tensor, optional): currently not used in most cases.

Output:
- x (torch.Tensor): updated node features of size (B * N, num_classes) for node prediction, or (B, num_classes)
for graph level prediction

where
B: number of graphs in a batch (batch size)
N: number of nodes
E: number of edges
C_in: dimension of input node features
num_classes: number of classes to predict
D_in: dimension of input edge features
"""

def __init__(self, in_channels, enc_sizes, num_classes, non_linear='relu', non_linear_layer_wise='none',
residual_hop=None, dropout=0.0, final_layer_config=None, final_type='none', pred_on='node', **kwargs):
assert final_type in ['none', 'proj']
assert pred_on in ['node', 'graph']
super().__init__()

self.in_channels = in_channels
self.enc_sizes = [in_channels, *enc_sizes]
self.num_layers = len(self.enc_sizes) - 1
self.num_classes = num_classes
self.residual_hop = residual_hop
self.non_linear_layer_wise = non_linear_layer_wise
self.final_type = final_type
self.pred_on = pred_on

# 允许不同的层具有不用的attention头,尤其最后一个attention层的结果将直接用于输出层,int、list两种各层attention头声明方式
if 'nheads' in kwargs:
if isinstance(kwargs['nheads'], int):
self.nheads = [kwargs['nheads']] * self.num_layers
elif isinstance(kwargs['nheads'], list):
self.nheads = kwargs['nheads']
assert len(self.nheads) == self.num_layers
else:
raise ValueError
del kwargs['nheads']
else:
# otherwise just a placeholder for 'nheads'
self.nheads = [1] * self.num_layers

# 如果进行最后的输出层配置,那么直接采用多个GCNLayer堆叠的方式来进行
if final_layer_config is None:
self.gcn_net = nn.ModuleList(
[GCNLayer(in_c, out_c, nheads=nh, non_linear=non_linear_layer_wise, **kwargs) for in_c, out_c, nh in zip(self.enc_sizes, self.enc_sizes[1:], self.nheads)])
else:
assert isinstance(final_layer_config, dict)
self.gcn_net = nn.ModuleList(
[
GCNLayer(in_c, out_c, nheads=nh, non_linear=non_linear_layer_wise, **kwargs) for in_c, out_c, nh in zip(self.enc_sizes[:-2],self.enc_sizes[1:-1],self.nheads[:-1])])
kwargs.update(final_layer_config) # this will update with the new values in final_layer_config
self.gcn_net.append(
GCNLayer(
self.enc_sizes[-2],
self.enc_sizes[-1],
nheads=self.nheads[-1],
non_linear=non_linear_layer_wise,
**kwargs))

self.dropout = nn.Dropout(dropout)

if residual_hop is not None and residual_hop > 0:
self.residuals = nn.ModuleList([nn.Linear(self.enc_sizes[i], self.enc_sizes[j], bias=False)
if self.enc_sizes[i] != self.enc_sizes[j]
else
nn.Identity()
for i, j in zip(range(0, len(self.enc_sizes), residual_hop),
range(residual_hop, len(self.enc_sizes), residual_hop))])
self.num_residuals = len(self.residuals)

self.non_linear = activation(non_linear)

if self.final_type == 'none':
self.final = nn.Identity()
elif self.final_type == 'proj':
self.final = nn.Linear(self.enc_sizes[-1], num_classes)
else:
raise ValueError

def reset_parameters(self):
for net in self.gcn_net:
net.reset_parameters()
if self.residual_hop is not None:
for net in self.residuals:
net.reset_parameters()
if self.final_type != 'none':
self.final.reset_parameters()

def forward(self, x, edge_index, edge_attr=None, deg=None, edge_weight=None, **kwargs):
xr = None
add_xr_at = -1

for n, net in enumerate(self.gcn_net):
# pass to a GCN layer with non-linear activation
xo = net(x, edge_index, edge_attr, deg, edge_weight, **kwargs)
xo = self.dropout(xo)
# deal with residual connections
if self.residual_hop is not None and self.residual_hop > 0:
if n % self.residual_hop == 0 and (n // self.residual_hop) < self.num_residuals:
xr = self.residuals[n // self.residual_hop](x)
add_xr_at = n + self.residual_hop - 1
if n == add_xr_at:
if n < self.num_layers - 1: # before the last layer
# non_linear is applied both after each layer (by default: 'none') and after residual sum
xo = self.non_linear(xo + xr)
else: # the last layer (potentially the output layer)
if self.final_type == 'none':
# no non_linear is important for binary classification since this is to be passed to sigmoid
# function to calculate loss, and ReLU will directly kill all the negative parts
xo = xo + xr
else:
xo = self.non_linear(xo + xr)
else:
if n < self.num_layers - 1: # before the last layer
xo = self.non_linear(xo)
else:
if self.final_type == 'none':
pass
else:
xo = self.non_linear(xo)

x = xo
# size of x: (B * N, self.enc_sizes[-1]) -> (B * N, num_classes)
x = self.final(x)

# graph level pooling for graph classification
# use mean pooling here
if self.pred_on == 'graph':
assert 'batch_slices_x' in kwargs
batch_slices_x = kwargs['batch_slices_x']
if len(batch_slices_x) == 2:
# only one graph in the batch
x = x.mean(dim=0, keepdim=True) # size (1, num_classes)
else:
# more than one graphs in the batch
x_batch, lengths = zip(*[(x[i:j], j - i) for (i, j) in zip(batch_slices_x, batch_slices_x[1:])])
x_batch = pad_sequence(x_batch, batch_first=True,
padding_value=0) # size (batch_size, max_num_nodes, num_classes)
x = x_batch.sum(dim=1) / x_batch.new_tensor(lengths) # size (batch_size, num_classes)

return x
Scatter_add

 scatter_add函数实现的功能为索引上相同的值src矩阵中对应的位置的元素进行加和。函数的参数包括:

1
2
3
4
5
6
> src: torch.Tensor
> index: torch.Tensor
> dim: int = -1
> out: Optional[torch.Tensor] = None,
> dim_size: Optional[int] = None) -> torch.Tensor
>

注意在新版本的scatter_add中不在具有fill_value参数

  scatter_add的工作原理如下图所示。

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
>> index  = torch.tensor([[2,1],[1,3],[0,2],[3,0],[3,1],[3,2]])
tensor([[2, 1],
[1, 3],
[0, 2],
[3, 0],
[3, 1],
[3, 2]])
>> src = torch.tensor([[1,2],[3,4],[5,6],[7,8],[9,10],[11,12]]).float()
tensor([[ 1., 2.],
[ 3., 4.],
[ 5., 6.],
[ 7., 8.],
[ 9., 10.],
[11., 12.]])
>> output = torch.zeros((4,4))
tensor([[0., 0., 0., 0.],
[0., 0., 0., 0.],
[0., 0., 0., 0.],
[0., 0., 0., 0.]])
>> torch_scatter.scatter_add(src,index,0)
tensor([[ 5., 8.],
[ 3., 12.],
[ 1., 18.],
[27., 4.]])
>> torch_scatter.scatter_add(src,index,0,out=output)
tensor([[ 5., 8., 0., 0.],
[ 3., 12., 0., 0.],
[ 1., 18., 0., 0.],
[27., 4., 0., 0.]])
参考文献
  • xxx
  • xxx