医学图像分割 unet实现(一)
这次实验用来学习unet网络实现图像分割(keras, backend: tensorflow)。
数据集DRIVE:为眼部图像,目的是分割出眼部血管。
数据集结构:
上面分别是训练的原始图片images、first_manual、mask
整体流程:
- 1、前期准备:将所有图片写入h5文件,(为了后期训练网络时减少io时间)
- 2、训练网络
- 2.1、获取训练图片和groundtruth图片
- 2.1.1、 读取hdf5文件,获取img array
- 2.1.2、 预处理所有的img
- 2.2.3、 提取patches
- 2.2、构建model,并训练
- 2.3、模型保存(包括architecture 和 weights)
- 2.1、获取训练图片和groundtruth图片
- 3、测试
- 3.1、获取测试图片
- 3.2、model加载
- 3.3、model predict
- 3.4、测试结果重建,可视化显示分割结果(patches->full image)
- 3.5、用不同的metrics测评model结果
文档结构
注意,lib文件夹里的加上__init__.py
很重要,加上后,这个文件夹才能被python识别为一个package,从而,可以在run_training.py
中写from lib.help_function import *
第一步
python prepare_dataset_DRIVE.py
将图片读取为ndarray格式,并保存至hdf5文件
# -*- coding: utf-8 -*-
#==========================================================
#
# This prepare the hdf5 datasets of the DRIVE database
#
#============================================================
import os
import h5py
import numpy as np
from PIL import Image
"""
PIL 读取的图片为RGB通道的Image格式,
cv2 读取的图片为BGR通道的ndarray格式
skimage 读取的图片为RGB通道的ndarray格式
"""
def write_hdf5(arr,outfile):
with h5py.File(outfile,"w") as f:
f.create_dataset("image", data=arr, dtype=arr.dtype)
#------------Path of the images --------------------------------------------------------------
#train
original_imgs_train = "./DRIVE/training/images/"
groundTruth_imgs_train = "./DRIVE/training/1st_manual/"
borderMasks_imgs_train = "./DRIVE/training/mask/"
#test
original_imgs_test = "./DRIVE/test/images/"
groundTruth_imgs_test = "./DRIVE/test/1st_manual/"
borderMasks_imgs_test = "./DRIVE/test/mask/"
#---------------------------------------------------------------------------------------------
# 图像原始数量、通道数、高、宽
Nimgs = 20
channels = 3
height = 584
width = 565
dataset_path = "./DRIVE_datasets_training_testing/"
def get_datasets(imgs_dir,groundTruth_dir,borderMasks_dir,train_test="null"):
imgs = np.empty((Nimgs,height,width,channels))
groundTruth = np.empty((Nimgs,height,width))
border_masks = np.empty((Nimgs,height,width))
for path, subdirs, files in os.walk(imgs_dir): #list all files, directories in the path
for i in range(len(files)):
#original
print("original image: ",files[i])
img = Image.open(imgs_dir+files[i])
imgs[i] = np.asarray(img)
#corresponding ground truth
groundTruth_name = files[i][0:2] + "_manual1.gif"
print("ground truth name: ", groundTruth_name)
g_truth = Image.open(groundTruth_dir + groundTruth_name)
groundTruth[i] = np.asarray(g_truth)
#corresponding border masks
border_masks_name = ""
if train_test=="train":
border_masks_name = files[i][0:2] + "_training_mask.gif"
elif train_test=="test":
border_masks_name = files[i][0:2] + "_test_mask.gif"
else:
print("specify if train or test!!")
exit()
print("border masks name: ", border_masks_name)
b_mask = Image.open(borderMasks_dir + border_masks_name)
border_masks[i] = np.asarray(b_mask)
print("imgs max: ", str(np.max(imgs)))
print("imgs min: ", str(np.min(imgs)))
assert(np.max(groundTruth)==255 and np.max(border_masks)==255)
assert(np.min(groundTruth)==0 and np.min(border_masks)==0)
print("ground truth and border masks are correctly withih pixel value range 0-255 (black-white)")
assert(imgs.shape == (Nimgs,height,width,channels))
groundTruth = np.reshape(groundTruth,(Nimgs,height,width,1))
border_masks = np.reshape(border_masks,(Nimgs,height,width,1))
assert(groundTruth.shape == (Nimgs,height,width,1))
assert(border_masks.shape == (Nimgs,height,width,1))
return imgs, groundTruth, border_masks
if not os.path.exists(dataset_path):
os.makedirs(dataset_path)
#getting the training datasets
imgs_train, groundTruth_train, border_masks_train = get_datasets(original_imgs_train,groundTruth_imgs_train,borderMasks_imgs_train,"train")
print("saving train datasets")
write_hdf5(imgs_train, dataset_path + "DRIVE_dataset_imgs_train.hdf5")
write_hdf5(groundTruth_train, dataset_path + "DRIVE_dataset_groundTruth_train.hdf5")
write_hdf5(border_masks_train,dataset_path + "DRIVE_dataset_borderMasks_train.hdf5")
#getting the testing datasets
imgs_test, groundTruth_test, border_masks_test = get_datasets(original_imgs_test,groundTruth_imgs_test,borderMasks_imgs_test,"test")
print("saving test datasets")
write_hdf5(imgs_test,dataset_path + "DRIVE_dataset_imgs_test.hdf5")
write_hdf5(groundTruth_test, dataset_path + "DRIVE_dataset_groundTruth_test.hdf5")
write_hdf5(border_masks_test,dataset_path + "DRIVE_dataset_borderMasks_test.hdf5")
配置文件configuration.txt
[data paths]
path_local = ./DRIVE_datasets_training_testing/
train_imgs_original = DRIVE_dataset_imgs_train.hdf5
train_groundTruth = DRIVE_dataset_groundTruth_train.hdf5
train_border_masks = DRIVE_dataset_borderMasks_train.hdf5
test_imgs_original = DRIVE_dataset_imgs_test.hdf5
test_groundTruth = DRIVE_dataset_groundTruth_test.hdf5
test_border_masks = DRIVE_dataset_borderMasks_test.hdf5
[experiment name]
name = test
[data attributes]
#Dimensions of the patches extracted from the full images
patch_height = 48
patch_width = 48
[training settings]
#number of total patches:
N_subimgs = 190000
#if patches are extracted only inside the field of view:
inside_FOV = False
#Number of training epochs
N_epochs = 150
batch_size = 32
#if running with nohup
nohup = True
[testing settings]
#Choose the model to test: best==epoch with min loss, last==last epoch
best_last = best
#number of full images for the test (max 20)
full_images_to_test = 20
#How many original-groundTruth-prediction images are visualized in each image
N_group_visual = 1
#Compute average in the prediction, improve results but require more patches to be predicted
average_mode = True
#Only if average_mode==True. Stride for patch extraction, lower value require more patches to be predicted
stride_height = 5
stride_width = 5
#if running with nohup
nohup = False
第二步
python run_training.py
# -*- coding: utf-8 -*-
###################################################
#
# run_training.py Script to launch the training
#
##################################################
import os, sys
# 注意,py3是configparser,py2是ConfigParser
import configparser
#config file to read from
config = configparser.RawConfigParser()
config.readfp(open(r'./configuration.txt'))
#===========================================
#name of the experiment
name_experiment = config.get('experiment name', 'name')
nohup = config.getboolean('training settings', 'nohup') #std output on log file?
# 在自己机器上测试时,注释掉
run_GPU = '' if sys.platform == 'win32' else ' THEANO_FLAGS=device=gpu,floatX=float32 '
#create a folder for the results
result_dir = name_experiment
print("\n1. Create directory for the results (if not already existing)")
if os.path.exists(result_dir):
print("Dir already existing")
elif sys.platform=='win32':
os.system('mkdir ' + result_dir)
else:
os.system('mkdir -p ' +result_dir)
print("copy the configuration file in the results folder")
if sys.platform=='win32':
os.system('copy configuration.txt .\\' +name_experiment+'\\'+name_experiment+'_configuration.txt')
else:
os.system('cp configuration.txt ./' +name_experiment+'/'+name_experiment+'_configuration.txt')
# 注意,在自己机器上测试时,把与run_GPU有关的命令去掉
# run the experiment
if nohup:
print("\n2. Run the training on GPU with nohup")
#os.system(run_GPU +' nohup python -u ./src/retinaNN_training.py > ' +'./'+name_experiment+'/'+name_experiment+'_training.nohup')
os.system(run_GPU + 'nohup python -u ./src/retinaNN_training.py > ' +'./'+name_experiment+'/'+name_experiment+'_training.nohup')
else:
print("\n2. Run the training on GPU (no nohup)")
os.system(run_GPU + 'python ./src/retinaNN_training.py')
# os.system('python ./src/retinaNN_training.py')
# -*- coding: utf-8 -*-
# retinaNN_training.py
import numpy as np
import configparser
from keras.models import Model
from keras.layers import Input, concatenate, Conv2D, MaxPooling2D, UpSampling2D, Reshape, core, Dropout
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint, LearningRateScheduler
from keras import backend as K
from keras.utils.vis_utils import plot_model as plot
from keras.optimizers import SGD
import sys
sys.path.insert(0, './lib/')
from help_functions import *
#function to obtain data for training/testing (validation)
from extract_patches import get_data_training
#Define the neural network
def get_unet(n_ch,patch_height,patch_width):
inputs = Input(shape=(patch_height,patch_width,n_ch))
conv1 = Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
conv1 = Dropout(0.2)(conv1)
conv1 = Conv2D(32, (3, 3), activation='relu', padding='same')(conv1)
pool1 = MaxPooling2D((2, 2))(conv1)
#
conv2 = Conv2D(64, (3, 3), activation='relu', padding='same')(pool1)
conv2 = Dropout(0.2)(conv2)
conv2 = Conv2D(64, (3, 3), activation='relu', padding='same')(conv2)
pool2 = MaxPooling2D((2, 2))(conv2)
#
conv3 = Conv2D(128, (3, 3), activation='relu', padding='same')(pool2)
conv3 = Dropout(0.2)(conv3)
conv3 = Conv2D(128, (3, 3), activation='relu', padding='same')(conv3)
up1 = UpSampling2D(size=(2, 2))(conv3)
up1 = concatenate([conv2,up1],axis=3)
conv4 = Conv2D(64, (3, 3), activation='relu', padding='same')(up1)
conv4 = Dropout(0.2)(conv4)
conv4 = Conv2D(64, (3, 3), activation='relu', padding='same')(conv4)
#
up2 = UpSampling2D(size=(2, 2))(conv4)
up2 = concatenate([conv1,up2], axis=3)
conv5 = Conv2D(32, (3, 3), activation='relu', padding='same')(up2)
conv5 = Dropout(0.2)(conv5)
conv5 = Conv2D(32, (3, 3), activation='relu', padding='same')(conv5)
# 注意,写成这种结构,并且用的loss为categorical_crossentropy,
# 需要对groundtruth数据进行处理,见后面help_function.py里的mask_Unet
conv6 = Conv2D(2, (1, 1), activation='relu',padding='same')(conv5)
conv6 = core.Reshape((2,patch_height*patch_width))(conv6)
conv6 = core.Permute((2,1))(conv6)
############
conv7 = core.Activation('softmax')(conv6)
model = Model(inputs=inputs, outputs=conv7)
# sgd = SGD(lr=0.01, decay=1e-6, momentum=0.3, nesterov=False)
model.compile(optimizer='sgd', loss='categorical_crossentropy',metrics=['accuracy'])
return model
print("read configure")
#========= Load settings from Config file
config = configparser.RawConfigParser()
config.read('configuration.txt')
#patch to the datasets
path_data = config.get('data paths', 'path_local')
#Experiment name
name_experiment = config.get('experiment name', 'name')
#training settings
N_epochs = int(config.get('training settings', 'N_epochs'))
batch_size = int(config.get('training settings', 'batch_size'))
print("load data")
#============ Load the data and divided in patches
patches_imgs_train, patches_masks_train = get_data_training(
DRIVE_train_imgs_original = path_data + config.get('data paths', 'train_imgs_original'),
DRIVE_train_groudTruth = path_data + config.get('data paths', 'train_groundTruth'), #masks
patch_height = int(config.get('data attributes', 'patch_height')),
patch_width = int(config.get('data attributes', 'patch_width')),
N_subimgs = int(config.get('training settings', 'N_subimgs')),
inside_FOV = config.getboolean('training settings', 'inside_FOV') #select the patches only inside the FOV (default == True)
)
# 这些可以不写
print("create sampel")
#========= Save a sample of what you're feeding to the neural network ==========
N_sample = min(patches_imgs_train.shape[0],40)
visualize(group_images(patches_imgs_train[0:N_sample,:,:,:],5),'./'+name_experiment+'/'+"sample_input_imgs")#.show()
visualize(group_images(patches_masks_train[0:N_sample,:,:,:],5),'./'+name_experiment+'/'+"sample_input_masks")#.show()
print("construct model")
#=========== Construct and save the model arcitecture =====
n_ch = patches_imgs_train.shape[3]
patch_height = patches_imgs_train.shape[1]
patch_width = patches_imgs_train.shape[2]
model = get_unet(n_ch, patch_height, patch_width) #the U-net model
print("Check: final output of the network:")
print(model.output_shape)
plot(model, to_file='./'+name_experiment+'/'+name_experiment + '_model.png') #check how the model looks like
json_string = model.to_json()
open('./'+name_experiment+'/'+name_experiment +'_architecture.json', 'w').write(json_string)
#============ Training ==================================
checkpointer = ModelCheckpoint(filepath='./'+name_experiment+'/'+name_experiment +'_best_weights.h5', verbose=1, monitor='val_loss', mode='auto', save_best_only=True) #save at each epoch if the validation decreased
# def step_decay(epoch):
# lrate = 0.01 #the initial learning rate (by default in keras)
# if epoch==100:
# return 0.005
# else:
# return lrate
#
# lrate_drop = LearningRateScheduler(step_decay)
patches_masks_train = masks_Unet(patches_masks_train) #reduce memory consumption
model.fit(patches_imgs_train, patches_masks_train, nb_epoch=N_epochs, batch_size=batch_size, verbose=1, shuffle=True, validation_split=0.1, callbacks=[checkpointer])
#========== Save and test the last model ===================
model.save_weights('./'+name_experiment+'/'+name_experiment +'_last_weights.h5', overwrite=True)
上面出现过的一些函数:
# extract_patches.py
import numpy as np
import random
import configparser
from help_functions import load_hdf5
from help_functions import visualize
from help_functions import group_images
from pre_processing import my_PreProc
#To select the same images
# random.seed(10)
#Load the original data and return the extracted patches for training/testing
def get_data_training(DRIVE_train_imgs_original,
DRIVE_train_groudTruth,
patch_height,
patch_width,
N_subimgs,
inside_FOV):
train_imgs_original = load_hdf5(DRIVE_train_imgs_original)
train_masks = load_hdf5(DRIVE_train_groudTruth) #masks always the same
# visualize(group_images(train_imgs_original[0:20,:,:,:],5),'imgs_train')#.show() #check original imgs train
train_imgs = my_PreProc(train_imgs_original)
train_masks = train_masks/255.
train_imgs = train_imgs[:,9:574,:,:] #cut bottom and top so now it is 565*565
train_masks = train_masks[:,9:574,:,:] #cut bottom and top so now it is 565*565
data_consistency_check(train_imgs,train_masks)
#check masks are within 0-1
assert(np.min(train_masks)==0 and np.max(train_masks)==1)
print("\ntrain images/masks shape:")
print(train_imgs.shape)
print("train images range (min-max): ", str(np.min(train_imgs)), ' - ', str(np.max(train_imgs)))
print("train masks are within 0-1\n")
#extract the TRAINING patches from the full images
patches_imgs_train, patches_masks_train = extract_random(train_imgs,train_masks,patch_height,patch_width,N_subimgs,inside_FOV)
data_consistency_check(patches_imgs_train, patches_masks_train)
print("\ntrain PATCHES images/masks shape:")
print(patches_imgs_train.shape)
print("train PATCHES images range (min-max): ", str(np.min(patches_imgs_train)), ' - ', str(np.max(patches_imgs_train)))
return patches_imgs_train, patches_masks_train#, patches_imgs_test, patches_masks_test
#data consinstency check
def data_consistency_check(imgs,masks):
assert(len(imgs.shape)==len(masks.shape))
assert(imgs.shape[0]==masks.shape[0])
assert(imgs.shape[1]==masks.shape[1])
assert(imgs.shape[2]==masks.shape[2])
assert(masks.shape[3]==1)
assert(imgs.shape[3]==1 or imgs.shape[3]==3)
#extract patches randomly in the full training images
# -- Inside OR in full image
def extract_random(full_imgs,full_masks, patch_h,patch_w, N_patches, inside=True):
if (N_patches%full_imgs.shape[0] != 0):
print("N_patches: plase enter a multiple of 20")
exit()
assert (len(full_imgs.shape)==4 and len(full_masks.shape)==4) #4D arrays
assert (full_imgs.shape[3]==1 or full_imgs.shape[3]==3) #check the channel is 1 or 3
assert (full_masks.shape[3]==1) #masks only black and white
assert (full_imgs.shape[2] == full_masks.shape[2] and full_imgs.shape[1] == full_masks.shape[1])
patches = np.empty((N_patches,patch_h,patch_w,full_imgs.shape[3]))
patches_masks = np.empty((N_patches,patch_h,patch_w,full_masks.shape[3]))
img_h = full_imgs.shape[1] #height of the full image
img_w = full_imgs.shape[2] #width of the full image
# (0,0) in the center of the image
patch_per_img = int(N_patches/full_imgs.shape[0]) #N_patches equally divided in the full images
print("patches per full image: ", str(patch_per_img))
iter_tot = 0 #iter over the total numbe rof patches (N_patches)
for i in range(full_imgs.shape[0]): #loop over the full images
k=0
while k <patch_per_img:
x_center = random.randint(0+int(patch_w/2),img_w-int(patch_w/2))
# print "x_center " +str(x_center)
y_center = random.randint(0+int(patch_h/2),img_h-int(patch_h/2))
# print "y_center " +str(y_center)
#check whether the patch is fully contained in the FOV
if inside==True:
if is_patch_inside_FOV(x_center,y_center,img_w,img_h,patch_h)==False:
continue
patch = full_imgs[i,y_center-int(patch_h/2):y_center+int(patch_h/2),x_center-int(patch_w/2):x_center+int(patch_w/2),:]
patch_mask = full_masks[i,y_center-int(patch_h/2):y_center+int(patch_h/2),x_center-int(patch_w/2):x_center+int(patch_w/2),:]
patches[iter_tot]=patch
patches_masks[iter_tot]=patch_mask
iter_tot +=1 #total
k+=1 #per full_img
return patches, patches_masks
#check if the patch is fully contained in the FOV
def is_patch_inside_FOV(x,y,img_w,img_h,patch_h):
x_ = x - int(img_w/2) # origin (0,0) shifted to image center
y_ = y - int(img_h/2) # origin (0,0) shifted to image center
R_inside = 270 - int(patch_h * np.sqrt(2.0) / 2.0) #radius is 270 (from DRIVE db docs), minus the patch diagonal (assumed it is a square #this is the limit to contain the full patch in the FOV
radius = np.sqrt((x_*x_)+(y_*y_))
if radius < R_inside:
return True
else:
return False
help_function.py
# -*- coding: utf-8 -*-
import h5py
import numpy as np
from PIL import Image
from matplotlib import pyplot as plt
def load_hdf5(infile):
with h5py.File(infile,"r") as f: #"with" close the file after its nested commands
return f["image"][()]
def write_hdf5(arr,outfile):
with h5py.File(outfile,"w") as f:
f.create_dataset("image", data=arr, dtype=arr.dtype)
#convert RGB image in black and white
def rgb2gray(rgb):
assert (len(rgb.shape)==4) #4D arrays
assert (rgb.shape[3]==3)
bn_imgs = rgb[:,:,:,0]*0.299 + rgb[:,:,:,1]*0.587 + rgb[:,:,:,2]*0.114
bn_imgs = np.reshape(bn_imgs,(rgb.shape[0],rgb.shape[1],rgb.shape[2],1))
return bn_imgs
#group a set of images row per columns
def group_images(data,per_row):
assert data.shape[0]%per_row==0
assert (data.shape[3]==1 or data.shape[3]==3)
all_stripe = []
for i in range(int(data.shape[0]/per_row)):
stripe = data[i*per_row]
for k in range(i*per_row+1, i*per_row+per_row):
stripe = np.concatenate((stripe,data[k]),axis=1)
all_stripe.append(stripe)
totimg = all_stripe[0]
for i in range(1,len(all_stripe)):
totimg = np.concatenate((totimg,all_stripe[i]),axis=0)
return totimg
#visualize image (as PIL image, NOT as matplotlib!)
def visualize(data,filename):
assert (len(data.shape)==3) #height*width*channels
img = None
if data.shape[2]==1: #in case it is black and white
data = np.reshape(data,(data.shape[0],data.shape[1]))
if np.max(data)>1:
img = Image.fromarray(data.astype(np.uint8)) #the image is already 0-255
else:
img = Image.fromarray((data*255).astype(np.uint8)) #the image is between 0-1
img.save(filename + '.png')
return img
#prepare the mask in the right shape for the Unet
def masks_Unet(masks):
assert (len(masks.shape)==4) #4D arrays
assert (masks.shape[3]==1 ) #check the channel is 1
im_h = masks.shape[1]
im_w = masks.shape[2]
masks = np.reshape(masks,(masks.shape[0],im_h*im_w))
new_masks = np.empty((masks.shape[0],im_h*im_w,2))
for i in range(masks.shape[0]):
for j in range(im_h*im_w):
if masks[i,j] == 0:
new_masks[i,j,0]=1
new_masks[i,j,1]=0
else:
new_masks[i,j,0]=0
new_masks[i,j,1]=1
return new_masks
def pred_to_imgs(pred, patch_height, patch_width, mode="original"):
assert (len(pred.shape)==3) #3D array: (Npatches,height*width,2)
assert (pred.shape[2]==2 ) #check the classes are 2
pred_images = np.empty((pred.shape[0],pred.shape[1])) #(Npatches,height*width)
if mode=="original":
for i in range(pred.shape[0]):
for pix in range(pred.shape[1]):
pred_images[i,pix]=pred[i,pix,1]
elif mode=="threshold":
for i in range(pred.shape[0]):
for pix in range(pred.shape[1]):
if pred[i,pix,1]>=0.5:
pred_images[i,pix]=1
else:
pred_images[i,pix]=0
else:
print("mode ", str(mode), " not recognized, it can be 'original' or 'threshold'")
exit()
pred_images = np.reshape(pred_images,(pred_images.shape[0], patch_height, patch_width,1))
return pred_images
pre_processing.py
# -*- coding: utf-8 -*-
###################################################
#
# pre_processing.py Script to pre-process the original imgs
#
##################################################
import numpy as np
from PIL import Image
import cv2
from help_functions import *
#My pre processing (use for both training and testing!)
def my_PreProc(data):
assert(len(data.shape)==4)
assert (data.shape[3]==3) #Use the original images
#black-white conversion
train_imgs = rgb2gray(data)
#my preprocessing:
train_imgs = dataset_normalized(train_imgs)
train_imgs = clahe_equalized(train_imgs)
train_imgs = adjust_gamma(train_imgs, 1.2)
train_imgs = train_imgs/255. #reduce to 0-1 range
return train_imgs
#============================================================
#========= PRE PROCESSING FUNCTIONS ========================#
#============================================================
#==== histogram equalization
def histo_equalized(imgs):
assert (len(imgs.shape)==4) #4D arrays
assert (imgs.shape[3]==1) #check the channel is 1
imgs_equalized = np.empty(imgs.shape)
for i in range(imgs.shape[0]):
imgs_equalized[i,:,:,0] = cv2.equalizeHist(np.array(imgs[i,:,:,0], dtype = np.uint8))
return imgs_equalized
# CLAHE (Contrast Limited Adaptive Histogram Equalization)
#adaptive histogram equalization is used. In this, image is divided into small blocks called "tiles" (tileSize is 8x8 by default in OpenCV). Then each of these blocks are histogram equalized as usual. So in a small area, histogram would confine to a small region (unless there is noise). If noise is there, it will be amplified. To avoid this, contrast limiting is applied. If any histogram bin is above the specified contrast limit (by default 40 in OpenCV), those pixels are clipped and distributed uniformly to other bins before applying histogram equalization. After equalization, to remove artifacts in tile borders, bilinear interpolation is applied
def clahe_equalized(imgs):
assert (len(imgs.shape)==4) #4D arrays
assert (imgs.shape[3]==1) #check the channel is 1
#create a CLAHE object (Arguments are optional).
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
imgs_equalized = np.empty(imgs.shape)
for i in range(imgs.shape[0]):
imgs_equalized[i,:,:,0] = clahe.apply(np.array(imgs[i,:,:,0], dtype = np.uint8))
return imgs_equalized
# ===== normalize over the dataset
def dataset_normalized(imgs):
assert (len(imgs.shape)==4) #4D arrays
assert (imgs.shape[3]==1) #check the channel is 1
imgs_normalized = np.empty(imgs.shape)
imgs_std = np.std(imgs)
imgs_mean = np.mean(imgs)
imgs_normalized = (imgs-imgs_mean)/imgs_std
for i in range(imgs.shape[0]):
imgs_normalized[i] = ((imgs_normalized[i] - np.min(imgs_normalized[i])) / (np.max(imgs_normalized[i])-np.min(imgs_normalized[i])))*255
return imgs_normalized
def adjust_gamma(imgs, gamma=1.0):
assert (len(imgs.shape)==4) #4D arrays
assert (imgs.shape[3]==1) #check the channel is 1
# build a lookup table mapping the pixel values [0, 255] to
# their adjusted gamma values
invGamma = 1.0 / gamma
table = np.array([((i / 255.0) ** invGamma) * 255 for i in np.arange(0, 256)]).astype("uint8")
# apply gamma correction using the lookup table
new_imgs = np.empty(imgs.shape)
for i in range(imgs.shape[0]):
new_imgs[i,:,:,0] = cv2.LUT(np.array(imgs[i,:,:,0], dtype = np.uint8), table)
return new_imgs
def pred_to_imgs(pred, patch_height, patch_width, mode="original"):
assert (len(pred.shape)==3) #3D array: (Npatches,height*width,2)
assert (pred.shape[2]==2 ) #check the classes are 2
pred_images = np.empty((pred.shape[0],pred.shape[1])) #(Npatches,height*width)
if mode=="original":
for i in range(pred.shape[0]):
for pix in range(pred.shape[1]):
pred_images[i,pix]=pred[i,pix,1]
elif mode=="threshold":
for i in range(pred.shape[0]):
for pix in range(pred.shape[1]):
if pred[i,pix,1]>=0.5:
pred_images[i,pix]=1
else:
pred_images[i,pix]=0
else:
print("mode ", str(mode), " not recognized, it can be 'original' or 'threshold'")
exit()
pred_images = np.reshape(pred_images,(pred_images.shape[0], patch_height, patch_width, 1))
return pred_images