from typing import Tuple, Sequence, Optional, List, Generator, Dict
import bisect
import abc
from ._subset import _is_single_subset_noop
[docs]
class AbstractGrid(abc.ABC):
"""
Abstract base class for array grids. Each grid subdivides an array to
determine how it should be iterated over; this is useful for ensuring that
iteration respects the physical layout of an array.
Subclasses should define the ``shape``, ``boundaries`` and ``cost``
properties, as well as the ``subset``, ``transpose`` and ``iterate``
methods; see the :py:class:`~SimpleGrid` and :py:class:`~CompositeGrid`
subclasses for examples.
"""
@property
@abc.abstractmethod
def shape(self) -> Tuple[int, ...]:
pass
@property
@abc.abstractmethod
def cost(self) -> int:
pass
@property
@abc.abstractmethod
def boundaries(self) -> Tuple[Sequence[int], ...]:
pass
[docs]
@abc.abstractmethod
def transpose(self, perm: Tuple[int, ...]) -> "AbstractGrid":
pass
[docs]
@abc.abstractmethod
def subset(self, subset: Tuple[Sequence[int], ...]) -> "AbstractGrid":
pass
[docs]
@abc.abstractmethod
def iterate(self, dimensions: Tuple[int, ...], buffer_elements: int = 1e6) -> Generator[Tuple, None, None]:
pass
############################################################
############################################################
[docs]
class SimpleGrid(AbstractGrid):
"""
A simple grid to subdivide an array, involving arbitrary boundaries on each
dimension. Each grid element is defined by boundaries on each dimension.
"""
[docs]
def __init__(self, boundaries: Tuple[Sequence[int], ...], cost_factor: float, internals: Optional[Dict] = None):
"""
Args:
boundaries:
Tuple of length equal to the number of dimensions. Each entry
should be a strictly increasing sequence of integers specifying
the position of the grid boundaries; the last element should be
equal to the extent of the dimension for the array. A tuple
entry may also be an empty list for a zero-extent dimension.
cost_factor:
Positive number representing the cost of iteration over each
element of the grid's array. The actual cost is defined by the
product of the cost factor by the array size. This is used to
choose between iteration schemes; as a reference, extraction
from an in-memory NumPy array has a cost factor of 1.
internals:
Internal use only.
"""
self._boundaries = boundaries
if internals is not None and "shape" in internals:
shape = internals["shape"]
else:
new_shape = []
for bounds in boundaries:
if len(bounds):
new_shape.append(bounds[-1])
else:
new_shape.append(0)
shape = (*new_shape,)
self._shape = shape
if internals is not None and "maxgap" in internals:
maxgap = internals["maxgap"]
else:
maxgap = (*(SimpleGrid._define_maxgap(b) for b in boundaries),)
self._maxgap = maxgap
if internals is not None and "cost" in internals:
cost = internals["cost"]
else:
cost = cost_factor
for s in shape:
cost *= s
self._cost = cost
self._cost_factor = cost_factor
@staticmethod
def _define_maxgap(bounds):
last = 0
curmax = 0
for d in bounds:
gap = d - last
if gap > curmax:
curmax = gap
last = d
return curmax
@property
def shape(self) -> Tuple[int, ...]:
"""
Returns:
Shape of the grid, equivalent to the array's shape.
"""
return self._shape
@property
def boundaries(self) -> Tuple[Sequence[int], ...]:
"""
Returns:
Boundaries on each dimension of the grid.
"""
return self._boundaries
@property
def cost(self) -> float:
"""
Returns:
Cost of iteration over the underlying array.
"""
return self._cost
[docs]
def transpose(self, perm: Tuple[int, ...]) -> "SimpleGrid":
"""
Transpose a grid to reflect the same operation on the associated array.
Args:
perm:
Tuple of length equal to the dimensionality of the array,
containing the permutation of dimensions.
Returns:
A new ``SimpleGrid`` object.
"""
return SimpleGrid(
(*(self._boundaries[p] for p in perm),),
self._cost_factor,
internals = { # Save ourselves the trouble of recomputing these.
"shape": (*(self._shape[p] for p in perm),),
"maxgap": (*(self._maxgap[p] for p in perm),),
"cost": self._cost,
}
)
[docs]
def subset(self, subset: Tuple[Sequence[int], ...]) -> "SimpleGrid":
"""
Subset a grid to reflect the same operation on the associated array.
For any given dimension, consecutive elements in the subset are only
placed in the same grid interval in the subsetted grid if they belong
to the same grid interval in the original grid.
Args:
subset:
Tuple of length equal to the number of grid dimensions. Each
entry should be a (possibly unsorted) sequence of integers,
specifying the subset to apply to each dimension of the grid.
Returns:
A new ``SimpleGrid`` object.
"""
if len(subset) != len(self._shape):
raise ValueError("'shape' and 'subset' should have the same length")
new_boundaries = []
new_shape = []
new_maxgap = []
for i, bounds in enumerate(self._boundaries):
cursub = subset[i]
if _is_single_subset_noop(self._shape[i], cursub):
new_boundaries.append(bounds)
new_shape.append(self._shape[i])
new_maxgap.append(self._maxgap[i])
else:
my_boundaries = []
counter = 0
if len(bounds):
last_chunk = -1
for y in cursub:
cur_chunk = bisect.bisect_right(bounds, y)
if cur_chunk != last_chunk:
if counter > 0:
my_boundaries.append(counter)
last_chunk = cur_chunk
counter += 1
my_boundaries.append(counter)
new_boundaries.append(my_boundaries)
new_shape.append(counter)
new_maxgap.append(SimpleGrid._define_maxgap(my_boundaries))
return SimpleGrid(
(*new_boundaries,),
self._cost_factor,
internals = {
"shape": (*new_shape,),
"maxgap": (*new_maxgap,),
}
)
def _recursive_iterate(self, dimension: int, used: List[bool], starts: List[int], ends: List[int], buffer_elements: int, prescale: List[int]):
bounds = self._boundaries[dimension]
full_end = self._shape[dimension]
if used[dimension]:
# 'prescale' holds the worst-case minimum block sizes of the
# remaining dimensions (i.e., in the subsequent recursion levels).
# These are the worst-case because we assume that all remaining
# dimensions are using their maximum gap for a single grid interval
# to form a block, hence it's a conservative estimate of the space
# we have left to play with on the current dimenion. Note that this
# should only differ from buffer_elements when dimension > 0.
conservative_buffer_elements = max(1, buffer_elements // prescale[dimension])
start = 0
pos = 0
nb = len(bounds)
while True:
if pos == nb:
# Wrapping up the last block, if the grid-breaking code
# has not already iterated to the end of the dimension.
if start != full_end:
starts[dimension] = start
ends[dimension] = full_end
if dimension == 0:
yield (*zip(starts, ends),)
else:
yield from self._recursive_iterate(dimension - 1, used, starts, ends, buffer_elements // (full_end - start), prescale)
break
# Check if we can keep going to make a larger block.
current_end = bounds[pos]
if current_end - start <= conservative_buffer_elements:
pos += 1
continue
if pos:
previous_end = bounds[pos - 1]
else:
previous_end = 0
# Break grid intervals to force compliance with the buffer element limit.
if previous_end == start:
while start < current_end:
starts[dimension] = start
breaking_end = min(current_end, start + conservative_buffer_elements)
ends[dimension] = breaking_end
if dimension == 0:
yield (*zip(starts, ends),)
else:
# Next level of recursion uses buffer_elements, not its
# conservative counterpart, as the next level actually has
# knowledge of the boundaries for that dimension.
yield from self._recursive_iterate(dimension - 1, used, starts, ends, buffer_elements // (breaking_end - start), prescale)
start = breaking_end
pos += 1
continue
# Falling back to the last boundary that fit in the buffer limit.
starts[dimension] = start
ends[dimension] = previous_end
if dimension == 0:
yield (*zip(starts, ends),)
else:
yield from self._recursive_iterate(dimension - 1, used, starts, ends, buffer_elements // (previous_end - start), prescale)
start = previous_end
else:
# If this dimension is not used, we return its entire extent.
starts[dimension] = 0
ends[dimension] = full_end
if dimension == 0:
yield (*zip(starts, ends),)
else:
yield from self._recursive_iterate(dimension - 1, used, starts, ends, buffer_elements // full_end, prescale)
[docs]
def iterate(self, dimensions: Tuple[int, ...], buffer_elements: int = 1e6) -> Generator[Tuple, None, None]:
"""
Iterate over an array grid. This assembles blocks of contiguous grid
intervals to reduce the number of iterations (and associated overhead)
at the cost of increased memory usage during data extraction.
Args:
dimensions:
Dimensions over which to perform the iteration. Any dimensions
not listed here are extracted in their entirety, i.e., each
block consists of the full extent of unlisted dimensions.
buffer_elements:
Total number of elements in each block. Larger values increase
the block size and reduce the number of iterations, at the cost
of increased memory usage at each iteration.
Returns:
A generator that returns a tuple of length equal to the number of
dimensions. Each element contains the start and end of the block
on its corresponding dimension.
"""
if 0 in self._shape:
return
ndim = len(self._shape)
used = [False] * ndim
for d in dimensions:
used[d] = True
# See comments above about 'prescale'. For each dimension, this
# contains the worst-case minimum block size from the remaining
# dimensions. For used dimensions, this is computed from the max gap;
# for unused dimensions, the full extent is applied.
prescale = [1]
for i in range(ndim - 1):
last = prescale[-1]
if used[i]:
last *= self._maxgap[i]
else:
last *= self._shape[i]
prescale.append(last)
starts = [0] * ndim
ends = [0] * ndim
yield from self._recursive_iterate(ndim - 1, used, starts, ends, buffer_elements, prescale)
############################################################
############################################################
[docs]
class CompositeGrid(AbstractGrid):
"""
A grid to subdivide an array, constructed by combining component grids
along a specified dimension. This aims to mirror the same combining
operation for the arrays associated with the component grids.
"""
[docs]
def __init__(self, components: Tuple[AbstractGrid, ...], along: int, internals: Optional[Dict] = None):
"""
Args:
components:
Component grids to be combined to form the composite grid.
Each grid should have the same dimension extents, except for
the ``along`` dimension.
along:
Dimension over which to combine entries of ``components``.
internals:
Internal use only.
"""
self._components = components
self._along = along
if internals is not None and "shape" in internals:
shape = internals["shape"]
else:
first = components[0]
new_shape = list(first.shape)
for i in range(1, len(components)):
current = components[i]
for j, d in enumerate(current.shape):
if j == along:
new_shape[j] += d
elif new_shape[j] != d:
raise ValueError("entries of 'components' should have the same shape on all dimensions except 'along'")
shape = (*new_shape,)
self._shape = shape
if internals is not None and "boundaries" in internals:
boundaries = internals["boundaries"]
else:
boundaries = None
self._boundaries = boundaries
if internals is not None and "cost" in internals:
cost = internals["cost"]
else:
cost = 0
for comp in self._components:
cost += comp.cost
self._cost = cost
@property
def shape(self) -> Tuple[int, ...]:
"""
Returns:
Shape of the grid, equivalent to the array's shape.
"""
return self._shape
@property
def boundaries(self) -> Tuple[Sequence[int], ...]:
"""
Returns:
Boundaries on each dimension of the grid. For the ``along``
dimension, this is a concatenation of the boundaries for the
component grids. For all other dimensions, the boundaries are
set to those of the most costly component grid.
"""
if self._boundaries is None: # Lazy evaluation
chosen, maxcost = self._maxcost()
new_boundaries = list(self._components[chosen].boundaries)
replacement = []
offset = 0
for i, comp in enumerate(self._components):
if i == chosen:
addition = new_boundaries[self._along]
else:
addition = comp.boundaries[self._along]
for a in addition:
replacement.append(a + offset)
offset += comp.shape[self._along]
new_boundaries[self._along] = replacement
self._boundaries = (*new_boundaries,)
return self._boundaries
@property
def cost(self) -> float:
"""
Returns:
Cost of iteration over the underlying array. This is defined
as the sum of the costs of the component arrays.
"""
return self._cost
def _maxcost(self) -> Tuple[int, float]:
chosen = 0
maxcost = 0
for i, comp in enumerate(self._components):
if isinstance(comp, CompositeGrid):
tmp, curcost = comp._maxcost()
else:
curcost = comp.cost
if curcost > maxcost:
maxcost = curcost
chosen = i
return chosen, maxcost
[docs]
def transpose(self, perm: Tuple[int, ...]) -> "CompositeGrid":
"""
Transpose a grid to reflect the same operation on the associated array.
Args:
perm:
Tuple of length equal to the dimensionality of the array,
containing the permutation of dimensions.
Returns:
A new ``CompositeGrid`` object.
"""
new_components = [grid.transpose(perm) for grid in self._components]
new_along = 0
for i, p in enumerate(perm):
if p == self._along:
new_along = i
internals = { "cost": self._cost }
if self._boundaries is not None:
internals["boundaries"] = (*(self._boundaries[p] for p in perm),)
return CompositeGrid(
new_components,
new_along,
internals = internals,
)
[docs]
def subset(self, subset: Tuple[Sequence[int], ...]) -> "CompositeGrid":
"""
Subset a grid to reflect the same operation on the associated array.
This splits up the subset sequence for the ``along`` dimension and
distributes it to each of the component grids.
Args:
subset:
Tuple of length equal to the number of grid dimensions. Each
entry should be a (possibly unsorted) sequence of integers,
specifying the subset to apply to each dimension of the grid.
Returns:
A new ``CompositeGrid`` object.
"""
if len(subset) != len(self._shape):
raise ValueError("'shape' and 'subset' should have the same length")
if _is_single_subset_noop(self._shape[self._along], subset[self._along]):
new_components = []
new_subset = list(subset)
for grid in self._components:
new_subset[self._along] = range(grid.shape[self._along])
new_components.append(grid.subset((*new_subset,)))
return CompositeGrid(new_components, self._along)
component_limits = []
counter = 0
for y in self._components:
counter += y.shape[self._along]
component_limits.append(counter)
last_choice = -1
new_components = []
sofar = []
raw_subset = list(subset)
for s in subset[self._along]:
choice = bisect.bisect_left(component_limits, s)
if choice != last_choice:
if len(sofar):
raw_subset[self._along] = sofar
new_components.append(self._components[last_choice].subset((*raw_subset,)))
sofar = []
last_choice = choice
if choice:
sofar.append(s - component_limits[choice - 1])
else:
sofar.append(s)
if len(sofar):
raw_subset[self._along] = sofar
new_components.append(self._components[last_choice].subset((*raw_subset,)))
new_shape = []
for i, s in enumerate(subset):
if s is None:
new_shape.append(self._shape[i])
else:
new_shape.append(len(s))
return CompositeGrid(
new_components,
self._along,
internals = {
"shape": (*new_shape,),
}
)
[docs]
def iterate(self, dimensions: Tuple[int, ...], buffer_elements: int = 1e6) -> Generator[Tuple, None, None]:
"""
Iterate over an array grid. This assembles blocks of contiguous grid
intervals to reduce the number of iterations (and associated overhead)
at the cost of increased memory usage during data extraction. For any
iteration over the ``along`` dimension (i.e., ``along`` is in
``dimensions``), this function dispatches to the component grids;
otherwise the iteration is performed based on :py:meth:`~boundaries`.
Args:
dimensions:
Dimensions over which to perform the iteration. Any dimensions
not listed here are extracted in their entirety, i.e., each
block consists of the full extent of unlisted dimensions.
buffer_elements:
Total number of elements in each block. Larger values increase
the block size and reduce the number of iterations, at the cost
of increased memory usage at each iteration.
Returns:
A generator that returns a tuple of length equal to the number of
dimensions. Each element contains the start and end of the block
on its corresponding dimension.
"""
if 0 in self._shape:
return
if self._along in dimensions:
first = True
offset = 0
for grid in self._components:
for block in grid.iterate(dimensions=dimensions, buffer_elements=buffer_elements):
if offset == 0:
yield block
else:
copy = list(block)
comp = copy[self._along]
copy[self._along] = (comp[0] + offset, comp[1] + offset)
yield (*copy,)
offset += grid.shape[self._along]
return
temp = SimpleGrid(self.boundaries, cost_factor=1)
yield from temp.iterate(dimensions=dimensions, buffer_elements=buffer_elements)