《Kaggle Histopathologic Cancer Detection癌症图像分类比赛》之PyTorch实现

it2023-01-22  70

项目地址:https://www.kaggle.com/c/histopathologic-cancer-detection/overview

本文记录了自己使用Pytorch以及Pytorch标准的Dataset的准备方式对同一问题进行了实现:

其他实现版本:

Kaggle Histopathology Cancel Detection之Pyorch实现

Kaggle Histopathologic Cancer Detection之Keras/Generator实现

Kaggle Histopathologic Cancer Detection之Tensorflow2.0实现

# -*- coding: utf-8 -*- import numpy as np import os,sys,csv,math import cv2 as cv import matplotlib.pyplot as plt import pandas as pd from sklearn.model_selection import train_test_split from sklearn.utils import shuffle import myimageutil as iu import copy import time """ ==================================================================================== <<1.初步了解掌握数据的情况>> ==================================================================================== 用pandas简单处理一下CSV并画出来看一下 这里我借用了kaggle的这篇kernel里的plot的代码,有兴趣的童鞋可以读一下, https://www.kaggle.com/qitvision/a-complete-ml-pipeline-fast-ai """ ROOT_PATH = 'D:/ai_data/histopathologic-cancer-detection' CSV_PATH = 'D:/ai_data/histopathologic-cancer-detection/train_labels.csv' TRAIN_PATH = 'D:/ai_data/histopathologic-cancer-detection/train' TEST_PATH = 'D:/ai_data/histopathologic-cancer-detection/test' print(">>>看一下根目录下有哪些东西:") print(os.listdir(ROOT_PATH)) df = pd.read_csv(CSV_PATH) #pandas里的数据集叫dataframe,和scala里的一样,我们简称df # 接下来我们来看一下数据的情况 print(">>>这个数据集的大小:") print(df.shape) print(">>>这个数据集的样本分布:") print(df['label'].value_counts()) print(">>>看一下数据:") print(df.head()) # 这边我想说明一下,之前我们的第一篇walkthrough里是直接从csv中获得文件列表的,这边最好检查一下列表里的文件和文件夹里的是不是一一对应 print(">>>list一下训练图片文件夹里的图片:") from glob import glob train_file_paths = glob(TRAIN_PATH + '/*.tif') test_file_paths = glob(TEST_PATH + '/*.tif') print("train_file_paths size:", len(train_file_paths)) print("test_file_paths size:", len(test_file_paths)) import re def check_valid(): assert len(train_file_paths) == len(df['id']),'图片数量不一致' ids_from_filepath = list(map(lambda filepath:''.join(re.findall(r'[a-z0-9]{40}',filepath)), train_file_paths)) dif = list(set(ids_from_filepath)^set(df['id'])) #求两个list的差集,如果差集为0,那说明两个list相等 if len(dif) == 0: print("文件名匹配正常") else: print("匹配异常,下列文件名有差异:") print(dif) exit() check_valid() # print(">>>数据没问题的话接下来看一下正负数据样例的图片:") # iu.plotSamples(df,TRAIN_PATH) #要注意本次的图片数据是使用中间32X32像素的内容为基准进行标注的,所以画图把中间一块标注出来了,但实际分类的时候不一定要把中间裁出来 # print(">>>进入正题,我们拆分一下数据,把训练数据分成训练和测试2部分,比例为9:1") train, val = train_test_split(train_file_paths, test_size=0.1, shuffle=True) # train = train[:640] # val = val[:64] """ ==================================================================================== <<2.图片处理和扩增>> ==================================================================================== 这边使用pytorch官方的pipline dataset来预处理图片 """ import torch import torch.nn as nn import torch.optim as optim from torch.optim import lr_scheduler from torch.utils.data import DataLoader import torchvision from torchvision import models import pretrainedmodels from torch.utils.data import Dataset, DataLoader from torchvision import transforms, utils import inspect from gpu_mem_track import MemTracker device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') print('trian_device:{}'.format(device.type)) id_label_map = {k:v for k,v in zip(df.id.values, df.label.values)} def get_id_from_file_path(file_path): return file_path.split(os.path.sep)[-1].replace('.tif', '') class MyDataset(Dataset): def __init__(self, file_list,id_label_map,transform=None): """ Args: csv_file (string): Path to the csv file with annotations. root_dir (string): Directory with all the images. transform (callable, optional): Optional transform to be applied on a sample. """ self.file_list = file_list self.transform = transform self.id_label_map = id_label_map def __len__(self): return len(self.file_list) def __getitem__(self,index): file_path = self.file_list[index] label = self.id_label_map[get_id_from_file_path(file_path)] image_bgr = cv.imread(file_path) image_rgb = image_bgr[:,:,[2,1,0]] # image_c_h_w = np.transpose(image_rgb,(2,0,1)) # label_one_hot = np.array([0,1],dtype=np.int64) if label == 1 else np.array([1,0],dtype=np.int64) # label = np.array([label],dtype=np.int64) # label = np.array([label_one_hot],dtype='float') sample = {'image': image_rgb, 'label': label} if self.transform: sample = self.transform(sample) return sample # def show_labels(image,label): # plt.imshow(image) # plt.pause(0.001) # myDataset = MyDataset(train,id_label_map) # fig = plt.figure() # for i in range(len(myDataset)): # sample = myDataset[i] # print(i, sample['image'].shape, sample['label'].shape) # ax = plt.subplot(1, 4, i + 1) # plt.tight_layout() # ax.set_title('Sample #{}'.format(i)) # ax.axis('off') # show_landmarks(**sample) # if i == 3: # plt.show() # break class Rescale(object): """Rescale the image in a sample to a given size. Args: output_size (tuple or int): Desired output size. If tuple, output is matched to output_size. If int, smaller of image edges is matched to output_size keeping aspect ratio the same. """ def __init__(self, output_size): assert isinstance(output_size, (int, tuple)) self.output_size = output_size def __call__(self, sample): image, label = sample['image'], sample['label'] h, w = image.shape[:2] if isinstance(self.output_size, int): if h > w: new_h, new_w = self.output_size * h / w, self.output_size else: new_h, new_w = self.output_size, self.output_size * w / h else: new_h, new_w = self.output_size new_h, new_w = int(new_h), int(new_w) img = cv.resize(image, (new_w,new_h)) # print(img) return {'image': img, 'label': label} class RandomCrop(object): """Crop randomly the image in a sample. Args: output_size (tuple or int): Desired output size. If int, square crop is made. """ def __init__(self, output_size): assert isinstance(output_size, (int, tuple)) if isinstance(output_size, int): self.output_size = (output_size, output_size) else: assert len(output_size) == 2 self.output_size = output_size def __call__(self, sample): image, label = sample['image'], sample['label'] h, w = image.shape[:2] new_h, new_w = self.output_size top = np.random.randint(0, h - new_h) left = np.random.randint(0, w - new_w) image = image[top: top + new_h, left: left + new_w] return {'image': image, 'label': label} class ToTensor(object): """Convert ndarrays in sample to Tensors.""" def __call__(self, sample): image, label = sample['image'], sample['label'] # swap color axis because # numpy image: H x W x C # torch image: C X H X W image = image.transpose((2, 0, 1)) return {'image': torch.from_numpy(image).float().div(255), 'label': label} class Normalize(object): """Normalize a tensor image with mean and standard deviation. Given mean: ``(M1,...,Mn)`` and std: ``(S1,..,Sn)`` for ``n`` channels, this transform will normalize each channel of the input ``torch.*Tensor`` i.e. ``input[channel] = (input[channel] - mean[channel]) / std[channel]`` .. note:: This transform acts out of place, i.e., it does not mutates the input tensor. Args: mean (sequence): Sequence of means for each channel. std (sequence): Sequence of standard deviations for each channel. """ def __init__(self, mean, std): self.mean = mean self.std = std def __call__(self, sample): """ Args: tensor (Tensor): Tensor image of size (C, H, W) to be normalized. Returns: Tensor: Normalized Tensor image. """ tensor , label = sample['image'], sample['label'] mean = torch.as_tensor(self.mean, dtype=torch.float32, device=tensor.device) std = torch.as_tensor(self.std, dtype=torch.float32, device=tensor.device) # tensor.sub_(mean[:, None, None]).div_(std[:, None, None]) tensor.mul(std[:, None, None]).sub_(mean[:, None, None]) return {'image': tensor,'label': label} # scale = Rescale(400) # # crop = RandomCrop(331) # composed = transforms.Compose([Rescale(400),RandomCrop(331)]) # fig = plt.figure() # sample = myDataset[65] # for i, tsfrm in enumerate([scale, composed]): # transformed_sample = tsfrm(sample) # ax = plt.subplot(1, 3, i + 1) # plt.tight_layout() # ax.set_title(type(tsfrm).__name__) # show_labels(**transformed_sample) # plt.show() # exit() def main(): myDataset_train = MyDataset(train,id_label_map, transform=transforms.Compose([ Rescale(331), ToTensor(), # Normalize(mean=[0.5, 0.5, 0.5],std=[0.5, 0.5, 0.5]) ])) myDataset_val = MyDataset(val,id_label_map, transform=transforms.Compose([ Rescale(331), ToTensor(), # Normalize(mean=[0.5, 0.5, 0.5],std=[0.5, 0.5, 0.5]) ])) dataloader_train = torch.utils.data.DataLoader(myDataset_train, batch_size= 4, shuffle=True, num_workers=0) dataloader_val = torch.utils.data.DataLoader(myDataset_val, batch_size=4, shuffle=True, num_workers=0) dataloaders = {'train':dataloader_train,'val':dataloader_val} # def imshow(inp, title=None): # """Imshow for Tensor.""" # inp = inp.numpy().transpose((1, 2, 0)) # # mean = np.array([0.5, 0.5, 0.5]) # # std = np.array([0.5, 0.5, 0.5]) # # inp = (inp + mean)/std # inp = np.clip(inp, 0, 1) # plt.imshow(inp) # if title is not None: # plt.title(title) # plt.pause(0.001) # pause a bit so that plots are updated # plt.show() # output = next(iter(dataloader_train)) # inputs = output['image'] # labels = output['label'] # # inputs,labels = [ x['image'] for x in output],[ x['label'] for x in output] # out = torchvision.utils.make_grid(inputs) # imshow(out,title=[x for x in labels]) # exit() frame = inspect.currentframe() gpu_tracker = MemTracker(frame) # #GPU VRAM Track # gpu_tracker.track() model_name = 'nasnetalarge' # could be fbresnet152 or inceptionresnetv2 model = pretrainedmodels.__dict__[model_name](num_classes=1000, pretrained='imagenet') model.eval() model.last_linear = nn.Linear(4032,2) # model = nn.Sequential(model,nn.Sigmoid()) model = model.to(device) # GPU VRAM Track # gpu_tracker.track() loss_func = nn.CrossEntropyLoss() # optimizer = optim.SGD(model.parameters(),lr=0.0001,momentum=0.9) optimizer = optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False) # exp_lr_scheduler = lr_scheduler.StepLR(optimizer=optimizer,step_size=10,gamma=0.5) # print(model) # exit() def train_model(model, loss_func, optimizer, scheduler, num_epochs=25): since = time.time() best_model_wts = copy.deepcopy(model.state_dict()) best_acc = 0.0 for epoch in range(num_epochs): print('Epoch {}/{}'.format(epoch, num_epochs - 1)) print('-' * 10) # Each epoch has a training and validation phase for phase in ['train', 'val']: if phase == 'train': if scheduler is not None: scheduler.step() model.train() # Set model to training mode else: model.eval() # Set model to evaluate mode running_loss = 0.0 running_corrects = 0 # Iterate over data. for x in dataloaders[phase]: inputs = x['image'].to(device) labels = x['label'].to(device) # #GPU VRAM Track # gpu_tracker.track() # zero the parameter gradients optimizer.zero_grad() # forward # track history if only in train with torch.set_grad_enabled(phase == 'train'): outputs = model(inputs) _, preds = torch.max(outputs, 1) # print(outputs) # print(labels) # print(labels.squeeze()) loss = loss_func(outputs, labels) # backward + optimize only if in training phase if phase == 'train': loss.backward() optimizer.step() # #GPU VRAM Track # gpu_tracker.track() # statistics running_loss += loss.item() * inputs.size(0) running_corrects += torch.sum(preds == labels.data) epoch_loss = running_loss / dataset_sizes[phase] epoch_acc = running_corrects.double() / dataset_sizes[phase] print('{} Loss: {:.4f} Acc: {:.4f}'.format( phase, epoch_loss, epoch_acc)) # deep copy the model if phase == 'val' and epoch_acc > best_acc: best_acc = epoch_acc best_model_wts = copy.deepcopy(model.state_dict()) print() time_elapsed = time.time() - since print('Training complete in {:.0f}m {:.0f}s'.format( time_elapsed // 60, time_elapsed % 60)) print('Best val Acc: {:4f}'.format(best_acc)) # load best model weights model.load_state_dict(best_model_wts) return model model_conv = train_model(model,loss_func,optimizer,None) # print('training finish !') # torch.save(model_conv.state_dict(), './model/model_4.pth') if __name__=='__main__': main()
最新回复(0)