为自定义模型编写自己的转换器

您可能实现了自己的模型,并且显然没有此新模型的现有转换器。但这并不意味着包含它的管道的转换将无法工作。让我们看看如何做到这一点。

t-SNE 是一种有趣的转换,只能用于研究数据,因为一旦拟合就无法重现结果。这就是为什么类 TSNE 没有任何 transform 方法,只有 fit_transform。此示例提出了一种训练机器学习模型的方法,该模型近似于 t-SNE 转换器的输出。

新转换的实现

第一部分是关于实现的。代码非常通用,但基本上遵循以下过程来使用 Xy 拟合模型

  • t-SNE,(X, y) \rightarrow X_2 \in \mathbb{R}^2

  • k 近邻,fit(X, X_2),产生函数 f(X) \rightarrow X_3

  • 最终归一化,简单缩放 X_3 \rightarrow X_4

并在测试集上进行预测

  • k 近邻,f(X') \rightarrow X'_3

  • 最终归一化,简单缩放 X'_3 \rightarrow X'_4

import inspect
import os
import numpy
import onnx
from onnx.tools.net_drawer import GetPydotGraph, GetOpNodeProducer
import onnxruntime as rt
from matplotlib import offsetbox
import matplotlib.pyplot as plt
import sklearn
from sklearn.model_selection import train_test_split
from sklearn import datasets
from sklearn.base import BaseEstimator, TransformerMixin, clone
from sklearn.manifold import TSNE
from sklearn.metrics import mean_squared_error
from sklearn.neighbors import KNeighborsRegressor
from skl2onnx import update_registered_converter
import skl2onnx
from skl2onnx import convert_sklearn, get_model_alias
from skl2onnx.common._registration import get_shape_calculator
from skl2onnx.common.data_types import FloatTensorType


class PredictableTSNE(BaseEstimator, TransformerMixin):
    def __init__(
        self,
        transformer=None,
        estimator=None,
        normalize=True,
        keep_tsne_outputs=False,
        **kwargs
    ):
        """
        :param transformer: `TSNE` by default
        :param estimator: `MLPRegressor` by default
        :param normalize: normalizes the outputs, centers and normalizes
            the output of the *t-SNE* and applies that same
            normalization to he prediction of the estimator
        :param keep_tsne_output: if True, keep raw outputs of
            *TSNE* is stored in member *tsne_outputs_*
        :param kwargs: sent to :meth:`set_params <mlinsights.mlmodel.
            tsne_transformer.PredictableTSNE.set_params>`, see its
            documentation to understand how to specify parameters
        """
        TransformerMixin.__init__(self)
        BaseEstimator.__init__(self)
        if estimator is None:
            estimator = KNeighborsRegressor()
        if transformer is None:
            transformer = TSNE()
        self.estimator = estimator
        self.transformer = transformer
        self.keep_tsne_outputs = keep_tsne_outputs
        if not hasattr(transformer, "fit_transform"):
            raise AttributeError(
                "Transformer {} does not have a 'fit_transform' "
                "method.".format(type(transformer))
            )
        if not hasattr(estimator, "predict"):
            raise AttributeError(
                "Estimator {} does not have a 'predict' method.".format(type(estimator))
            )
        self.normalize = normalize
        if kwargs:
            self.set_params(**kwargs)

    def fit(self, X, y, sample_weight=None):
        """
        Runs a *k-means* on each class
        then trains a classifier on the
        extended set of features.
        Parameters
        ----------
        X : numpy array or sparse matrix of shape [n_samples,n_features]
            Training data
        y : numpy array of shape [n_samples, n_targets]
            Target values. Will be cast to X's dtype if necessary
        sample_weight : numpy array of shape [n_samples]
            Individual weights for each sample
        Returns
        -------
        self : returns an instance of self.
        Attributes
        ----------
        transformer_: trained transformeer
        estimator_: trained regressor
        tsne_outputs_: t-SNE outputs if *keep_tsne_outputs* is True
        mean_: average of the *t-SNE* output on each dimension
        inv_std_: inverse of the standard deviation of the *t-SNE*
            output on each dimension
        loss_: loss (*mean_squared_error*)
        between the predictions and the outputs of t-SNE
        """
        params = dict(y=y, sample_weight=sample_weight)

        self.transformer_ = clone(self.transformer)

        sig = inspect.signature(self.transformer.fit_transform)
        pars = {}
        for p in ["sample_weight", "y"]:
            if p in sig.parameters and p in params:
                pars[p] = params[p]
        target = self.transformer_.fit_transform(X, **pars)

        sig = inspect.signature(self.estimator.fit)
        if "sample_weight" in sig.parameters:
            self.estimator_ = clone(self.estimator).fit(
                X, target, sample_weight=sample_weight
            )
        else:
            self.estimator_ = clone(self.estimator).fit(X, target)
        mean = target.mean(axis=0)
        var = target.std(axis=0)
        self.mean_ = mean
        self.inv_std_ = 1.0 / var
        exp = (target - mean) * self.inv_std_
        got = (self.estimator_.predict(X) - mean) * self.inv_std_
        self.loss_ = mean_squared_error(exp, got)
        if self.keep_tsne_outputs:
            self.tsne_outputs_ = exp if self.normalize else target
        return self

    def transform(self, X):
        """
        Runs the predictions.
        Parameters
        ----------
        X : numpy array or sparse matrix of shape [n_samples,n_features]
            Training data
        Returns
        -------
        tranformed *X*
        """
        pred = self.estimator_.predict(X)
        if self.normalize:
            pred -= self.mean_
            pred *= self.inv_std_
        return pred

    def get_params(self, deep=True):
        """
        Returns the parameters for all the embedded objects.
        """
        res = {}
        for k, v in self.transformer.get_params().items():
            res["t_" + k] = v
        for k, v in self.estimator.get_params().items():
            res["e_" + k] = v
        return res

    def set_params(self, **values):
        """
        Sets the parameters before training.
        Every parameter prefixed by ``'e_'`` is an estimator
        parameter, every parameter prefixed by
        ``t_`` is for a transformer parameter.
        """
        pt, pe, pn = {}, {}, {}
        for k, v in values.items():
            if k.startswith("e_"):
                pe[k[2:]] = v
            elif k.startswith("t_"):
                pt[k[2:]] = v
            elif k.startswith("n_"):
                pn[k[2:]] = v
            else:
                raise ValueError("Unexpected parameter name '{0}'.".format(k))
        self.transformer.set_params(**pt)
        self.estimator.set_params(**pe)

在 MNIST 上进行实验

让我们拟合 t-SNE…

digits = datasets.load_digits(n_class=6)
Xd = digits.data
yd = digits.target
imgs = digits.images
n_samples, n_features = Xd.shape
n_samples, n_features

X_train, X_test, y_train, y_test, imgs_train, imgs_test = train_test_split(Xd, yd, imgs)

tsne = TSNE(n_components=2, init="pca", random_state=0)


def plot_embedding(Xp, y, imgs, title=None, figsize=(12, 4)):
    x_min, x_max = numpy.min(Xp, 0), numpy.max(Xp, 0)
    X = (Xp - x_min) / (x_max - x_min)

    fig, ax = plt.subplots(1, 2, figsize=figsize)
    for i in range(X.shape[0]):
        ax[0].text(
            X[i, 0],
            X[i, 1],
            str(y[i]),
            color=plt.cm.Set1(y[i] / 10.0),
            fontdict={"weight": "bold", "size": 9},
        )

    if hasattr(offsetbox, "AnnotationBbox"):
        # only print thumbnails with matplotlib > 1.0
        shown_images = numpy.array([[1.0, 1.0]])  # just something big
        for i in range(X.shape[0]):
            dist = numpy.sum((X[i] - shown_images) ** 2, 1)
            if numpy.min(dist) < 4e-3:
                # don't show points that are too close
                continue
            shown_images = numpy.r_[shown_images, [X[i]]]
            imagebox = offsetbox.AnnotationBbox(
                offsetbox.OffsetImage(imgs[i], cmap=plt.cm.gray_r), X[i]
            )
            ax[0].add_artist(imagebox)
    ax[0].set_xticks([]), ax[0].set_yticks([])
    ax[1].plot(Xp[:, 0], Xp[:, 1], ".")
    if title is not None:
        ax[0].set_title(title)
    return ax


X_train_tsne = tsne.fit_transform(X_train)
plot_embedding(X_train_tsne, y_train, imgs_train, "t-SNE embedding of the digits")
t-SNE embedding of the digits
array([<Axes: title={'center': 't-SNE embedding of the digits'}>,
       <Axes: >], dtype=object)

可重复的 t-SNE

只是为了检查它是否有效。

ptsne_knn = PredictableTSNE()
ptsne_knn.fit(X_train, y_train)

X_train_tsne2 = ptsne_knn.transform(X_train)
plot_embedding(
    X_train_tsne2,
    y_train,
    imgs_train,
    "Predictable t-SNE of the digits\n" "StandardScaler+KNeighborsRegressor",
)
Predictable t-SNE of the digits StandardScaler+KNeighborsRegressor
array([<Axes: title={'center': 'Predictable t-SNE of the digits\nStandardScaler+KNeighborsRegressor'}>,
       <Axes: >], dtype=object)

我们在测试集上进行检查。

X_test_tsne2 = ptsne_knn.transform(X_test)
plot_embedding(
    X_test_tsne2,
    y_test,
    imgs_test,
    "Predictable t-SNE of the digits\n" "StandardScaler+KNeighborsRegressor",
)
Predictable t-SNE of the digits StandardScaler+KNeighborsRegressor
array([<Axes: title={'center': 'Predictable t-SNE of the digits\nStandardScaler+KNeighborsRegressor'}>,
       <Axes: >], dtype=object)

ONNX - shape_calculator,转换器

现在开始专门用于 ONNX 的部分。ONNX 转换需要两个函数,一个根据输入计算输出的形状,另一个对模型进行实际转换。

def predictable_tsne_shape_calculator(operator):
    input = operator.inputs[0]  # inputs in ONNX graph
    # output = operator.outputs[0]    # output in ONNX graph
    op = operator.raw_operator  # scikit-learn model (mmust be fitted)

    N = input.type.shape[0]  # number of observations
    C = op.estimator_._y.shape[1]  # dimension of outputs

    # new output definition
    operator.outputs[0].type = FloatTensorType([N, C])

然后是转换器模型。我们重用现有的转换器。

def predictable_tsne_converter(scope, operator, container):
    """
    :param scope: name space, where to keep node names, get unused new names
    :param operator: operator to converter, same object as sent to
        *predictable_tsne_shape_calculator*
    :param container: contains the ONNX graph
    """
    # input = operator.inputs[0]      # input in ONNX graph
    output = operator.outputs[0]  # output in ONNX graph
    op = operator.raw_operator  # scikit-learn model (mmust be fitted)

    # First step is the k nearest-neighbours,
    # we reuse existing converter and declare it as local
    # operator.
    model = op.estimator_
    alias = get_model_alias(type(model))
    knn_op = scope.declare_local_operator(alias, model)
    knn_op.inputs = operator.inputs

    # We add an intermediate outputs.
    knn_output = scope.declare_local_variable("knn_output", FloatTensorType())
    knn_op.outputs.append(knn_output)

    # We adjust the output of the submodel.
    shape_calc = get_shape_calculator(alias)
    shape_calc(knn_op)

    # We add the normalizer which needs a unique node name.
    name = scope.get_unique_operator_name("Scaler")

    # The parameter follows the specifications of ONNX
    # https://github.com/onnx/onnx/blob/main/docs/Operators-ml.md#ai.onnx.ml.Scaler
    attrs = dict(
        name=name,
        scale=op.inv_std_.ravel().astype(numpy.float32),
        offset=op.mean_.ravel().astype(numpy.float32),
    )

    # Let's finally add the scaler which connects the output
    # of the k-nearest neighbours model to output of the whole model
    # declared in ONNX graph
    container.add_node(
        "Scaler",
        [knn_output.onnx_name],
        [output.full_name],
        op_domain="ai.onnx.ml",
        **attrs
    )

现在我们需要声明新的转换器。

update_registered_converter(
    PredictableTSNE,
    "CustomPredictableTSNE",
    predictable_tsne_shape_calculator,
    predictable_tsne_converter,
)

转换为 ONNX

我们只需要像任何其他模型一样调用 convert_sklearn 进行转换。

model_onnx = convert_sklearn(
    ptsne_knn,
    "predictable_tsne",
    [("input", FloatTensorType([None, X_test.shape[1]]))],
    target_opset=12,
)

# And save.
with open("predictable_tsne.onnx", "wb") as f:
    f.write(model_onnx.SerializeToString())

现在我们比较预测结果。

print("ptsne_knn.tranform\n", ptsne_knn.transform(X_test[:2]))
ptsne_knn.tranform
 [[-0.87237126  1.222756  ]
 [ 0.13582209  0.6264854 ]]

使用 onnxruntime 进行预测。

sess = rt.InferenceSession("predictable_tsne.onnx", providers=["CPUExecutionProvider"])

pred_onx = sess.run(None, {"input": X_test[:1].astype(numpy.float32)})
print("transform", pred_onx[0])
transform [[-0.8723713  1.222756 ]]

最近邻的转换器生成一个 ONNX 图,该图不允许一次进行多次预测。让我们为第二行调用 onnxruntime

pred_onx = sess.run(None, {"input": X_test[1:2].astype(numpy.float32)})
print("transform", pred_onx[0])
transform [[0.13582209 0.6264854 ]]

显示 ONNX 图

pydot_graph = GetPydotGraph(
    model_onnx.graph,
    name=model_onnx.graph.name,
    rankdir="TB",
    node_producer=GetOpNodeProducer(
        "docstring", color="yellow", fillcolor="yellow", style="filled"
    ),
)
pydot_graph.write_dot("pipeline_tsne.dot")

os.system("dot -O -Gdpi=300 -Tpng pipeline_tsne.dot")

image = plt.imread("pipeline_tsne.dot.png")
fig, ax = plt.subplots(figsize=(40, 20))
ax.imshow(image)
ax.axis("off")
plot custom model
(-0.5, 2643.5, 9099.5, -0.5)

此示例使用的版本

print("numpy:", numpy.__version__)
print("scikit-learn:", sklearn.__version__)
print("onnx: ", onnx.__version__)
print("onnxruntime: ", rt.__version__)
print("skl2onnx: ", skl2onnx.__version__)
numpy: 1.26.4
scikit-learn: 1.6.dev0
onnx:  1.17.0
onnxruntime:  1.18.0+cu118
skl2onnx:  1.17.0

脚本的总运行时间:(0 分 10.217 秒)

由 Sphinx-Gallery 生成的库