Tokenizer

输入数据(tensor: $\text{batch_size}\times \text{categories}(\text{d_numerical})$)后tokenizer获得$x \in \mathbb{R}^{ \text{batch_size}\times (1+ \text{d_numerical}+\text{categories})\times {d_{token}}}$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Tokenizer(nn.Module):
def __init__(
self,
d_numerical: int
categories: ty.Optional[ty.List[int]],
d_token: int,
bias: bool,
) -> None
super().__init__()
if categories is None:
d_bias = d_numerical
self.category_offsets = None
self.category_embeddings = None
else:
d_bias = d_numerical + len(categories)
category_offsets = torch.tensor([0] + categories[:-1]).cumsum(0)
self.register_buffer('category_offsets', category_offsets)
self.category_embeddings = nn.Embedding(sum(categories), d_token)
nn_init.kaiming_uniform_(self.category_embeddings.weight, a=math.sqrt(5))
print(f'{self.category_embeddings.weight.shape}')

# take [Cross-level Readout Node] into account
self.weight = nn.Parameter(Tensor(d_numerical + 1, d_token))
self.bias = nn.Parameter(Tensor(d_bias, d_token)) if bias else None
# The initialization is inspired by nn.Linear
nn_init.kaiming_uniform_(self.weight, a=math.sqrt(5))
if self.bias is not None:
nn_init.kaiming_uniform_(self.bias, a=math.sqrt(5))

初始化方法
Args:
d_numerical(int):数值特征的维度数量。
categories(Optional[List[int]]):分类特征的类别数量列表,若无则为None
d_token(int):词向量的维度大小
bias(bool):是否使用偏置项
Attributes:
d_bias:为偏置项的数目,对应categories特征和numerical特征的数目之和。
category_offsets(tensor):不同特征的index,例如categories=[2,4,6],则category_offsets为[0,2,6],这是因为对于category的处理是将不同的category堆叠为一个矩阵,所以我们需要知道不同的特征对应的index。然后将category_index注册为一个buffer,此时category_index将无法进行训练更新。
category_embeddings:创建从特征向特征嵌入的映射,其实就是一个$\mathbb{R}^{categories\times d_{token}}$的矩阵,然后通过矩阵乘法将维度$\mathbb{R}^{categories}$转为$\mathbb{R}^{d_{token}}$
self.weight:数值特征与Cross-level Readout的节点权重,$\mathbb{R}^{(d_numerical+1)\times d_token}$。有亿些下划线渲染失败了,不高兴改了,大概就是 $\text{d_numerical} = d_numerical$
self.bias:偏置矩阵,$\mathbb{R}^{d_bias\times d_token}$
Notes:
创建类别的映射矩阵并构建类别映射的index,使用kaiming初始化对映射矩阵初始化。kaiming初始化如下$$W_{ij} \sim \mathcal{U}(-\mathrm{bound},\mathrm{bound})$$$$\mathrm{bound}=\sqrt{ \frac{6}{(1+a^2) \cdot\mathrm{fan_in}} }$$
其中a在这里定义为sqrt(5),为负斜率(slope),fan_in为输入的特征数量
所有初始化均采用kaiming初始化,与nn.Linear保持一致。
Returns:
None

1
2
3
4
5
@property
def n_tokens(self) -> int:
return len(self.weight) + (
0 if self.category_offsets is None else len(self.category_offsets)
)

获取特征的数目
Args:self
Attributes:None
returns:
获取特征的总数,如果有10个数值特征,5个类别特征,再加上一个Cross-level Readout,则返回16
Notes:
将函数注册为一个不可修改的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def forward(self, x_num: Tensor, x_cat: ty.Optional[Tensor]) -> Tensor:
x_some = x_num if x_cat is None else x_cat
assert x_some is not None
x_num = torch.cat(
[torch.ones(len(x_some), 1, device=x_some.device)]  # [CLS]
+ ([] if x_num is None else [x_num]),
dim=1,
)
x = self.weight[None] * x_num[:, :, None]
if x_cat is not None:
x = torch.cat(
[x, self.category_embeddings(x_cat + self.category_offsets[None])],
dim=1,
)
if self.bias is not None:
bias = torch.cat(
[
torch.zeros(1, self.bias.shape[1], device=x.device),
self.bias,
]
)
x = x + bias[None]
return x

