# 稠密连接网络（DenseNet）

DenseNet121/169/201模型代码，参照DenseNet原文和torchvision源码实现，供参考

``````import re
import torch
import torchvision
import torch.utils.checkpoint as cp
from torch import nn, Tensor
from typing import List, Tuple
from collections import OrderedDict

class _DenseLayer(nn.Module):
def __init__(self, in_channels, growth_rate, bn_size,
dropout_rate, momery_efficient: bool):
super(_DenseLayer, self).__init__()
self.momery_efficient = momery_efficient
self.dropout_rate = dropout_rate
# 用于bottleneck前向计算, 标号2表示加载checkpoints的
self.norm1: nn.BatchNorm2d
self.relu1: nn.ReLU
self.conv1: nn.Conv2d
self.norm2: nn.BatchNorm2d
self.relu2: nn.ReLU
self.conv2: Conv2d
# 将bottleneck添加到网络中
kernel_size=1, stride=1, bias=False))
bias=False))

def bottleneck(self, inputs: List[Tensor]):
concated_features = torch.cat(inputs, 1)
bottleneck_ouputs = self.conv1(self.relu1(self.norm1(concated_features)))
return bottleneck_ouputs

@torch.jit.unused
def call_checkpoints_bottleneck(self, inputs: List[Tensor]):
def closure(*inputs):
return self.bottleneck(inputs)
return cp.checkpoint(closure, *inputs)

def forward(self, input: Tensor):
if isinstance(input, Tensor):
prev_features = [input]
else:
prev_features = input
if self.momery_efficient:
bottleneck_ouputs = self.call_checkpoints_bottleneck(prev_features)
else:
bottleneck_ouputs = self.bottleneck(prev_features)
new_features = self.conv2(self.relu2(self.norm2(bottleneck_ouputs)))
if self.dropout_rate > 0:
new_features = F.dropout(new_features, p=self.dropout_rate,
training=self.training)
return new_features

class _DenseBlock(nn.ModuleDict):
def __init__(self, num_layers, in_channels, growth_rate, bn_size,
dropout_rate, momery_efficient):
super(_DenseBlock, self).__init__()
for i in range(num_layers):
layer = _DenseLayer(in_channels + growth_rate * i, growth_rate,
bn_size, dropout_rate, momery_efficient)
self.add_module('denselayer%d' % (i + 1), layer)
def forward(self, x):
# 先把上个denseblock的输入放到一个列表，后面逐渐添加各denselayer输出
features = [x]
# self.items()访问以OrderedDict方式存在当前self._modules中的layers
for name, layer in self.items():
new_features = layer(features)
features.append(new_features)
return features

class _Transition(nn.Module):#
def __init__(self, in_channels, out_channels):
super(_Transition, self).__init__()
# 调整（增加）channels
kernel_size=1, stride=1, bias=False))
# 减小feature-maps尺寸

