Skip to content
Rain Hu's Workspace
Go back

[AI] 3-7. Keras API

Rain Hu

基礎篇

1. Layer

分類

Keras 中的基礎 Layer 類別

from tensorflow import tf

class SimpleDemo(keras.layers.Layer):
    def __init__(self, units, activation=None):
        super().__init__()
        self.units = units
        self.activation = activation
    
	def build(self, input_shape):
		input_dim = input_shape[-1]
		self.W = self.add_weight(shape=(input_dim, self.units),initializer="random_normal")
		self.b = self.add_weight(shape=(self.units,),initializer="zeros")
	
	def call(self, inputs): 
		y = tf.matmul(inputs, self.W) + self.b
		if self.activation is not None:
			y = self.activation(y)
		return y
sample_layer = SimpleDemo(units=32, activation=tf.nn.relu)
model = keras.Sequential([
	SimpleDense(32, activation="relu"),
	SimpleDense(64, activation="relu"),
	SimpleDense(32, activation="relu"),
	SimpleDense(10, activation="softmax")
])

2. Model

3. 編譯

model = keras.Sequential([keras.layers.Dense(1)])
model.compile(optimizer="rmsprop",
							loss="mean_square_error",
							metrics=["accuracy"])
model.compile(optimizer=keras.optimizer.RMSprop(),
						  loss=keras.losses.meanSquaredError(),
							metrics=[keras.metrics.BinaryAccuracy()])
model.compile(optimizer=keras.optimizer.RMSprop(learning_rate=1e-4),
							loss=my_custom_loss,
							metrics=[my_custom_metric_1, my_custom_metric_2])

4. 選擇損失函數

5. fit()

history = model.fit(
	inputs,
	targets,
	epochs=5,
	batch_size=128
)

6. 驗證資料

model = keras.Sequential([keras.layers.Dense(1)])
model.compile(optimizer=keras.optimizer.RMSprop(learning_rate=0.1),
   					  loss=keras.losses.MeanSquaredError(),
							metrics=[keras.metrics.BinaryAccuracy()])

# 洗牌
indices_permutation = np.random.permutation(len(inputs))
shuffled_inputs = inputs[indices_permutation]
shuffled_targets = targets[indices_permutation]

# 用 3 成的資料做為驗證集
num_validation_samples = int(0.3 * len(inputs)) 
val_inputs = shuffled_inputs[:num_validation_samples]
val_targets = shuffled_targets[:num_validation_samples]
training_inputs = shuffled_inputs[num_validation_samples]
training_target = shuffled_targets[num_validation_samples]
model.fit(
	training_inputs,
	training_targets,
	epochs=5
	batch_size=16,
	validation_data=(val_inputs, val_targets0)
)
loss_and_metrics = model.evaluate(val_inputs, val_targets, batch_size=128)

7. 推論(Inference)

進階篇

建構 Keras 模型的不同方法

1. 序列式模型 (Sequential Model)

from tensorflow import keras
from tensorflow.keras import layers

model = keras.Sequential([
    layers.Dense(64, activation="relu"),
    layers.Dense(10, activation="softmax")
])
model = keras.Sequential()
model.add(layers.Dense(64, activation="relu"))
model.add(layers.Dense(10, activation="softmax"))
model.build(input_size=(None, 3))
model.weights
model.summary()

summary

model = keras.Sequential(name="my_sequential_model")
model.add(layers.Dense(64, activation="relu", name="hidden_layer"))
model.add(layers.Dense(10, activation="softmax", name="output_layer"))
model.build(input_shape=(None, 3))
model.summary()

naming

model = keras.Sequential()
model.add(keras.Input(shape=(3,)))
model.add(layers.Dense(64, activation="relu"))
model.summary()

preclaim

2. 函數式 API (Functional API)

inputs = keras.Input(shape=(3,), name="funtinoal_api")     # 實際上這一層並非一個神經層,只是用來建立模型輸入點的張量物件
features = layers.Dense(64, activation="relu")(inputs)
outputs = layers.Dense(10, activation="softmax")(features)
model = keras.Model(inputs=inputs, outputs=outputs)
model.summary()

functional_api

