• 內建的 fit() 只適用於監督式學習(supervised learning),然而不是所有的機器學習任務都適用,如生成式學習(generative learning)自監督式學習(self-supervised learning)強化式學習(reinforcement learning) 等。

監督式學習的訓練

  • 在實作 Keras 訓練迴圈時有兩個重要細節:
    1. training 參數: 某些層(如 Dropout)在訓練和推論時行為不同 訓練時需設定 training=True 推論時設定 training=False

    2. 模型權重分類:

      • Trainable weights: 可訓練的權重
      • Non-trainable weights: 不可訓練的權重(如 BatchNormalization 層的統計值) 取梯度時應使用 model.trainable_weights

以下是完整可執行範例:

import tensorflow as tf
from tensorflow import keras
import numpy as np

# 建立範例資料
x_train = np.random.random((1000, 28, 28))
y_train = np.random.randint(10, size=(1000,))

# 建立包含 Dropout 和 BatchNormalization 的模型
model = keras.Sequential([
    keras.layers.Flatten(input_shape=(28, 28)),
    keras.layers.Dense(128),
    keras.layers.BatchNormalization(),
    keras.layers.Dropout(0.5),
    keras.layers.Dense(10)
])

# 損失函數與優化器
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = keras.optimizers.Adam(learning_rate=1e-3)

# 自定義訓練步驟
@tf.function
def train_step(inputs, targets):
    with tf.GradientTape() as tape:
        # 訓練模式前向傳播
        predictions = model(inputs, training=True)
        loss = loss_fn(targets, predictions)
    
    # 計算可訓練權重的梯度
    gradients = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(gradients, model.trainable_weights))
    return loss

# 訓練迴圈
batch_size = 32
for epoch in range(5):
    print(f"\nEpoch {epoch+1}")
    
    for step in range(0, len(x_train), batch_size):
        x_batch = x_train[step:step + batch_size]
        y_batch = y_train[step:step + batch_size]
        loss = train_step(x_batch, y_batch)
        
        if step % 200 == 0:
            print(f"Step {step}: loss = {loss:.4f}")

# 推論時使用 training=False
test_predictions = model(x_train[:1], training=False)

評量指標

  • 在訓練期間,我們可以用 Keras 的評量指標來查詢當前的指標值,我們會用到幾個函式:
    • update_state(y_true, y_pred)
    • result()
    • reset_state()
import tensorflow as tf
from tensorflow import keras
import numpy as np

# 建立模型
model = keras.Sequential([
    keras.layers.Dense(64, activation='relu'),
    keras.layers.Dense(10, activation='softmax')
])

# 初始化指標追蹤器
accuracy_tracker = keras.metrics.SparseCategoricalAccuracy()
loss_tracker = keras.metrics.Mean()

# 訓練步驟
@tf.function
def train_step(inputs, targets):
    with tf.GradientTape() as tape:
        predictions = model(inputs, training=True)
        loss = tf.keras.losses.sparse_categorical_crossentropy(targets, predictions)
    
    # 更新梯度
    gradients = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(gradients, model.trainable_weights))
    
    # 更新指標
    accuracy_tracker.update_state(targets, predictions)
    loss_tracker.update_state(loss)
    
    return loss

# 訓練迴圈
x_train = np.random.random((1000, 32))
y_train = np.random.randint(10, size=(1000,))
optimizer = keras.optimizers.Adam()
batch_size = 32

for epoch in range(3):
    # 重置每個 epoch 的指標
    accuracy_tracker.reset_state()
    loss_tracker.reset_state()
    
    for step in range(0, len(x_train), batch_size):
        x_batch = x_train[step:step + batch_size]
        y_batch = y_train[step:step + batch_size]
        loss = train_step(x_batch, y_batch)
        
        if step % 200 == 0:
            print(
                f"Step {step}: ",
                f"Loss: {loss_tracker.result():.4f}, ",
                f"Accuracy: {accuracy_tracker.result():.4f}"
            )

完整的訓練與評估迴圈

設計練訓函式

model = get_mnist_model()

loss_fn = keras.losses.SparseCategoricalCrossentropy()
optimizer = keras.optimizers.RMSprop()
metrics = [keras.metrics.SparseCategoricalAccuracy()]

loss_tracking_metric = keras.metrics.Mean()

def train_step(inputs, targets):
    with tf.GradientTape() as tape:
        predictions = model(inputs, training=True)
        loss = loss_fn(targets, predictions)
    
    gradients = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(gradients, model.trainable_weights))
    
    logs = {}
    for metric in metrics:
        metric.update_state(targets, predictions)
        logs[metric.name] = metric.result()
    
    loss_tracking_metric.update_state(loss)
    logs["loss"] = loss_tracking_metric.result()
    return logs

重置評量指標

def reset_metrics():
    for metric in metrics:
        metric.reset_state()
    loss_tracking_metric.reset_state()

設計訓練迴圈

training_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
training_dataset = training_dataset.batch(32)
epochs = 3
for epoch in range(epochs):
    reset_metrics()
    for inputs_batch, targets_batch in training_dataset:
        logs = train_step(inputs_batch, targets_batch)
    print(f"Results at the end of epoch {epoch}")
    for key, value in logs.items():
        print(f"...{key}: {value:.4f}")

設計評估迴圈

def test_step(inputs, targets):
    predictions = model(inputs, training=False)
    loss = loss_fn(targets, predictions)

    logs = {}
    for metric in metrics:
        metric.update_state(targets, predictions)
        logs["val_" + metric.name] = metric.result()
    loss_tracking_metric.update_state(loss)
    logs["val_loss"] = loss_tracking_metric.result()
    return logs

val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels))
val_dataset = val_dataset.batch(32)
reset_metrics()
for inputs_batch, targets_batch in val_dataset:
    logs = test_step(inputs_batch, targets_batch)
print("Evaluation results:")
for key, value in logs.items():
    print(f"...{key}: {value:.4f}")

利用 tf.function 來加速

  • 只要在要編譯的函式前加上 @tf.function 裝飾器就可以將 TensorFlow 程式碼編譯成運算圖(computation graph)。

搭配 fit() 和自定義的訓練的迴圈

from tensorflow import keras
from tensorflow.keras import layers
import tensorflow as tf

# 1. 將 optimizer 移到類別內部
class CustomModel(keras.Model):
    def __init__(self, inputs, outputs):
        super().__init__(inputs=inputs, outputs=outputs)
        self.loss_tracker = keras.metrics.Mean(name="loss")  # 2. 移到類別內
        self.optimizer = keras.optimizers.RMSprop()  # 3. 加入 optimizer
        self.loss_fn = keras.losses.SparseCategoricalCrossentropy()

    def train_step(self, data):
        inputs, targets = data
        with tf.GradientTape() as tape:
            predictions = self(inputs, training=True)
            loss = self.loss_fn(targets, predictions)
        
        gradients = tape.gradient(loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_weights))  # 4. 使用 self.optimizer

        self.loss_tracker.update_state(loss)  # 5. 使用 self.loss_tracker
        return {"loss": self.loss_tracker.result()}

    @property
    def metrics(self):
        return [self.loss_tracker]  # 6. 使用 self.loss_trackerxqf

# 建立模型
inputs = keras.Input(shape=(28 * 28,))
features = layers.Dense(512, activation="relu")(inputs)
features = layers.Dropout(0.5)(features)
outputs = layers.Dense(10, activation="softmax")(features)
model = CustomModel(inputs=inputs, outputs=outputs)

model.compile()  # 7. 移除 optimizer 參數
model.fit(train_images, train_labels, epochs=3)