TensorFlow自定義訓練函數

2022-07-30 12:00:47

本文記錄了在TensorFlow框架中自定義訓練函數的模板並簡述了使用自定義訓練函數的優勢與劣勢。

首先需要說明的是,本文中所記錄的訓練函數模板參考自https://stackoverflow.com/questions/59438904/applying-callbacks-in-a-custom-training-loop-in-tensorflow-2-0中的回答以及Hands-On Machine Learning with Scikit-Learn, Keras, and Tensorflow一書中第12.3.9節的內容,如有錯漏,歡迎指正。

為什麼和什麼時候需要自定義訓練函數

除非你真的需要額外的靈活性,否則應該更傾向使用fit()方法,為不是實現你自己的迴圈,尤其是在團隊合作中。

如果你還在困惑為什麼需要自定義訓練函數的時候,那說明你還不需要自定義訓練函數。通常只有在搭建一些結構奇特的模型時,我們才會發現model.fit()無法完全滿足需求,接下來首先該嘗試的方法是去看TensorFlow相關部分的原始碼,看看有沒有認識之外的引數或方法,其次才是考慮使用自定義訓練函數。毫無疑問,自定義訓練函數會讓程式碼更長、更難維護、更難懂。

但是,自定義訓練函數的靈活性是fit()方法無法比擬的。比如,在自定義函數中你可以實現使用多個不同優化器的訓練迴圈或是在多個資料集上計算驗證迴圈。

自定義訓練函數模板

模板設計的目的在於讓我們通過對程式碼塊的複用以及對關鍵部位的填空快速完成自定義訓練函數,以使我們更專注於訓練函數結構本身而非一些細枝末節的部分(如未知長度訓練集的處理)並實現一些fit()方法支援的功能(如Callback類的使用)。

 def train(model:keras.Model,train_batchs,epochs=1,initial_epoch=0,callbacks=None,steps_per_epoch=None,val_batchs=None):
    callbacks = tf.keras.callbacks.CallbackList(
        callbacks, add_history=True, model=model)

    logs_dict = {}
    
    # init optimizer, loss function and metrics
    optimizer = keras.optimizers.Nadam(learning_rate=0.0005)
    loss_fn = keras.losses.MeanSquaredError
    
    train_loss_tracker = keras.metrics.Mean(name="train_loss")
    val_loss_tracker = keras.metrics.Mean(name="val_loss")
    # train_acc_metric = tf.keras.metrics.BinaryAccuracy(name="train_acc")
    # val_acc_metric = tf.keras.metrics.BinaryAccuracy(name="val_acc")
    
    def count(): # infinite iter
        x = 0
        while True:yield x;x+=1
    
    def print_status_bar(iteration, total, metrics=None):
        metrics = " - ".join(["{}:{:.4f}".format(m.name,m.result()) for m in (metrics or [])])
        end = "" if iteration < total or float('inf') else "\n"
        print("\r{}/{} - ".format(iteration,total) + metrics, end=end)
   	
    def train_step(x,y,loss_tracker:keras.metrics.Metric):
        with tf.GradientTape() as tape:
            outputs = model(x)
            main_loss = tf.reduce_mean(loss_fn(y,outputs))
            
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients,model.trainable_variables))
        loss_tracker.update_state(loss)
        return {loss_tracker.name:loss_tracker.result()}
    
    def val_step(x,y,loss_tracker:keras.metrics.Metric):
        outputs = model.predict(x,verbose=0)
        main_loss = tf.reduce_mean(loss_fn(y,outputs))
        
        loss = tf.add_n([main_loss] + model.losses)
        loss_tracker.update_state(loss)
        return {loss_tracker.name:loss_tracker.result()}
    
    # init train_batchs
    train_iter = iter(train_batchs)
    
    callbacks.on_train_begin(logs=logs_dict)
    for i_epoch in range(initial_epoch, epochs):
    
        # init steps
        infinite_flag = False
        if steps_per_epoch is None:
            infinite_flag = True
            step_iter = count()
        else:
            step_iter = range(steps_per_epoch)

		# train_loop
        for i_step in step_iter:
            callbacks.on_batch_begin(i_step, logs=logs_dict)
            callbacks.on_train_batch_begin(i_step, logs=logs_dict)

            try:
                X_batch, y_batch = train_iter.next()
            except StopIteration:
                train_iter = iter(train_batchs)
                if infinite_flag is True:
                    break
                else:
                    X_batch, y_batch = train_iter.next()
            
            train_logs_dict = train_step(x=X_batch,y=y_batch,loss_tracker=train_loss_tracker)
            logs_dict.update(train_logs_dict)

            print_status_bar(i_step, steps_per_epoch or i_step, [train_loss_tracker])
            
            callbacks.on_train_batch_end(i_step, logs=logs_dict)
            callbacks.on_batch_end(i_step, logs=logs_dict)

        if steps_per_epoch is None:
            print()
            steps_per_epoch = i_step
            
        if val_batchs is not None:
        	# val_loop
            for i_step,(X_batch,y_batch) in enumerate(iter(val_batchs)):
                callbacks.on_batch_begin(i_step, logs=logs_dict)
                callbacks.on_test_batch_begin(i_step, logs=logs_dict)

                val_logs_dict = val_step(x=X_batch,y=y_batch,loss_tracker=val_loss_tracker)
                logs_dict.update(val_logs_dict)

                callbacks.on_test_batch_end(i_step, logs=logs_dict)
                callbacks.on_batch_end(i_step, logs=logs_dict)
            
            logs_dict.update(val_logs_dict)

        print_status_bar(steps_per_epoch, steps_per_epoch, [train_loss_tracker, val_loss_tracker])
        callbacks.on_epoch_end(i_epoch, logs=logs_dict)

        for metric in [train_loss_tracker, val_loss_tracker]:
            metric.reset_states()

    callbacks.on_train_end(logs=logs_dict)
    
    # Fetch the history object we normally get from keras.fit
    history_object = None
    for cb in callbacks:
        if isinstance(cb, tf.keras.callbacks.History):
            history_object = cb
    return history_object