Source code for delayedarray.apply_over_blocks

from typing import Callable, Optional, Tuple
import math

from .chunk_grid import chunk_grid
from .Grid import AbstractGrid
from .is_sparse import is_sparse
from .extract_dense_array import extract_dense_array
from .extract_sparse_array import extract_sparse_array
from .default_buffer_size import default_buffer_size

__author__ = "ltla"
__copyright__ = "ltla"
__license__ = "MIT"


[docs] def apply_over_blocks(x, fun: Callable, allow_sparse: bool = False, grid: Optional[AbstractGrid] = None, buffer_size: Optional[int] = None) -> list: """ Iterate over an array by blocks. We apply a user-provided function and collect the results before proceeding to the next block. Args: x: An array-like object. fun: Function to apply to each block. This should accept two arguments; the first is a list containing the start/end of the current block on each dimension, and the second is the block contents. Each block is typically provided as a :py:class:`~numpy.ndarray`. allow_sparse: Whether to allow extraction of sparse subarrays. If true and ``x`` contains a sparse array, the block contents are instead represented by a :py:class:`~delayedarray.SparseNdarray.SparseNdarray`. grid: Grid to subdivide ``x`` for iteration. Specifically, iteration will attempt to extract blocks that are aligned with the grid boundaries, e.g., to optimize extraction of chunked data. Defaults to the output of :py:func:`~delayedarray.chunk_grid.chunk_grid` on ``x``. buffer_size: Buffer_size in bytes, to hold a single block per iteration. Larger values generally improve speed at the cost of memory. If ``None``, defaults to the value returned by :py:func:`~delayedarray.default_buffer_size.default_buffer_size`. Returns: List containing the output of ``fun`` on each block. """ if grid is None: grid = chunk_grid(x) if allow_sparse and is_sparse(x): extractor = extract_sparse_array else: extractor = extract_dense_array dims = (*range(len(x.shape)),) collected = [] if buffer_size is None: buffer_size = default_buffer_size() buffer_elements = buffer_size // x.dtype.itemsize for job in grid.iterate(dims, buffer_elements = buffer_elements): subsets = (*(range(s, e) for s, e in job),) output = fun(job, extractor(x, subsets)) collected.append(output) return collected