class DenseNet(nn.Module):
def __init__(self, block_config: Tuple[int, int, int, int],
num_classes: int = 1000,
in_channels: int = 64,
growth_rate: int = 32,
bn_size: int = 4,
dropout_rate: float = 0.,
momery_efficient: bool = False):
super(DenseNet, self).__init__()
# 前面初始部分
self.features = nn.Sequential(OrderedDict([
('conv0', nn.Conv2d(3, in_channels, kernel_size=7, stride=2,
('norm0', nn.BatchNorm2d(in_channels)),
('relu0', nn.ReLU(inplace=True)),
]))
# 密集部分
num_features = in_channels
for i, num_layers in enumerate(block_config):
denseblock = _DenseBlock(num_layers, num_features, growth_rate,
bn_size, dropout_rate, momery_efficient)
self.features.add_module('denseblock%d' % (i + 1), denseblock)
num_features += growth_rate * num_layers
if i < len(block_config) - 1:
trans = _Transition(num_features, num_features // 2)
self.features.add_module('transition%d' % (i + 1), trans)
# transition通道减半，更新
num_features = num_features // 2
# 结尾前batchnorm

self.classifier = nn.Linear(num_features, num_classes)

# 初始化参数
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
nn.init.constant_(m.bias, 0)

def forward(self, x):
out = self.features(x)
out = F.relu(out, inplace=True)
out = torch.flatten(out, 1)
out = self.classifier(out)
return out

class Constructor:
def __init__(self, num_classes: int = 1000,
momery_efficient: bool = False,
progress: bool = True):
self.num_classes = num_classes
self.momery_efficient = momery_efficient
self.progress = progress
self.model_urls = {
}

def _load_state_dict(self, model: nn.Module, model_url: str):
pattern = re.compile(
r'^(.*denselayer\d+\.(?:norm|relu|conv))\.((?:[12])\.(?:weight|bias|running_mean|running_var))\$')
for key in list(state_dict.keys()):
res = pattern.match(key)
if res:
new_key = res.group(1) + res.group(2)
state_dict[new_key] = state_dict[key]
del state_dict[key]

def _build_model(self, block_config, moder_url=None):
model = DenseNet(block_config=block_config,
num_classes=self.num_classes,
momery_efficient=self.momery_efficient)
return model

def DenseNet121(self):
return self._build_model((6, 12, 24, 16), self.model_urls['densenet121'])

def DenseNet169(self):
return self._build_model((6, 12, 32, 32), self.model_urls['densenet169'])

def DenseNet201(self):
return self._build_model((6, 12, 48, 32), self.model_urls['densenet201'])

if __name__ == '__main__':
num_classes = 1000
momery_efficient = True
progress = True
progress).DenseNet169()
print(densenet169)``````
1. 主要原因在于过渡层中Pooling之前涉及维度压缩，为了更好完整地保留和传递信息因而采用了AvgPooling

2. 教材中DenseNet于ResNet的卷积层层数及全连接层层数都是一样的，而网络的参数量也主要来自这两个部分。造成差距的主要原因在于卷积层和全连接层在通道数目上的差异。DenseNet通过过渡层不断控制通道数量，每个卷积的输入、输出通道数都没有超过256；反观ResNet，block3中5个卷积层输出通道均为256，block4中5个卷积层输出通道均为512，导致卷积层的参数量大幅增加；此外，在单层参数量最大的全连接层中，DenseNet输入通道数为248，远小于ResNet的512，因此在这一部分也获得了巨大的优势，最终使得DenseNet总体参数量比ResNet有了显著的下降

3. 将输入大小同为96的情况下DenseNet的显存占用为 3161MiB / 4096MiB，ResNet为 2725MiB / 4096MiB，可见确实占用更大可以通过引入bottleneck结构来降低显存占用

# 在稠密块之间添加一个转换层，使通道数量减半

``````if i != len(num_convs_in_dense_blocks) - 1:
blks.append(transition_block(num_channels, num_channels // 2))
num_channels = num_channels // 2
``````

``````from torch.hub import load_state_dict_from_url
``````

1. `_Transition`类继承写错了，应该是继承`nn.Sequential`。还有一种不改继承的思路，即补上`forward`函数。

2. `_DenseBlock`类的`forward`方法中的`return`语句错了，应该返回一个`Tensor`而不是`list`，正确写法应该为`return torch.cat(features, 1)`

``````import re
from matplotlib.pyplot import isinteractive
import torch
import torchvision
from torch import nn
import torch.utils.checkpoint as cp
from typing import List, Tuple
from collections import OrderedDict

class _DenseLayer(nn.Module):
def __init__(self, in_channels, growth_rate, bn_size, dropout_rate, memory_efficient: bool):
super().__init__()
self.memory_efficient = memory_efficient
self.dropout_rate = dropout_rate
# 用于bottleneck的forward计算，标号2表示加载checkpoints
# 这里用先标识了每个层的类型
self.norm1: nn.BatchNorm2d
self.relu1: nn.ReLU
self.conv1: nn.Conv2d
self.norm2: nn.BatchNorm2d
self.relu2: nn.ReLU
self.conv2: nn.Conv2d
self.drop: nn.Dropout2d
# 添加bottleneck到网络中
in_channels, bn_size * growth_rate, kernel_size=1, stride=1, bias=False))
if self.dropout_rate > 0:

def bottleneck(self, input: List[torch.Tensor]):
concated_features = torch.cat(input, dim=1)
bottle_neck_outputs = self.conv1(
self.relu1(self.norm1(concated_features)))
return bottle_neck_outputs

@torch.jit.unused
def call_checkpoints_bottleneck(self, input: List[torch.Tensor]):
def closure(*inputs):
return self.bottleneck(inputs)
return cp.checkpoint(closure, *input)

def forward(self, input: torch.Tensor):
# 若输入不是list，则转换为list
if isinstance(input, torch.Tensor):
prev_features = [input]
else:
prev_features = input
if self.memory_efficient:
bottleneck_output = self.call_checkpoints_bottleneck(prev_features)
else:
bottleneck_output = self.bottleneck(prev_features)
new_features = self.conv2(self.relu2(self.norm2(bottleneck_output)))
if self.dropout_rate > 0:
new_features = self.drop(new_features)
return new_features

class _DenseBlock(nn.ModuleDict):
'''
stacked dense layers to form a dense block
'''

def __init__(self, num_layers, in_channels, growth_rate, bn_size, dropout_rate, memory_efficient) -> None:
super().__init__()
for i in range(num_layers):
layer = _DenseLayer(in_channels + growth_rate * i,
growth_rate, bn_size, dropout_rate, memory_efficient)
# 层的标识下标是从1开始，计算size时使用0开始会更方便

def forward(self, x: torch.Tensor):
# 先将上个denseblock放入一个列表，然后逐渐添加各denselayer的输出
# features 会在每个denselayer中的bottlelayer进行concat，然后再进行计算
# 这样通过denselayer中的checkpoints模块函数进行管理，可以实现memory efficient
features = [x]
# self.items()以OrderDict的方式访问self._modules中的layers
for name, layer in self.items():
new_features = layer(features)
features.append(new_features)

class _Transition(nn.Module):
'''
transition layer
'''

def __init__(self, in_channels, out_channels):
super().__init__()
# 调整channels
in_channels, out_channels, kernel_size=1, stride=1, bias=False))
# 调整feature map的大小