print(inputs.shape)
print(inputs.dtype)
print(features.shape)
print(features.dtype)

> (None, 3)
> float32
> (None, 64)
> float32
news_size = 10000
tag_size = 20
department_size = 5

title = keras.Input(shape=(news_size,), name="title")
content = keras.Input(shape=(news_size,), name="content")
tags = keras.Input(shape=(tag_size,), name="tags")

features = layers.Concatenate()([title, content, tags])
features = layers.Dense(64, activation="relu")(features)

priority = (layers.Dense(1, activation="sigmoid", name="priority")(features))
department = (layers.Dense(department_size, activation="softmax", name="department")(features))

model = keras.Model(inputs=[title, content, tags], outputs=[priority, department])
flowchart TD
    subgraph Inputs
        T[title<br/>shape: 10000] --> C[Concatenate]
        Co[content<br/>shape: 10000] --> C
        Ta[tags<br/>shape: 20] --> C
    end

    subgraph Hidden
        C --> D[Dense Layer<br/>units: 64<br/>activation: relu]
    end

    subgraph Outputs
        D --> P[Priority Output<br/>Dense Layer<br/>units: 1<br/>activation: sigmoid]
        D --> Dep[Department Output<br/>Dense Layer<br/>units: 5<br/>activation: softmax]
    end

    style T fill:#f9f,stroke:#333,stroke-width:2px
    style Co fill:#f9f,stroke:#333,stroke-width:2px
    style Ta fill:#f9f,stroke:#333,stroke-width:2px
    style C fill:#bbf,stroke:#333,stroke-width:2px
    style D fill:#bbf,stroke:#333,stroke-width:2px
    style P fill:#bfb,stroke:#333,stroke-width:2px
    style Dep fill:#bfb,stroke:#333,stroke-width:2px
from keras.utils import plot_model
plot_model(model, "news_classifier.png")

plot_model

model.layers

> [<InputLayer name=title, built=True>,
   <InputLayer name=content, built=True>,
   <InputLayer name=tags, built=True>,
   <Concatenate name=concatenate, built=True>,
   <Dense name=dense_14, built=True>,
   <Dense name=priority, built=True>,
   <Dense name=department, built=True>]
features = model.layers[4].output
potential = layers.Dense(3, activation="softmax", name="potential")(features)
new_model = keras.Model([title, content, tags],
                        [priority, department, potential])
plot_model(new_model, "new_news_classifier.png", show_layer_names=True)

add_output

3. 繼承 Model 類別 (Subclassing the model class)

class NewsModel(keras.Model):
    def __init__(self, n):  
        super().__init__()   # 要先呼叫父類別的初始化函式
        self.concat_layer = layers.Concatenate()
        self.mixing_layer = layers.Dense(64, activation='relu')
        self.priority_scorer = layers.Dense(1, activation='sigmoid')
        self.department_scorer = layers.Dense(n, activation='softmax')

    def call(self, inputs):  # 定義正向傳播的過程
        title = inputs["title"]
        content = inputs["content"]
        tags = inputs["tags"]
        features = self.concat_layer([title, content, tags])
        features = self.mixing_layer(features)
        priority = self.priority_scorer(features)
        department = self.department_scorer(features)
        return priority, department

model = NewsModel(n=5)
priority, department = model({
    "title": title_data,
    "content": content_data,
    "tags": tags_data})
model.compile(optimizer='rmsprop',
              loss=['mean_squared_error', 'categorical_crossentropy'],
              metrics=[['mean_absolute_error'], ['accuracy']])

model.fit({
    "title": title_data,
    "content": content_data,
    "tags": tags_data
}, [priority_data, department_data], epochs=1)

model.evaluate({
    "title": title_data,
    "content": content_data,
    "tags": tags_data
}, [priority_data, department_data])

priority_preds, department_preds = model.predict({
    "title": title_data,
    "content": content_data,
    "tags": tags_data
})

Mixing

設計評量指標

方均根誤差(root mean squared error, RMSE)

import tensorflow as tf

