Making your own Streaming Reduction Function

The ireduce_ufunc() generator function

You can assemble your own streaming reduction function from a binary NumPy ufunc using the following generator function:

npstreams.ireduce_ufunc(arrays, ufunc, axis=-1, dtype=None, ignore_nan=False, **kwargs)

Streaming reduction generator function from a binary NumPy ufunc. Generator version of reduce_ufunc.

ufunc must be a NumPy binary Ufunc (i.e. it takes two arguments). Moreover, for performance reasons, ufunc must have the same return types as input types. This precludes the use of numpy.greater, for example.

Note that performance is much better for the default axis = -1. In such a case, reduction operations can occur in-place. This also allows to operate in constant-memory.

Parameters:
  • arrays (iterable) – Arrays to be reduced.
  • ufunc (numpy.ufunc) – Binary universal function.
  • axis (int or None, optional) – Reduction axis. Default is to reduce the arrays in the stream as if they had been stacked along a new axis, then reduce along this new axis. If None, arrays are flattened before reduction. If axis is an int larger that the number of dimensions in the arrays of the stream, arrays are reduced along the new axis. Note that not all of NumPy Ufuncs support axis = None, e.g. numpy.subtract.
  • dtype (numpy.dtype or None, optional) – Overrides the dtype of the calculation and output arrays.
  • ignore_nan (bool, optional) – If True and ufunc has an identity value (e.g. numpy.add.identity is 0), then NaNs are replaced with this identity. An error is raised if ufunc has no identity (e.g. numpy.maximum.identity is None).
  • kwargs – Keyword arguments are passed to ufunc. Note that some valid ufunc keyword arguments (e.g. keepdims) are not valid for all streaming functions. Also, contrary to NumPy v. 1.10+, casting = 'unsafe is the default in npstreams.
Yields:

reduced (ndarray or scalar)

Raises:
  • TypeError : if ufunc is not NumPy ufunc.
  • ValueError : if ignore_nan is True but ufunc has no identity
  • ValueError : if ufunc is not a binary ufunc
  • ValueError : if ufunc does not have the same input type as output type

The non-generator version is also available:

npstreams.reduce_ufunc(arrays, ufunc, axis=-1, dtype=None, ignore_nan=False, **kwargs)

Reduce a stream using a binary NumPy ufunc. Function version of ireduce_ufunc.

ufunc must be a NumPy binary Ufunc (i.e. it takes two arguments). Moreover, for performance reasons, ufunc must have the same return types as input types. This precludes the use of numpy.greater, for example.

Note that performance is much better for the default axis = -1. In such a case, reduction operations can occur in-place. This also allows to operate in constant-memory.

Parameters:
  • arrays (iterable) – Arrays to be reduced.
  • ufunc (numpy.ufunc) – Binary universal function.
  • axis (int or None, optional) – Reduction axis. Default is to reduce the arrays in the stream as if they had been stacked along a new axis, then reduce along this new axis. If None, arrays are flattened before reduction. If axis is an int larger that the number of dimensions in the arrays of the stream, arrays are reduced along the new axis. Note that not all of NumPy Ufuncs support axis = None, e.g. numpy.subtract.
  • dtype (numpy.dtype or None, optional) – Overrides the dtype of the calculation and output arrays.
  • ignore_nan (bool, optional) – If True and ufunc has an identity value (e.g. numpy.add.identity is 0), then NaNs are replaced with this identity. An error is raised if ufunc has no identity (e.g. numpy.maximum.identity is None).
  • kwargs – Keyword arguments are passed to ufunc. Note that some valid ufunc keyword arguments (e.g. keepdims) are not valid for all streaming functions. Note that contrary to NumPy v. 1.10+, casting = 'unsafe is the default in npstreams.
Returns:

reduced

Return type:

ndarray or scalar

