pyarrow.compute.register_aggregate_function#
- pyarrow.compute.register_aggregate_function(func, function_name, function_doc, in_types, out_type, func_registry=None)#
Register a user-defined non-decomposable aggregate function.
This API is EXPERIMENTAL.
A non-decomposable aggregation function is a function that executes aggregate operations on the whole data that it is aggregating. In other words, non-decomposable aggregate function cannot be split into consume/merge/finalize steps.
This is often used with ordered or segmented aggregation where groups can be emit before accumulating all of the input data.
Note that currently the size of any input column cannot exceed 2 GB for a single segment (all groups combined).
- Parameters:
- func
callable() A callable implementing the user-defined function. The first argument is the context argument of type UdfContext. Then, it must take arguments equal to the number of in_types defined. It must return a Scalar matching the out_type. To define a varargs function, pass a callable that takes *args. The in_type needs to match in type of inputs when the function gets called.
- function_name
str Name of the function. This name must be unique, i.e., there should only be one function registered with this name in the function registry.
- function_doc
dict A dictionary object with keys “summary” (str), and “description” (str).
- in_types
Dict[str,DataType] A dictionary mapping function argument names to their respective DataType. The argument names will be used to generate documentation for the function. The number of arguments specified here determines the function arity.
- out_type
DataType Output type of the function.
- func_registry
FunctionRegistry Optional function registry to use instead of the default global one.
- func
Examples
>>> import numpy as np >>> import pyarrow as pa >>> import pyarrow.compute as pc >>> >>> func_doc = {} >>> func_doc["summary"] = "simple median udf" >>> func_doc["description"] = "compute median" >>> >>> def compute_median(ctx, array): ... return pa.scalar(np.median(array)) >>> >>> func_name = "py_compute_median" >>> in_types = {"array": pa.int64()} >>> out_type = pa.float64() >>> pc.register_aggregate_function(compute_median, func_name, func_doc, ... in_types, out_type) >>> >>> func = pc.get_function(func_name) >>> func.name 'py_compute_median' >>> answer = pc.call_function(func_name, [pa.array([20, 40])]) >>> answer <pyarrow.DoubleScalar: 30.0> >>> table = pa.table([pa.array([1, 1, 2, 2]), pa.array([10, 20, 30, 40])], names=['k', 'v']) >>> result = table.group_by('k').aggregate([('v', 'py_compute_median')]) >>> result pyarrow.Table k: int64 v_py_compute_median: double ---- k: [[1,2]] v_py_compute_median: [[15,35]]