pyarrow.compute.register_aggregate_function#

pyarrow.compute.register_aggregate_function(func, function_name, function_doc, in_types, out_type, func_registry=None)#

Register a user-defined non-decomposable aggregate function.

This API is EXPERIMENTAL.

A non-decomposable aggregation function is a function that executes aggregate operations on the whole data that it is aggregating. In other words, non-decomposable aggregate function cannot be split into consume/merge/finalize steps.

This is often used with ordered or segmented aggregation where groups can be emit before accumulating all of the input data.

Note that currently the size of any input column cannot exceed 2 GB for a single segment (all groups combined).

Parameters:
funccallable()

A callable implementing the user-defined function. The first argument is the context argument of type UdfContext. Then, it must take arguments equal to the number of in_types defined. It must return a Scalar matching the out_type. To define a varargs function, pass a callable that takes *args. The in_type needs to match in type of inputs when the function gets called.

function_namestr

Name of the function. This name must be unique, i.e., there should only be one function registered with this name in the function registry.

function_docdict

A dictionary object with keys “summary” (str), and “description” (str).

in_typesDict[str, DataType]

A dictionary mapping function argument names to their respective DataType. The argument names will be used to generate documentation for the function. The number of arguments specified here determines the function arity.

out_typeDataType

Output type of the function.

func_registryFunctionRegistry

Optional function registry to use instead of the default global one.

Examples

>>> import numpy as np
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>>
>>> func_doc = {}
>>> func_doc["summary"] = "simple median udf"
>>> func_doc["description"] = "compute median"
>>>
>>> def compute_median(ctx, array):
...     return pa.scalar(np.median(array))
>>>
>>> func_name = "py_compute_median"
>>> in_types = {"array": pa.int64()}
>>> out_type = pa.float64()
>>> pc.register_aggregate_function(compute_median, func_name, func_doc,
...                   in_types, out_type)
>>>
>>> func = pc.get_function(func_name)
>>> func.name
'py_compute_median'
>>> answer = pc.call_function(func_name, [pa.array([20, 40])])
>>> answer
<pyarrow.DoubleScalar: 30.0>
>>> table = pa.table([pa.array([1, 1, 2, 2]), pa.array([10, 20, 30, 40])], names=['k', 'v'])
>>> result = table.group_by('k').aggregate([('v', 'py_compute_median')])
>>> result
pyarrow.Table
k: int64
v_py_compute_median: double
----
k: [[1,2]]
v_py_compute_median: [[15,35]]