Making your own Streaming Reduction Function

The ireduce_ufunc() generator function

You can assemble your own streaming reduction function from a binary NumPy ufunc using the following generator function:

npstreams.ireduce_ufunc(arrays, ufunc, axis=-1, dtype=None, ignore_nan=False, **kwargs)

Streaming reduction generator function from a binary NumPy ufunc. Generator version of reduce_ufunc.

ufunc must be a NumPy binary Ufunc (i.e. it takes two arguments). Moreover, for performance reasons, ufunc must have the same return types as input types. This precludes the use of numpy.greater, for example.

Note that performance is much better for the default axis = -1. In such a case, reduction operations can occur in-place. This also allows to operate in constant-memory.

Parameters
  • arrays (iterable) – Arrays to be reduced.

  • ufunc (numpy.ufunc) – Binary universal function.

  • axis (int or None, optional) – Reduction axis. Default is to reduce the arrays in the stream as if they had been stacked along a new axis, then reduce along this new axis. If None, arrays are flattened before reduction. If axis is an int larger that the number of dimensions in the arrays of the stream, arrays are reduced along the new axis. Note that not all of NumPy Ufuncs support axis = None, e.g. numpy.subtract.

  • dtype (numpy.dtype or None, optional) – Overrides the dtype of the calculation and output arrays.

  • ignore_nan (bool, optional) – If True and ufunc has an identity value (e.g. numpy.add.identity is 0), then NaNs are replaced with this identity. An error is raised if ufunc has no identity (e.g. numpy.maximum.identity is None).

  • kwargs – Keyword arguments are passed to ufunc. Note that some valid ufunc keyword arguments (e.g. keepdims) are not valid for all streaming functions. Also, contrary to NumPy v. 1.10+, casting = 'unsafe is the default in npstreams.

Yields

reduced (ndarray or scalar)

:raises TypeError : if ufunc is not NumPy ufunc.: :raises ValueError : if ignore_nan is True but ufunc has no identity: :raises ValueError : if ufunc is not a binary ufunc: :raises ValueError : if ufunc does not have the same input type as output type:

The non-generator version is also available:

npstreams.reduce_ufunc(arrays, ufunc, axis=-1, dtype=None, ignore_nan=False, **kwargs)

Reduce a stream using a binary NumPy ufunc. Function version of ireduce_ufunc.

ufunc must be a NumPy binary Ufunc (i.e. it takes two arguments). Moreover, for performance reasons, ufunc must have the same return types as input types. This precludes the use of numpy.greater, for example.

Note that performance is much better for the default axis = -1. In such a case, reduction operations can occur in-place. This also allows to operate in constant-memory.

Parameters
  • arrays (iterable) – Arrays to be reduced.

  • ufunc (numpy.ufunc) – Binary universal function.

  • axis (int or None, optional) – Reduction axis. Default is to reduce the arrays in the stream as if they had been stacked along a new axis, then reduce along this new axis. If None, arrays are flattened before reduction. If axis is an int larger that the number of dimensions in the arrays of the stream, arrays are reduced along the new axis. Note that not all of NumPy Ufuncs support axis = None, e.g. numpy.subtract.

  • dtype (numpy.dtype or None, optional) – Overrides the dtype of the calculation and output arrays.

  • ignore_nan (bool, optional) – If True and ufunc has an identity value (e.g. numpy.add.identity is 0), then NaNs are replaced with this identity. An error is raised if ufunc has no identity (e.g. numpy.maximum.identity is None).

  • kwargs – Keyword arguments are passed to ufunc. Note that some valid ufunc keyword arguments (e.g. keepdims) are not valid for all streaming functions. Note that contrary to NumPy v. 1.10+, casting = 'unsafe is the default in npstreams.

Returns

reduced

Return type

ndarray or scalar

