相比於之前寫的ResNet18,下面的ResNet50寫得更加工程化一點,這還適用與其他分類,就是換一個分類訓練只需要修改圖片資料的路徑即可。
我的程式碼檔案結構
1. 資料處理
首先已經對資料做好了分類
資料夾結構是這樣
開始劃分資料集
split_data.py
import os import random import shutil def move_file(target_path, save_train_path, save_val_pathm, scale=0.1): file_list = os.listdir(target_path) random.shuffle(file_list) number = int(len(file_list) * scale) train_list = file_list[number:] val_list = file_list[:number] for file in train_list: target_file_path = os.path.join(target_path, file) save_file_path = os.path.join(save_train_path, file) shutil.copyfile(target_file_path, save_file_path) for file in val_list: target_file_path = os.path.join(target_path, file) save_file_path = os.path.join(save_val_pathm, file) shutil.copyfile(target_file_path, save_file_path) def split_classify_data(base_path, save_path, scale=0.1): folder_list = os.listdir(base_path) for folder in folder_list: target_path = os.path.join(base_path, folder) save_train_path = os.path.join(save_path, 'train', folder) save_val_path = os.path.join(save_path, 'val', folder) if not os.path.exists(save_train_path): os.makedirs(save_train_path) if not os.path.exists(save_val_path): os.makedirs(save_val_path) move_file(target_path, save_train_path, save_val_path, scale) print(folder, 'finish!') if __name__ == '__main__': base_path = r'C:\Users\Administrator.DESKTOP-161KJQD\Desktop\save_dir' save_path = r'C:\Users\Administrator.DESKTOP-161KJQD\Desktop\dog_cat' # 驗證集比例 scale = 0.1 split_classify_data(base_path, save_path, scale)
執行完以上程式碼的到的資料夾結構
一個訓練集資料,一個驗證集資料
2.資料集的匯入
我這個檔案寫了一個資料集的匯入和一個學習率更新的函數。資料匯入是通用的
tools.py
import os import time import cv2 import numpy as np import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import torchvision from torch.autograd.variable import Variable from torch.utils.tensorboard import SummaryWriter from torchvision import datasets, transforms from torch.utils.data import Dataset, DataLoader from torch.optim.lr_scheduler import ExponentialLR, LambdaLR from torchvision.models import ResNet50_Weights from tqdm import tqdm from classify_cfg import * mean = MEAN std = STD def get_dataset(base_dir='', input_size=160): dateset = dict() transform_train = transforms.Compose([ # 解析度重置為input_size transforms.Resize(input_size), transforms.RandomRotation(15), # 對載入的影象作歸一化處理, 並裁剪為[input_sizexinput_sizex3]大小的影象(因為這圖片畫素不一致直接統一) transforms.CenterCrop(input_size), transforms.ToTensor(), transforms.Normalize(mean=mean, std=std) ]) transform_val = transforms.Compose([ transforms.Resize(input_size), transforms.RandomRotation(15), transforms.CenterCrop(input_size), transforms.ToTensor(), transforms.Normalize(mean=mean, std=std) ]) base_dir_train = os.path.join(base_dir, 'train') train_dataset = datasets.ImageFolder(root=base_dir_train, transform=transform_train) # print("train_dataset=" + repr(train_dataset[1][0].size())) # print("train_dataset.class_to_idx=" + repr(train_dataset.class_to_idx)) # print(train_dataset.classes) classes = train_dataset.classes # classes = train_dataset.class_to_idx classes_num = len(train_dataset.classes) base_dir_val = os.path.join(base_dir, 'val') val_dataset = datasets.ImageFolder(root=base_dir_val, transform=transform_val) dateset['train'] = train_dataset dateset['val'] = val_dataset return dateset, classes, classes_num def update_lr(epoch, epochs): """ 假設開始的學習率lr是0.001,訓練次數epochs是100 當epoch<33時是lr * 1 當33<=epoch<=66 時是lr * 0.5 當66<epoch時是lr * 0.1 """ if epoch == 0 or epochs // 3 > epoch: return 1 elif (epochs // 3 * 2 >= epoch) and (epochs // 3 <= epoch): return 0.5 else: return 0.1
3.訓練模型
資料集匯入好了以後,選擇模型,選擇優化器等等,然後開始訓練。
mytrain.py
import os import time import cv2 import numpy as np import torch import torch.nn as nn import torch.optim as optim import torchvision from torch.autograd.variable import Variable from torch.utils.tensorboard import SummaryWriter from torch.utils.data import Dataset, DataLoader from torch.optim.lr_scheduler import ExponentialLR, LambdaLR from torchvision.models import ResNet50_Weights # from tqdm import tqdm from classify_cfg import * from tools import get_dataset, update_lr def train(model, dateset, epochs, batch_size, device, optimizer, scheduler, criterion, save_path): train_loader = DataLoader(dateset.get('train'), batch_size=batch_size, shuffle=True) val_loader = DataLoader(dateset.get('val'), batch_size=batch_size, shuffle=True) # 儲存為tensorboard檔案 write = SummaryWriter(save_path) # 訓練過程寫入txt f = open(os.path.join(save_path, 'log.txt'), 'w', encoding='utf-8') best_acc = 0 for epoch in range(epochs): train_correct = 0.0 model.train() sum_loss = 0.0 accuracy = -1 total_num = len(train_loader.dataset) # print(total_num, len(train_loader)) # loop = tqdm(enumerate(train_loader), total=len(train_loader)) batch_count = 0 for batch_idx, (data, target) in enumerate(train_loader): start_time = time.time() data, target = Variable(data).to(device), Variable(target).to(device) output = model(data) loss = criterion(output, target) optimizer.zero_grad() loss.backward() optimizer.step() print_loss = loss.data.item() sum_loss += print_loss train_predict = torch.max(output.data, 1)[1] if torch.cuda.is_available(): train_correct += (train_predict.cuda() == target.cuda()).sum() else: train_correct += (train_predict == target).sum() accuracy = (train_correct / total_num) * 100 # loop.set_description(f'Epoch [{epoch+1}/{epochs}]') # loop.set_postfix(loss=loss.item(), acc='{:.3f}'.format(accuracy)) batch_count += len(data) end_time = time.time() s = f'Epoch:[{epoch+1}/{epochs}] Batch:[{batch_count}/{total_num}] train_acc: {"{:.2f}".format(accuracy)} ' \ f'train_loss: {"{:.3f}".format(loss.item())} time: {int((end_time-start_time)*1000)} ms' # print(f'Epoch:[{epoch+1}/{epochs}]', f'Batch:[{batch_count}/{total_num}]', # 'train_acc:', '{:.2f}'.format(accuracy), 'train_loss:', '{:.3f}'.format(loss.item()), # 'time:', f'{int((end_time-start_time)*1000)} ms') print(s) f.write(s+'\n') write.add_scalar('train_acc', accuracy, epoch) write.add_scalar('train_loss', loss.item(), epoch) # print(optimizer.param_groups[0]['lr']) scheduler.step() if best_acc < accuracy: best_acc = accuracy torch.save(model, os.path.join(save_path, 'best.pt')) if epoch+1 == epochs: torch.save(model, os.path.join(save_path, 'last.pt')) # 預測驗證集 # if (epoch+1) % 5 == 0 or epoch+1 == epochs: model.eval() test_loss = 0.0 correct = 0.0 total_num = len(val_loader.dataset) # print(total_num, len(val_loader)) with torch.no_grad(): for data, target in val_loader: data, target = Variable(data).to(device), Variable(target).to(device) output = model(data) loss = criterion(output, target) _, pred = torch.max(output.data, 1) if torch.cuda.is_available(): correct += torch.sum(pred.cuda() == target.cuda()) else: correct += torch.sum(pred == target) print_loss = loss.data.item() test_loss += print_loss acc = correct / total_num * 100 avg_loss = test_loss / len(val_loader) s = f"val acc: {'{:.2f}'.format(acc)} val loss: {'{:.3f}'.format(avg_loss)}" # print('val acc: ', '{:.2f}'.format(acc), 'val loss: ', '{:.3f}'.format(avg_loss)) print(s) f.write(s+'\n') write.add_scalar('val_acc', acc, epoch) write.add_scalar('val_loss', avg_loss, epoch) # loop.set_postfix(val_loss='{:.3f}'.format(avg_loss), val_acc='{:.3f}'.format(acc)) f.close() if __name__ == '__main__': device = DEVICE epochs = EPOCHS batch_size = BATCH_SIZE input_size = INPUT_SIZE lr = LR # ---------------------------訓練------------------------------------- # 圖片的路徑 base_dir = r'C:\Users\Administrator.DESKTOP-161KJQD\Desktop\dog_cat' # 儲存的路徑 save_path = r'C:\Users\Administrator.DESKTOP-161KJQD\Desktop\dog_cat_save' dateset, classes, classes_num = get_dataset(base_dir, input_size=input_size) # model = torchvision.models.resnet50(pretrained=True) model = torchvision.models.resnet50(weights=ResNet50_Weights.IMAGENET1K_V1) num_ftrs = model.fc.in_features model.fc = nn.Linear(num_ftrs, classes_num) model.to(DEVICE) # # 損失函數,交叉熵損失函數 criteon = nn.CrossEntropyLoss() # 選擇優化器 optimizer = optim.SGD(model.parameters(), lr=lr) # 學習率更新 # scheduler = ExponentialLR(optimizer, gamma=0.9) scheduler = LambdaLR(optimizer, lr_lambda=lambda epoch: update_lr(epoch, epochs)) # 開始訓練 train(model, dateset, epochs, batch_size, device, optimizer, scheduler, criteon, save_path) # 將label儲存起來 with open(os.path.join(save_path, 'labels.txt'), 'w', encoding='utf-8') as f: f.write(f'{classes_num} {classes}')
訓練結束以後,在儲存路徑下會得到下面的檔案
最好的模型,最後一次的模型,標籤的列表,訓練的記錄和tensorboard記錄
在該路徑下執行 tensorboard --logdir=.
然後在瀏覽器開啟給出的地址,即可看到資料訓練過程的繪圖
4.對圖片進行預測
考慮對於使用者來說,使用者是在網頁或者手機上上傳一張圖片進行預測,所以這邊是採用二進位制資料。
mypredict.py
import cv2 import numpy as np import torch from classify_cfg import * def img_process(img_betys, img_size, device): img_arry = np.asarray(bytearray(img_betys), dtype='uint8') # im0 = cv2.imread(img_betys) im0 = cv2.imdecode(img_arry, cv2.IMREAD_COLOR) image = cv2.resize(im0, (img_size, img_size)) image = np.float32(image) / 255.0 image[:, :, ] -= np.float32(mean) image[:, :, ] /= np.float32(std) image = image.transpose((2, 0, 1)) im = torch.from_numpy(image).unsqueeze(0) im = im.to(device) return im def predict(model_path, img, device): model = torch.load(model_path) model.to(device) model.eval() predicts = model(img) # print(predicts) _, preds = torch.max(predicts, 1) pred = torch.squeeze(preds) # print(pred) return pred if __name__ == '__main__': mean = MEAN std = STD device = DEVICE classes = ['狗', '貓'] # # 預測 model_path = r'C:\Users\Administrator.DESKTOP-161KJQD\Desktop\dog_cat_save\best.pt' img_path = r'C:\Users\Administrator.DESKTOP-161KJQD\Desktop\save_dir\狗\000000.jpg' with open(img_path, 'rb') as f: img_betys = f.read() img =img_process(img_betys, 160, device) # print(img.shape) # print(img) pred = predict(model_path, img, device) print(classes[int(pred)])
還有我的組態檔classify_cfg.py
import torch BATCH_SIZE = 2 # 每批次處理的資料 DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 放在cuda或者cpu上訓練 EPOCHS = 30 # 訓練資料集的輪次 LR = 1e-3 # 學習率 INPUT_SIZE = 160 # 輸入圖片大小 MEAN = [0.485, 0.456, 0.406] # 均值 STD = [0.229, 0.224, 0.225] # 方差