[AI] 3-8. 客製化 Training
內建的 fit() 只適用於監督式學習(supervised learning),然而不是所有的機器學習任務都適用,如生成式學習(generative learning)、自監督式學習(self-supervised learning)、強化式學習(reinforcement learning) 等。 監督式學習的訓練 在實作 Keras 訓練迴圈時有兩個重要細節: training 參數: 某些層(如 Dropout)在訓練和推論時行為不同 訓練時需設定 training=True 推論時設定 training=False 模型權重分類: Trainable weights: 可訓練的權重 Non-trainable weights: 不可訓練的權重(如 BatchNormalization 層的統計值) 取梯度時應使用 model.trainable_weights 以下是完整可執行範例: import tensorflow as tf from tensorflow import keras import numpy as np # 建立範例資料 x_train = np.random.random((1000, 28, 28)) y_train = np.random.randint(10, size=(1000,)) # 建立包含 Dropout 和 BatchNormalization 的模型 model = keras.Sequential([ keras.layers.Flatten(input_shape=(28, 28)), keras.layers.Dense(128), keras.layers.BatchNormalization(), keras.layers.Dropout(0.5), keras.layers.Dense(10) ]) # 損失函數與優化器 loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = keras.optimizers.Adam(learning_rate=1e-3) # 自定義訓練步驟 @tf.function def train_step(inputs, targets): with tf.GradientTape() as tape: # 訓練模式前向傳播 predictions = model(inputs, training=True) loss = loss_fn(targets, predictions) # 計算可訓練權重的梯度 gradients = tape.gradient(loss, model.trainable_weights) optimizer.apply_gradients(zip(gradients, model.trainable_weights)) return loss # 訓練迴圈 batch_size = 32 for epoch in range(5): print(f"\nEpoch {epoch+1}") for step in range(0, len(x_train), batch_size): x_batch = x_train[step:step + batch_size] y_batch = y_train[step:step + batch_size] loss = train_step(x_batch, y_batch) if step % 200 == 0: print(f"Step {step}: loss = {loss:.4f}") # 推論時使用 training=False test_predictions = model(x_train[:1], training=False) 評量指標 在訓練期間,我們可以用 Keras 的評量指標來查詢當前的指標值,我們會用到幾個函式: update_state(y_true, y_pred) result() reset_state() import tensorflow as tf from tensorflow import keras import numpy as np # 建立模型 model = keras.Sequential([ keras.layers.Dense(64, activation='relu'), keras.layers.Dense(10, activation='softmax') ]) # 初始化指標追蹤器 accuracy_tracker = keras.metrics.SparseCategoricalAccuracy() loss_tracker = keras.metrics.Mean() # 訓練步驟 @tf.function def train_step(inputs, targets): with tf.GradientTape() as tape: predictions = model(inputs, training=True) loss = tf.keras.losses.sparse_categorical_crossentropy(targets, predictions) # 更新梯度 gradients = tape.gradient(loss, model.trainable_weights) optimizer.apply_gradients(zip(gradients, model.trainable_weights)) # 更新指標 accuracy_tracker.update_state(targets, predictions) loss_tracker.update_state(loss) return loss # 訓練迴圈 x_train = np.random.random((1000, 32)) y_train = np.random.randint(10, size=(1000,)) optimizer = keras.optimizers.Adam() batch_size = 32 for epoch in range(3): # 重置每個 epoch 的指標 accuracy_tracker.reset_state() loss_tracker.reset_state() for step in range(0, len(x_train), batch_size): x_batch = x_train[step:step + batch_size] y_batch = y_train[step:step + batch_size] loss = train_step(x_batch, y_batch) if step % 200 == 0: print( f"Step {step}: ", f"Loss: {loss_tracker.result():.4f}, ", f"Accuracy: {accuracy_tracker.result():.4f}" ) 完整的訓練與評估迴圈 設計練訓函式 model = get_mnist_model() loss_fn = keras.losses.SparseCategoricalCrossentropy() optimizer = keras.optimizers.RMSprop() metrics = [keras.metrics.SparseCategoricalAccuracy()] loss_tracking_metric = keras.metrics.Mean() def train_step(inputs, targets): with tf.GradientTape() as tape: predictions = model(inputs, training=True) loss = loss_fn(targets, predictions) gradients = tape.gradient(loss, model.trainable_weights) optimizer.apply_gradients(zip(gradients, model.trainable_weights)) logs = {} for metric in metrics: metric.update_state(targets, predictions) logs[metric.name] = metric.result() loss_tracking_metric.update_state(loss) logs["loss"] = loss_tracking_metric.result() return logs 重置評量指標 def reset_metrics(): for metric in metrics: metric.reset_state() loss_tracking_metric.reset_state() 設計訓練迴圈 training_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)) training_dataset = training_dataset.batch(32) epochs = 3 for epoch in range(epochs): reset_metrics() for inputs_batch, targets_batch in training_dataset: logs = train_step(inputs_batch, targets_batch) print(f"Results at the end of epoch {epoch}") for key, value in logs.items(): print(f"...{key}: {value:.4f}") 設計評估迴圈 def test_step(inputs, targets): predictions = model(inputs, training=False) loss = loss_fn(targets, predictions) logs = {} for metric in metrics: metric.update_state(targets, predictions) logs["val_" + metric.name] = metric.result() loss_tracking_metric.update_state(loss) logs["val_loss"] = loss_tracking_metric.result() return logs val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels)) val_dataset = val_dataset.batch(32) reset_metrics() for inputs_batch, targets_batch in val_dataset: logs = test_step(inputs_batch, targets_batch) print("Evaluation results:") for key, value in logs.items(): print(f"...{key}: {value:.4f}") 利用 tf.function 來加速 只要在要編譯的函式前加上 @tf.function 裝飾器就可以將 TensorFlow 程式碼編譯成運算圖(computation graph)。 搭配 fit() 和自定義的訓練的迴圈 from tensorflow import keras from tensorflow.keras import layers import tensorflow as tf # 1. 將 optimizer 移到類別內部 class CustomModel(keras.Model): def __init__(self, inputs, outputs): super().__init__(inputs=inputs, outputs=outputs) self.loss_tracker = keras.metrics.Mean(name="loss") # 2. 移到類別內 self.optimizer = keras.optimizers.RMSprop() # 3. 加入 optimizer self.loss_fn = keras.losses.SparseCategoricalCrossentropy() def train_step(self, data): inputs, targets = data with tf.GradientTape() as tape: predictions = self(inputs, training=True) loss = self.loss_fn(targets, predictions) gradients = tape.gradient(loss, self.trainable_weights) self.optimizer.apply_gradients(zip(gradients, self.trainable_weights)) # 4. 使用 self.optimizer self.loss_tracker.update_state(loss) # 5. 使用 self.loss_tracker return {"loss": self.loss_tracker.result()} @property def metrics(self): return [self.loss_tracker] # 6. 使用 self.loss_trackerxqf # 建立模型 inputs = keras.Input(shape=(28 * 28,)) features = layers.Dense(512, activation="relu")(inputs) features = layers.Dropout(0.5)(features) outputs = layers.Dense(10, activation="softmax")(features) model = CustomModel(inputs=inputs, outputs=outputs) model.compile() # 7. 移除 optimizer 參數 model.fit(train_images, train_labels, epochs=3)