def forward(self, x: torch.Tensor):
out = self.norm(x)
out = self.relu(out)
out = self.conv(out)
out = self.pool(out)
return out

class Densenet(nn.Module):
def __init__(self, block_config: Tuple[int, int, int, int],
num_classes: int = 1000,
in_channels: int = 64,
growth_rate: int = 32,
bn_size: int = 4,
dropout_rate: float = 0,
memory_efficient: bool = False):
super().__init__()
# stage 1: initial convolution
# 适应fashion mnist，改为单通道
self.features = nn.Sequential(OrderedDict([
('conv0', nn.Conv2d(1, in_channels,
('norm0', nn.BatchNorm2d(in_channels)),
('relu0', nn.ReLU(inplace=True)),
]))
# stage 2: dense blocks
num_features = in_channels
for i, num_layers in enumerate(block_config):
denseblock = _DenseBlock(
num_layers, num_features, growth_rate, bn_size, dropout_rate, memory_efficient)
num_features += num_layers * growth_rate
# 判断是否到了最后一层，如果是最后一层，这里应该接分类层，而不是transition layer
if i != len(block_config) - 1:
# 这里设置通道数目直接减半，feature map H W同时减半
trans = _Transition(num_features, num_features // 2)
num_features = num_features // 2
# 结尾前的batchnorm

self.classifier = nn.Linear(num_features, num_classes)

# 初始化参数
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
nn.init.constant_(m.bias, 0)

def forward(self, x):
out = self.features(x)
out = self.classifier(out)
return out

class Constructor:
def __init__(self, num_classes: int = 1000,
memory_efficient: bool = False,
progress: bool = True):
self.num_classes = num_classes
self.memory_efficient = memory_efficient
self.progress = progress
# 并不能直接用这些官方模型参数，因为模型上有些细节和官方不一样
# 这里只为了了解官方加载的代码方式而写
self.model_urls = {
}

def _load_state_dict(self, model: nn.Module, model_url: str):
model_url, progress=self.progress)
pattern = re.compile(
r'^(.*denselayer\d+\.(?:norm|relu|conv))\.((?:[12])\.(?:weight|bias|running_mean|running_var))\$')
for key in list(state_dict.keys()):
res = pattern.match(key)
if res:
new_key = res.group(1) + res.group(2)
state_dict[new_key] = state_dict[key]
del state_dict[key]

def _build_model(self, block_config, model_url=None):
model = Densenet(block_config, self.num_classes,
memory_efficient=self.memory_efficient)
if model_url is None:
model_url = self.model_urls['densenet121']
return model

def densenet121(self):
return self._build_model((6, 12, 24, 16), self.model_urls['densenet121'])

def densenet169(self):
return self._build_model((6, 12, 32, 32), self.model_urls['densenet169'])

def densenet201(self):
return self._build_model((6, 12, 48, 32), self.model_urls['densenet201'])

from torchinfo import summary
from d2l import torch as d2l

num_classes = 10
memory_efficient = True
progress = True

progress).densenet169().to('cpu')
summary(densenet121, input_size=(256, 3, 224, 224), device='cpu')
# X = torch.randn(1, 1, 224, 224).to('cpu')
# print(densenet121(X).shape)
``````
def conv_block_mlp(input_channels,num_channels):

``````return nn.Sequential(

nn.BatchNorm1d(input_channels),nn.ReLU(),

nn.Linear(input_channels,num_channels)

)
``````

class DenseBlock_mlp(nn.Module):

``````def __init__(self,num_convs,input_channels,num_channels) -> None:

super().__init__()

layer=[]

for i in range(num_convs):

layer.append(conv_block_mlp(

num_channels*i+input_channels,num_channels

))

self.net=nn.Sequential(*layer)

def forward(self,X):

for blk in self.net:

Y=blk(X)

X=torch.cat((X,Y),dim=1)

return X
``````

def transition_block_mlp(input_channels,num_channels):
return nn.Sequential(nn.BatchNorm1d(input_channels),nn.ReLU(),
nn.Linear(input_channels,num_channels),nn.ReLU()
)

b1=nn.Sequential(

``````nn.Linear(330,512),nn.ReLU(),

nn.Linear(512,1024),nn.ReLU()
``````

)
#b2
#num_channels为当前的通道数

num_channels,growth_rate=1024,128

num_convs_in_dense_blocks=[2,2,2,2]

blks=[]

for i,num_convs in enumerate(num_convs_in_dense_blocks):

``````blks.append(DenseBlock_mlp(num_convs,num_channels,growth_rate))

# 上一个稠密块的输出通道数

num_channels+=num_convs*growth_rate

# 在稠密块之间添加转换层，使通道数减半

if i != len(num_convs_in_dense_blocks)-1:

blks.append(transition_block_mlp(num_channels,num_channels//2))

num_channels=num_channels//2
``````

net_ = nn.Sequential(

``````b1, *blks,

nn.BatchNorm1d(num_channels), nn.ReLU(),

nn.Linear(608, 1))
``````

k, num_epochs, lr, weight_decay, batch_size = 5, 10, 0.05, 0, 64

5-折验证: 平均训练log rmse: 0.041683, 平均验证log rmse: 0.053562

model, optimizer, dataset, resize = densenet121, adam, fashion_mnist, 96
lr, num_epochs, batch_size = 0.001, 10, 64
epoch 9, loss 0.119, train acc 0.956, test acc 0.931