Control Flow

Streaming array pipelines

Before reducing your stream of arrays (e.g. averaging them together), you may want to transform them. This can be done with the ipipe() function:

npstreams.ipipe(*args, **kwargs)

Pipe arrays through a sequence of functions. For example:

pipe(f, g, h, stream) is equivalent to

for arr in stream:
    yield f(g(h(arr)))
Parameters
  • *funcs (callable) – Callable that support Numpy arrays in their first argument. These should NOT be generator functions.

  • arrays (iterable) – Stream of arrays to be passed.

  • processes (int or None, optional, keyword-only) – Number of processes to use. If None, maximal number of processes is used. Default is one.

  • ntotal (int or None, optional, keyword-only) – If the length of arrays is known, but passing arrays as a list would take too much memory, the total number of arrays ntotal can be specified. This allows for pmap to chunk better in case of processes > 1.

Yields

piped (ndarray)

Imagine we have the following pipeline, in which we want processes images in some iterable arrays as follows:

  • Remove negative pixel intensity values;

  • Adjust the gamma value of images (from Scikit-image’s exposure module);

  • Average the result together.

The following lines will do the trick:

from functools import partial
from npstreams import ipipe, iaverage, last
from skimage.exposure import adjust_gamma

def remove_negative(arr):
    arr[arr < 0] = 0
    return arr

pipeline = ipipe(adjust_gamma, remove_negative, arrays)
avgs = last(iaverage(pipeline))

If the pipeline is computationally intensive, we can also pipe arrays in parallel using the keyword-only processes:

pipeline = ipipe(adjust_gamma, remove_negative, arrays, processes = 4)  # 4 cores will be used
avgs = last(iaverage(pipeline))

Since ipipe() uses pmap() under the hood, we can also use all available cores by passing processes = None.