前向传播方法
Args:
x_num(tensor): 数值特征的矩阵,$\mathbb{R}^{batch_size\times d_numerical}$。
x_cat(ty.Optional[Tensor]): 类别特征的矩阵,可选,$\mathbb{R}^{batch_size\times categories}$。
Attributes:
x_num(tensor): 带有[CLS]的矩阵,$\mathbb{R}^{batch_size\times (1+d_numerical)}$
x(tensor): 通过自动广播计算,广播后逐元素相乘,$\mathbb{R}^{batch_size\times (1+d_numerical)\times {d_{token}}}$,如果有类别变量,则将类别变量与广播后的位置矩阵相加并x堆叠起来,得到$\mathbb{R}^{batch_size\times (1+d_numerical+categories)\times {d_{token}}}$,与bias相加后大小不变
bias(tensor): 先堆叠为$\mathbb{R}^{(1+d_bias)\times d_token}$,然后增加维度为$\mathbb{R}^{1\times(1+d_bias)\times d_token}$,与x相加得到x
Notes:
tensor索引中使用None索引是对tensor新加一个维度,例如self.weight就是$\mathbb{R}^{(d_numerical+1)\times d_token}\to \mathbb{R}^{1\times(d_numerical+1)\times d_token}$,x_num就是$\mathbb{R}^{batch_size\times (1+d_numerical)\times {1}}$。
这里使用的自动广播功能主要是对于batch进行重复操作。
Returns:
x(tensor): $\mathbb{R}^{batch_size\times (1+d_numerical+categories)\times {d_{token}}}$

MultiheadGEAttention

多头GE注意力模块:输入原数据,然后得到一个经过该图神经网络的数据和图神经网络的架构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class MultiheadGEAttention(nn.Module):
"""
FR-Graph integrated attention
---
Learn relations among features and feature selection strategy in data-driven manner.
"""
def __init__(
# Normal Attention Args
self, d: int, n_heads: int, dropout: float, initialization: str,
# FR-Graph Args
n: int, sym_weight: bool = True, sym_topology: bool = False, nsi: bool = True,
) -> None:
if n_heads > 1:
assert d % n_heads == 0
assert initialization in ['xavier', 'kaiming']

super().__init__()
self.W_v = nn.Linear(d, d)
self.W_out = nn.Linear(d, d) if n_heads > 1 else None
self.n_heads = n_heads
self.dropout = nn.Dropout(dropout) if dropout else None

