- 內建的
fit()
只適用於監督式學習(supervised learning),然而不是所有的機器學習任務都適用,如生成式學習(generative learning)、自監督式學習(self-supervised learning)、強化式學習(reinforcement learning) 等。
監督式學習的訓練
- 在實作 Keras 訓練迴圈時有兩個重要細節:
training 參數: 某些層(如 Dropout)在訓練和推論時行為不同 訓練時需設定
training=True
推論時設定training=False
模型權重分類:
- Trainable weights: 可訓練的權重
- Non-trainable weights: 不可訓練的權重(如 BatchNormalization 層的統計值)
取梯度時應使用
model.trainable_weights
以下是完整可執行範例:
import tensorflow as tf
from tensorflow import keras
import numpy as np
# 建立範例資料
x_train = np.random.random((1000, 28, 28))
y_train = np.random.randint(10, size=(1000,))
# 建立包含 Dropout 和 BatchNormalization 的模型
model = keras.Sequential([
keras.layers.Flatten(input_shape=(28, 28)),
keras.layers.Dense(128),
keras.layers.BatchNormalization(),
keras.layers.Dropout(0.5),
keras.layers.Dense(10)
])
# 損失函數與優化器
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = keras.optimizers.Adam(learning_rate=1e-3)
# 自定義訓練步驟
@tf.function
def train_step(inputs, targets):
with tf.GradientTape() as tape:
# 訓練模式前向傳播
predictions = model(inputs, training=True)
loss = loss_fn(targets, predictions)
# 計算可訓練權重的梯度
gradients = tape.gradient(loss, model.trainable_weights)
optimizer.apply_gradients(zip(gradients, model.trainable_weights))
return loss
# 訓練迴圈
batch_size = 32
for epoch in range(5):
print(f"\nEpoch {epoch+1}")
for step in range(0, len(x_train), batch_size):
x_batch = x_train[step:step + batch_size]
y_batch = y_train[step:step + batch_size]
loss = train_step(x_batch, y_batch)
if step % 200 == 0:
print(f"Step {step}: loss = {loss:.4f}")
# 推論時使用 training=False
test_predictions = model(x_train[:1], training=False)
評量指標
- 在訓練期間,我們可以用 Keras 的評量指標來查詢當前的指標值,我們會用到幾個函式:
update_state(y_true, y_pred)
result()
reset_state()
import tensorflow as tf
from tensorflow import keras
import numpy as np
# 建立模型
model = keras.Sequential([
keras.layers.Dense(64, activation='relu'),
keras.layers.Dense(10, activation='softmax')
])
# 初始化指標追蹤器
accuracy_tracker = keras.metrics.SparseCategoricalAccuracy()
loss_tracker = keras.metrics.Mean()
# 訓練步驟
@tf.function
def train_step(inputs, targets):
with tf.GradientTape() as tape:
predictions = model(inputs, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(targets, predictions)
# 更新梯度
gradients = tape.gradient(loss, model.trainable_weights)
optimizer.apply_gradients(zip(gradients, model.trainable_weights))
# 更新指標
accuracy_tracker.update_state(targets, predictions)
loss_tracker.update_state(loss)
return loss
# 訓練迴圈
x_train = np.random.random((1000, 32))
y_train = np.random.randint(10, size=(1000,))
optimizer = keras.optimizers.Adam()
batch_size = 32
for epoch in range(3):
# 重置每個 epoch 的指標
accuracy_tracker.reset_state()
loss_tracker.reset_state()
for step in range(0, len(x_train), batch_size):
x_batch = x_train[step:step + batch_size]
y_batch = y_train[step:step + batch_size]
loss = train_step(x_batch, y_batch)
if step % 200 == 0:
print(
f"Step {step}: ",
f"Loss: {loss_tracker.result():.4f}, ",
f"Accuracy: {accuracy_tracker.result():.4f}"
)
完整的訓練與評估迴圈
設計練訓函式
model = get_mnist_model()
loss_fn = keras.losses.SparseCategoricalCrossentropy()
optimizer = keras.optimizers.RMSprop()
metrics = [keras.metrics.SparseCategoricalAccuracy()]
loss_tracking_metric = keras.metrics.Mean()
def train_step(inputs, targets):
with tf.GradientTape() as tape:
predictions = model(inputs, training=True)
loss = loss_fn(targets, predictions)
gradients = tape.gradient(loss, model.trainable_weights)
optimizer.apply_gradients(zip(gradients, model.trainable_weights))
logs = {}
for metric in metrics:
metric.update_state(targets, predictions)
logs[metric.name] = metric.result()
loss_tracking_metric.update_state(loss)
logs["loss"] = loss_tracking_metric.result()
return logs
重置評量指標
def reset_metrics():
for metric in metrics:
metric.reset_state()
loss_tracking_metric.reset_state()
設計訓練迴圈
training_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
training_dataset = training_dataset.batch(32)
epochs = 3
for epoch in range(epochs):
reset_metrics()
for inputs_batch, targets_batch in training_dataset:
logs = train_step(inputs_batch, targets_batch)
print(f"Results at the end of epoch {epoch}")
for key, value in logs.items():
print(f"...{key}: {value:.4f}")
設計評估迴圈
def test_step(inputs, targets):
predictions = model(inputs, training=False)
loss = loss_fn(targets, predictions)
logs = {}
for metric in metrics:
metric.update_state(targets, predictions)
logs["val_" + metric.name] = metric.result()
loss_tracking_metric.update_state(loss)
logs["val_loss"] = loss_tracking_metric.result()
return logs
val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels))
val_dataset = val_dataset.batch(32)
reset_metrics()
for inputs_batch, targets_batch in val_dataset:
logs = test_step(inputs_batch, targets_batch)
print("Evaluation results:")
for key, value in logs.items():
print(f"...{key}: {value:.4f}")
利用 tf.function 來加速
- 只要在要編譯的函式前加上
@tf.function
裝飾器就可以將 TensorFlow 程式碼編譯成運算圖(computation graph)。
搭配 fit() 和自定義的訓練的迴圈
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow as tf
# 1. 將 optimizer 移到類別內部
class CustomModel(keras.Model):
def __init__(self, inputs, outputs):
super().__init__(inputs=inputs, outputs=outputs)
self.loss_tracker = keras.metrics.Mean(name="loss") # 2. 移到類別內
self.optimizer = keras.optimizers.RMSprop() # 3. 加入 optimizer
self.loss_fn = keras.losses.SparseCategoricalCrossentropy()
def train_step(self, data):
inputs, targets = data
with tf.GradientTape() as tape:
predictions = self(inputs, training=True)
loss = self.loss_fn(targets, predictions)
gradients = tape.gradient(loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(gradients, self.trainable_weights)) # 4. 使用 self.optimizer
self.loss_tracker.update_state(loss) # 5. 使用 self.loss_tracker
return {"loss": self.loss_tracker.result()}
@property
def metrics(self):
return [self.loss_tracker] # 6. 使用 self.loss_trackerxqf
# 建立模型
inputs = keras.Input(shape=(28 * 28,))
features = layers.Dense(512, activation="relu")(inputs)
features = layers.Dropout(0.5)(features)
outputs = layers.Dense(10, activation="softmax")(features)
model = CustomModel(inputs=inputs, outputs=outputs)
model.compile() # 7. 移除 optimizer 參數
model.fit(train_images, train_labels, epochs=3)