class RootMeanSquaredError(tf.keras.metrics.Metric):
    def __init__(self, name="rmse", **kwargs):
        super().__init__(name=name, **kwargs)
        self.mse_sum = self.add_weight(name="mse_sum", initializer="zeros")
        self.total_samples = self.add_weight(name="total_samples", initializer="zeros", dtype="int32")
    
    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.one_hot(y_true, depth=tf.shape(y_pred)[1])
        mse = tf.reduce_sum(tf.square(y_true - y_pred))
        self.mse_sum.assign_add(mse)
        num_samples = tf.shape(y_pred)[0]
        self.total_samples.assign_add(num_samples)
    
    def result(self):
        rmse = tf.sqrt(self.mse_sum / tf.cast(self.total_samples, tf.float32))
        return rmse

    def reset_state(self):
        self.mse_sum.assign(0.)
        self.total_samples.assign(0)

model = get_mnist_model()
model.compile(optimizer="adam",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy", RootMeanSquaredError()])
model.fit(train_images, train_labels,
          epochs=3,
          validation_data=(val_images, val_labels))
test_metrics = model.evaluate(test_images, test_labels)

使用 callbacks 模組

EarlyStopping + ModelCheckpoint

callbacks = [
    keras.callbacks.EarlyStopping(
        monitor="val_accuracy",   #監控模型的驗證準確度
        patience=2,   # 超過兩個週期沒有改善就停止
    ),
    keras.callbacks.ModelCheckpoint(       # 在每個週期結束後儲存模型
        filepath="checkpoint_path.keras",  # 存檔路徑及檔名
        monitor="val_loss",
        save_best_only=True,               # 當 val_loss 有改善時才儲存模型
    )
]

model = get_mnist_model()
model.compile(optimizer="adam",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])   # 對應到監控
model.fit(train_images, train_labels,
          epochs=10,
          callbacks=callbacks,       # 在 training 時加入 callbacks
          validation_data=(val_images, val_labels))  # 一定要加入驗證集

設計 custom callback

from matplotlib import pyplot as plt

class LossHistory(keras.callbacks.Callback):
	def on_train_begin(self, logs):
		self.per_batch_losses = []

	def on_batch_end(self, batch, logs):
		self.per_batch_losses.append(logs.get('loss'))

	def on_epoch_end(self, epoch, logs):
		plt.clf()
		plt.plot(range(len(self.per_batch_losses)),
			self.per_batch_losses,
			label='Training loss for each batch')
		plt.xlabel(f'Batch (epoch {epoch})')
		plt.ylabel("Loss")
		plt.legend()
		plt.savefig(f"plot_at_epoch_{epoch}")
		self.per_batch_losses = []

model = get_mnist_model()
model.compile(optimizer="adam",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])
model.fit(train_images, train_labels,
          epochs=10,
          callbacks=[LossHistory()],
          validation_data=(val_images, val_labels))

使用 TensorBoard

model = get_mnist_model()
model.compile(optimizer="adam",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])
tensorboard = keras.callbacks.TensorBoard(
    log_dir="/tensor_board_log"  # 定記錄檔案的寫入位置
)
model.fit(train_images, train_labels,
          epochs=10,
          validation_data=(val_images, val_labels),
          callbacks=[tensorboard])
%load_ext tensorboard
%tensorboard --logdir /tensor_board_log

tensorboard

實作演練

from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.datasets import mnist

def get_mnist_model():
    inputs = keras.Input(shape=(28 * 28,))
    features = layers.Dense(512, activation="relu")(inputs)
    features = layers.Dropout(0.5)(features)
    outputs = layers.Dense(10, activation="softmax")(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

(images, labels), (test_images, test_labels) = mnist.load_data()
images = images.reshape((60000, 28 * 28)).astype("float32") / 255
test_images = test_images.reshape((10000, 28 * 28)).astype("float32") / 255
train_images, val_images = images[10000:], images[:10000]
train_labels, val_labels = labels[10000:], labels[:10000]

model = get_mnist_model()
model.compile(optimizer="adam",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])
model.fit(train_images, train_labels, epochs=3,
          validation_data=(val_images, val_labels))
test_metrics = model.evaluate(test_images, test_labels)
predictions = model.predict(test_images)
print(test_metrics)

Share this post on:

Previous
[AI] 3-8. 客製化 Training
Next
[AI] 3-6. 實作線性分類器