"""FR-Graph Params: Edge weights"""
# head and tail transformation
self.W_head = nn.Linear(d, d)
if sym_weight:
self.W_tail = self.W_head # symmetric weights
else:
self.W_tail = nn.Linear(d, d) # ASYM
# relation embedding: learnable diagonal matrix
self.rel_emb = nn.Parameter(torch.ones(n_heads, d // self.n_heads))

for m in [self.W_head, self.W_tail, self.W_v]:
if initialization == 'xavier' and (n_heads > 1 or m is not self.W_v):
nn_init.xavier_uniform_(m.weight, gain=1 / math.sqrt(2))
nn_init.zeros_(m.bias)
if self.W_out is not None:
nn_init.zeros_(self.W_out.bias)

"""FR-Graph Params: Graph topology (column = node = feature)"""
self.n_cols = n + 1 # Num of Nodes: input feature nodes + [Cross-level Readout]
self.nsi = nsi # no self-interaction

# column embeddings: semantics for each column
d_col = math.ceil(2 * math.log2(self.n_cols)) # dim for column header embedding -> d_header += d
self.col_head = nn.Parameter(Tensor(self.n_heads, self.n_cols, d_col))
if not sym_topology:
self.col_tail = nn.Parameter(Tensor(self.n_heads, self.n_cols, d_col))
else:
self.col_tail = self.col_head # share the parameter
for W in [self.col_head, self.col_tail]:
if W is not None:
# correspond to Tokenizer initialization
nn_init.kaiming_uniform_(W, a=math.sqrt(5))

# Learnable bias and fixed threshold for topology
self.bias = nn.Parameter(torch.zeros(1))
self.threshold = 0.5

"""Frozen topology"""
# for some sensitive datasets set to `True`
# after training several epoch, which helps
# stability and better performance
self.frozen = False

初始化模块
Args:
d(int): 输入的特征维度
n_heads(int): 特征头的数量
dropout(float): dropout率
initialization(str: [xavier, kaiming]): 初始化方法
n(int): 输入的特征数
sym_weight(bool): 是否共享头尾转换的权重
sym_topology(bool): 是否共享拓扑嵌入的参数
nsi(bool): 是否禁用自交互(圈,在邻接矩阵中就是对角线)
Attributes:
边权重:
self.W_head(torch.nn.Linear): $d\to d$的全连接层
self.rel_emb(torch.tensor): 对角矩阵,可学习的,$\mathbb{R}^{n_heads\times (d/n_heads)}$,但是在最初创建的时候,其使用torch.ones进行创建,创建的是一个全1矩阵。
图拓扑结构:
self.n_cols(int): n+1,为特征数+1(也就是多了一个跨层读取)
self.nsi(bool): nsi.
d_col(int): 列的嵌入维度,通过$\lceil 2\times \log_{2}(self.n_{cols}) \rceil$确定
self.col_head(torch.tensor): self.n_heads * self.n_cols * d_col.
self.bias(torch.tensor=[1]): 可学习参数,是图拓扑结构的门控结构中的bias。
self.threshold(int=0.5): 固定参数,是图拓扑结构的阈值
self.frozen(bool): 对于部分敏感的数据集,将会自动停止。
Notes:
边权重的初始化采用xavier,图拓扑结构的初始化采用kaiming初始化。
Returns:
None

1
2
3
4
5
6
7
def _reshape(self, x: Tensor) -> Tensor:
batch_size, n_tokens, d = x.shape
d_head = d // self.n_heads
return (
x.reshape(batch_size, n_tokens, self.n_heads, d_head)
.transpose(1, 2)
)

用于注意力机制中的”分头”。将batch_size * n_tokens * d的矩阵转化为 batch_size * n_heads * n_tokens * d_heads,其中 d_heads = d // n_heads。
Args:
x(torch.tensor)
Returns:
x(torch.tensor)

1
2
3
4
5
6
7
def _no_self_interaction(self, x):
if x.shape[-2] == 1: # only [Readout Node]
return x
assert x.shape[-1] == x.shape[-2] == self.n_cols
# mask diagonal interaction
nsi_mask = 1.0 - torch.diag_embed(torch.ones(self.n_cols, device=x.device))
return x * nsi_mask

去除自我交互
Args:
x(torch.Tensor)
Notes:
1. 输入的第二个维度应该大于一,若等于则为仅存在一个Readout Node
2. 应该是方阵,且维度等于特征数+1(也就是图拓扑结构内的跨层读取)
3. 减去一个n_cols维的单位矩阵
4. 最后与输入逐元素相乘,得到一个对角线为0的adjacent matrix
Returns:
对角线为0的adjacent matrix

1
2
3
4
5
6
def _prune_to_readout(self, x):
"""Prune edges from any features to [Readout Node]"""
assert x.shape[-1] == self.n_cols
mask = torch.ones(self.n_cols, device=x.device)
mask[0] = 0 # zero out interactions from features to [Readout]
return x * mask

修建Readout节点的入度
Args:
x(torch.Tensor)
Notes:
创建一个n_cols的全1向量,然后设置第一个数字为0后广播逐元素相乘
Returns:
x * mask(torch.Tensor): 第一列为0的邻接矩阵,代表没有特征指向Readout节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def _get_topology(self, top_score, elewise_func=torch.sigmoid):
"""
Learning static knowledge topology (adjacency matrix)
---
top_score: N x N tensor, relation topology score
adj: adjacency matrix A of FR-Graph
"""
adj_probs = elewise_func(top_score + self.bias) # choose `sigmoid` as element-wise activation (sigma1)
if self.nsi:
adj_probs = self._no_self_interaction(adj_probs) # apply `nsi` function
adj_probs = self._prune_to_readout(adj_probs) # cut edges from features to [Readout]

if not self.frozen:
# using `Straight-through` tirck for non-differentiable operation
adj = (adj_probs > 0.5).float() - adj_probs.detach() + adj_probs
else:
# frozen graph topology: no gradient
adj = (adj_probs > 0.5).float()
return adj

学习静态知识图谱的邻接矩阵
Args:
top_score(int): 相关性拓扑得分
elewise_func(func): 默认使用torch.sigmoid,是一种逐元素运算
Attributes:
adj_probs(torch.Tensor): 这是门控函数
Notes:
这里使用了straight-through的反向传播策略,这是一种当操作过程中设计不可导操作时进行的反向传播策略,基本原理就是构造一个导数用于反向传播。
adj_probs.detach()用于阻断当前梯度,其可以返回一个全新的,没有梯度的矩阵。
adj_probs > 0.5 是前向传播的输出值,梯度值则由adj_probs的梯度得出,如此的策略可以用以下式子概括。如果没有梯度就不传入adj_probs的梯度。

1
adj = hard - soft.detach() + soft

Returns:
adj(torch.Tensor): 知识图谱的邻接矩阵。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def forward(
self,
x_head: Tensor,
x_tail: Tensor,
key_compression: ty.Optional[nn.Linear],
value_compression: ty.Optional[nn.Linear],
elewise_func = torch.sigmoid,
comp_func = torch.softmax,
) -> Tensor:
f_head, f_tail, f_v = self.W_head(x_head), self.W_tail(x_tail), self.W_v(x_tail)
for tensor in [f_head, f_tail, f_v]:
# check multi-head
assert tensor.shape[-1] % self.n_heads == 0
if key_compression is not None:
assert value_compression is not None
f_tail = key_compression(f_tail.transpose(1, 2)).transpose(1, 2)
f_v = value_compression(f_v.transpose(1, 2)).transpose(1, 2)
else:
assert value_compression is None

batch_size = len(f_head)
d_head_tail = f_tail.shape[-1] // self.n_heads
d_value = f_v.shape[-1] // self.n_heads
n_head_nodes = f_head.shape[1]

# reshape to multi-head view
f_head = self._reshape(f_head)
f_tail = self._reshape(f_tail)

# edge weight scores (Gw)
weight_score = f_head @ torch.diag_embed(self.rel_emb) @ f_tail.transpose(-1, -2) / math.sqrt(d_head_tail)

col_emb_head = F.normalize(self.col_head, p=2, dim=-1) # L2 normalized column embeddings
col_emb_tail = F.normalize(self.col_tail, p=2, dim=-1)
# topology score (Gt)
top_score = col_emb_head @ col_emb_tail.transpose(-1, -2)
# graph topology (A)
adj = self._get_topology(top_score, elewise_func)
if n_head_nodes == 1: # only [Cross-level Readout]
adj = adj[:, :1]

# graph assembling: apply FR-Graph on interaction like attention mask
adj_mask = (1.0 - adj) * -10000 # analogous to attention mask

# FR-Graph of this layer
# Can be used for visualization on Feature Relation and Readout Collection
fr_graph = comp_func(weight_score + adj_mask, dim=-1) # choose `softmax` as competitive function

if self.dropout is not None:
fr_graph = self.dropout(fr_graph)
x = fr_graph @ self._reshape(f_v)
x = (
x.transpose(1, 2)
.reshape(batch_size, n_head_nodes, self.n_heads * d_value)
)
if self.W_out is not None:
x = self.W_out(x)
return x, fr_graph.detach()

nn.module特有的前向传播函数
Args:
x_head(torch.Tensor):
x_tail(Tensor):
key_compression(type.Optional[nn.Linear]):
value_compression(type.Optional[nn.Linear]):
elewise_func(function: torch.sigmoid):
comp_func(function: torch.softmax):
Attributes:
f_head, f_tail, f_v: x_head, x_tail以及x_v经过变换后得到的图嵌入
weight_score: 边的权重矩阵,由f_head @ rel_emb @ f_tail^T/sqrt(d) 得到
top_score: 拓扑邻接矩阵,由col_emb_head @ col_emb_tail^T 得到
adj_mask: 遮掩矩阵,对于没有连接的两个节点的交点,设置为-10000,其余为1
fr_graph: 最终的图邻接矩阵,comp_func(weight_score + adj_mask)得到,其中comp_func为softmax函数。这样就可以得到一个邻接矩阵,相连的权重为正,不相连的则接近-10000
self.W_out: 输入到下一层最后的线性变换,是一个全连接层。
x: 是输出向下一层的值,由W_v($\mathbb{R}^{d\times d}$)与x_tail相乘得到。
Returns:
x: 输出向下一层的值,矩阵大小不变,依旧是batch_size * n_head_nodes(节点总数) * d(特征嵌入的维度),最后还需经过一个$d\to d$的全连接层
fr_graph: 图邻接矩阵,无梯度

T2GFormer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class T2GFormer(nn.Module):
"""T2G-Former

References:
- FT-Transformer: https://github.com/Yura52/tabular-dl-revisiting-models/blob/main/bin/ft_transformer.py#L151
"""
def __init__(
self,
*,
# tokenizer
d_numerical: int,
categories: ty.Optional[ty.List[int]],
token_bias: bool,
# transformer
n_layers: int,
d_token: int,
n_heads: int,
d_ffn_factor: float,
attention_dropout: float,
ffn_dropout: float,
residual_dropout: float,
activation: str,
prenormalization: bool,
initialization: str,
# linformer
kv_compression: ty.Optional[float],
kv_compression_sharing: ty.Optional[str],
# graph estimator
sym_weight: bool = True,
sym_topology: bool = False,
nsi: bool = True,
#
d_out: int,
) -> None:
assert (kv_compression is None) ^ (kv_compression_sharing is not None)

super().__init__()
self.tokenizer = Tokenizer(d_numerical, categories, d_token, token_bias)
n_tokens = self.tokenizer.n_tokens

初始化T2GFormer
Args:
tokenizer:
d_numerical(int): 特征的维度
categories(type.List[int]): 类别特征的类别数,例如性别为男女,则类别数为2,如果有4种职业,那类别数为4
token_bias(bool): 是否使用tokenizer的偏置项
transformer:
n_layer(int): 堆叠几个GE+transformer的架构
d_token(int): 每个特征在嵌入时会转化为几维的向量,例如将数值转化为8维的向量
n_heads(int): attention模块有几个头
d_ffn_factor(float): 全连接层的隐藏层神经元占d_token的比例,具体就是d_ffn_factor * d_token等于隐藏层的神经元个数
attention_dropout(float): attention层中出现dropout的概率
ffn_dropout(float): 全连接层出现dropout的概率
residual_dropout(float): 同上
activation(str): 激活函数的名字。
prenormalization(bool): 预归一化,是在残差层前归一化还是在残差层后归一化,体现在代码如下。
initialization(str): 初始化方法
linformer:
kv_compression(type.Optional[float]): 压缩k,v矩阵的token数
kv_compression_sharing(type.Optional[str] = layerwise): 代表专门创建一个压缩层,所有层均共用这个层 。
graph estimator:
sym_weight(bool): 是否保证头尾的权重矩阵相同(是否对称)
sym_topology(bool): 是否保证头尾的拓扑结构图相同(是否对称)
d_out(int): 输出维度

1
2
3
4
5
6
7
# prenormalization
x = x + attention(norm0(x))
x = x + linear(norm1(x))

# postnormalization
x = norm0(x + attention(x))
x = norm1(x + linear(x))

Attributes:
self.tokenizer: 分词器,输入的数值特征嵌入为d_numerical维的向量,categories为类别的数量,为List[int],d_token分词器处理后产生的维度,token_bias:是否启用bias.
n_tokens(int): 是所有特征加起来的数量+1,这段代码写得并不好,因为n_tokens被反复赋值,后续直接等于所有特征的数量
self.shared_kv_compression(nn.Linear): 当使用kv压缩且kv_compression_sharing = layerwise则赋值为make_kv_compression,反之则赋值为None
d_hidden(int): d_token * d_ffn_factor
self.activation(function): 激活函数。
self.last_activation(function): 激活函数,但是glu版本换成正常版本,避免最后一层进行glu激活。
self.last_normalization(type.Option[make_normalization]): 最后一层层归一化操作,如果有预归一化则为None.
self.head(nn.Linear): 分类头, d_token->d_out.
self.layers(nn.ModuleList[nn.ModuleDict]):
存放网络的层信息,每层由以下组分构成:
attention(MultiheadGEAttention): d_token维度,n_heads个头,n_tokens个特征(这里的n_tokens没有”+1”,其实是在MultiheadGEAttention里面+1获得self.n_cols)
linear0(nn.Linear): 1->2层神经网络,d_token -> d_hidden
linear1(nn.Linear): 2->3层神经网络,d_hidden -> d_token
norm1(nn.LayerNorm = make_normalization): 层归一化
norm0(type.Option[nn.LayerNorm] = make_normalization): 如果不进行prenormalization或者layer_idx != 0,就存在norm0,反之则不存在。也就是只有第一行有区别,只有在进行前归一化时,第一行可以不加norm0.
key_compression(nn.Linear = make_kv_compression): 只有当kv_compression存在且不为False, shared_kv_compression不存在,设置为make_kv_compression
value_compression(nn.Linear = make_kv_compression): 当kv_compression当kv_compression存在且不为False, shared_kv_compression不存在,且kv_compression_sharing == headwise时设置为make_kv_compression。
这里逻辑上就是:如果需要使用kv_compression,但是不共享,那么就给每层都创造一个对于key与value的compression,如果我们要区分key与value的压缩(也就是headwise),那么就再独立创建一个value_compression,反之则key与value共用压缩(也就是key-value)

function:
make_kv_compression(nn.Linear): 构建kv压缩层,n_tokens -> n_tokens * kv_compression
make_normalization(nn.LayerNorm): 层归一化,这里n_tokens依旧是”+1”

1
2
3
4
5
6
7
8
9
10
def _get_kv_compressions(self, layer):
return (
(self.shared_kv_compression, self.shared_kv_compression)
if self.shared_kv_compression is not None
else (layer['key_compression'], layer['value_compression'])
if 'key_compression' in layer and 'value_compression' in layer
else (layer['key_compression'], layer['key_compression'])
if 'key_compression' in layer
else (None, None)
)

获取当前层使用的 Key 和 Value 压缩模块。
根据 KV 压缩共享策略,该函数返回对应的压缩模块:
- 如果设置了全局共享的压缩器(self.shared_kv_compression),则 Key 和 Value 都使用该共享模块;
- 如果当前层包含独立的 ‘key_compression’ 和 ‘value_compression’,则分别返回;
- 如果当前层只包含 ‘key_compression’,则认为 Key 和 Value 共用一个压缩模块;
- 如果都没有设置,返回 (None, None),表示不使用压缩。
Attributes:
layer (nn.ModuleDict): 当前 Transformer 层,可能包含 Key/Value 压缩模块。
Returns:
Tuple[Optional[nn.Module], Optional[nn.Module]]:
一个元组,分别表示 Key 和 Value 的压缩模块。

1
2
3
4
5
6
7
def _start_residual(self, x, layer, norm_idx):
x_residual = x
if self.prenormalization:
norm_key = f'norm{norm_idx}'
if norm_key in layer:
x_residual = layer[norm_key](x_residual)
return x_residual

处理残差连接起始部分(如果使用前置归一化)
Attributes:
x (Tensor): 当前输入张量
layer (nn.ModuleDict): 当前 Transformer 层,可能包含 norm 层
norm_idx (int): 使用的归一化层索引(如 norm0, norm1)
Returns:
Tensor: 用于残差分支的输入(可能已归一化)

1
2
3
4
5
6
7
def _end_residual(self, x, x_residual, layer, norm_idx):
if self.residual_dropout:
x_residual = F.dropout(x_residual, self.residual_dropout, self.training)
x = x + x_residual
if not self.prenormalization:
x = layer[f'norm{norm_idx}'](x)
return x

处理残差连接结束部分:加上残差、dropout、后归一化(如果不使用前置归一化)
参数:
x (Tensor): 当前主干输出
x_residual (Tensor): 残差分支输入(来自 _start_residual)
layer (nn.ModuleDict): 当前 Transformer 层
norm_idx (int): 使用的归一化层索引
返回:
Tensor: 应用残差和归一化后的输出张量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def forward(self, x_num: Tensor, x_cat: ty.Optional[Tensor], return_fr: bool = False) -> Tensor:
fr_graphs = [] # FR-Graph of each layer
x = self.tokenizer(x_num, x_cat)

for layer_idx, layer in enumerate(self.layers):
is_last_layer = layer_idx + 1 == len(self.layers)
layer = ty.cast(ty.Dict[str, nn.Module], layer)

x_residual = self._start_residual(x, layer, 0)
x_residual, fr_graph = layer['attention'](
# for the last attention, it is enough to process only [CLS]
(x_residual[:, :1] if is_last_layer else x_residual),
x_residual,
*self._get_kv_compressions(layer),
)
fr_graphs.append(fr_graph)
if is_last_layer:
x = x[:, : x_residual.shape[1]]
x = self._end_residual(x, x_residual, layer, 0)

x_residual = self._start_residual(x, layer, 1)
x_residual = layer['linear0'](x_residual)
x_residual = self.activation(x_residual)
if self.ffn_dropout:
x_residual = F.dropout(x_residual, self.ffn_dropout, self.training)
x_residual = layer['linear1'](x_residual)
x = self._end_residual(x, x_residual, layer, 1)

assert x.shape[1] == 1
x = x[:, 0]
if self.last_normalization is not None:
x = self.last_normalization(x)
x = self.last_activation(x)
x = self.head(x)
x = x.squeeze(-1)
return x if not return_fr else (x, fr_graphs)

前向传播
Args:
x_num(Tensor): 数值特征的矩阵
x_cat(type.Option[Tensor]): 类别特征的矩阵
return_fr(bool = False): 是否返回图结构
Attributes:
x: 特征,流程:
1. self.tokenizer(x_num, x_cat): 进行“分词“
2. self._start_residual(x, layer, 0): 计算norm0的残差(分为预归一化和后归一化)
3. layer[‘attention’]: 计算图拓扑结构并返回下一层的输入
4. (如果是最后一层)取出每个样本的第一行([CLS])进行最后的分类
5. self._end_residual: 同_start_resudual
6. self._start_residual(x, layer, 1): 计算norm1的残差
7. lay[‘linear0’]: 线性层
8. self.activation: 激活函数
9. dropout: 如果需要ffn_drop,进行dropout
10. lay[‘linear1’]: 线性层
11. end_residual: 同上
12. self.last_normalization: 最终的激活函数
13. self.head: 分类头
Notes:
这里用到了一个有趣的代码

1
layer = ty.cast(ty.Dict[str, nn.Module], layer)	

它可以让layer从nn.ModuleDict伪装成dict,避免类型检查时出错
Returns:
Tensor or Tuple[Tensor, SomeType]:
如果 return_fr 为 False,则返回张量 x,表示模型的输出;
如果 return_fr 为 True,则返回一个元组 (x, fr_graphs)
其中 fr_graphs 是边权重图的数据结构(具体类型视实现而定)。

1
2
3
4
5
def froze_topology(self):
"""API to froze FR-Graph topology in training"""
for layer in self.layers:
layer = ty.cast(ty.Dict[str, nn.Module], layer)
layer['attention'].frozen = True

在训练过程中冻结 FR-Graph 拓扑结构。通过将每个层的 attention 模块的 frozen 属性设置为 True,禁止 FR-Graph 拓扑的更新,从而保持当前拓扑结构不变。
适用于需要固定图结构、不希望训练时修改拓扑的场景。
Args:

Returns:
None


Cover image icon by Dewi Sari from Flaticon