FunctionTransformer 问题

包含 FunctionTransformer 的管道无法自动转换为 onnx,因为没有能够将自定义 Python 代码转换为 ONNX 的转换器。需要专门为此编写自定义转换器。

初始尝试

一个非常简单的管道以及将其转换为 ONNX 的第一次尝试。

import numpy as np
from numpy.testing import assert_allclose
from onnx.version_converter import convert_version
from pandas import DataFrame
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.tree import DecisionTreeClassifier
from sklearn.preprocessing import FunctionTransformer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from skl2onnx import to_onnx

# For the custom converter
from skl2onnx import update_registered_converter
from skl2onnx.common.utils import check_input_and_output_numbers
from skl2onnx.algebra.onnx_ops import OnnxSlice, OnnxSub, OnnxDiv, OnnxMul, OnnxCastLike
from skl2onnx.helpers import add_onnx_graph
import onnxscript
from onnxscript import opset18 as op

# To check discrepancies
from onnx.reference import ReferenceEvaluator
from onnxruntime import InferenceSession


def calculate_growth(df):
    df["c"] = 100 * (df["a"] - df["b"]) / df["b"]
    return df


mapper = ColumnTransformer(
    transformers=[
        ("c", FunctionTransformer(calculate_growth), ["a", "b"]),
    ],
    remainder="passthrough",
    verbose_feature_names_out=False,
)
mapper.set_output(transform="pandas")

pipe = Pipeline([("mapper", mapper), ("classifier", DecisionTreeClassifier())])

data = DataFrame(
    [
        dict(a=2, b=1, f=5),
        dict(a=50, b=4, f=10),
        dict(a=5, b=2, f=4),
        dict(a=100, b=6, f=20),
    ]
)
y = np.array([0, 1, 0, 1], dtype=np.int64)
pipe.fit(data, y)

try:
    to_onnx(pipe, data[:1], options={"zipmap": False})
except Exception as e:
    print("It does not work:", e)
It does not work: FunctionTransformer is not supported unless the transform function is None (= identity). You may raise an issue at https://github.com/onnx/sklearn-onnx/issues.

自定义转换器的使用

如果 FunctionTransformer 实现为自定义转换器,则编写自定义转换器更容易。