Raises:
  • TypeError : if ufunc is not NumPy ufunc.
  • ValueError : if ignore_nan is True but ufunc has no identity
  • ValueError: if ufunc is not a binary ufunc
  • ValueError: if ufunc does not have the same input type as output type

Note that while all NumPy ufuncs have a reduce() method, not all of them are useful. This is why ireduce_ufunc() and reduce_ufunc() will only work with binary ufuncs, most of which are listed below. For performance reasons, we further restrict the use of ireduce_ufunc() and reduce_ufunc() to ufuncs that have the same input types as output types. Therefore, for example, numpy.greater() cannot be made to work with ireduce_ufunc() and reduce_ufunc().

NaNs handling

NumPy ufuncs can have an identity value, that is, a value such that ufunc(x1, identity) is always x1. For such ufuncs, ireduce_ufunc() and reduce_ufunc() can replace NaNs in the stream with the ufunc’s identity value, if ignore_nan = True. Note that not all ufuncs have an identity value; for example, how would you define the identity value of numpy.maximum? There is no answer.

NumPy Binary Ufuncs

ireduce_ufunc() is tested to work on the following binary ufuncs, which are available in NumPy.

Arithmetics

numpy.add add(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.subtract subtract(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.multiply multiply(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.divide true_divide(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.logaddexp logaddexp(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.logaddexp2 logaddexp2(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.true_divide true_divide(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.floor_divide floor_divide(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.power power(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.remainder remainder(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.mod remainder(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.fmod fmod(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])

Trigonometric functions

numpy.arctan2 arctan2(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.hypot hypot(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])

Bit-twiddling functions

numpy.bitwise_and bitwise_and(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.bitwise_or bitwise_or(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.bitwise_xor bitwise_xor(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.left_shift left_shift(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.right_shift right_shift(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])

Comparison functions

numpy.maximum maximum(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.fmax fmax(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.minimum minimum(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.fmin fmin(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])

Floating functions

numpy.copysign copysign(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.nextafter nextafter(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.ldexp ldexp(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])

Example: Streaming Maximum

Let’s create a streaming maximum function for a stream. First, we have to choose how to handle NaNs; since numpy.maximum does not have an identity value, we must find another way. We can proceed as follows:

  • If we want to propagate NaNs, we should use numpy.maximum()
  • If we want to ignore NaNs, we should use numpy.fmax()

Both of those functions are binary ufuncs, so we can use ireduce_ufunc(). Note that any function based on ireduce_ufunc() or reduce_ufunc() will automatically work on streams of numbers thanks to the array_stream() decorator.

Putting it all together:

from npstreams import ireduce_ufunc
from numpy import maximum, fmax

def imax(arrays, axis = -1, ignore_nan = False, **kwargs):
    """
    Streaming cumulative maximum along an axis.

    Parameters
    ----------
    arrays : iterable
        Stream of arrays to be compared.
    axis : int or None, optional
        Axis along which to compute the maximum. If None,
        arrays are flattened before reduction.
    ignore_nan : bool, optional
        If True, NaNs are ignored. Default is False.

    Yields
    ------
    online_max : ndarray
    """
    ufunc = fmax if ignore_nan else maximum
    yield from ireduce_ufunc(arrays, ufunc, axis = axis, **kwargs)

This will provide us with a streaming function, meaning that we can look at the progress as it is being computed. We can also create a function that returns the max of the stream like numpy.ndarray.max() using the reduce_ufunc() function:

from npstreams import reduce_ufunc

def smax(*args, **kwargs):  # s for stream
    """
    Maximum of a stream along an axis.

    Parameters
    ----------
    arrays : iterable
        Stream of arrays to be compared.
    axis : int or None, optional
        Axis along which to compute the maximum. If None,
        arrays are flattened before reduction.
    ignore_nan : bool, optional
        If True, NaNs are ignored. Default is False.

    Yields
    ------
    max : ndarray
    """
    return reduce_ufunc(*args, **kwargs)