Control Flow¶
Streaming array pipelines¶
Before reducing your stream of arrays (e.g. averaging them together), you may want to
transform them. This can be done with the ipipe()
function:
- npstreams.ipipe(*args, **kwargs)
Pipe arrays through a sequence of functions. For example:
pipe(f, g, h, stream)
is equivalent tofor arr in stream: yield f(g(h(arr)))
- Parameters
*funcs (callable) – Callable that support Numpy arrays in their first argument. These should NOT be generator functions.
arrays (iterable) – Stream of arrays to be passed.
processes (int or None, optional, keyword-only) – Number of processes to use. If None, maximal number of processes is used. Default is one.
ntotal (int or None, optional, keyword-only) – If the length of arrays is known, but passing arrays as a list would take too much memory, the total number of arrays ntotal can be specified. This allows for pmap to chunk better in case of
processes > 1
.
- Yields
piped (ndarray)
Imagine we have the following pipeline, in which we want processes images in some iterable arrays
as follows:
Remove negative pixel intensity values;
Adjust the gamma value of images (from Scikit-image’s
exposure
module);Average the result together.
The following lines will do the trick:
from functools import partial
from npstreams import ipipe, iaverage, last
from skimage.exposure import adjust_gamma
def remove_negative(arr):
arr[arr < 0] = 0
return arr
pipeline = ipipe(adjust_gamma, remove_negative, arrays)
avgs = last(iaverage(pipeline))
If the pipeline is computationally intensive, we can also pipe arrays in parallel using the
keyword-only processes
:
pipeline = ipipe(adjust_gamma, remove_negative, arrays, processes = 4) # 4 cores will be used
avgs = last(iaverage(pipeline))
Since ipipe()
uses pmap()
under the hood, we can also use all available cores
by passing processes = None
.