class GrowthCalculator(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def calculate_growth(self, x, y):
        return 100 * (x - y) / y

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        x = X.apply(lambda x: self.calculate_growth(x.a, x.b), axis=1)
        return x.values.reshape((-1, 1))


mapper = ColumnTransformer(
    transformers=[
        ("ab", FunctionTransformer(), ["a", "b"]),  # We keep the first column.
        ("c", GrowthCalculator(), ["a", "b"]),  # We add a new one.
    ],
    remainder="passthrough",
    verbose_feature_names_out=False,
)

pipe_tr = Pipeline([("mapper", mapper), ("classifier", DecisionTreeClassifier())])
pipe_tr.fit(data, y)
Pipeline(steps=[('mapper',
                 ColumnTransformer(remainder='passthrough',
                                   transformers=[('ab', FunctionTransformer(),
                                                  ['a', 'b']),
                                                 ('c', GrowthCalculator(),
                                                  ['a', 'b'])],
                                   verbose_feature_names_out=False)),
                ('classifier', DecisionTreeClassifier())])
在 Jupyter 环境中,请重新运行此单元格以显示 HTML 表示形式或信任笔记本。
在 GitHub 上,HTML 表示形式无法呈现,请尝试使用 nbviewer.org 加载此页面。


这两个管道返回相同的输出。

assert_allclose(pipe.predict_proba(data), pipe_tr.predict_proba(data))

让我们检查它是否产生了相同数量的特征。

assert_allclose(pipe.steps[0][-1].transform(data), pipe_tr.steps[0][-1].transform(data))

但转换仍然失败,并显示不同的错误消息。

try:
    to_onnx(pipe_tr, data[:1], options={"zipmap": False})
except Exception as e:
    print("It does not work:", e)
It does not work: Unable to find a shape calculator for type '<class '__main__.GrowthCalculator'>'.
It usually means the pipeline being converted contains a
transformer or a predictor with no corresponding converter
implemented in sklearn-onnx. If the converted is implemented
in another library, you need to register
the converted so that it can be used by sklearn-onnx (function
update_registered_converter). If the model is not yet covered
by sklearn-onnx, you may raise an issue to
https://github.com/onnx/sklearn-onnx/issues
to get the converter implemented or even contribute to the
project. If the model is a custom model, a new converter must
be implemented. Examples can be found in the gallery.

自定义转换器

我们需要在 ONNX 中实现 calculate_growth 方法。第一个函数返回预期的类型和形状。

def growth_shape_calculator(operator):
    check_input_and_output_numbers(operator, input_count_range=1, output_count_range=1)
    # Gets the input type, the transformer works on any numerical type.
    input_type = operator.inputs[0].type.__class__
    # The first dimension is usually dynamic (batch dimension).
    input_dim = operator.inputs[0].get_first_dimension()
    operator.outputs[0].type = input_type([input_dim, 1])


def growth_converter(scope, operator, container):
    # No need to retrieve the fitted estimator, it is not trained.
    # op = operator.raw_operator
    opv = container.target_opset
    X = operator.inputs[0]

    # 100 * (x-y)/y  --> 100 * (X[0] - X[1]) / X[1]

    zero = np.array([0], dtype=np.int64)
    one = np.array([1], dtype=np.int64)
    two = np.array([2], dtype=np.int64)
    hundred = np.array([100], dtype=np.float32)

    # Slice(data, starts, ends, axes)
    x0 = OnnxSlice(X, zero, one, one, op_version=opv)
    x1 = OnnxSlice(X, one, two, one, op_version=opv)
    z = OnnxMul(
        OnnxCastLike(hundred, X, op_version=opv),
        OnnxDiv(OnnxSub(x0, x1, op_version=opv), x1, op_version=opv),
        op_version=opv,
        output_names=operator.outputs[0],
    )
    z.add_to(scope, container)


update_registered_converter(
    GrowthCalculator,
    "AliasGrowthCalculator",
    growth_shape_calculator,
    growth_converter,
)


onx = to_onnx(pipe_tr, data[:1], target_opset=18, options={"zipmap": False})

让我们检查是否存在差异

首先是预期值

expected = (pipe_tr.predict(data), pipe_tr.predict_proba(data))
print(expected)
(array([0, 1, 0, 1]), array([[1., 0.],
       [0., 1.],
       [1., 0.],
       [0., 1.]]))

然后让我们使用 onnx.reference.ReferenceEvaluator 进行检查。

feeds = {
    "a": data["a"].values.reshape((-1, 1)),
    "b": data["b"].values.reshape((-1, 1)),
    "f": data["f"].values.reshape((-1, 1)),
}

# verbose=10 to show intermediate results
ref = ReferenceEvaluator(onx, verbose=0)
got = ref.run(None, feeds)

assert_allclose(expected[0], got[0])
assert_allclose(expected[1], got[1])

然后使用用于部署的运行时,例如 onnxruntime。

ref = InferenceSession(onx.SerializeToString(), providers=["CPUExecutionProvider"])
got = ref.run(None, feeds)

assert_allclose(expected[0], got[0])
assert_allclose(expected[1], got[1])

使用 onnxscript 的自定义转换器

onnxscript 提供了比 onnx 包实现更简洁的 API。让我们看看如何使用它来编写转换器。

@onnxscript.script()
def calculate_onnxscript_verbose(X):
    # onnxscript must define an opset. We use an identity node
    # from a specific opset to set it (otherwise it fails).
    x0 = op.Slice(X, [0], [1], [1])
    x1 = op.Slice(X, [1], [2], [1])
    return op.Mul(op.Div(op.Sub(x0, x1), x1), 100)

此版本使用 ONNX 运算符的严格定义。如果使用常规 Python 运算符,代码可以更简单。它们可能不会转换为 ONNX,但在这种情况下会引发错误消息。

@onnxscript.script()
def calculate_onnxscript(X):
    # onnxscript must define an opset. We use an identity node
    # from a specific opset to set it (otherwise it fails).
    xi = op.Identity(X)
    x0 = xi[:, :1]
    x1 = xi[:, 1:]
    return (x0 - x1) / x1 * 100

我们还可以检查它是否等效于 Python 实现。

f_expected = calculate_growth(data)["c"].values
f_got = calculate_onnxscript(data[["a", "b"]].values.astype(np.float32))
assert_allclose(f_expected.ravel(), f_got.ravel(), atol=1e-6)

让我们在转换器中使用它。

def growth_converter_onnxscript(scope, operator, container):
    # No need to retrieve the fitted estimator, it is not trained.
    # op = operator.raw_operator
    opv = container.target_opset

    # 100 * (x-y)/y  --> 100 * (X[0] - X[1]) / X[1]
    proto = calculate_onnxscript.to_model_proto()
    # The function is written with opset 18, it needs to be converted
    # to the opset required by the user when the conversion starts.
    proto_version = convert_version(proto, opv)
    add_onnx_graph(scope, operator, container, proto_version)


update_registered_converter(
    GrowthCalculator,
    "AliasGrowthCalculator",
    growth_shape_calculator,
    growth_converter_onnxscript,
)

让我们检查它是否有效。

onx = to_onnx(pipe_tr, data[:1], target_opset=18, options={"zipmap": False})

以及再次出现的差异。

ref = ReferenceEvaluator(onx, verbose=0)
got = ref.run(None, feeds)
assert_allclose(expected[0], got[0])
assert_allclose(expected[1], got[1])

最后。

print("done.")
done.

脚本的总运行时间:(0 分钟 0.222 秒)

由 Sphinx-Gallery 生成的图库