:raises TypeError : if ufunc is not NumPy ufunc.: :raises ValueError : if ignore_nan is True but ufunc has no identity: :raises ValueError: if ufunc is not a binary ufunc: :raises ValueError: if ufunc does not have the same input type as output type:

Note that while all NumPy ufuncs have a reduce() method, not all of them are useful. This is why ireduce_ufunc() and reduce_ufunc() will only work with binary ufuncs, most of which are listed below. For performance reasons, we further restrict the use of ireduce_ufunc() and reduce_ufunc() to ufuncs that have the same input types as output types. Therefore, for example, numpy.greater() cannot be made to work with ireduce_ufunc() and reduce_ufunc().

NaNs handling

NumPy ufuncs can have an identity value, that is, a value such that ufunc(x1, identity) is always x1. For such ufuncs, ireduce_ufunc() and reduce_ufunc() can replace NaNs in the stream with the ufunc’s identity value, if ignore_nan = True. Note that not all ufuncs have an identity value; for example, how would you define the identity value of numpy.maximum? There is no answer.

NumPy Binary Ufuncs

ireduce_ufunc() is tested to work on the following binary ufuncs, which are available in NumPy.

Arithmetics

numpy.add

add(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.subtract

subtract(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.multiply

multiply(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.divide

true_divide(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.logaddexp

logaddexp(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.logaddexp2

logaddexp2(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.true_divide

true_divide(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.floor_divide

floor_divide(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.power

power(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.remainder

remainder(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.mod

remainder(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.fmod

fmod(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

Trigonometric functions

numpy.arctan2

arctan2(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.hypot

hypot(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

Bit-twiddling functions

numpy.bitwise_and

bitwise_and(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.bitwise_or

bitwise_or(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.bitwise_xor

bitwise_xor(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.left_shift

left_shift(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.right_shift

right_shift(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

Comparison functions

numpy.maximum

maximum(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.fmax

fmax(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.minimum

minimum(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.fmin

fmin(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

Floating functions

numpy.copysign

copysign(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.nextafter

nextafter(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

numpy.ldexp

ldexp(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj])

Example: Streaming Maximum

Let’s create a streaming maximum function for a stream. First, we have to choose how to handle NaNs; since numpy.maximum does not have an identity value, we must find another way. We can proceed as follows:

  • If we want to propagate NaNs, we should use numpy.maximum()

  • If we want to ignore NaNs, we should use numpy.fmax()

Both of those functions are binary ufuncs, so we can use ireduce_ufunc(). Note that any function based on ireduce_ufunc() or reduce_ufunc() will automatically work on streams of numbers thanks to the array_stream() decorator.

Putting it all together:

from npstreams import ireduce_ufunc
from numpy import maximum, fmax

def imax(arrays, axis = -1, ignore_nan = False, **kwargs):
    """
    Streaming cumulative maximum along an axis.

    Parameters
    ----------
    arrays : iterable
        Stream of arrays to be compared.
    axis : int or None, optional
        Axis along which to compute the maximum. If None,
        arrays are flattened before reduction.
    ignore_nan : bool, optional
        If True, NaNs are ignored. Default is False.

    Yields
    ------
    online_max : ndarray
    """
    ufunc = fmax if ignore_nan else maximum
    yield from ireduce_ufunc(arrays, ufunc, axis = axis, **kwargs)

This will provide us with a streaming function, meaning that we can look at the progress as it is being computed. We can also create a function that returns the max of the stream like numpy.ndarray.max() using the reduce_ufunc() function:

from npstreams import reduce_ufunc

def smax(*args, **kwargs):  # s for stream
    """
    Maximum of a stream along an axis.

    Parameters
    ----------
    arrays : iterable
        Stream of arrays to be compared.
    axis : int or None, optional
        Axis along which to compute the maximum. If None,
        arrays are flattened before reduction.
    ignore_nan : bool, optional
        If True, NaNs are ignored. Default is False.

    Yields
    ------
    max : ndarray
    """
    return reduce_ufunc(*args, **kwargs)