T2G代码阅读
Tokenizer
输入数据(tensor: $\text{batch_size}\times \text{categories}(\text{d_numerical})$)后tokenizer获得$x \in \mathbb{R}^{ \text{batch_size}\times (1+ \text{d_numerical}+\text{categories})\times {d_{token}}}$
1 | class Tokenizer(nn.Module): |
初始化方法
Args:
d_numerical(int):数值特征的维度数量。
categories(Optional[List[int]]):分类特征的类别数量列表,若无则为None
d_token(int):词向量的维度大小
bias(bool):是否使用偏置项
Attributes:
d_bias:为偏置项的数目,对应categories特征和numerical特征的数目之和。
category_offsets(tensor):不同特征的index,例如categories=[2,4,6],则category_offsets为[0,2,6],这是因为对于category的处理是将不同的category堆叠为一个矩阵,所以我们需要知道不同的特征对应的index。然后将category_index注册为一个buffer,此时category_index将无法进行训练更新。
category_embeddings:创建从特征向特征嵌入的映射,其实就是一个$\mathbb{R}^{categories\times d_{token}}$的矩阵,然后通过矩阵乘法将维度$\mathbb{R}^{categories}$转为$\mathbb{R}^{d_{token}}$
self.weight:数值特征与Cross-level Readout的节点权重,$\mathbb{R}^{(d_numerical+1)\times d_token}$。有亿些下划线渲染失败了,不高兴改了,大概就是 $\text{d_numerical} = d_numerical$
self.bias:偏置矩阵,$\mathbb{R}^{d_bias\times d_token}$
Notes:
创建类别的映射矩阵并构建类别映射的index,使用kaiming初始化对映射矩阵初始化。kaiming初始化如下$$W_{ij} \sim \mathcal{U}(-\mathrm{bound},\mathrm{bound})$$$$\mathrm{bound}=\sqrt{ \frac{6}{(1+a^2) \cdot\mathrm{fan_in}} }$$
其中a在这里定义为sqrt(5),为负斜率(slope),fan_in为输入的特征数量
所有初始化均采用kaiming初始化,与nn.Linear保持一致。
Returns:
None
1 |
|
获取特征的数目
Args:self
Attributes:None
returns:
获取特征的总数,如果有10个数值特征,5个类别特征,再加上一个Cross-level Readout,则返回16
Notes:
将函数注册为一个不可修改的方法。
1 | def forward(self, x_num: Tensor, x_cat: ty.Optional[Tensor]) -> Tensor: |
前向传播方法
Args:
x_num(tensor): 数值特征的矩阵,$\mathbb{R}^{batch_size\times d_numerical}$。
x_cat(ty.Optional[Tensor]): 类别特征的矩阵,可选,$\mathbb{R}^{batch_size\times categories}$。
Attributes:
x_num(tensor): 带有[CLS]的矩阵,$\mathbb{R}^{batch_size\times (1+d_numerical)}$
x(tensor): 通过自动广播计算,广播后逐元素相乘,$\mathbb{R}^{batch_size\times (1+d_numerical)\times {d_{token}}}$,如果有类别变量,则将类别变量与广播后的位置矩阵相加并x堆叠起来,得到$\mathbb{R}^{batch_size\times (1+d_numerical+categories)\times {d_{token}}}$,与bias相加后大小不变
bias(tensor): 先堆叠为$\mathbb{R}^{(1+d_bias)\times d_token}$,然后增加维度为$\mathbb{R}^{1\times(1+d_bias)\times d_token}$,与x相加得到x
Notes:
tensor索引中使用None索引是对tensor新加一个维度,例如self.weight就是$\mathbb{R}^{(d_numerical+1)\times d_token}\to \mathbb{R}^{1\times(d_numerical+1)\times d_token}$,x_num就是$\mathbb{R}^{batch_size\times (1+d_numerical)\times {1}}$。
这里使用的自动广播功能主要是对于batch进行重复操作。
Returns:
x(tensor): $\mathbb{R}^{batch_size\times (1+d_numerical+categories)\times {d_{token}}}$
MultiheadGEAttention
多头GE注意力模块:输入原数据,然后得到一个经过该图神经网络的数据和图神经网络的架构。
1 | class MultiheadGEAttention(nn.Module): |
初始化模块
Args:
d(int): 输入的特征维度
n_heads(int): 特征头的数量
dropout(float): dropout率
initialization(str: [xavier, kaiming]): 初始化方法
n(int): 输入的特征数
sym_weight(bool): 是否共享头尾转换的权重
sym_topology(bool): 是否共享拓扑嵌入的参数
nsi(bool): 是否禁用自交互(圈,在邻接矩阵中就是对角线)
Attributes:
边权重:
self.W_head(torch.nn.Linear): $d\to d$的全连接层
self.rel_emb(torch.tensor): 对角矩阵,可学习的,$\mathbb{R}^{n_heads\times (d/n_heads)}$,但是在最初创建的时候,其使用torch.ones进行创建,创建的是一个全1矩阵。
图拓扑结构:
self.n_cols(int): n+1,为特征数+1(也就是多了一个跨层读取)
self.nsi(bool): nsi.
d_col(int): 列的嵌入维度,通过$\lceil 2\times \log_{2}(self.n_{cols}) \rceil$确定
self.col_head(torch.tensor): self.n_heads * self.n_cols * d_col.
self.bias(torch.tensor=[1]): 可学习参数,是图拓扑结构的门控结构中的bias。
self.threshold(int=0.5): 固定参数,是图拓扑结构的阈值
self.frozen(bool): 对于部分敏感的数据集,将会自动停止。
Notes:
边权重的初始化采用xavier,图拓扑结构的初始化采用kaiming初始化。
Returns:
None
1 | def _reshape(self, x: Tensor) -> Tensor: |
用于注意力机制中的”分头”。将batch_size * n_tokens * d的矩阵转化为 batch_size * n_heads * n_tokens * d_heads,其中 d_heads = d // n_heads。
Args:
x(torch.tensor)
Returns:
x(torch.tensor)
1 | def _no_self_interaction(self, x): |
去除自我交互
Args:
x(torch.Tensor)
Notes:
1. 输入的第二个维度应该大于一,若等于则为仅存在一个Readout Node
2. 应该是方阵,且维度等于特征数+1(也就是图拓扑结构内的跨层读取)
3. 减去一个n_cols维的单位矩阵
4. 最后与输入逐元素相乘,得到一个对角线为0的adjacent matrix
Returns:
对角线为0的adjacent matrix
1 | def _prune_to_readout(self, x): |
修建Readout节点的入度
Args:
x(torch.Tensor)
Notes:
创建一个n_cols的全1向量,然后设置第一个数字为0后广播逐元素相乘
Returns:
x * mask(torch.Tensor): 第一列为0的邻接矩阵,代表没有特征指向Readout节点。
1 | def _get_topology(self, top_score, elewise_func=torch.sigmoid): |
学习静态知识图谱的邻接矩阵
Args:
top_score(int): 相关性拓扑得分
elewise_func(func): 默认使用torch.sigmoid,是一种逐元素运算
Attributes:
adj_probs(torch.Tensor): 这是门控函数
Notes:
这里使用了straight-through的反向传播策略,这是一种当操作过程中设计不可导操作时进行的反向传播策略,基本原理就是构造一个导数用于反向传播。
adj_probs.detach()用于阻断当前梯度,其可以返回一个全新的,没有梯度的矩阵。
adj_probs > 0.5 是前向传播的输出值,梯度值则由adj_probs的梯度得出,如此的策略可以用以下式子概括。如果没有梯度就不传入adj_probs的梯度。
1 | adj = hard - soft.detach() + soft |
Returns:
adj(torch.Tensor): 知识图谱的邻接矩阵。
1 | def forward( |
nn.module特有的前向传播函数
Args:
x_head(torch.Tensor):
x_tail(Tensor):
key_compression(type.Optional[nn.Linear]):
value_compression(type.Optional[nn.Linear]):
elewise_func(function: torch.sigmoid):
comp_func(function: torch.softmax):
Attributes:
f_head, f_tail, f_v: x_head, x_tail以及x_v经过变换后得到的图嵌入
weight_score: 边的权重矩阵,由f_head @ rel_emb @ f_tail^T/sqrt(d) 得到
top_score: 拓扑邻接矩阵,由col_emb_head @ col_emb_tail^T 得到
adj_mask: 遮掩矩阵,对于没有连接的两个节点的交点,设置为-10000,其余为1
fr_graph: 最终的图邻接矩阵,comp_func(weight_score + adj_mask)得到,其中comp_func为softmax函数。这样就可以得到一个邻接矩阵,相连的权重为正,不相连的则接近-10000
self.W_out: 输入到下一层最后的线性变换,是一个全连接层。
x: 是输出向下一层的值,由W_v($\mathbb{R}^{d\times d}$)与x_tail相乘得到。
Returns:
x: 输出向下一层的值,矩阵大小不变,依旧是batch_size * n_head_nodes(节点总数) * d(特征嵌入的维度),最后还需经过一个$d\to d$的全连接层
fr_graph: 图邻接矩阵,无梯度
T2GFormer
1 | class T2GFormer(nn.Module): |
初始化T2GFormer
Args:
tokenizer:
d_numerical(int): 特征的维度
categories(type.List[int]): 类别特征的类别数,例如性别为男女,则类别数为2,如果有4种职业,那类别数为4
token_bias(bool): 是否使用tokenizer的偏置项
transformer:
n_layer(int): 堆叠几个GE+transformer的架构
d_token(int): 每个特征在嵌入时会转化为几维的向量,例如将数值转化为8维的向量
n_heads(int): attention模块有几个头
d_ffn_factor(float): 全连接层的隐藏层神经元占d_token的比例,具体就是d_ffn_factor * d_token等于隐藏层的神经元个数
attention_dropout(float): attention层中出现dropout的概率
ffn_dropout(float): 全连接层出现dropout的概率
residual_dropout(float): 同上
activation(str): 激活函数的名字。
prenormalization(bool): 预归一化,是在残差层前归一化还是在残差层后归一化,体现在代码如下。
initialization(str): 初始化方法
linformer:
kv_compression(type.Optional[float]): 压缩k,v矩阵的token数
kv_compression_sharing(type.Optional[str] = layerwise): 代表专门创建一个压缩层,所有层均共用这个层 。
graph estimator:
sym_weight(bool): 是否保证头尾的权重矩阵相同(是否对称)
sym_topology(bool): 是否保证头尾的拓扑结构图相同(是否对称)
d_out(int): 输出维度
1 | # prenormalization |
Attributes:
self.tokenizer: 分词器,输入的数值特征嵌入为d_numerical维的向量,categories为类别的数量,为List[int],d_token分词器处理后产生的维度,token_bias:是否启用bias.
n_tokens(int): 是所有特征加起来的数量+1,这段代码写得并不好,因为n_tokens被反复赋值,后续直接等于所有特征的数量。
self.shared_kv_compression(nn.Linear): 当使用kv压缩且kv_compression_sharing = layerwise则赋值为make_kv_compression,反之则赋值为None
d_hidden(int): d_token * d_ffn_factor
self.activation(function): 激活函数。
self.last_activation(function): 激活函数,但是glu版本换成正常版本,避免最后一层进行glu激活。
self.last_normalization(type.Option[make_normalization]): 最后一层层归一化操作,如果有预归一化则为None.
self.head(nn.Linear): 分类头, d_token->d_out.
self.layers(nn.ModuleList[nn.ModuleDict]):
存放网络的层信息,每层由以下组分构成:
attention(MultiheadGEAttention): d_token维度,n_heads个头,n_tokens个特征(这里的n_tokens没有”+1”,其实是在MultiheadGEAttention里面+1获得self.n_cols)
linear0(nn.Linear): 1->2层神经网络,d_token -> d_hidden
linear1(nn.Linear): 2->3层神经网络,d_hidden -> d_token
norm1(nn.LayerNorm = make_normalization): 层归一化
norm0(type.Option[nn.LayerNorm] = make_normalization): 如果不进行prenormalization或者layer_idx != 0,就存在norm0,反之则不存在。也就是只有第一行有区别,只有在进行前归一化时,第一行可以不加norm0.
key_compression(nn.Linear = make_kv_compression): 只有当kv_compression存在且不为False, shared_kv_compression不存在,设置为make_kv_compression
value_compression(nn.Linear = make_kv_compression): 当kv_compression当kv_compression存在且不为False, shared_kv_compression不存在,且kv_compression_sharing == headwise时设置为make_kv_compression。
这里逻辑上就是:如果需要使用kv_compression,但是不共享,那么就给每层都创造一个对于key与value的compression,如果我们要区分key与value的压缩(也就是headwise),那么就再独立创建一个value_compression,反之则key与value共用压缩(也就是key-value)
function:
make_kv_compression(nn.Linear): 构建kv压缩层,n_tokens -> n_tokens * kv_compression
make_normalization(nn.LayerNorm): 层归一化,这里n_tokens依旧是”+1”
1 | def _get_kv_compressions(self, layer): |
获取当前层使用的 Key 和 Value 压缩模块。
根据 KV 压缩共享策略,该函数返回对应的压缩模块:
- 如果设置了全局共享的压缩器(self.shared_kv_compression),则 Key 和 Value 都使用该共享模块;
- 如果当前层包含独立的 ‘key_compression’ 和 ‘value_compression’,则分别返回;
- 如果当前层只包含 ‘key_compression’,则认为 Key 和 Value 共用一个压缩模块;
- 如果都没有设置,返回 (None, None),表示不使用压缩。
Attributes:
layer (nn.ModuleDict): 当前 Transformer 层,可能包含 Key/Value 压缩模块。
Returns:
Tuple[Optional[nn.Module], Optional[nn.Module]]:
一个元组,分别表示 Key 和 Value 的压缩模块。
1 | def _start_residual(self, x, layer, norm_idx): |
处理残差连接起始部分(如果使用前置归一化)
Attributes:
x (Tensor): 当前输入张量
layer (nn.ModuleDict): 当前 Transformer 层,可能包含 norm 层
norm_idx (int): 使用的归一化层索引(如 norm0, norm1)
Returns:
Tensor: 用于残差分支的输入(可能已归一化)
1 | def _end_residual(self, x, x_residual, layer, norm_idx): |
处理残差连接结束部分:加上残差、dropout、后归一化(如果不使用前置归一化)
参数:
x (Tensor): 当前主干输出
x_residual (Tensor): 残差分支输入(来自 _start_residual)
layer (nn.ModuleDict): 当前 Transformer 层
norm_idx (int): 使用的归一化层索引
返回:
Tensor: 应用残差和归一化后的输出张量
1 | def forward(self, x_num: Tensor, x_cat: ty.Optional[Tensor], return_fr: bool = False) -> Tensor: |
前向传播
Args:
x_num(Tensor): 数值特征的矩阵
x_cat(type.Option[Tensor]): 类别特征的矩阵
return_fr(bool = False): 是否返回图结构
Attributes:
x: 特征,流程:
1. self.tokenizer(x_num, x_cat): 进行“分词“
2. self._start_residual(x, layer, 0): 计算norm0的残差(分为预归一化和后归一化)
3. layer[‘attention’]: 计算图拓扑结构并返回下一层的输入
4. (如果是最后一层)取出每个样本的第一行([CLS])进行最后的分类
5. self._end_residual: 同_start_resudual
6. self._start_residual(x, layer, 1): 计算norm1的残差
7. lay[‘linear0’]: 线性层
8. self.activation: 激活函数
9. dropout: 如果需要ffn_drop,进行dropout
10. lay[‘linear1’]: 线性层
11. end_residual: 同上
12. self.last_normalization: 最终的激活函数
13. self.head: 分类头
Notes:
这里用到了一个有趣的代码
1 | layer = ty.cast(ty.Dict[str, nn.Module], layer) |
它可以让layer从nn.ModuleDict伪装成dict,避免类型检查时出错
Returns:
Tensor or Tuple[Tensor, SomeType]:
如果 return_fr
为 False,则返回张量 x
,表示模型的输出;
如果 return_fr
为 True,则返回一个元组 (x, fr_graphs)
,
其中 fr_graphs
是边权重图的数据结构(具体类型视实现而定)。
1 | def froze_topology(self): |
在训练过程中冻结 FR-Graph 拓扑结构。通过将每个层的 attention 模块的 frozen
属性设置为 True,禁止 FR-Graph 拓扑的更新,从而保持当前拓扑结构不变。
适用于需要固定图结构、不希望训练时修改拓扑的场景。
Args:
无
Returns:
None