注意
转到末尾 下载完整的示例代码。
FunctionTransformer 问题¶
包含 FunctionTransformer 的管道无法自动转换为 onnx,因为没有能够将自定义 Python 代码转换为 ONNX 的转换器。需要专门为此编写自定义转换器。
初始尝试¶
一个非常简单的管道以及将其转换为 ONNX 的第一次尝试。
import numpy as np
from numpy.testing import assert_allclose
from onnx.version_converter import convert_version
from pandas import DataFrame
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.tree import DecisionTreeClassifier
from sklearn.preprocessing import FunctionTransformer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from skl2onnx import to_onnx
# For the custom converter
from skl2onnx import update_registered_converter
from skl2onnx.common.utils import check_input_and_output_numbers
from skl2onnx.algebra.onnx_ops import OnnxSlice, OnnxSub, OnnxDiv, OnnxMul, OnnxCastLike
from skl2onnx.helpers import add_onnx_graph
import onnxscript
from onnxscript import opset18 as op
# To check discrepancies
from onnx.reference import ReferenceEvaluator
from onnxruntime import InferenceSession
def calculate_growth(df):
df["c"] = 100 * (df["a"] - df["b"]) / df["b"]
return df
mapper = ColumnTransformer(
transformers=[
("c", FunctionTransformer(calculate_growth), ["a", "b"]),
],
remainder="passthrough",
verbose_feature_names_out=False,
)
mapper.set_output(transform="pandas")
pipe = Pipeline([("mapper", mapper), ("classifier", DecisionTreeClassifier())])
data = DataFrame(
[
dict(a=2, b=1, f=5),
dict(a=50, b=4, f=10),
dict(a=5, b=2, f=4),
dict(a=100, b=6, f=20),
]
)
y = np.array([0, 1, 0, 1], dtype=np.int64)
pipe.fit(data, y)
try:
to_onnx(pipe, data[:1], options={"zipmap": False})
except Exception as e:
print("It does not work:", e)
It does not work: FunctionTransformer is not supported unless the transform function is None (= identity). You may raise an issue at https://github.com/onnx/sklearn-onnx/issues.
自定义转换器的使用¶
如果 FunctionTransformer 实现为自定义转换器,则编写自定义转换器更容易。
class GrowthCalculator(BaseEstimator, TransformerMixin):
def __init__(self):
pass
def calculate_growth(self, x, y):
return 100 * (x - y) / y
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
x = X.apply(lambda x: self.calculate_growth(x.a, x.b), axis=1)
return x.values.reshape((-1, 1))
mapper = ColumnTransformer(
transformers=[
("ab", FunctionTransformer(), ["a", "b"]), # We keep the first column.
("c", GrowthCalculator(), ["a", "b"]), # We add a new one.
],
remainder="passthrough",
verbose_feature_names_out=False,
)
pipe_tr = Pipeline([("mapper", mapper), ("classifier", DecisionTreeClassifier())])
pipe_tr.fit(data, y)
这两个管道返回相同的输出。
assert_allclose(pipe.predict_proba(data), pipe_tr.predict_proba(data))
让我们检查它是否产生了相同数量的特征。
assert_allclose(pipe.steps[0][-1].transform(data), pipe_tr.steps[0][-1].transform(data))
但转换仍然失败,并显示不同的错误消息。
try:
to_onnx(pipe_tr, data[:1], options={"zipmap": False})
except Exception as e:
print("It does not work:", e)
It does not work: Unable to find a shape calculator for type '<class '__main__.GrowthCalculator'>'.
It usually means the pipeline being converted contains a
transformer or a predictor with no corresponding converter
implemented in sklearn-onnx. If the converted is implemented
in another library, you need to register
the converted so that it can be used by sklearn-onnx (function
update_registered_converter). If the model is not yet covered
by sklearn-onnx, you may raise an issue to
https://github.com/onnx/sklearn-onnx/issues
to get the converter implemented or even contribute to the
project. If the model is a custom model, a new converter must
be implemented. Examples can be found in the gallery.
自定义转换器¶
我们需要在 ONNX 中实现 calculate_growth 方法。第一个函数返回预期的类型和形状。
def growth_shape_calculator(operator):
check_input_and_output_numbers(operator, input_count_range=1, output_count_range=1)
# Gets the input type, the transformer works on any numerical type.
input_type = operator.inputs[0].type.__class__
# The first dimension is usually dynamic (batch dimension).
input_dim = operator.inputs[0].get_first_dimension()
operator.outputs[0].type = input_type([input_dim, 1])
def growth_converter(scope, operator, container):
# No need to retrieve the fitted estimator, it is not trained.
# op = operator.raw_operator
opv = container.target_opset
X = operator.inputs[0]
# 100 * (x-y)/y --> 100 * (X[0] - X[1]) / X[1]
zero = np.array([0], dtype=np.int64)
one = np.array([1], dtype=np.int64)
two = np.array([2], dtype=np.int64)
hundred = np.array([100], dtype=np.float32)
# Slice(data, starts, ends, axes)
x0 = OnnxSlice(X, zero, one, one, op_version=opv)
x1 = OnnxSlice(X, one, two, one, op_version=opv)
z = OnnxMul(
OnnxCastLike(hundred, X, op_version=opv),
OnnxDiv(OnnxSub(x0, x1, op_version=opv), x1, op_version=opv),
op_version=opv,
output_names=operator.outputs[0],
)
z.add_to(scope, container)
update_registered_converter(
GrowthCalculator,
"AliasGrowthCalculator",
growth_shape_calculator,
growth_converter,
)
onx = to_onnx(pipe_tr, data[:1], target_opset=18, options={"zipmap": False})
让我们检查是否存在差异¶
首先是预期值
(array([0, 1, 0, 1]), array([[1., 0.],
[0., 1.],
[1., 0.],
[0., 1.]]))
然后让我们使用 onnx.reference.ReferenceEvaluator
进行检查。
feeds = {
"a": data["a"].values.reshape((-1, 1)),
"b": data["b"].values.reshape((-1, 1)),
"f": data["f"].values.reshape((-1, 1)),
}
# verbose=10 to show intermediate results
ref = ReferenceEvaluator(onx, verbose=0)
got = ref.run(None, feeds)
assert_allclose(expected[0], got[0])
assert_allclose(expected[1], got[1])
然后使用用于部署的运行时,例如 onnxruntime。
ref = InferenceSession(onx.SerializeToString(), providers=["CPUExecutionProvider"])
got = ref.run(None, feeds)
assert_allclose(expected[0], got[0])
assert_allclose(expected[1], got[1])
使用 onnxscript 的自定义转换器¶
onnxscript 提供了比 onnx 包实现更简洁的 API。让我们看看如何使用它来编写转换器。
@onnxscript.script()
def calculate_onnxscript_verbose(X):
# onnxscript must define an opset. We use an identity node
# from a specific opset to set it (otherwise it fails).
x0 = op.Slice(X, [0], [1], [1])
x1 = op.Slice(X, [1], [2], [1])
return op.Mul(op.Div(op.Sub(x0, x1), x1), 100)
此版本使用 ONNX 运算符的严格定义。如果使用常规 Python 运算符,代码可以更简单。它们可能不会转换为 ONNX,但在这种情况下会引发错误消息。
@onnxscript.script()
def calculate_onnxscript(X):
# onnxscript must define an opset. We use an identity node
# from a specific opset to set it (otherwise it fails).
xi = op.Identity(X)
x0 = xi[:, :1]
x1 = xi[:, 1:]
return (x0 - x1) / x1 * 100
我们还可以检查它是否等效于 Python 实现。
f_expected = calculate_growth(data)["c"].values
f_got = calculate_onnxscript(data[["a", "b"]].values.astype(np.float32))
assert_allclose(f_expected.ravel(), f_got.ravel(), atol=1e-6)
让我们在转换器中使用它。
def growth_converter_onnxscript(scope, operator, container):
# No need to retrieve the fitted estimator, it is not trained.
# op = operator.raw_operator
opv = container.target_opset
# 100 * (x-y)/y --> 100 * (X[0] - X[1]) / X[1]
proto = calculate_onnxscript.to_model_proto()
# The function is written with opset 18, it needs to be converted
# to the opset required by the user when the conversion starts.
proto_version = convert_version(proto, opv)
add_onnx_graph(scope, operator, container, proto_version)
update_registered_converter(
GrowthCalculator,
"AliasGrowthCalculator",
growth_shape_calculator,
growth_converter_onnxscript,
)
让我们检查它是否有效。
onx = to_onnx(pipe_tr, data[:1], target_opset=18, options={"zipmap": False})
以及再次出现的差异。
ref = ReferenceEvaluator(onx, verbose=0)
got = ref.run(None, feeds)
assert_allclose(expected[0], got[0])
assert_allclose(expected[1], got[1])
最后。
print("done.")
done.
脚本的总运行时间:(0 分钟 0.222 秒)