ResNet50的貓狗分類訓練及預測

2023-04-12 18:01:37

相比於之前寫的ResNet18,下面的ResNet50寫得更加工程化一點,這還適用與其他分類,就是換一個分類訓練只需要修改圖片資料的路徑即可。

我的程式碼檔案結構

  

 

1. 資料處理

  首先已經對資料做好了分類

  

 

 

   資料夾結構是這樣

  開始劃分資料集

  split_data.py

import os
import random
import shutil


def move_file(target_path, save_train_path, save_val_pathm, scale=0.1):

    file_list = os.listdir(target_path)
    random.shuffle(file_list)

    number = int(len(file_list) * scale)
    train_list = file_list[number:]
    val_list = file_list[:number]

    for file in train_list:
        target_file_path = os.path.join(target_path, file)
        save_file_path = os.path.join(save_train_path, file)
        shutil.copyfile(target_file_path, save_file_path)
    for file in val_list:
        target_file_path = os.path.join(target_path, file)
        save_file_path = os.path.join(save_val_pathm, file)
        shutil.copyfile(target_file_path, save_file_path)


def split_classify_data(base_path, save_path, scale=0.1):
    folder_list = os.listdir(base_path)
    for folder in folder_list:
        target_path = os.path.join(base_path, folder)
        save_train_path = os.path.join(save_path, 'train', folder)
        save_val_path = os.path.join(save_path, 'val', folder)
        if not os.path.exists(save_train_path):
            os.makedirs(save_train_path)
        if not os.path.exists(save_val_path):
            os.makedirs(save_val_path)
        move_file(target_path, save_train_path, save_val_path, scale)
        print(folder, 'finish!')


if __name__ == '__main__':
    base_path = r'C:\Users\Administrator.DESKTOP-161KJQD\Desktop\save_dir'
    save_path = r'C:\Users\Administrator.DESKTOP-161KJQD\Desktop\dog_cat'
    # 驗證集比例
    scale = 0.1
    split_classify_data(base_path, save_path, scale)

  執行完以上程式碼的到的資料夾結構

    

 

 

   一個訓練集資料,一個驗證集資料

  

2.資料集的匯入

  我這個檔案寫了一個資料集的匯入和一個學習率更新的函數。資料匯入是通用的

  tools.py

import os
import time

import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torch.autograd.variable import Variable
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import ExponentialLR, LambdaLR
from torchvision.models import ResNet50_Weights
from tqdm import tqdm
from classify_cfg import *

mean = MEAN
std = STD


def get_dataset(base_dir='', input_size=160):
    dateset = dict()
    transform_train = transforms.Compose([
        # 解析度重置為input_size
        transforms.Resize(input_size),
        transforms.RandomRotation(15),
        # 對載入的影象作歸一化處理, 並裁剪為[input_sizexinput_sizex3]大小的影象(因為這圖片畫素不一致直接統一)
        transforms.CenterCrop(input_size),
        transforms.ToTensor(),
        transforms.Normalize(mean=mean, std=std)
    ])

    transform_val = transforms.Compose([
        transforms.Resize(input_size),
        transforms.RandomRotation(15),
        transforms.CenterCrop(input_size),
        transforms.ToTensor(),
        transforms.Normalize(mean=mean, std=std)
    ])
    base_dir_train = os.path.join(base_dir, 'train')
    train_dataset = datasets.ImageFolder(root=base_dir_train, transform=transform_train)
    # print("train_dataset=" + repr(train_dataset[1][0].size()))
    # print("train_dataset.class_to_idx=" + repr(train_dataset.class_to_idx))
    # print(train_dataset.classes)
    classes = train_dataset.classes
    # classes = train_dataset.class_to_idx
    classes_num = len(train_dataset.classes)

    base_dir_val = os.path.join(base_dir, 'val')
    val_dataset = datasets.ImageFolder(root=base_dir_val, transform=transform_val)

    dateset['train'] = train_dataset
    dateset['val'] = val_dataset

    return dateset, classes, classes_num


