Triplet-Loss原理及其实现、应用
- 本文个人博客地址: 点击查看
- 欢迎下面留言交流
一、 Triplet loss
1、介绍
-
Triplet loss
最初是在 FaceNet: A Unified Embedding for Face Recognition and Clustering 论文中提出的,可以学到较好的人脸的embedding
- 为什么不适用
softmax
函数呢,softmax
最终的类别数是确定的,而Triplet loss
学到的是一个好的embedding
,相似的图像在embedding
空间里是相近的,可以判断是否是同一个人脸。
2、原理
- 输入是一个三元组
<a, p, n>
a: anchor
-
p: positive
, 与a
是同一类别的样本 -
n: negative
, 与a
是不同类别的样本
- 公式是:
- 所以最终的优化目标是拉近
a, p
的距离, 拉远a, n
的距离 -
easy triplets
: 即 ,这种情况不需要优化,天然a, p
的距离很近,a, n
的距离远 -
hard triplets
: , 即a, p
的距离远 -
semi-hard triplets
: , 即a, n
的距离靠的很近,但是有一个margin
- 所以最终的优化目标是拉近
-
FaceNet
中是随机选取semi-hard triplets
进行训练的, (也可以选择hard triplets
或者两者一起进行训练)
3、训练方法
3.1 offline
- 训练集所有数据经过计算得到对应的
embeddings
, 可以得到 很多<i, j, k>
的三元组,然后再计算triplet loss
- 效率不高,因为需要过一遍所有的数据得到三元组,然后训练反向更新网络
3.2 online
- 从训练集中抽取
B
个样本,然后计算B
个embeddings
,可以产生 个triplets
(当然其中有不合法的,因为需要的是<a, p, n>
)
- 实际使用中采用此方法,又分为两种策略 (是在一篇行人重识别的论文中提到的 In Defense of the Triplet Loss for Person Re-Identification),假设 , 其中
P
个身份的人,每个身份的人K
张图片(一般K
取4
)-
Batch All
: 计算batch_size
中所有valid
的的hard triplet
和semi-hard triplet
, 然后取平均得到Loss
- 注意因为很多
easy triplets
的情况,所以平均会导致Loss
很小,所以是对所有 valid 的所有求平均 (下面代码中会介绍) - 可以产生 个
triplets
-
PK
个anchor
-
K-1
个positive
-
PK-K
个negative
-
- 注意因为很多
-
Batch Hard
: 对于每一个anchor
, 选择距离最大的d(a, p)
和 距离最大的d(a, n)
- 所以公有 个 三元组
triplets
- 所以公有 个 三元组
-
二、 Tensorflow 中的实现
1、Batch All
1.1 计算两两embeddings
的距离
-
numpy
中的实现,便于调试理解, 点击查看 - 输入大小是
(batch_size, vector_size)
大小的embeddings
向量 - 因为 , 矩阵相乘 中包含
a*b
的值,对象线上是向量平方的值,所以可以直接使用矩阵计算 - 如果不使用平方,就开根号,
- 注意根号下不能为
0
,0
开根号是没有问题的,但是Tensorflow
梯度反向传播是就会导致无穷大,所以加上一个平滑项1e-16
,最后再修改回来。
- 注意根号下不能为
def _pairwise_distance(embeddings, squared=False):
'''
计算两两embedding的距离
------------------------------------------
Args:
embedding: 特征向量, 大小(batch_size, vector_size)
squared: 是否距离的平方,即欧式距离
Returns:
distances: 两两embeddings的距离矩阵,大小 (batch_size, batch_size)
'''
# 矩阵相乘,得到(batch_size, batch_size),因为计算欧式距离|a-b|^2 = a^2 -2ab + b^2,
# 其中 ab 可以用矩阵乘表示
dot_product = tf.matmul(embeddings, tf.transpose(embeddings))
# dot_product对角线部分就是 每个embedding的平方
square_norm = tf.diag_part(dot_product)
# |a-b|^2 = a^2 - 2ab + b^2
# tf.expand_dims(square_norm, axis=1)是(batch_size, 1)大小的矩阵,减去 (batch_size, batch_size)大小的矩阵,相当于每一列操作
distances = tf.expand_dims(square_norm, axis=1) - 2.0 * dot_product + tf.expand_dims(square_norm, axis=0)
distances = tf.maximum(distances, 0.0) # 小于0的距离置为0
if not squared: # 如果不平方,就开根号,但是注意有0元素,所以0的位置加上 1e*-16
distances = distances + mask * 1e-16
distances = tf.sqrt(distances)
distances = distances * (1.0 - mask) # 0的部分仍然置为0
return distances
1.2 计算valid mask
-
numpy
中的实现, 点击查看 - 上面得到了
(batch_size, batch_size)
大小的距离矩阵,然后就可以计算所有embeddings
组成的三元组<i, j, k>
损失 - 但是不是所有的三元组都是
valid
的, 要是<a, p, n>
的形式,所以计算一个3D
的mask
,然后乘上得到的(batch_size, batch_size, batch_size)
的所有三元组的损失即可,如何得到mask
呢 -
<i, j, k>
要满足-
i, j, k
不相等 labels[i] == labels[j] and labels[i] != labels[k]
-
def _get_triplet_mask(labels):
'''
得到一个3D的mask [a, p, n], 对应triplet(a, p, n)是valid的位置是True
----------------------------------
Args:
labels: 对应训练数据的labels, shape = (batch_size,)
Returns:
mask: 3D,shape = (batch_size, batch_size, batch_size)
'''
# 初始化一个二维矩阵,坐标(i, j)不相等置为1,得到indices_not_equal
indices_equal = tf.cast(tf.eye(tf.shape(labels)[0]), tf.bool)
indices_not_equal = tf.logical_not(indices_equal)
# 因为最后得到一个3D的mask矩阵(i, j, k),增加一个维度,则 i_not_equal_j 在第三个维度增加一个即,(batch_size, batch_size, 1), 其他同理
i_not_equal_j = tf.expand_dims(indices_not_equal, 2)
i_not_equal_k = tf.expand_dims(indices_not_equal, 1)
j_not_equal_k = tf.expand_dims(indices_not_equal, 0)
# 想得到i!=j!=k, 三个不等取and即可, 最后可以得到当下标(i, j, k)不相等时才取True
distinct_indices = tf.logical_and(tf.logical_and(i_not_equal_j, i_not_equal_k), j_not_equal_k)
# 同样根据labels得到对应i=j, i!=k
label_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1))
i_equal_j = tf.expand_dims(label_equal, 2)
i_equal_k = tf.expand_dims(label_equal, 1)
valid_labels = tf.logical_and(i_equal_j, tf.logical_not(i_equal_k))
# mask即为满足上面两个约束,所以两个3D取and
mask = tf.logical_and(distinct_indices, valid_labels)
return mask
1.3 计算triplet loss
-
numpy
中的实现, 点击查看 -
1.1
中计算得到了两两embeddings
的距离,大小(batch_size, batch_size)
, 需要得到所有三元组的triplet loss
, 即(batch_size, batch_size, batch_size)
大小 - 为什么
triplet_loss = anchor_positive_dist - anchor_negative_dist + margin
可以得到所有(i, j, k)
的triplet loss
,- 如下图,
x0y
平面的是anchor_positive_dist
的距离矩阵(其实是3D
的, 想象一下) -
x0z
平面是anchor_negative_dist
的距离矩阵(也是3D
的) - 两个相减, 比如
0-0 = 0
就相当于i=0, j=0
的距离,减去j=0, k=0
的距离 - 以此类推,得到所有三元组的
loss
- 如下图,
def batch_all_triplet_loss(labels, embeddings, margin, squared=False):
'''
triplet loss of a batch
-------------------------------
Args:
labels: 标签数据,shape = (batch_size,)
embeddings: 提取的特征向量, shape = (batch_size, vector_size)
margin: margin大小, scalar
Returns:
triplet_loss: scalar, 一个batch的损失值
fraction_postive_triplets : valid的triplets占的比例
'''
# 得到每两两embeddings的距离,然后增加一个维度,一维需要得到(batch_size, batch_size, batch_size)大小的3D矩阵
# 然后再点乘上valid 的 mask即可
pairwise_dis = _pairwise_distance(embeddings, squared=squared)
anchor_positive_dist = tf.expand_dims(pairwise_dis, 2)
assert anchor_positive_dist.shape[2] == 1, "{}".format(anchor_positive_dist.shape)
anchor_negative_dist = tf.expand_dims(pairwise_dis, 1)
assert anchor_negative_dist.shape[1] == 1, "{}".format(anchor_negative_dist.shape)
triplet_loss = anchor_positive_dist - anchor_negative_dist + margin
mask = _get_triplet_mask(labels)
mask = tf.to_float(mask)
triplet_loss = tf.multiply(mask, triplet_loss)
triplet_loss = tf.maximum(triplet_loss, 0.0)
# 计算valid的triplet的个数,然后对所有的triplet loss求平均
valid_triplets = tf.to_float(tf.greater(triplet_loss, 1e-16))
num_positive_triplets = tf.reduce_sum(valid_triplets)
num_valid_triplets = tf.reduce_sum(mask)
fraction_postive_triplets = num_positive_triplets / (num_valid_triplets + 1e-16)
triplet_loss = tf.reduce_sum(triplet_loss) / (num_positive_triplets + 1e-16)
return triplet_loss, fraction_postive_triplets
2、Batch Hard
-
numpy
中的实现,点击查看 - 因为最后只有个
triplet
, 从positive
中选择距离最大的,从negative
中选择距离最小的即可
2.1 计算positive mask
- 满足
a!=p and a, p label一致即可
- 之后用
mask
乘上计算的pairwice_distances
, 然后取每行最大值即为每个样本对应positive
的最大距离
def _get_anchor_positive_triplet_mask(labels):
'''
得到合法的positive的mask, 即2D的矩阵,[a, p], a!=p and a和p相同labels
------------------------------------------------
Args:
labels: 标签数据,shape = (batch_size, )
Returns:
mask: 合法的positive mask, shape = (batch_size, batch_size)
'''
indices_equal = tf.cast(tf.eye(tf.shape(labels)[0]), tf.bool)
indices_not_equal = tf.logical_not(indices_equal) # (i, j)不相等
labels_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1)) # labels相等,
mask = tf.logical_and(indices_not_equal, labels_equal) # 取and即可
return mask
2.2 计算negative mask
- 只需
[a, n]
对应的labels
不一致即可
def _get_anchor_negative_triplet_mask(labels):
'''
得到negative的2D mask, [a, n] 只需a, n不同且有不同的labels
------------------------------------------------
Args:
labels: 标签数据,shape = (batch_size, )
Returns:
mask: negative mask, shape = (batch_size, batch_size)
'''
labels_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1))
mask = tf.logical_not(labels_equal)
return mask
2.3 batch hard loss
- 计算最大
positive
距离时直接取valid
的每一行的最大值即可 - 计算最小
negative
距离时不能直接取每一行的最小值,因为invalid
位置的值为0
,所以可以在invalid
位置加上每一行的最大值,然后就可以取每一行的最小值了
def batch_hard_triplet_loss(labels, embeddings, margin, squared=False):
'''
batch hard triplet loss of a batch, 每个样本最大的positive距离 - 对应样本最小的negative距离
------------------------------------
Args:
labels: 标签数据,shape = (batch_size,)
embeddings: 提取的特征向量, shape = (batch_size, vector_size)
margin: margin大小, scalar
Returns:
triplet_loss: scalar, 一个batch的损失值
'''
pairwise_distances = _pairwise_distance(embeddings)
mask_anchor_positive = _get_anchor_positive_triplet_mask(labels)
mask_anchor_positive = tf.to_float(mask_anchor_positive)
anchor_positive_dist = tf.multiply(mask_anchor_positive, pairwise_distances)
hardest_positive_dist = tf.reduce_max(anchor_positive_dist, axis=1, keepdims=True) # 取每一行最大的值即为最大positive距离
tf.summary.scalar("hardest_positive_dis", tf.reduce_mean(hardest_positive_dist))
'''取每一行最小值得时候,因为invalid [a, n]置为了0, 所以不能直接取,这里对应invalid位置加上每一行的最大值即可,然后再取最小的值'''
mask_anchor_negative = _get_anchor_negative_triplet_mask(labels)
mask_anchor_negative = tf.to_float(mask_anchor_negative)
max_anchor_negative_dist = tf.reduce_max(pairwise_distances, axis=1, keepdims=True) # 每一样最大值
anchor_negative_dist = pairwise_distances + max_anchor_negative_dist * (1.0 - mask_anchor_negative) # (1.0 - mask_anchor_negative)即为invalid位置
hardest_negative_dist = tf.reduce_min(anchor_negative_dist, axis=1, keepdims=True)
tf.summary.scalar("hardest_negative_dist", tf.reduce_mean(hardest_negative_dist))
triplet_loss = tf.maximum(hardest_positive_dist - hardest_negative_dist + margin, 0.0)
triplet_loss = tf.reduce_mean(triplet_loss)
return triplet_loss
三、具体使用
- 使用
mnist
数据集和triplet loss
训练,最后得到的embeddings
应该是同一类别的靠在一起 - 因为只有
10
个类别,所以直接随机取batch
大小的数据,这里batch_size=64
,- 注意如果类别很多时,就不能随机构建
batch
了, 需要选P
个类别,然后每个类别选K
张图
- 注意如果类别很多时,就不能随机构建
3.1 构建模型
- 上一篇介绍了 tensorflow的高级API, 这里使用
Estimator
构建模型 - 全部代码:点击查看
3.1.1 使用Estimator
-
params
指定超参数, 这里保存为json
格式的文件,- 配置为:
{
"learning_rate": 1e-3,
"batch_size": 64,
"num_epochs": 20,
"num_channels": 32,
"use_batch_norm": false,
"bn_momentum": 0.9,
"margin": 0.5,
"embedding_size": 64,
"triplet_strategy": "batch_all",
"squared": false,
"image_size": 28,
"num_labels": 10,
"train_size": 50000,
"eval_size": 10000,
"num_parallel_calls": 4,
"save_summary_steps": 50
}
def main(argv):
args = parser.parse_args(argv[1:])
tf.logging.info("创建模型....")
with open(args.model_config) as f:
params = json.load(f)
config = tf.estimator.RunConfig(model_dir=args.model_dir, tf_random_seed=100) # config
cls = tf.estimator.Estimator(model_fn=my_model, config=config, params=params) # 建立模型
tf.logging.info("开始训练模型,共{} epochs....".format(params['num_epochs']))
cls.train(input_fn = lambda: train_input_fn(args.data_dir, params)) # 训练模型,指定输入
tf.logging.info("测试集评价模型....")
res = cls.evaluate(input_fn = lambda: test_input_fn(args.data_dir, params)) # 测试模型,指定输入
for key in res:
print("评价---{} : {}".format(key, res[key]))
3.1.2 model_fn函数
- 下面都有对应注释
- 计算
embedding_mean_norm
中每一行embeding
公式为: , 然后再取均值
def my_model(features, labels, mode, params):
'''
model_fn指定函数,构建模型,训练等
---------------------------------
Args:
features: 输入,shape = (batch_size, 784)
labels: 输出,shape = (batch_size, )
mode: str, 阶段
params: dict, 超参数
'''
is_training = (mode == tf.estimator.ModeKeys.TRAIN)
images = features
images = tf.reshape(images, shape=[-1, params['image_size'], params['image_size'], 1]) # reshape (batch_size, img_size, img_size, 1)
with tf.variable_scope("model"):
embeddings = build_model(is_training, images, params) # 简历模型
if mode == tf.estimator.ModeKeys.PREDICT: # 如果是预测阶段,直接返回得到embeddings
predictions = {'embeddings': embeddings}
return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)
'''调用对应的triplet loss'''
labels = tf.cast(labels, tf.int64)
if params['triplet_strategy'] == 'batch_all':
loss, fraction = batch_all_triplet_loss(labels, embeddings, margin=params['margin'], squared=params['squared'])
elif params['triplet_strategy'] == 'batch_hard':
loss = batch_hard_triplet_loss(labels, embeddings, margin=params['margin'], squared=params['squared'])
else:
raise ValueError("triplet_strategy 配置不正确: {}".format(params['triplet_strategy']))
embedding_mean_norm = tf.reduce_mean(tf.norm(embeddings, axis=1)) # 这里计算了embeddings的二范数的均值
tf.summary.scalar("embedding_mean_norm", embedding_mean_norm)
with tf.variable_scope("metrics"):
eval_metric_ops = {'embedding_mean_norm': tf.metrics.mean(embedding_mean_norm)}
if params['triplet_strategy'] == 'batch_all':
eval_metric_ops['fraction_positive_triplets'] = tf.metrics.mean(fraction)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss, eval_metric_ops=eval_metric_ops)
tf.summary.scalar('loss', loss)
if params['triplet_strategy'] == "batch_all":
tf.summary.scalar('fraction_positive_triplets', fraction)
tf.summary.image('train_image', images, max_outputs=1) # 1代表1个channel
optimizer = tf.train.AdamOptimizer(learning_rate=params['learning_rate'])
global_step = tf.train.get_global_step()
if params['use_batch_norm']:
'''如果使用BN,需要估计batch上的均值和方差,tf.get_collection(tf.GraphKeys.UPDATE_OPS)就可以得到
tf.control_dependencies计算完之后再进行里面的操作
'''
with tf.control_dependencies(tf.get_collection(tf.GraphKeys.UPDATE_OPS)):
train_op = optimizer.minimize(loss, global_step=global_step)
else:
train_op = optimizer.minimize(loss, global_step=global_step)
return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
3.1.3 构建模型,得到embeddings
def build_model(is_training, images, params):
'''
建立模型
----------------------------
Args:
is_training: bool, 是否是训练阶段,可以从mode中判断
images: (batch_size, 28*28*1), 输入mnist数据
params: dict, 一些超参数
Returns:
out: 输出的embeddings, shape = (batch_size, 64)
'''
num_channel = params['num_channels']
bn_momentum = params['bn_momentum']
channels = [num_channel, num_channel * 2]
out = images
for i, c in enumerate(channels):
with tf.variable_scope("block_{}".format(i)):
out = tf.layers.conv2d(out, c, 3, padding='same')
if params['use_batch_norm']:
out = tf.layers.batch_normalization(out, momentum=bn_momentum, training=is_training)
out = tf.nn.relu(out)
out = tf.layers.max_pooling2d(out, 2, 2)
assert out.shape[1:] == [7, 7, num_channel * 2]
out = tf.reshape(out, [-1, 7*7*num_channel*2])
with tf.variable_scope("fc_1"):
out = tf.layers.dense(out, params['embedding_size'])
return out
3.2 训练结果
3.2.1 batch all
-
python train_with_triplet_loss.py
-
可以在
tensorboard
中查看tensorboard --logdir experiment/model/
-
embeddings_mean_norm
- [ 可以看到是上升的,因为我们要学到可分性好的
embeddings
, 那么其方差应该是偏大的,均值应该是变大的 ]
- [ 可以看到是上升的,因为我们要学到可分性好的
-
fraction positive
- 这个是收敛的,因为随着优化占的比例是越来越少
- 这个是收敛的,因为随着优化占的比例是越来越少
-
loss
- 注意这里的
loss
一般不是收敛的,因为是计算的semi-hard
和hard
的距离均值,因为每次是先选择出semi-hard
和hard
的triplet
, 那么上次优化后的可能就选择不到了,所以loss
并不会收敛,但是fraction_postive_triplets
是收敛的,因为随着优化占的比例是越来越少的
- 注意这里的
3.2.2 batch hard
- embeddings mean norm
- positive and negative distance
- 这里我原以为应该是
negative
应该是增大的,positive
应该是减小的,但实际结果是positive
也是增大的,因为我们计算loss
是triplet_loss = tf.maximum(hardest_positive_dist - hardest_negative_dist + margin, 0.0)
, 只要negative
的距离大于positive + margin
就是0
了,所以只要满足就行, 用BN
训练的效果可能更好一点。(有什么其他看法的可以交流一下)
- 这里我原以为应该是
- loss
-
batch hard
的loss
就应该是收敛的了
-
3.3 可视化embedding
- 全部代码: 点击查看
- 之前在
tensorflow
工具中使用过: 点击查看 - 这里将可视化
embeddings
的训练数据都放在experiment/log
文件夹下-
另外我使用
tensorflow 1.11
出现问题,这里使用的版本是tensorflow 1.10
-
另外我使用
- 加载训练的模型,预测得到embeddings
args = parser.parse_args(argv[1:])
with open(args.model_config) as f:
params = json.load(f)
tf.logging.info("创建模型....")
config = tf.estimator.RunConfig(model_dir=args.model_dir, tf_random_seed=100) # config
cls = tf.estimator.Estimator(model_fn=my_model, config=config, params=params) # 建立模型
tf.logging.info("预测....")
predictions = cls.predict(input_fn=lambda: test_input_fn(args.data_dir, params))
embeddings = np.zeros((10000, params['embedding_size']))
for i, p in enumerate(predictions):
embeddings[i] = p['embeddings']
tf.logging.info("embeddings shape: {}".format(embeddings.shape))
- 获得label数据,保存为
metadata.tsv
文件
with tf.Session() as sess:
# Obtain the test labels
dataset = mnist_dataset.test(args.data_dir)
dataset = dataset.map(lambda img, lab: lab)
dataset = dataset.batch(10000)
labels_tensor = dataset.make_one_shot_iterator().get_next()
labels = sess.run(labels_tensor)
np.savetxt(os.path.join(args.log_dir, 'metadata.tsv'), labels, fmt='%d')
- 可视化
embedding
shutil.copy(args.sprite_filename, args.log_dir)
'''可视化embeddings'''
with tf.Session() as sess:
# 1. Variable
embedding_var = tf.Variable(embeddings, name="mnist_embeddings")
#tf.global_variables_initializer().run() # 不需要
# 2. 保存到文件中,embeddings.ckpt
saver = tf.train.Saver()
sess.run(embedding_var.initializer)
saver.save(sess, os.path.join(args.log_dir, 'embeddings.ckpt'))
# 3. 关联metadata.tsv, 和mnist_10k_sprite.png
summary_writer = tf.summary.FileWriter(args.log_dir)
config = projector.ProjectorConfig()
embedding = config.embeddings.add()
embedding.tensor_name = embedding_var.name
embedding.metadata_path = 'metadata.tsv'
embedding.sprite.image_path = 'mnist_10k_sprite.png'
embedding.sprite.single_image_dim.extend([28, 28])
projector.visualize_embeddings(summary_writer, config)
3.3.1 batch all
-
PCA
结果