Python 是一种动态脚本语言。它不仅具有动态类型系统(可以先将变量分配给一种类型并稍后更改),而且它的对象模型也是动态的。这允许我们在运行时修改其行为。这样做的结果是猴子修补的可能性。这是一个我们可以在不修改更高层代码的情况下修改程序底层的想法。想象一下,您可以使用该print()
函数将某些内容打印到屏幕上,并且我们可以修改该函数的定义以将其打印到文件中,而无需修改任何代码行。
这是可能的,因为Python是一种解释性语言,所以我们可以在程序运行时进行更改。我们可以在Python中利用这个属性来修改类或模块的接口。如果我们正在处理遗留代码或其他人的代码,我们不想对其进行广泛修改,但仍希望使其在不同版本的库或环境中运行,那么它非常有用。在323ai导航网的本教程中,我们将了解如何将此技术应用于一些 Keras 和 TensorFlow 代码。
完成本教程后,您将学到:
- 什么是猴子补丁
- 如何在运行时更改 Python 中的对象或模块
教程概述
本教程分为三个部分;他们是:
- 一种型号,两种接口
- 通过猴子修补来扩展对象
- 猴子修补以恢复遗留代码
一种型号,两种接口
TensorFlow 是一个巨大的库。它提供了一个高级 Keras API 来分层描述深度学习模型。它还带有许多用于训练的功能,例如不同的优化器和数据生成器。仅仅因为我们需要运行经过训练的模型,安装 TensorFlow 就让人不知所措。因此,TensorFlow 为我们提供了一个名为TensorFlow Lite 的对应版本 ,它的尺寸要小得多,适合在移动或嵌入式设备等小型设备中运行。
我们想要展示原始 TensorFlow Keras 模型和 TensorFlow Lite 模型的使用方式有何不同。那么我们来做一个大小适中的模型,比如LeNet-5模型。以下是我们加载 MNIST 数据集并训练分类模型的方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Dense, AveragePooling2D, Dropout, Flatten
from tensorflow.keras.callbacks import EarlyStopping
# Load MNIST data
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# Reshape data to shape of (n_sample, height, width, n_channel)
X_train = np.expand_dims(X_train, axis=3).astype(‘float32’)
X_test = np.expand_dims(X_test, axis=3).astype(‘float32’)
# LeNet5 model: ReLU can be used intead of tanh
model = Sequential([
Conv2D(6, (5,5), input_shape=(28,28,1), padding=“same”, activation=“tanh”),
AveragePooling2D((2,2), strides=2),
Conv2D(16, (5,5), activation=“tanh”),
AveragePooling2D((2,2), strides=2),
Conv2D(120, (5,5), activation=“tanh”),
Flatten(),
Dense(84, activation=“tanh”),
Dense(10, activation=“softmax”)
])
# Training
model.compile(loss=“sparse_categorical_crossentropy”, optimizer=“adam”, metrics=[“sparse_categorical_accuracy”])
earlystopping = EarlyStopping(monitor=“val_loss”, patience=4, restore_best_weights=True)
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=100, batch_size=32, callbacks=[earlystopping]
|
运行上述代码将使用 TensorFlow 的数据集 API 下载 MNIST 数据集并训练模型。之后,我们可以保存模型:
1
|
model.save(“lenet5-mnist.h5”)
|
或者我们可以用我们的测试集评估模型:
1
2
|
print(np.argmax(model.predict(X_test), axis=1))
print(y_test)
|
然后我们应该看到:
1
2
|
[7 2 1 … 4 5 6]
[7 2 1 … 4 5 6]
|
但如果我们打算将其与 TensorFlow Lite 一起使用,我们需要将其转换为 TensorFlow Lite 格式,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# tflite conversion with dynamic range optimization
import tensorflow as tf
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
# Optional: Save the data for testing
import numpy as np
np.savez(‘mnist-test.npz’, X=X_test, y=y_test)
# Save the model.
with open(‘lenet5-mnist.tflite’, ‘wb’) as f:
f.write(tflite_model
|
我们可以向转换器添加更多选项,例如减少模型以使用 16 位浮点。但在所有情况下,转换的输出都是二进制字符串。转换不仅会将模型减小到更小的尺寸(与从 Keras 保存的 HDF5 文件的大小相比),而且还允许我们将其与轻量级库一起使用。有适用于 Android 和 iOS 移动设备的库。如果您使用嵌入式 Linux,则可以从 PyPI 存储库找到该tflite-runtime
模块(或者可以从 TensorFlow 源代码编译一个模块)。以下是我们如何tflite-runtime
运行转换后的模型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import numpy as np
import tflite_runtime.interpreter as tflite
loaded = np.load(‘mnist-test.npz’)
X_test = loaded[“X”]
y_test = loaded[“y”]
interpreter = tflite.Interpreter(model_path=“lenet5-mnist.tflite”)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print(input_details[0][‘shape’])
rows = []
for n in range(len(X_test)):
# this model has single input and single output
interpreter.set_tensor(input_details[0][‘index’], X_test[n:n+1])
interpreter.invoke()
row = interpreter.get_tensor(output_details[0][‘index’])
rows.append(row)
rows = np.vstack(rows)
accuracy = np.sum(np.argmax(rows, axis=1) == y_test) / len(y_test)
print(accuracy
|
事实上,更大的 TensorFlow 库也可以以非常相似的语法运行转换后的模型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import numpy as np
import tensorflow as tf
interpreter = tf.lite.Interpreter(model_path=“lenet5-mnist.tflite”)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
rows = []
for n in range(len(X_test)):
# this model has single input and single output
interpreter.set_tensor(input_details[0][‘index’], X_test[n:n+1])
interpreter.invoke()
row = interpreter.get_tensor(output_details[0][‘index’])
rows.append(row)
rows = np.vstack(rows)
accuracy = np.sum(np.argmax(rows, axis=1) == y_test) / len(y_test)
print(accuracy)
|
请注意使用模型的不同方式:在 Keras 模型中,我们有一个predict()
函数将批次作为输入并返回结果。然而,在 TensorFlow Lite 模型中,我们必须一次向“解释器”注入一个输入张量并调用它,然后检索结果。
将所有内容放在一起,下面的代码是我们如何构建 Keras 模型、训练它、将其转换为 TensorFlow Lite 格式并使用转换后的模型进行测试:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Dense, AveragePooling2D, Dropout, Flatten
from tensorflow.keras.callbacks import EarlyStopping
# Load MNIST data
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# Reshape data to shape of (n_sample, height, width, n_channel)
X_train = np.expand_dims(X_train, axis=3).astype(‘float32’)
X_test = np.expand_dims(X_test, axis=3).astype(‘float32’)
# LeNet5 model: ReLU can be used intead of tanh
model = Sequential([
Conv2D(6, (5,5), input_shape=(28,28,1), padding=“same”, activation=“tanh”),
AveragePooling2D((2,2), strides=2),
Conv2D(16, (5,5), activation=“tanh”),
AveragePooling2D((2,2), strides=2),
Conv2D(120, (5,5), activation=“tanh”),
Flatten(),
Dense(84, activation=“tanh”),
Dense(10, activation=“softmax”)
])
# Training
model.compile(loss=“sparse_categorical_crossentropy”, optimizer=“adam”, metrics=[“sparse_categorical_accuracy”])
earlystopping = EarlyStopping(monitor=“val_loss”, patience=4, restore_best_weights=True)
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=100, batch_size=32, callbacks=[earlystopping])
# Save model
model.save(“lenet5-mnist.h5”)
# Compare the prediction vs test data
print(np.argmax(model.predict(X_test), axis=1))
print(y_test)
# tflite conversion with dynamic range optimization
import tensorflow as tf
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
# Optional: Save the data for testing
import numpy as np
np.savez(‘mnist-test.npz’, X=X_test, y=y_test)
# Save the tflite model.
with open(‘lenet5-mnist.tflite’, ‘wb’) as f:
f.write(tflite_model)
# Load the tflite model and run test
interpreter = tf.lite.Interpreter(model_path=“lenet5-mnist.tflite”)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
rows = []
for n in range(len(X_test)):
# this model has single input and single output
interpreter.set_tensor(input_details[0][‘index’], X_test[n:n+1])
interpreter.invoke()
row = interpreter.get_tensor(output_details[0][‘index’])
rows.append(row)
rows = np.vstack(rows)
accuracy = np.sum(np.argmax(rows, axis=1) == y_test) / len(y_test)
print(accuracy)
|
使用 Monkey Patching 扩展对象
predict()
我们可以在 TensorFlow Lite 解释器中使用吗 ?
解释器对象没有这样的功能。但由于我们使用的是 Python,因此我们可以使用猴子修补技术来添加它。为了理解我们在做什么,首先,我们必须注意interpreter
我们在前面的代码中定义的对象可能包含许多属性和函数。当我们 interpreter.predict()
像函数一样调用时,Python会在对象内部查找具有该名称的函数,然后执行它。如果没有找到这样的名称,Python 将引发AttributeError
异常:
1
2
|
…
interpreter.predict()
|
这给出:
1
2
3
4
|
Traceback (most recent call last):
File “/Users/MLM/pred_error.py”, line 13, in <module>
interpreter.predict()
AttributeError: ‘Interpreter’ object has no attribute ‘predict’
|
interpreter
为了实现这一点,我们需要向名为 的对象 添加一个函数predict
,并且该函数在被调用时应该表现得像一个函数。为了简单起见,我们注意到我们的模型是一个顺序模型,以数组作为输入,并返回一组 softmax 结果作为输出。因此,我们可以编写predict()
一个与 Keras 模型中的函数类似的函数,但使用 TensorFlow Lite 解释器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
...
# Monkey patching the tflite model
def predict(self, input_batch):
batch_size = len(input_batch)
output = []
input_details = self.get_input_details()
output_details = self.get_output_details()
# Run each sample from the batch
for sample in range(batch_size):
self.set_tensor(input_details[0][“index”], input_batch[sample:sample+1])
self.invoke()
sample_output = self.get_tensor(output_details[0][“index”])
output.append(sample_output)
# vstack the output of each sample
return np.vstack(output)
interpreter.predict = predict.__get__(interpreter)
|
上面的最后一行将我们创建的函数分配给对象interpreter
,名称为 predict
。该 __get__(interpreter)
部分需要使我们定义的函数成为对象的成员函数 interpreter
。
有了这些,我们现在可以运行一个批处理:
1
2
3
4
5
6
7
|
...
out_proba = interpreter.predict(X_test)
out = np.argmax(out_proba, axis=1)
print(out)
accuracy = np.sum(out == y_test) / len(y_test)
print(accuracy)
|
1
2
|
[7 2 1 … 4 5 6]
0.9879
|
这是可能的,因为 Python 具有动态对象模型。我们可以在运行时修改对象的属性或成员函数。事实上,这不应该让我们感到惊讶。Keras 模型需要model.compile()
先运行,然后才能运行 model.fit()
。loss
编译函数的作用之一是向模型添加属性 以保存损失函数。这是在运行时完成的。
将 predict()
函数添加到 对象后,我们可以像经过训练的 Keras 模型一样interpreter
传递 对象进行预测。interpreter
虽然它们在幕后有所不同,但它们共享相同的接口,因此其他函数可以使用它而无需修改任何代码行。
下面是加载我们保存的 TensorFlow Lite 模型的完整代码,然后将函数修补predict()
到它,使其看起来像 Keras 模型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
# Load MNIST data and reshape
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = np.expand_dims(X_train, axis=3).astype(‘float32’)
X_test = np.expand_dims(X_test, axis=3).astype(‘float32’)
# Monkey patching the tflite model
def predict(self, input_batch):
batch_size = len(input_batch)
output = []
input_details = self.get_input_details()
output_details = self.get_output_details()
# Run each sample from the batch
for sample in range(batch_size):
self.set_tensor(input_details[0][“index”], input_batch[sample:sample+1])
self.invoke()
sample_output = self.get_tensor(output_details[0][“index”])
output.append(sample_output)
# vstack the output of each sample
return np.vstack(output)
# Load and monkey patch
interpreter = tf.lite.Interpreter(model_path=“lenet5-mnist.tflite”)
interpreter.predict = predict.__get__(interpreter)
interpreter.allocate_tensors()
# test output
out_proba = interpreter.predict(X_test)
out = np.argmax(out_proba, axis=1)
print(out)
accuracy = np.sum(out == y_test) / len(y_test)
print(accuracy)
|
猴子修补以恢复遗留代码
我们可以再举一个 Python 中猴子修补的例子。考虑以下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# https://machinelearningmastery.com/dropout-regularization-deep-learning-models-keras/
# Example of Dropout on the Sonar Dataset: Hidden Layer
from pandas import read_csv
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Dropout
from keras.wrappers.scikit_learn import KerasClassifier
from keras.constraints import maxnorm
from keras.optimizers import SGD
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
# load dataset
dataframe = read_csv(“sonar.csv”, header=None)
dataset = dataframe.values
# split into input (X) and output (Y) variables
X = dataset[:,0:60].astype(float)
Y = dataset[:,60]
# encode class values as integers
encoder = LabelEncoder()
encoder.fit(Y)
encoded_Y = encoder.transform(Y)
# dropout in hidden layers with weight constraint
def create_model():
# create model
model = Sequential()
model.add(Dense(60, input_dim=60, activation=‘relu’, kernel_constraint=maxnorm(3)))
model.add(Dropout(0.2))
model.add(Dense(30, activation=‘relu’, kernel_constraint=maxnorm(3)))
model.add(Dropout(0.2))
model.add(Dense(1, activation=‘sigmoid’))
# Compile model
sgd = SGD(lr=0.1, momentum=0.9)
model.compile(loss=‘binary_crossentropy’, optimizer=sgd, metrics=[‘accuracy’])
return model
estimators = []
estimators.append((‘standardize’, StandardScaler()))
estimators.append((‘mlp’, KerasClassifier(build_fn=create_model, epochs=300, batch_size=16, verbose=0)))
pipeline = Pipeline(estimators)
kfold = StratifiedKFold(n_splits=10, shuffle=True)
results = cross_val_score(pipeline, X, encoded_Y, cv=kfold)
print(“Hidden: %.2f%% (%.2f%%)” % (results.mean()*100, results.std()*100))
|
该代码是几年前编写的,并假设使用旧版本的 Keras 和 TensorFlow 1.x。数据文件可以在另一篇文章sonar.csv
中找到。如果我们使用 TensorFlow 2.5 运行此代码,我们将看到. 为了使其运行,我们至少需要对上面的代码进行两处更改:ImportError
SGD
- 函数和类应该从而不是
tensorflow.keras
导入keras
- 约束类
maxnorm
应该采用驼峰式大小写,MaxNorm
以下是更新后的代码,其中我们仅修改了 import 语句:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
# Example of Dropout on the Sonar Dataset: Hidden Layer
from pandas import read_csv
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from tensorflow.keras.constraints import MaxNorm as maxnorm
from tensorflow.keras.optimizers import SGD
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
# load dataset
dataframe = read_csv(“sonar.csv”, header=None)
dataset = dataframe.values
# split into input (X) and output (Y) variables
X = dataset[:,0:60].astype(float)
Y = dataset[:,60]
# encode class values as integers
encoder = LabelEncoder()
encoder.fit(Y)
encoded_Y = encoder.transform(Y)
# dropout in hidden layers with weight constraint
def create_model():
# create model
model = Sequential()
model.add(Dense(60, input_dim=60, activation=‘relu’, kernel_constraint=maxnorm(3)))
model.add(Dropout(0.2))
model.add(Dense(30, activation=‘relu’, kernel_constraint=maxnorm(3)))
model.add(Dropout(0.2))
model.add(Dense(1, activation=‘sigmoid’))
# Compile model
sgd = SGD(lr=0.1, momentum=0.9)
model.compile(loss=‘binary_crossentropy’, optimizer=sgd, metrics=[‘accuracy’])
return model
estimators = []
estimators.append((‘standardize’, StandardScaler()))
estimators.append((‘mlp’, KerasClassifier(build_fn=create_model, epochs=300, batch_size=16, verbose=0)))
pipeline = Pipeline(estimators)
kfold = StratifiedKFold(n_splits=10, shuffle=True)
results = cross_val_score(pipeline, X, encoded_Y, cv=kfold)
print(“Hidden: %.2f%% (%.2f%%)” % (results.mean()*100, results.std()*100))
|
如果我们有一个更大的项目,有很多脚本,修改每一行导入都会很乏味。但Python的模块系统只是一个字典 sys.modules
。因此,我们可以对其进行猴子修补以使旧代码适合新库。以下是我们的做法。这适用于 TensorFlow 2.5 安装(Keras 代码的向后兼容性问题已在 TensorFlow 2.9 中修复;因此您不需要在最新版本的库中进行此修补):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
# monkey patching
import sys
import tensorflow.keras
tensorflow.keras.constraints.maxnorm = tensorflow.keras.constraints.MaxNorm
for x in sys.modules.keys():
if x.startswith(“tensorflow.keras”):
sys.modules[x[len(“tensorflow.”):]] = sys.modules[x]
# Old code below:
# Example of Dropout on the Sonar Dataset: Hidden Layer
from pandas import read_csv
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Dropout
from keras.wrappers.scikit_learn import KerasClassifier
from keras.constraints import maxnorm
from keras.optimizers import SGD
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
# load dataset
dataframe = read_csv(“sonar.csv”, header=None)
dataset = dataframe.values
# split into input (X) and output (Y) variables
X = dataset[:,0:60].astype(float)
Y = dataset[:,60]
# encode class values as integers
encoder = LabelEncoder()
encoder.fit(Y)
encoded_Y = encoder.transform(Y)
# dropout in hidden layers with weight constraint
def create_model():
# create model
model = Sequential()
model.add(Dense(60, input_dim=60, activation=‘relu’, kernel_constraint=maxnorm(3)))
model.add(Dropout(0.2))
model.add(Dense(30, activation=‘relu’, kernel_constraint=maxnorm(3)))
model.add(Dropout(0.2))
model.add(Dense(1, activation=‘sigmoid’))
# Compile model
sgd = SGD(lr=0.1, momentum=0.9)
model.compile(loss=‘binary_crossentropy’, optimizer=sgd, metrics=[‘accuracy’])
return model
estimators = []
estimators.append((‘standardize’, StandardScaler()))
estimators.append((‘mlp’, KerasClassifier(build_fn=create_model, epochs=300, batch_size=16, verbose=0)))
pipeline = Pipeline(estimators)
kfold = StratifiedKFold(n_splits=10, shuffle=True)
results = cross_val_score(pipeline, X, encoded_Y, cv=kfold)
print(“Hidden: %.2f%% (%.2f%%)” % (results.mean()*100, results.std()*100))
|
这绝对不是一个干净整洁的代码,而且会给以后的维护带来问题。因此,猴子补丁在生产代码中是不受欢迎的。然而,这将是一种快速的技术,它利用 Python 语言的内部机制来让某些东西轻松工作。