def update_lr(epoch, epochs):
    """
    假設開始的學習率lr是0.001,訓練次數epochs是100
    當epoch<33時是lr * 1
    當33<=epoch<=66 時是lr * 0.5
    當66<epoch時是lr * 0.1
    """
    if epoch == 0 or epochs // 3 > epoch:
        return 1
    elif (epochs // 3 * 2 >= epoch) and (epochs // 3 <= epoch):
        return 0.5
    else:
        return 0.1

 

3.訓練模型

  資料集匯入好了以後,選擇模型,選擇優化器等等,然後開始訓練。

  mytrain.py

import os
import time

import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torch.autograd.variable import Variable
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import ExponentialLR, LambdaLR
from torchvision.models import ResNet50_Weights
# from tqdm import tqdm
from classify_cfg import *
from tools import get_dataset, update_lr


def train(model, dateset, epochs, batch_size, device, optimizer, scheduler, criterion, save_path):
    train_loader = DataLoader(dateset.get('train'), batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(dateset.get('val'), batch_size=batch_size, shuffle=True)

    # 儲存為tensorboard檔案
    write = SummaryWriter(save_path)
    # 訓練過程寫入txt
    f = open(os.path.join(save_path, 'log.txt'), 'w', encoding='utf-8')

    best_acc = 0
    for epoch in range(epochs):
        train_correct = 0.0
        model.train()
        sum_loss = 0.0
        accuracy = -1
        total_num = len(train_loader.dataset)
        # print(total_num, len(train_loader))
        # loop = tqdm(enumerate(train_loader), total=len(train_loader))
        batch_count = 0
        for batch_idx, (data, target) in enumerate(train_loader):
            start_time = time.time()
            data, target = Variable(data).to(device), Variable(target).to(device)
            output = model(data)
            loss = criterion(output, target)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            print_loss = loss.data.item()
            sum_loss += print_loss
            train_predict = torch.max(output.data, 1)[1]
            if torch.cuda.is_available():
                train_correct += (train_predict.cuda() == target.cuda()).sum()
            else:
                train_correct += (train_predict == target).sum()
            accuracy = (train_correct / total_num) * 100
            # loop.set_description(f'Epoch [{epoch+1}/{epochs}]')
            # loop.set_postfix(loss=loss.item(), acc='{:.3f}'.format(accuracy))
            batch_count += len(data)
            end_time = time.time()
            s = f'Epoch:[{epoch+1}/{epochs}] Batch:[{batch_count}/{total_num}] train_acc: {"{:.2f}".format(accuracy)} ' \
                f'train_loss: {"{:.3f}".format(loss.item())} time: {int((end_time-start_time)*1000)} ms'
            # print(f'Epoch:[{epoch+1}/{epochs}]', f'Batch:[{batch_count}/{total_num}]',
            #       'train_acc:', '{:.2f}'.format(accuracy), 'train_loss:', '{:.3f}'.format(loss.item()),
            #       'time:', f'{int((end_time-start_time)*1000)} ms')
            print(s)
            f.write(s+'\n')

        write.add_scalar('train_acc', accuracy, epoch)
        write.add_scalar('train_loss', loss.item(), epoch)
        # print(optimizer.param_groups[0]['lr'])
        scheduler.step()
        if best_acc < accuracy:
            best_acc = accuracy
            torch.save(model, os.path.join(save_path, 'best.pt'))

        if epoch+1 == epochs:
            torch.save(model, os.path.join(save_path, 'last.pt'))

        # 預測驗證集
        # if (epoch+1) % 5 == 0 or epoch+1 == epochs:
        model.eval()
        test_loss = 0.0
        correct = 0.0
        total_num = len(val_loader.dataset)
        # print(total_num, len(val_loader))
        with torch.no_grad():
            for data, target in val_loader:
                data, target = Variable(data).to(device), Variable(target).to(device)
                output = model(data)
                loss = criterion(output, target)
                _, pred = torch.max(output.data, 1)
                if torch.cuda.is_available():
                    correct += torch.sum(pred.cuda() == target.cuda())
                else:
                    correct += torch.sum(pred == target)
                print_loss = loss.data.item()
                test_loss += print_loss
            acc = correct / total_num * 100
            avg_loss = test_loss / len(val_loader)
        s = f"val acc: {'{:.2f}'.format(acc)} val loss: {'{:.3f}'.format(avg_loss)}"
        # print('val acc: ', '{:.2f}'.format(acc), 'val loss: ', '{:.3f}'.format(avg_loss))
        print(s)
        f.write(s+'\n')
        write.add_scalar('val_acc', acc, epoch)
        write.add_scalar('val_loss', avg_loss, epoch)
        # loop.set_postfix(val_loss='{:.3f}'.format(avg_loss), val_acc='{:.3f}'.format(acc))

    f.close()


if __name__ == '__main__':
    device = DEVICE
    epochs = EPOCHS
    batch_size = BATCH_SIZE
    input_size = INPUT_SIZE
    lr = LR
    # ---------------------------訓練-------------------------------------
    # 圖片的路徑
    base_dir = r'C:\Users\Administrator.DESKTOP-161KJQD\Desktop\dog_cat'
    # 儲存的路徑
    save_path = r'C:\Users\Administrator.DESKTOP-161KJQD\Desktop\dog_cat_save'
    dateset, classes, classes_num = get_dataset(base_dir, input_size=input_size)
    # model = torchvision.models.resnet50(pretrained=True)
    model = torchvision.models.resnet50(weights=ResNet50_Weights.IMAGENET1K_V1)
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, classes_num)
    model.to(DEVICE)
    # # 損失函數,交叉熵損失函數
    criteon = nn.CrossEntropyLoss()
    # 選擇優化器
    optimizer = optim.SGD(model.parameters(), lr=lr)
    # 學習率更新
    # scheduler = ExponentialLR(optimizer, gamma=0.9)
    scheduler = LambdaLR(optimizer, lr_lambda=lambda epoch: update_lr(epoch, epochs))
    # 開始訓練
    train(model, dateset, epochs, batch_size, device, optimizer, scheduler, criteon, save_path)
    # 將label儲存起來
    with open(os.path.join(save_path, 'labels.txt'), 'w', encoding='utf-8') as f:
        f.write(f'{classes_num} {classes}')

  訓練結束以後,在儲存路徑下會得到下面的檔案

  

 

  最好的模型,最後一次的模型,標籤的列表,訓練的記錄和tensorboard記錄

  在該路徑下執行 tensorboard --logdir=.  

  

 

  然後在瀏覽器開啟給出的地址,即可看到資料訓練過程的繪圖

 4.對圖片進行預測

  考慮對於使用者來說,使用者是在網頁或者手機上上傳一張圖片進行預測,所以這邊是採用二進位制資料。

  mypredict.py

  

import cv2
import numpy as np
import torch

from classify_cfg import *



def img_process(img_betys, img_size, device):

    img_arry = np.asarray(bytearray(img_betys), dtype='uint8')
    # im0 = cv2.imread(img_betys)
    im0 = cv2.imdecode(img_arry, cv2.IMREAD_COLOR)
    image = cv2.resize(im0, (img_size, img_size))
    image = np.float32(image) / 255.0
    image[:, :, ] -= np.float32(mean)
    image[:, :, ] /= np.float32(std)
    image = image.transpose((2, 0, 1))
    im = torch.from_numpy(image).unsqueeze(0)
    im = im.to(device)
    return im


def predict(model_path, img, device):
    model = torch.load(model_path)
    model.to(device)
    model.eval()
    predicts = model(img)
    # print(predicts)
    _, preds = torch.max(predicts, 1)
    pred = torch.squeeze(preds)
    # print(pred)
    return pred


if __name__ == '__main__':
    mean = MEAN
    std = STD
    device = DEVICE
    classes = ['', '']
    # # 預測
    model_path = r'C:\Users\Administrator.DESKTOP-161KJQD\Desktop\dog_cat_save\best.pt'
    img_path = r'C:\Users\Administrator.DESKTOP-161KJQD\Desktop\save_dir\狗\000000.jpg'
    with open(img_path, 'rb') as f:
        img_betys = f.read()
    img =img_process(img_betys, 160, device)
    # print(img.shape)
    # print(img)
    pred = predict(model_path, img, device)
    print(classes[int(pred)])

 

還有我的組態檔classify_cfg.py

import torch

BATCH_SIZE = 2  # 每批次處理的資料
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # 放在cuda或者cpu上訓練
EPOCHS = 30  # 訓練資料集的輪次
LR = 1e-3       # 學習率
INPUT_SIZE = 160    # 輸入圖片大小
MEAN = [0.485, 0.456, 0.406]    # 均值
STD = [0.229, 0.224, 0.225]     # 方差