Making your own Streaming Reduction Function¶
The ireduce_ufunc()
generator function¶
You can assemble your own streaming reduction function from a binary NumPy ufunc using the following generator function:
 npstreams.ireduce_ufunc(arrays, ufunc, axis=1, dtype=None, ignore_nan=False, **kwargs)¶
Streaming reduction generator function from a binary NumPy ufunc. Generator version of reduce_ufunc.
ufunc
must be a NumPy binary Ufunc (i.e. it takes two arguments). Moreover, for performance reasons, ufunc must have the same return types as input types. This precludes the use ofnumpy.greater
, for example.Note that performance is much better for the default
axis = 1
. In such a case, reduction operations can occur inplace. This also allows to operate in constantmemory.
 Parameters
arrays (iterable) – Arrays to be reduced.
ufunc (numpy.ufunc) – Binary universal function.
axis (int or None, optional) – Reduction axis. Default is to reduce the arrays in the stream as if they had been stacked along a new axis, then reduce along this new axis. If None, arrays are flattened before reduction. If axis is an int larger that the number of dimensions in the arrays of the stream, arrays are reduced along the new axis. Note that not all of NumPy Ufuncs support
axis = None
, e.g.numpy.subtract
.dtype (numpy.dtype or None, optional) – Overrides the dtype of the calculation and output arrays.
ignore_nan (bool, optional) – If True and ufunc has an identity value (e.g.
numpy.add.identity
is 0), then NaNs are replaced with this identity. An error is raised ifufunc
has no identity (e.g.numpy.maximum.identity
isNone
).kwargs – Keyword arguments are passed to
ufunc
. Note that some valid ufunc keyword arguments (e.g.keepdims
) are not valid for all streaming functions. Also, contrary to NumPy v. 1.10+,casting = 'unsafe
is the default in npstreams. Yields
reduced (ndarray or scalar)
:raises TypeError : if
ufunc
is not NumPy ufunc.: :raises ValueError : ifignore_nan
is True butufunc
has no identity: :raises ValueError : ifufunc
is not a binary ufunc: :raises ValueError : ifufunc
does not have the same input type as output type:
The nongenerator version is also available:
 npstreams.reduce_ufunc(arrays, ufunc, axis=1, dtype=None, ignore_nan=False, **kwargs)¶
Reduce a stream using a binary NumPy ufunc. Function version of
ireduce_ufunc
.
ufunc
must be a NumPy binary Ufunc (i.e. it takes two arguments). Moreover, for performance reasons, ufunc must have the same return types as input types. This precludes the use ofnumpy.greater
, for example.Note that performance is much better for the default
axis = 1
. In such a case, reduction operations can occur inplace. This also allows to operate in constantmemory.
 Parameters
arrays (iterable) – Arrays to be reduced.
ufunc (numpy.ufunc) – Binary universal function.
axis (int or None, optional) – Reduction axis. Default is to reduce the arrays in the stream as if they had been stacked along a new axis, then reduce along this new axis. If None, arrays are flattened before reduction. If axis is an int larger that the number of dimensions in the arrays of the stream, arrays are reduced along the new axis. Note that not all of NumPy Ufuncs support
axis = None
, e.g.numpy.subtract
.dtype (numpy.dtype or None, optional) – Overrides the dtype of the calculation and output arrays.
ignore_nan (bool, optional) – If True and ufunc has an identity value (e.g.
numpy.add.identity
is 0), then NaNs are replaced with this identity. An error is raised ifufunc
has no identity (e.g.numpy.maximum.identity
isNone
).kwargs – Keyword arguments are passed to
ufunc
. Note that some valid ufunc keyword arguments (e.g.keepdims
) are not valid for all streaming functions. Note that contrary to NumPy v. 1.10+,casting = 'unsafe
is the default in npstreams. Returns
reduced
 Return type
ndarray or scalar
:raises TypeError : if
ufunc
is not NumPy ufunc.: :raises ValueError : ifignore_nan
is True butufunc
has no identity: :raises ValueError: ifufunc
is not a binary ufunc: :raises ValueError: ifufunc
does not have the same input type as output type:
Note that while all NumPy ufuncs have a reduce()
method, not all of them are useful.
This is why ireduce_ufunc()
and reduce_ufunc()
will only work with binary ufuncs,
most of which are listed below. For performance reasons, we further restrict the use of
ireduce_ufunc()
and reduce_ufunc()
to ufuncs that have the same input types
as output types. Therefore, for example, numpy.greater()
cannot be made to work with
ireduce_ufunc()
and reduce_ufunc()
.
NaNs handling¶
NumPy ufuncs can have an identity value, that is, a value such that ufunc(x1, identity)
is always x1
. For such ufuncs,
ireduce_ufunc()
and reduce_ufunc()
can replace NaNs in the stream with the ufunc’s identity value, if ignore_nan = True
.
Note that not all ufuncs have an identity value; for example, how would you define the identity value of numpy.maximum
? There is no answer.
NumPy Binary Ufuncs¶
ireduce_ufunc()
is tested to work on the following binary ufuncs, which are available in NumPy.
Arithmetics¶

add(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

subtract(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

multiply(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

true_divide(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

logaddexp(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

logaddexp2(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

true_divide(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

floor_divide(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

power(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

remainder(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

remainder(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

fmod(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 
Trigonometric functions¶

arctan2(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

hypot(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 
Bittwiddling functions¶

bitwise_and(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

bitwise_or(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

bitwise_xor(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

left_shift(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

right_shift(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 
Comparison functions¶

maximum(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

fmax(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

minimum(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

fmin(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 
Floating functions¶

copysign(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

nextafter(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 

ldexp(x1, x2, /, out=None, *, where=True, casting='same_kind', order='K', dtype=None, subok=True[, signature, extobj]) 
Example: Streaming Maximum¶
Let’s create a streaming maximum function for a stream. First, we have to choose
how to handle NaNs; since numpy.maximum
does not have an identity value, we must find
another way. We can proceed as follows:
If we want to propagate NaNs, we should use
numpy.maximum()
If we want to ignore NaNs, we should use
numpy.fmax()
Both of those functions are binary ufuncs, so we can use ireduce_ufunc()
. Note that any function based
on ireduce_ufunc()
or reduce_ufunc()
will automatically work on streams of numbers thanks to the
array_stream()
decorator.
Putting it all together:
from npstreams import ireduce_ufunc
from numpy import maximum, fmax
def imax(arrays, axis = 1, ignore_nan = False, **kwargs):
"""
Streaming cumulative maximum along an axis.
Parameters

arrays : iterable
Stream of arrays to be compared.
axis : int or None, optional
Axis along which to compute the maximum. If None,
arrays are flattened before reduction.
ignore_nan : bool, optional
If True, NaNs are ignored. Default is False.
Yields

online_max : ndarray
"""
ufunc = fmax if ignore_nan else maximum
yield from ireduce_ufunc(arrays, ufunc, axis = axis, **kwargs)
This will provide us with a streaming function, meaning that we can look at the progress
as it is being computed. We can also create a function that returns the max of the stream
like numpy.ndarray.max()
using the reduce_ufunc()
function:
from npstreams import reduce_ufunc
def smax(*args, **kwargs): # s for stream
"""
Maximum of a stream along an axis.
Parameters

arrays : iterable
Stream of arrays to be compared.
axis : int or None, optional
Axis along which to compute the maximum. If None,
arrays are flattened before reduction.
ignore_nan : bool, optional
If True, NaNs are ignored. Default is False.
Yields

max : ndarray
"""
return reduce_ufunc(*args, **kwargs)