專案程式碼:https://github.com/wuya11/easy-classification
cfg = { ### Global Set "model_name": "mobilenetv3", #shufflenetv2 adv-efficientnet-b2 se_resnext50_32x4d xception "class_number": 10, "random_seed":42, "cfg_verbose":True, "num_workers":4, ### Train Setting 'train_path':"./data/train", 'val_path':"./data/val", ### Test 'model_path':'output/mobilenetv3_e50_0.77000.pth',#test model 'eval_path':"./data/test",#test with label,get test acc 'test_path':"./data/test",#test without label, just show img result ### 更多參考專案中的config.py檔案 }
from config import cfg path=cfg["train_path"] #獲取config檔案中的train_path變數
train_data = getFileNames(self.cfg['train_path']) val_data = getFileNames(self.cfg['val_path']) def getFileNames(file_dir, tail_list=['.png','.jpg','.JPG','.PNG']): L=[] for root, dirs, files in os.walk(file_dir): for file in files: if os.path.splitext(file)[1] in tail_list: L.append(os.path.join(root, file)) return L
# 隨機處理訓練集 train_data.sort(key = lambda x:os.path.basename(x)) train_data = np.array(train_data) random.shuffle(train_data) # 調整訓練時的資料量 if self.cfg['try_to_train_items'] > 0: train_data = train_data[:self.cfg['try_to_train_items']] val_data = val_data[:self.cfg['try_to_train_items']]
class TrainDataAug: def __init__(self, img_size): self.h = img_size[0] self.w = img_size[1] def __call__(self, img): # opencv img, BGR img = cv2.resize(img, (self.h,self.w)) img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB) img = Image.fromarray(img) return img
my_normalize = getNormorlize(cfg['model_name']) data_aug_train = TrainDataAug(cfg['img_size']) transforms.Compose([ # 調整影象大小 data_aug_train, # 影象轉換為張量 transforms.ToTensor(), # 歸一化處理 my_normalize, ])
def getNormorlize(model_name): if model_name in ['mobilenetv2','mobilenetv3']: my_normalize = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) elif model_name == 'xception': my_normalize = transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]) elif "adv-eff" in model_name: my_normalize = transforms.Lambda(lambda img: img * 2.0 - 1.0) elif "resnex" in model_name or 'eff' in model_name or 'RegNet' in model_name: my_normalize = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) #my_normalize = transforms.Normalize([0.4783, 0.4559, 0.4570], [0.2566, 0.2544, 0.2522]) elif "EN-B" in model_name: my_normalize = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) else: print("[Info] Not set normalize type! Use defalut imagenet normalization.") my_normalize = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) return my_normalize
class TensorDatasetTestClassify(Dataset): def __init__(self, train_jpg, transform=None): self.train_jpg = train_jpg if transform is not None: self.transform = transform else: self.transform = None def __getitem__(self, index): img = cv2.imread(self.train_jpg[index]) if self.transform is not None: img = self.transform(img) return img, self.train_jpg[index] def __len__(self): return len(self.train_jpg)
my_dataloader = TensorDatasetTestClassify train_data = getFileNames(self.cfg['train_path']) train_loader = torch.utils.data.DataLoader( my_dataloader(train_data, transforms.Compose([ data_aug_train, transforms.ToTensor(), my_normalize, ])), batch_size=cfg['batch_size'], shuffle=True, num_workers=cfg['num_workers'], pin_memory=True)
self.pretrain_model = MobileNetV3() # 預訓練模型權重路徑 if self.cfg['pretrained']: state_dict = torch.load(self.cfg['pretrained']) # 模型與預訓練不一致時,邏輯處理 state_dict = {k.replace('pretrain_', ''):v for k, v in state_dict.items()} state_dict = {k.replace('model.', ''): v for k, v in state_dict.items()} # 跳過不一致的地方 self.pretrain_model.load_state_dict(state_dict,False)
def forward(self, x): x = self.features(x) x = x.mean(3).mean(2) #張量維度換 4轉2 last_channel=1280 # mobilenetv3 large最終輸出為1280 # 構建一個全連線層 self.classifier = nn.Sequential( nn.Dropout(p=dropout), # refer to paper section 6 nn.Linear(last_channel, 10), #數位0-9共10個分類 x = self.classifier(x) return x
class ModelRunner(): def __init__(self, cfg, model): # 定義載入組態檔 self.cfg = cfg # 定義裝置資訊 if self.cfg['GPU_ID'] != '' : self.device = torch.device("cuda") else: self.device = torch.device("cpu") self.model = model.to(self.device) # gpu加速,cpu模式無效 self.scaler = torch.cuda.amp.GradScaler() # loss 定義損失函數 self.loss_func = getLossFunc(self.device, cfg) # 定義優化器 self.optimizer = getOptimizer(self.cfg['optimizer'], self.model, self.cfg['learning_rate'], self.cfg['weight_decay']) # 定義調整學習率的策略 self.scheduler = getSchedu(self.cfg['scheduler'], self.optimizer)
class CrossEntropyLoss(nn.Module): def __init__(self, label_smooth=0, weight=None): super().__init__() self.weight = weight self.label_smooth = label_smooth self.epsilon = 1e-7 def forward(self, x, y, sample_weights=0, sample_weight_img_names=None): one_hot_label = F.one_hot(y, x.shape[1]) if self.label_smooth: one_hot_label = labelSmooth(one_hot_label, self.label_smooth) #y_pred = F.log_softmax(x, dim=1) # equal below two lines y_softmax = F.softmax(x, 1) #print(y_softmax) y_softmax = torch.clamp(y_softmax, self.epsilon, 1.0-self.epsilon)# avoid nan y_softmaxlog = torch.log(y_softmax) # original CE loss loss = -one_hot_label * y_softmaxlog loss = torch.mean(torch.sum(loss, -1)) return loss
def getOptimizer(optims, model, learning_rate, weight_decay): if optims=='Adam': optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay) elif optims=='AdamW': optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay) elif optims=='SGD': optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=weight_decay) elif optims=='AdaBelief': optimizer = AdaBelief(model.parameters(), lr=learning_rate, eps=1e-12, betas=(0.9,0.999)) elif optims=='Ranger': optimizer = Ranger(model.parameters(), lr=learning_rate, weight_decay=weight_decay) else: raise Exception("Unkown getSchedu: ", optims) return optimizer
def getSchedu(schedu, optimizer): if 'default' in schedu: factor = float(schedu.strip().split('-')[1]) patience = int(schedu.strip().split('-')[2]) scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=factor, patience=patience,min_lr=0.000001) elif 'step' in schedu: step_size = int(schedu.strip().split('-')[1]) gamma = int(schedu.strip().split('-')[2]) scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma, last_epoch=-1) elif 'SGDR' in schedu: T_0 = int(schedu.strip().split('-')[1]) T_mult = int(schedu.strip().split('-')[2]) scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=T_0, T_mult=T_mult) elif 'multi' in schedu: milestones = [int(x) for x in schedu.strip().split('-')[1].split(',')] gamma = float(schedu.strip().split('-')[2]) scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones, gamma=gamma, last_epoch=-1) else: raise Exception("Unkown getSchedu: ", schedu) return scheduler
def train(self, train_loader, val_loader): # step 1:定義訓練開始時一些全域性的變數,如是否過早停止表示,執行時間等 self.onTrainStart() # step 2: 外層大輪詢次數,每次輪詢 全量 train_loader,val_loader for epoch in range(self.cfg['epochs']): # step 3: 非必須,過濾處理部分次數,做凍結訓練處理 self.freezeBeforeLinear(epoch, self.cfg['freeze_nonlinear_epoch']) # step 4: 訓練集資料處理 self.onTrainStep(train_loader, epoch) # step 5: 驗證集資料處理,最好訓練模型權重儲存,過早結束邏輯處理 self.onValidation(val_loader, epoch) # step 6: 滿足過早結束條件時,退出迴圈,結束訓練 if self.earlystop: break # step 7:訓練過程結束,釋放資源 self.onTrainEnd()
# freeze_epochs :設定凍結的標識,小於該值時凍結 # epoch: 輪詢次數值,從0開始 def freezeBeforeLinear(self, epoch, freeze_epochs=2): if epoch < freeze_epochs: for child in list(self.model.children())[:-1]: for param in child.parameters(): param.requires_grad = False # 等於標識值後,解凍 elif epoch == freeze_epochs: for child in list(self.model.children())[:-1]: for param in child.parameters(): param.requires_grad = True
# 定義模型為訓練 self.model.train() # 輪詢處理批次資料 for batch_idx, (data, target, img_names) in enumerate(train_loader): one_batch_time_start = time.time() # 來源於dataset物件,item中定義的物件 target = target.to(self.device) # 張量複製到硬體資源上 data = data.to(self.device) # gpu模式下,加快訓練,混合精度 with torch.cuda.amp.autocast(): # 模型訓練輸出張量,參考模型定義的forward返回方法 output = self.model(data).double() # 計算損失函數,可自定義或呼叫PyTorch常用的 loss = self.loss_func(output, target, self.cfg['sample_weights'], sample_weight_img_names=img_names) # 一個batchSize 求和 total_loss += loss.item() # 把梯度置零 self.optimizer.zero_grad() # loss.backward() #計算梯度 # self.optimizer.step() #更新引數 # 基於GPU scaler 加速 self.scaler.scale(loss).backward() self.scaler.step(self.optimizer) self.scaler.update() ### 返回 batchSize個最大張量值對應的陣列下標值 pred = output.max(1, keepdim=True)[1] # 訓練影象對應的 分類標籤 if len(target.shape) > 1: target = target.max(1, keepdim=True)[1] # 統計一組資料batchSize 中訓練出來的分類值與 實際影象分類標籤一樣的資料條數 correct += pred.eq(target.view_as(pred)).sum().item() # 統計總訓練資料條數 count += len(data) # 計算準確率 train_acc = correct / count train_loss = total_loss / count
# 定義模型為驗證 self.model.eval() # 重點,驗證流程定義不求導 with torch.no_grad(): pres = [] labels = [] # 基於批次迭代驗證資料 for (data, target, img_names) in val_loader: data, target = data.to(self.device), target.to(self.device) # GPU下加速處理 with torch.cuda.amp.autocast(): # 模型輸出張量,參考模型定義的forward返回方法 output = self.model(data).double() # 定義交叉損失函數 self.val_loss += self.loss_func(output, target).item() # sum up batch loss pred_score = nn.Softmax(dim=1)(output) # print(pred_score.shape) pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability if self.cfg['use_distill']: target = target.max(1, keepdim=True)[1] # 真實值與驗證值一致求和數量 self.correct += pred.eq(target.view_as(pred)).sum().item() batch_pred_score = pred_score.data.cpu().numpy().tolist() batch_label_score = target.data.cpu().numpy().tolist() pres.extend(batch_pred_score) labels.extend(batch_label_score) # print('\n',output[0],img_names[0]) pres = np.array(pres) labels = np.array(labels) # print(pres.shape, labels.shape) self.val_loss /= len(val_loader.dataset) # 計算準確率 self.val_acc = self.correct / len(val_loader.dataset) # 當次值記錄為最優,後續應用和歷史最優值做比較 self.best_score = self.val_acc
def checkpoint(self, epoch): # 當前值小於歷史記錄的值時 if self.val_acc <= self.early_stop_value: if self.best_score <= self.early_stop_value: if self.cfg['save_best_only']: pass else: save_name = '%s_e%d_%.5f.pth' % (self.cfg['model_name'], epoch + 1, self.best_score) self.last_save_path = os.path.join(self.cfg['save_dir'], save_name) self.modelSave(self.last_save_path) else: # 儲存最優權重資訊 if self.cfg['save_one_only']: if self.last_save_path is not None and os.path.exists(self.last_save_path): os.remove(self.last_save_path) save_name = '%s_e%d_%.5f.pth' % (self.cfg['model_name'], epoch + 1, self.best_score) self.last_save_path = os.path.join(self.cfg['save_dir'], save_name) torch.save(self.model.state_dict(), save_name)
def earlyStop(self, epoch): ### earlystop 設定下降次數,如當前值小於歷史值出現7次,就提前終止 if self.val_acc > self.early_stop_value: self.early_stop_value = self.val_acc if self.best_score > self.early_stop_value: self.early_stop_value = self.best_score self.early_stop_dist = 0 self.early_stop_dist += 1 if self.early_stop_dist > self.cfg['early_stop_patient']: self.best_epoch = epoch - self.cfg['early_stop_patient'] + 1 print("[INFO] Early Stop with patient %d , best is Epoch - %d :%f" % ( self.cfg['early_stop_patient'], self.best_epoch, self.early_stop_value)) self.earlystop = True if epoch + 1 == self.cfg['epochs']: self.best_epoch = epoch - self.early_stop_dist + 2 print("[INFO] Finish trainging , best is Epoch - %d :%f" % (self.best_epoch, self.early_stop_value)) self.earlystop = True
def onTrainEnd(self): # 刪除模型範例 del self.model # 垃圾回收 gc.collect() # 清空gpu上面的快取 torch.cuda.empty_cache()
# 載入訓練的模型權重 runner.modelLoad(cfg['model_path']) # 評估跑資料 runner.evaluate(train_loader) # 評估函數 def evaluate(self, data_loader): self.model.eval() correct = 0 # 驗證不求導 with torch.no_grad(): pres = [] labels = [] for (data, target, img_names) in data_loader: data, target = data.to(self.device), target.to(self.device) with torch.cuda.amp.autocast(): output = self.model(data).double() pred_score = nn.Softmax(dim=1)(output) pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability if self.cfg['use_distill']: target = target.max(1, keepdim=True)[1] correct += pred.eq(target.view_as(pred)).sum().item() batch_pred_score = pred_score.data.cpu().numpy().tolist() batch_label_score = target.data.cpu().numpy().tolist() pres.extend(batch_pred_score) labels.extend(batch_label_score) pres = np.array(pres) labels = np.array(labels) # acc評分 acc = correct / len(data_loader.dataset) print('[Info] acc: {:.3f}% \n'.format(100. * acc)) # f1評分 if 'F1' in self.cfg['metrics']: precision, recall, f1_score = getF1(pres, labels) print(' precision: {:.5f}, recall: {:.5f}, f1_score: {:.5f}\n'.format( precision, recall, f1_score))
# 載入權重 runner.modelLoad(cfg['model_path']) # 開始預測 res_dict = runner.predict(test_loader) # 預測函數 def predict(self, data_loader): self.model.eval() correct = 0 res_dict = {} with torch.no_grad(): pres = [] labels = [] for (data, img_names) in data_loader: data = data.to(self.device) output = self.model(data).double() pred_score = nn.Softmax(dim=1)(output) pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability batch_pred_score = pred_score.data.cpu().numpy().tolist() for i in range(len(batch_pred_score)): res_dict[os.path.basename(img_names[i])] = pred[i].item() # 儲存影象與預測結果 res_df = pd.DataFrame.from_dict(res_dict, orient='index', columns=['label']) res_df = res_df.reset_index().rename(columns={'index':'image_id'}) res_df.to_csv(os.path.join(cfg['save_dir'], 'pre.csv'), index=False,header=True)
微信讚賞
支付寶讚賞