本章继续分析,BiFPN的实现部分。各种FPN结构如下:
如图,单个BiFPN有8个Node。P3-P7分别是backbone后3层,和后面链接的两层ResampleFeatureMap。在Efficientdet模型中,由如下类组成:
FPNCells:按fpn_cell_repeats参数生成重复多个BiFPN。
FPNCell:代表单个BiFPN,包含8个Node。
FNode:将当前Node的多个输入融合,产生一个输出。
代码结构如下:
FPNCells代码如下:
class FPNCells(tf.keras.layers.Layer): """FPN cells.""" def __init__(self, config, name='fpn_cells'): super().__init__(name=name) self.config = config # fpn_config:None if config.fpn_config: self.fpn_config = config.fpn_config else: # fpn_name:None # fpn_weight_method:None self.fpn_config = fpn_configs.get_fpn_config(config.fpn_name, config.min_level, config.max_level, config.fpn_weight_method) self.cells = [ FPNCell(self.config, name='cell_%d' % rep) for rep in range(self.config.fpn_cell_repeats) ] def call(self, feats, training): for cell in self.cells: # feats会在里面累加 cell_feats = cell(feats, training) min_level = self.config.min_level max_level = self.config.max_level # 每次会清空 feats = [] # 2 -> 7 for level in range(min_level, max_level + 1): # reversed:序列反转 for i, fnode in enumerate(reversed(self.fpn_config.nodes)): if fnode['feat_level'] == level: feats.append(cell_feats[-1 - i]) break # 这里输出的是最后FPNCell结果 return featsFPNCell代码如下:
class FPNCell(tf.keras.layers.Layer): """A single FPN cell.""" def __init__(self, config, name='fpn_cell'): super().__init__(name=name) self.config = config if config.fpn_config: self.fpn_config = config.fpn_config else: self.fpn_config = fpn_configs.get_fpn_config(config.fpn_name, config.min_level, config.max_level, config.fpn_weight_method) self.fnodes = [] for i, fnode_cfg in enumerate(self.fpn_config.nodes): logging.info('fnode %d : %s', i, fnode_cfg) fnode = FNode( fnode_cfg['feat_level'] - self.config.min_level, fnode_cfg['inputs_offsets'], config.fpn_num_filters, # True config.apply_bn_for_resampling, # True config.is_training_bn, # False config.conv_after_downsample, # False config.conv_bn_act_pattern, # True config.separable_conv, config.act_type, # None strategy=config.strategy, # fastattn weight_method=self.fpn_config.weight_method, data_format=config.data_format, name='fnode%d' % i) self.fnodes.append(fnode) def call(self, feats, training): for fnode in self.fnodes: feats = fnode(feats, training) return featsFNode代码如下:
class FNode(tf.keras.layers.Layer): """A Keras Layer implementing BiFPN Node.""" def __init__(self, feat_level, inputs_offsets, fpn_num_filters, apply_bn_for_resampling, is_training_bn, conv_after_downsample, conv_bn_act_pattern, separable_conv, act_type, strategy, weight_method, data_format, name='fnode'): super().__init__(name=name) self.feat_level = feat_level self.inputs_offsets = inputs_offsets self.fpn_num_filters = fpn_num_filters self.apply_bn_for_resampling = apply_bn_for_resampling self.separable_conv = separable_conv self.act_type = act_type self.is_training_bn = is_training_bn self.conv_after_downsample = conv_after_downsample self.strategy = strategy self.data_format = data_format self.weight_method = weight_method self.conv_bn_act_pattern = conv_bn_act_pattern self.resample_layers = [] self.vars = [] def fuse_features(self, nodes): """Fuse features from different resolutions and return a weighted sum. Args: nodes: a list of tensorflow features at different levels Returns: A tensor denoting the fused feature. """ dtype = nodes[0].dtype if self.weight_method == 'attn': edge_weights = [] for var in self.vars: var = tf.cast(var, dtype=dtype) edge_weights.append(var) normalized_weights = tf.nn.softmax(tf.stack(edge_weights)) nodes = tf.stack(nodes, axis=-1) new_node = tf.reduce_sum(nodes * normalized_weights, -1) elif self.weight_method == 'fastattn': edge_weights = [] for var in self.vars: var = tf.cast(var, dtype=dtype) edge_weights.append(var) weights_sum = tf.add_n(edge_weights) nodes = [ nodes[i] * edge_weights[i] / (weights_sum + 0.0001) for i in range(len(nodes)) ] new_node = tf.add_n(nodes) elif self.weight_method == 'channel_attn': edge_weights = [] for var in self.vars: var = tf.cast(var, dtype=dtype) edge_weights.append(var) normalized_weights = tf.nn.softmax(tf.stack(edge_weights, -1), axis=-1) nodes = tf.stack(nodes, axis=-1) new_node = tf.reduce_sum(nodes * normalized_weights, -1) elif self.weight_method == 'channel_fastattn': edge_weights = [] for var in self.vars: var = tf.cast(var, dtype=dtype) edge_weights.append(var) weights_sum = tf.add_n(edge_weights) nodes = [ nodes[i] * edge_weights[i] / (weights_sum + 0.0001) for i in range(len(nodes)) ] new_node = tf.add_n(nodes) elif self.weight_method == 'sum': new_node = tf.add_n(nodes) else: raise ValueError('unknown weight_method %s' % self.weight_method) return new_node def _add_wsm(self, initializer): for i, _ in enumerate(self.inputs_offsets): name = 'WSM' + ('' if i == 0 else '_' + str(i)) self.vars.append( # 这里权重是一个标量 self.add_weight( initializer=initializer, name=name, trainable=self.is_training_bn)) def build(self, feats_shape): for i, input_offset in enumerate(self.inputs_offsets): name = 'resample_{}_{}_{}'.format(i, input_offset, len(feats_shape)) self.resample_layers.append( ResampleFeatureMap( self.feat_level, self.fpn_num_filters, self.apply_bn_for_resampling, self.is_training_bn, self.conv_after_downsample, strategy=self.strategy, data_format=self.data_format, name=name)) if self.weight_method == 'attn': self._add_wsm('ones') elif self.weight_method == 'fastattn': self._add_wsm('ones') elif self.weight_method == 'channel_attn': num_filters = int(self.fpn_num_filters) self._add_wsm(lambda: tf.ones([num_filters])) elif self.weight_method == 'channel_fastattn': num_filters = int(self.fpn_num_filters) self._add_wsm(lambda: tf.ones([num_filters])) self.op_after_combine = OpAfterCombine( self.is_training_bn, self.conv_bn_act_pattern, self.separable_conv, self.fpn_num_filters, self.act_type, self.data_format, self.strategy, name='op_after_combine{}'.format(len(feats_shape))) self.built = True super().build(feats_shape) def call(self, feats, training): nodes = [] append_feats = [] for i, input_offset in enumerate(self.inputs_offsets): input_node = feats[input_offset] input_node = self.resample_layers[i](input_node, training, feats) nodes.append(input_node) # 这里把所有结果加起来 new_node = self.fuse_features(nodes) # 这里先经过激活函数,再卷积+BN new_node = self.op_after_combine(new_node) append_feats.append(new_node) # 原特征 + 新特征,用于特征累加 return feats + append_feats到此本章内容结束,下一章,Box与Class回归细节分析,敬请关注!!!
