-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathu_net_resnet_50_encoder.py
136 lines (111 loc) · 4.7 KB
/
u_net_resnet_50_encoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import torch
import torch.nn as nn
import torchvision
resnet = torchvision.models.resnet.resnet50(pretrained=True)
class ConvBlock(nn.Module):
"""
Helper module that consists of a Conv -> BN -> ReLU
"""
def __init__(self, in_channels, out_channels, padding=1, kernel_size=3, stride=1, with_nonlinearity=True):
super().__init__()
self.conv = nn.Conv2d(in_channels, out_channels, padding=padding, kernel_size=kernel_size, stride=stride)
self.bn = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU()
self.with_nonlinearity = with_nonlinearity
def forward(self, x):
x = self.conv(x)
x = self.bn(x)
if self.with_nonlinearity:
x = self.relu(x)
return x
class Bridge(nn.Module):
"""
This is the middle layer of the UNet which just consists of some
"""
def __init__(self, in_channels, out_channels):
super().__init__()
self.bridge = nn.Sequential(
ConvBlock(in_channels, out_channels),
ConvBlock(out_channels, out_channels)
)
def forward(self, x):
return self.bridge(x)
class UpBlockForUNetWithResNet50(nn.Module):
"""
Up block that encapsulates one up-sampling step which consists of Upsample -> ConvBlock -> ConvBlock
"""
def __init__(self, in_channels, out_channels, up_conv_in_channels=None, up_conv_out_channels=None,
upsampling_method="conv_transpose"):
super().__init__()
if up_conv_in_channels == None:
up_conv_in_channels = in_channels
if up_conv_out_channels == None:
up_conv_out_channels = out_channels
if upsampling_method == "conv_transpose":
self.upsample = nn.ConvTranspose2d(up_conv_in_channels, up_conv_out_channels, kernel_size=2, stride=2)
elif upsampling_method == "bilinear":
self.upsample = nn.Sequential(
nn.Upsample(mode='bilinear', scale_factor=2),
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1)
)
self.conv_block_1 = ConvBlock(in_channels, out_channels)
self.conv_block_2 = ConvBlock(out_channels, out_channels)
def forward(self, up_x, down_x):
"""
:param up_x: this is the output from the previous up block
:param down_x: this is the output from the down block
:return: upsampled feature map
"""
x = self.upsample(up_x)
x = torch.cat([x, down_x], 1)
x = self.conv_block_1(x)
x = self.conv_block_2(x)
return x
class UNetWithResnet50Encoder(nn.Module):
DEPTH = 6
def __init__(self, n_classes=2):
super().__init__()
resnet = torchvision.models.resnet.resnet50(pretrained=True)
down_blocks = []
up_blocks = []
self.input_block = nn.Sequential(*list(resnet.children()))[:3]
self.input_pool = list(resnet.children())[3]
for bottleneck in list(resnet.children()):
if isinstance(bottleneck, nn.Sequential):
down_blocks.append(bottleneck)
self.down_blocks = nn.ModuleList(down_blocks)
self.bridge = Bridge(2048, 2048)
up_blocks.append(UpBlockForUNetWithResNet50(2048, 1024))
up_blocks.append(UpBlockForUNetWithResNet50(1024, 512))
up_blocks.append(UpBlockForUNetWithResNet50(512, 256))
up_blocks.append(UpBlockForUNetWithResNet50(in_channels=128 + 64, out_channels=128,
up_conv_in_channels=256, up_conv_out_channels=128))
up_blocks.append(UpBlockForUNetWithResNet50(in_channels=64 + 3, out_channels=64,
up_conv_in_channels=128, up_conv_out_channels=64))
self.up_blocks = nn.ModuleList(up_blocks)
self.out = nn.Conv2d(64, n_classes, kernel_size=1, stride=1)
def forward(self, x, with_output_feature_map=False):
pre_pools = dict()
pre_pools[f"layer_0"] = x
x = self.input_block(x)
pre_pools[f"layer_1"] = x
x = self.input_pool(x)
for i, block in enumerate(self.down_blocks, 2):
x = block(x)
if i == (UNetWithResnet50Encoder.DEPTH - 1):
continue
pre_pools[f"layer_{i}"] = x
x = self.bridge(x)
for i, block in enumerate(self.up_blocks, 1):
key = f"layer_{UNetWithResnet50Encoder.DEPTH - 1 - i}"
x = block(x, pre_pools[key])
output_feature_map = x
x = self.out(x)
del pre_pools
if with_output_feature_map:
return x, output_feature_map
else:
return x
model = UNetWithResnet50Encoder().cuda()
inp = torch.rand((2, 3, 512, 512)).cuda()
out = model(inp)