- 微软认知工具包(CNTK)教程
- 家
- 介绍
- 入门
- CPU和GPU
- CNTK-序列分类
- CNTK - 逻辑回归模型
- CNTK - 神经网络 (NN) 概念
- CNTK - 创建第一个神经网络
- CNTK - 训练神经网络
- CNTK - 内存中和大型数据集
- CNTK - 测量性能
- 神经网络分类
- 神经网络二元分类
- CNTK - 神经网络回归
- CNTK - 分类模型
- CNTK - 回归模型
- CNTK - 内存不足的数据集
- CNTK - 监控模型
- CNTK - 卷积神经网络
- CNTK - 循环神经网络
- Microsoft 认知工具包资源
- Microsoft 认知工具包 - 快速指南
- Microsoft 认知工具包 - 资源
- Microsoft 认知工具包 - 讨论
CNTK - 内存不足的数据集
本章将解释如何衡量内存不足数据集的性能。
在前面的章节中,我们讨论了验证神经网络性能的各种方法,但我们讨论的方法是处理适合内存的数据集的方法。
这里,问题出现了,内存不足的数据集怎么办,因为在生产场景中,我们需要大量的数据来训练NN。在本节中,我们将讨论如何在使用小批量源和手动小批量循环时衡量性能。
小批量源
在处理内存外数据集(即小批量源)时,我们需要与处理小数据集(即内存数据集)时使用的设置略有不同的损失设置以及指标。首先,我们将了解如何设置一种将数据提供给神经网络模型训练器的方法。
以下是实施步骤 -
步骤 1 - 首先,来自cntk。io 模块导入用于创建小批量源的组件,如下所示:
from cntk.io import StreamDef, StreamDefs, MinibatchSource, CTFDeserializer, INFINITY_REPEAT
步骤 2 - 接下来,创建一个名为create_datasource的新函数。该函数有两个参数,即 filename 和 limit,默认值为INFINITELY_REPEAT。
def create_datasource(filename, limit =INFINITELY_REPEAT)
步骤 3 - 现在,在函数中,通过使用StreamDef类为从具有三个功能的标签字段读取的标签创建流定义。我们还需要将is_sparse设置为False,如下所示:
labels_stream = StreamDef(field=’labels’, shape=3, is_sparse=False)
步骤 4 - 接下来,创建从输入文件读取特征,创建StreamDef的另一个实例,如下所示。
feature_stream = StreamDef(field=’features’, shape=4, is_sparse=False)
步骤 5 - 现在,初始化CTFDeserializer实例类。指定我们需要反序列化的文件名和流,如下所示 -
deserializer = CTFDeserializer(filename, StreamDefs(labels= label_stream, features=features_stream)
步骤 6 - 接下来,我们需要使用反序列化器创建 minisourceBatch 的实例,如下 -
Minibatch_source = MinibatchSource(deserializer, randomize=True, max_sweeps=limit) return minibatch_source
步骤 7 - 最后,我们需要提供我们在前面部分中创建的培训和测试源。我们正在使用鸢尾花数据集。
training_source = create_datasource(‘Iris_train.ctf’) test_source = create_datasource(‘Iris_test.ctf’, limit=1)
创建MinibatchSource实例后,我们需要对其进行训练。我们可以使用与处理小型内存数据集时相同的训练逻辑。在这里,我们将使用MinibatchSource实例作为损失函数训练方法的输入,如下所示 -
以下是实施步骤 -
步骤 1 - 为了记录培训课程的输出,首先从cntk.logging模块导入ProgressPrinter,如下所示 -
from cntk.logging import ProgressPrinter
步骤 2 - 接下来,要设置培训课程,请从cntk.train模块导入培训师和training_session,如下所示 -
from cntk.train import Trainer, training_session
步骤 3 - 现在,我们需要定义一些常量集,例如minibatch_size、samples_per_epoch和num_epochs,如下所示 -
minbatch_size = 16 samples_per_epoch = 150 num_epochs = 30 max_samples = samples_per_epoch * num_epochs
步骤 4 - 接下来,为了知道如何在 CNTK 训练期间读取数据,我们需要定义网络的输入变量与小批量源中的流之间的映射。
input_map = { features: training_source.streams.features, labels: training_source.streams.labels }
步骤 5 - 接下来记录训练过程的输出,使用新的ProgressPrinter实例初始化Progress_printer变量。另外,初始化训练器并为其提供模型,如下所示 -
progress_writer = ProgressPrinter(0) trainer: training_source.streams.labels
步骤6 - 最后,要开始训练过程,我们需要调用training_session函数,如下所示 -
session = training_session(trainer, mb_source=training_source, mb_size=minibatch_size, model_inputs_to_streams=input_map, max_samples=max_samples, test_config=test_config) session.train()
训练模型后,我们可以使用TestConfig对象向此设置添加验证,并将其分配给train_session函数的test_config关键字参数。
以下是实施步骤 -
步骤 1 - 首先,我们需要从模块cntk.train导入TestConfig类,如下所示 -
from cntk.train import TestConfig
步骤 2 - 现在,我们需要使用test_source作为输入创建TestConfig的新实例 -
Test_config = TestConfig(test_source)
完整示例
from cntk.io import StreamDef, StreamDefs, MinibatchSource, CTFDeserializer, INFINITY_REPEAT def create_datasource(filename, limit =INFINITELY_REPEAT) labels_stream = StreamDef(field=’labels’, shape=3, is_sparse=False) feature_stream = StreamDef(field=’features’, shape=4, is_sparse=False) deserializer = CTFDeserializer(filename, StreamDefs(labels=label_stream, features=features_stream) Minibatch_source = MinibatchSource(deserializer, randomize=True, max_sweeps=limit) return minibatch_source training_source = create_datasource(‘Iris_train.ctf’) test_source = create_datasource(‘Iris_test.ctf’, limit=1) from cntk.logging import ProgressPrinter from cntk.train import Trainer, training_session minbatch_size = 16 samples_per_epoch = 150 num_epochs = 30 max_samples = samples_per_epoch * num_epochs input_map = { features: training_source.streams.features, labels: training_source.streams.labels } progress_writer = ProgressPrinter(0) trainer: training_source.streams.labels session = training_session(trainer, mb_source=training_source, mb_size=minibatch_size, model_inputs_to_streams=input_map, max_samples=max_samples, test_config=test_config) session.train() from cntk.train import TestConfig Test_config = TestConfig(test_source)
输出
------------------------------------------------------------------- average since average since examples loss last metric last ------------------------------------------------------ Learning rate per minibatch: 0.1 1.57 1.57 0.214 0.214 16 1.38 1.28 0.264 0.289 48 [………] Finished Evaluation [1]: Minibatch[1-1]:metric = 69.65*30;
手动小批量循环
正如我们在上面看到的,通过使用 CNTK 中的常规 API 进行训练时的指标,可以很容易地测量训练期间和训练之后我们的 NN 模型的性能。但是,另一方面,使用手动小批量循环时事情不会那么容易。
在这里,我们使用下面给出的模型,该模型具有来自 Iris Flower 数据集的 4 个输入和 3 个输出,该模型也在前面的部分中创建 -
from cntk import default_options, input_variable from cntk.layers import Dense, Sequential from cntk.ops import log_softmax, relu, sigmoid from cntk.learners import sgd model = Sequential([ Dense(4, activation=sigmoid), Dense(3, activation=log_softmax) ]) features = input_variable(4) labels = input_variable(3) z = model(features)
接下来,模型的损失被定义为交叉熵损失函数和前面部分中使用的 F 度量指标的组合。我们将使用criteria_factory实用程序,将其创建为 CNTK 函数对象,如下所示 -
import cntk from cntk.losses import cross_entropy_with_softmax, fmeasure @cntk.Function def criterion_factory(outputs, targets): loss = cross_entropy_with_softmax(outputs, targets) metric = fmeasure(outputs, targets, beta=1) return loss, metric loss = criterion_factory(z, labels) learner = sgd(z.parameters, 0.1) label_mapping = { 'Iris-setosa': 0, 'Iris-versicolor': 1, 'Iris-virginica': 2 }
现在,我们已经定义了损失函数,我们将了解如何在训练器中使用它来设置手动训练课程。
以下是实施步骤 -
步骤 1 - 首先,我们需要导入所需的包(如numpy和pandas)来加载和预处理数据。
import pandas as pd import numpy as np
步骤 2 - 接下来,为了在训练期间记录信息,请导入ProgressPrinter类,如下所示 -
from cntk.logging import ProgressPrinter
步骤 3 - 然后,我们需要从 cntk.train 模块导入训练器模块,如下 -
from cntk.train import Trainer
步骤 4 - 接下来,创建ProgressPrinter的新实例,如下所示 -
progress_writer = ProgressPrinter(0)
步骤 5 - 现在,我们需要使用损失、学习者和Progress_writer参数初始化训练器,如下所示 -
trainer = Trainer(z, loss, learner, progress_writer)
步骤 6 - 接下来,为了训练模型,我们将创建一个循环,该循环将迭代数据集三十次。这将是外部训练循环。
for _ in range(0,30):
步骤 7 - 现在,我们需要使用 pandas 从磁盘加载数据。然后,为了以小批量加载数据集,请将chunksize关键字参数设置为 16。
input_data = pd.read_csv('iris.csv', names=['sepal_length', 'sepal_width','petal_length','petal_width', 'species'], index_col=False, chunksize=16)
步骤 8 - 现在,创建一个内部训练 for 循环来迭代每个小批量。
for df_batch in input_data:
步骤 9 - 现在在这个循环中,使用iloc索引器读取前四列,作为训练的特征并将其转换为 float32 -
feature_values = df_batch.iloc[:,:4].values feature_values = feature_values.astype(np.float32)
步骤 10 - 现在,读取最后一列作为训练标签,如下 -
label_values = df_batch.iloc[:,-1]
步骤 11 - 接下来,我们将使用 one-hot 向量将标签字符串转换为其数字表示形式,如下所示 -
label_values = label_values.map(lambda x: label_mapping[x])
步骤 12 - 之后,获取标签的数字表示。接下来,将它们转换为 numpy 数组,这样使用它们会更容易,如下所示 -
label_values = label_values.values
步骤 13 - 现在,我们需要创建一个新的 numpy 数组,其行数与我们转换的标签值相同。
encoded_labels = np.zeros((label_values.shape[0], 3))
步骤 14 - 现在,为了创建 one-hot 编码标签,请根据数字标签值选择列。
encoded_labels[np.arange(label_values.shape[0]), label_values] = 1.
步骤 15 - 最后,我们需要在训练器上调用train_minibatch方法,并为小批量提供处理后的特征和标签。
trainer.train_minibatch({features: feature_values, labels: encoded_labels})
完整示例
from cntk import default_options, input_variable from cntk.layers import Dense, Sequential from cntk.ops import log_softmax, relu, sigmoid from cntk.learners import sgd model = Sequential([ Dense(4, activation=sigmoid), Dense(3, activation=log_softmax) ]) features = input_variable(4) labels = input_variable(3) z = model(features) import cntk from cntk.losses import cross_entropy_with_softmax, fmeasure @cntk.Function def criterion_factory(outputs, targets): loss = cross_entropy_with_softmax(outputs, targets) metric = fmeasure(outputs, targets, beta=1) return loss, metric loss = criterion_factory(z, labels) learner = sgd(z.parameters, 0.1) label_mapping = { 'Iris-setosa': 0, 'Iris-versicolor': 1, 'Iris-virginica': 2 } import pandas as pd import numpy as np from cntk.logging import ProgressPrinter from cntk.train import Trainer progress_writer = ProgressPrinter(0) trainer = Trainer(z, loss, learner, progress_writer) for _ in range(0,30): input_data = pd.read_csv('iris.csv', names=['sepal_length', 'sepal_width','petal_length','petal_width', 'species'], index_col=False, chunksize=16) for df_batch in input_data: feature_values = df_batch.iloc[:,:4].values feature_values = feature_values.astype(np.float32) label_values = df_batch.iloc[:,-1] label_values = label_values.map(lambda x: label_mapping[x]) label_values = label_values.values encoded_labels = np.zeros((label_values.shape[0], 3)) encoded_labels[np.arange(label_values.shape[0]), label_values] = 1. trainer.train_minibatch({features: feature_values, labels: encoded_labels})
输出
------------------------------------------------------------------- average since average since examples loss last metric last ------------------------------------------------------ Learning rate per minibatch: 0.1 1.45 1.45 -0.189 -0.189 16 1.24 1.13 -0.0382 0.0371 48 [………]
在上面的输出中,我们得到了训练期间损失和度量的输出。这是因为我们在函数对象中组合了度量和损失,并在训练器配置中使用了进度打印机。
现在,为了评估模型性能,我们需要执行与训练模型相同的任务,但这次,我们需要使用 Evaluator 实例来测试模型。它如下面的 Python 代码所示:
from cntk import Evaluator evaluator = Evaluator(loss.outputs[1], [progress_writer]) input_data = pd.read_csv('iris.csv', names=['sepal_length', 'sepal_width','petal_length','petal_width', 'species'], index_col=False, chunksize=16) for df_batch in input_data: feature_values = df_batch.iloc[:,:4].values feature_values = feature_values.astype(np.float32) label_values = df_batch.iloc[:,-1] label_values = label_values.map(lambda x: label_mapping[x]) label_values = label_values.values encoded_labels = np.zeros((label_values.shape[0], 3)) encoded_labels[np.arange(label_values.shape[0]), label_values] = 1. evaluator.test_minibatch({ features: feature_values, labels: encoded_labels}) evaluator.summarize_test_progress()
现在,我们将得到如下输出:
输出
Finished Evaluation [1]: Minibatch[1-11]:metric = 74.62*143;