Source code for delayedarray.SparseNdarray

from bisect import bisect_left
from typing import Callable, List, Optional, Sequence, Tuple, Union
from collections import namedtuple
import numpy
import copy

from ._isometric import translate_ufunc_to_op_simple, translate_ufunc_to_op_with_args, ISOMETRIC_OP_WITH_ARGS, _choose_operator, _infer_along_with_args
from ._subset import _spawn_indices, _getitem_subset_preserves_dimensions, _getitem_subset_discards_dimensions, _repr_subset
from ._mask import (
    _allocate_unmasked_ndarray, 
    _allocate_maybe_masked_ndarray, 
    _convert_to_unmasked_1darray, 
    _convert_to_maybe_masked_1darray,
    _concatenate_unmasked_ndarrays,
    _concatenate_maybe_masked_ndarrays
)
from ._statistics import array_mean, array_var, array_sum, _create_offset_multipliers, array_all, array_any

__author__ = "ltla"
__copyright__ = "ltla"
__license__ = "MIT"


[docs] class SparseNdarray: """ The ``SparseNdarray``, as its name suggests, is a sparse n-dimensional array. It is inspired by the ``SVT_Array`` class from the `DelayedArray R/Bioconductor package <https://bioconductor.org/packages/DelayedArray>`_. Internally, the ``SparseNdarray`` is represented as a nested list where each nesting level corresponds to a dimension in reverse order, i.e., the outer-most list corresponds to elements of the last dimension in ``shape``. The list at each level has length equal to the extent of its dimension, where each entry contains another list representing the contents of the corresponding element of that dimension. This recursion continues until the second dimension (i.e., the penultimate nesting level), where each entry instead contains ``(index, value)`` tuples. In effect, this is a tree where the non-leaf nodes are lists and the leaf nodes are tuples. Each ``(index, value)`` tuple represents a sparse vector for the corresponding element of the first dimension of the ``SparseNdarray``. ``index`` should be a :py:class:`~numpy.ndarray` of integers where entries are strictly increasing and less than the extent of the first dimension. All ``index`` objects in the same ``SparseNdarray`` should have the same ``dtype`` (defined by the ``index_dtype`` property). ``value`` may be any numeric/boolean :py:class:`~numpy.ndarray` but the ``dtype`` should be consistent across all ``value`` objects in the ``SparseNdarray``. If the array contains masked values, all ``value`` objects should be a ``MaskedArray``, otherwise they should be regular NumPy arrays. Any entry of any (nested) list may also be None, indicating that the corresponding element of the dimension contains no non-zero values. In fact, the entire tree may be None, indicating that there are no non-zero values in the entire array. For 1-dimensional arrays, the contents should be a single ``(index, value)`` tuple containing the sparse contents. This may also be None if there are no non-zero values in the array. """
[docs] def __init__( self, shape: Tuple[int, ...], contents, dtype: Optional[numpy.dtype] = None, index_dtype: Optional[numpy.dtype] = None, is_masked: Optional[bool] = None, check: bool = True, ): """ Args: shape: Tuple specifying the dimensions of the array. contents: For ``n``-dimensional arrays where ``n`` > 1, a nested list representing a tree where each leaf node is a tuple containing a sparse vector (or None). For 1-dimensional arrays, a tuple containing a sparse vector. Alternatively None, if the array is empty. dtype: NumPy type of the array values. If None, this is inferred from ``contents``. index_dtype: NumPy type of the array indices. If None, this is inferred from ``contents``. is_masked: Whether ``contents`` contains masked values. If None, this is inferred from ``contents``. check: Whether to check the consistency of the ``contents`` during construction. This can be set to False for speed. """ self._shape = shape self._contents = contents ndim = len(shape) if dtype is None or index_dtype is None or is_masked is None: if contents is not None: if ndim > 1: info = _peek(contents, dim=ndim-1) if info is not None: index_dtype0 = info[0] dtype0 = info[1] is_masked0 = info[2] else: dtype0 = None index_dtype0 = None is_masked0 = False else: index_dtype0 = contents[0].dtype dtype0 = contents[1].dtype is_masked0 = numpy.ma.isMaskedArray(contents[1]) if dtype is None: dtype = dtype0 if index_dtype is None: index_dtype = index_dtype0 if is_masked is None: is_masked = is_masked0 if dtype is None: raise ValueError("cannot infer 'dtype' from 'contents'") if index_dtype is None: raise ValueError("cannot infer 'index_dtype' from 'contents'") if is_masked is None: is_masked = False self._dtype = dtype self._index_dtype = index_dtype self._is_masked = is_masked if check is True and contents is not None: payload = _CheckPayload(dtype=self._dtype, index_dtype=self._index_dtype, is_masked=self._is_masked, max_index=self._shape[0]) if len(shape) > 1: _recursive_check(self._contents, self._shape, payload=payload, dim=ndim-1) else: _check_sparse_tuple(self._contents[0], self._contents[1], payload=payload)
@property def shape(self) -> Tuple[int, ...]: """ Returns: Tuple of integers specifying the extent of each dimension. """ return self._shape @property def dtype(self) -> numpy.dtype: """ Returns: NumPy type of the values. """ return self._dtype @property def index_dtype(self) -> numpy.dtype: """ Returns: NumPy type of the indices. """ return self._index_dtype @property def is_masked(self) -> bool: """ Returns: Whether the values are masked. """ return self._is_masked @property def contents(self): """Contents of the array. This is intended to be read-only and should only be modified if you really know what you're doing. Returns: A nested list, for a n-dimensional array where n > 1. A tuple containing a sparse vector (i.e., indices and values), for a 1-dimensional array. Alternatively None, if the array contains no non-zero elements. """ return self._contents
[docs] def __repr__(self) -> str: """ Pretty-print this ``SparseNdarray``. This uses :py:func:`~numpy.array2string` and responds to all of its options. Returns: String containing a prettified display of the array contents. """ preamble = "<" + " x ".join([str(x) for x in self._shape]) + ">" preamble += " " + type(self).__name__ + " object of type '" + self._dtype.name + "'" indices = _repr_subset(self._shape) bits_and_pieces = _extract_dense_array_from_SparseNdarray(self, indices) converted = numpy.array2string(bits_and_pieces, separator=", ", threshold=0) return preamble + "\n" + converted
# For NumPy:
[docs] def __array__(self, dtype: Optional[numpy.dtype] = None, copy: bool = True) -> numpy.ndarray: """Convert a ``SparseNdarray`` to a NumPy array. Args: dtype: The desired NumPy type of the output array. If None, the type of the seed is used. copy: Currently ignored. The output is never a reference to the underlying seed, even if the seed is another NumPy array. Returns: Dense array of the same type as :py:attr:`~dtype` and shape as :py:attr:`~shape`. """ indices = _spawn_indices(self._shape) return _extract_dense_array_from_SparseNdarray(self, indices, dtype=dtype)
# Assorted dunder methods.
[docs] def __add__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Add something to the right-hand-side of a ``SparseNdarray``. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the addition. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="add", right=True)
[docs] def __radd__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Add something to the left-hand-side of a ``SparseNdarray``. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the addition. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="add", right=False)
[docs] def __sub__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Subtract something from the right-hand-side of a ``SparseNdarray``. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the subtraction. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="subtract", right=True)
[docs] def __rsub__(self, other): """Subtract a ``SparseNdarray`` from something else. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the subtraction. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="subtract", right=False)
[docs] def __mul__(self, other): """Multiply a ``SparseNdarray`` with something on the right hand side. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the multiplication. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="multiply", right=True)
[docs] def __rmul__(self, other): """Multiply a ``SparseNdarray`` with something on the left hand side. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the multiplication. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="multiply", right=False)
[docs] def __truediv__(self, other): """Divide a ``SparseNdarray`` by something. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the division. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="divide", right=True)
[docs] def __rtruediv__(self, other): """Divide something by a ``SparseNdarray``. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the division. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="divide", right=False)
[docs] def __mod__(self, other): """Take the remainder after dividing a ``SparseNdarray`` by something. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the modulo. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="remainder", right=True)
[docs] def __rmod__(self, other): """Take the remainder after dividing something by a ``SparseNdarray``. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the modulo. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray( self, other, operation="remainder", right=False )
[docs] def __floordiv__(self, other): """Divide a ``SparseNdarray`` by something and take the floor. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the floor division. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray( self, other, operation="floor_divide", right=True )
[docs] def __rfloordiv__(self, other): """Divide something by a ``SparseNdarray`` and take the floor. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the floor division. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray( self, other, operation="floor_divide", right=False )
[docs] def __pow__(self, other): """Raise a ``SparseNdarray`` to the power of something. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the power operation. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="power", right=True)
[docs] def __rpow__(self, other): """Raise something to the power of the contents of a ``SparseNdarray``. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the power operation. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="power", right=False)
[docs] def __eq__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Check for equality between a ``SparseNdarray`` and something. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="equal", right=True)
[docs] def __req__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Check for equality between something and a ``SparseNdarray``. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="equal", right=False)
[docs] def __ne__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Check for non-equality between a ``SparseNdarray`` and something. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="not_equal", right=True)
[docs] def __rne__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Check for non-equality between something and a ``SparseNdarray``. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="not_equal", right=False)
[docs] def __ge__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Check whether a ``SparseNdarray`` is greater than or equal to something. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="greater_equal", right=True)
[docs] def __rge__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Check whether something is greater than or equal to a ``SparseNdarray``. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray( self, other, operation="greater_equal", right=False )
[docs] def __le__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Check whether a ``SparseNdarray`` is less than or equal to something. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray( self, other, operation="less_equal", right=True )
[docs] def __rle__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Check whether something is greater than or equal to a ``SparseNdarray``. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray( self, other, operation="less_equal", right=False )
[docs] def __gt__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Check whether a ``SparseNdarray`` is greater than something. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="greater", right=True)
[docs] def __rgt__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Check whether something is greater than a ``SparseNdarray``. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="greater", right=False)
[docs] def __lt__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Check whether a ``SparseNdarray`` is less than something. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="less", right=True)
[docs] def __rlt__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Check whether something is less than a ``SparseNdarray``. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or any seed object of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="less", right=False)
# Simple methods.
[docs] def __neg__(self): """Negate the contents of a ``SparseNdarray``. Returns: A ``SparseNdarray`` containing the delayed negation. """ return _transform_sparse_array_from_SparseNdarray(self, lambda l, i, v : (i, -v), self._dtype)
[docs] def __abs__(self): """Take the absolute value of the contents of a ``SparseNdarray``. Returns: A ``SparseNdarray`` containing the delayed absolute value operation. """ return _transform_sparse_array_from_SparseNdarray(self, lambda l, i, v : (i, abs(v)), self._dtype)
[docs] def __or__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Element-wise OR with something. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or a ``DelayedArray`` of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="logical_or", right=True)
[docs] def __ror__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Element-wise OR with the right-hand-side of a ``DelayedArray``. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or a ``DelayedArray`` of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="logical_or", right=False)
[docs] def __and__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Element-wise AND with something. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or a ``DelayedArray`` of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="logical_and", right=True)
[docs] def __rand__(self, other) -> Union["SparseNdarray", numpy.ndarray]: """Element-wise AND with the right-hand-side of a ``DelayedArray``. Args: other: A numeric scalar; or a NumPy array with dimensions as described in :py:class:`~delayedarray.UnaryIsometricOpWithArgs.UnaryIsometricOpWithArgs`; or a ``DelayedArray`` of the same dimensions as :py:attr:`~shape`. Returns: Array containing the result of the check. This may or may not be sparse depending on ``other``. """ return _operate_with_args_on_SparseNdarray(self, other, operation="logical_and", right=False)
# Subsetting.
[docs] def __getitem__(self, subset: Tuple[Union[slice, Sequence], ...]) -> Union["SparseNdarray", numpy.ndarray]: """Take a subset of this ``SparseNdarray``. This follows the same logic as NumPy slicing and will generate a :py:class:`~delayedarray.Subset.Subset` object when the subset operation preserves the dimensionality of the seed, i.e., ``args`` is defined using the :py:meth:`~numpy.ix_` function. Args: args: A :py:class:`tuple` of length equal to the dimensionality of this ``SparseNdarray``. Any NumPy slicing is supported but only subsets that preserve dimensionality will generate a delayed subset operation. Raises: ValueError: If ``args`` contain more dimensions than the shape of the array. Returns: If the dimensionality is preserved by ``args``, a ``SparseNdarray`` containing a delayed subset operation is returned. Otherwise, a :py:class:`~numpy.ndarray` is returned containing the realized subset. """ cleaned = _getitem_subset_preserves_dimensions(self.shape, subset) if cleaned is not None: # No need to sanitize here, as the extractors can take unsorted subsets. return _extract_sparse_array_from_SparseNdarray(self, cleaned) return _getitem_subset_discards_dimensions(self, subset, _extract_dense_array_from_SparseNdarray)
# NumPy methods.
[docs] def __array_ufunc__(self, ufunc, method, *inputs, **kwargs) -> "SparseNdarray": """Interface with NumPy array methods. This is used to implement mathematical operations like NumPy's :py:meth:`~numpy.log`, or to override operations between NumPy class instances and ``SparseNdarray`` objects where the former is on the left hand side. Check out NumPy's ``__array_ufunc__`` `documentation <https://numpy.org/doc/stable/reference/arrays.classes.html#numpy.class.__array_ufunc__>`_ for more details. Returns: A ``SparseNdarray`` instance containing the requested delayed operation. """ if ufunc.__name__ in translate_ufunc_to_op_with_args or ufunc.__name__ == "true_divide": # This is required to support situations where the NumPy array is on # the LHS, such that the numpy.ndarray method gets called first. op = ufunc.__name__ if ufunc.__name__ == "true_divide": op = "divide" first_is_da = isinstance(inputs[0], SparseNdarray) da = inputs[1 - int(first_is_da)] v = inputs[int(first_is_da)] return _operate_with_args_on_SparseNdarray(self, v, op, right=False) elif ufunc.__name__ in translate_ufunc_to_op_simple: dummy = ufunc(numpy.zeros(1, dtype=self._dtype)) if dummy[0] == 0: return _transform_sparse_array_from_SparseNdarray(self, lambda l, i, v : (i, ufunc(v)), dummy.dtype) else: return ufunc(self.__array__()) elif ufunc.__name__ == "absolute": return self.__abs__() raise NotImplementedError(f"'{ufunc.__name__}' is not implemented!")
[docs] def __array_function__(self, func, types, args, kwargs) -> "SparseNdarray": """Interface to NumPy's high-level array functions. This is used to implement array operations like NumPy's :py:meth:`~numpy.concatenate`, Check out NumPy's ``__array_function__`` `documentation <https://numpy.org/doc/stable/reference/arrays.classes.html#numpy.class.__array_function__>`_ for more details. Returns: A ``SparseNdarray`` instance containing the requested operation. """ if func == numpy.concatenate: if "axis" in kwargs: axis = kwargs["axis"] else: axis = 0 return _concatenate_SparseNdarrays(args[0], along=axis) if func == numpy.transpose: if "axes" in kwargs: axes = kwargs["axes"] else: axes = list(range(len(self._shape) - 1, -1, -1)) return _transpose_SparseNdarray(self, axes) if func == numpy.round: return _transform_sparse_array_from_SparseNdarray(self, lambda l, i, v : (i, func(v, **kwargs)), self._dtype) if func == numpy.mean: return self.mean(**kwargs) if func == numpy.sum: return self.sum(**kwargs) if func == numpy.var: return self.var(**kwargs) if func == numpy.any: return self.any(**kwargs) if func == numpy.all: return self.all(**kwargs) raise NotImplementedError(f"'{func.__name__}' is not implemented!")
[docs] def astype(self, dtype: numpy.dtype, **kwargs) -> "SparseNdarray": """See :py:meth:`~numpy.ndarray.astype` for details. All keyword arguments are currently ignored. """ return _transform_sparse_array_from_SparseNdarray(self, lambda l, i, v : (i, v.astype(dtype)), dtype)
@property def T(self) -> "SparseNdarray": """ Returns: A ``SparseNdarray`` containing the transposed contents. """ axes = list(range(len(self._shape) - 1, -1, -1)) return _transpose_SparseNdarray(self, axes)
[docs] def sum(self, axis: Optional[Union[int, Tuple[int, ...]]] = None, dtype: Optional[numpy.dtype] = None) -> numpy.ndarray: """ Take the sum of values across the ``SparseNdarray``, possibly over a given axis or set of axes. Args: axis: A single integer specifying the axis over which to calculate the sum. Alternatively, a tuple (multiple axes) or None (no axes), see :py:func:`~numpy.sum` for details. dtype: NumPy type for the output array. If None, this is automatically chosen based on the type of the ``SparseNdarray``, see :py:func:`~numpy.sum` for details. Returns: A NumPy array containing the summed values. If ``axis = None``, this will be a NumPy scalar instead. """ return array_sum( self, axis=axis, dtype=dtype, reduce_over_x=_reduce_SparseNdarray, masked=self._is_masked, )
[docs] def mean(self, axis: Optional[Union[int, Tuple[int, ...]]] = None, dtype: Optional[numpy.dtype] = None) -> numpy.ndarray: """ Take the mean of values across the ``SparseNdarray``, possibly over a given axis or set of axes. Args: axis: A single integer specifying the axis over which to calculate the mean. Alternatively, a tuple (multiple axes) or None (no axes), see :py:func:`~numpy.mean` for details. dtype: NumPy type for the output array. If None, this is automatically chosen based on the type of the ``SparseNdarray``, see :py:func:`~numpy.mean` for details. Returns: A NumPy array containing the mean values. If ``axis = None``, this will be a NumPy scalar instead. """ return array_mean( self, axis=axis, dtype=dtype, reduce_over_x=_reduce_SparseNdarray, masked=self._is_masked, )
[docs] def var(self, axis: Optional[Union[int, Tuple[int, ...]]] = None, dtype: Optional[numpy.dtype] = None, ddof: int = 0) -> numpy.ndarray: """ Take the variances of values across the ``SparseNdarray``, possibly over a given axis or set of axes. Args: axis: A single integer specifying the axis over which to calculate the variance. Alternatively, a tuple (multiple axes) or None (no axes), see :py:func:`~numpy.var` for details. dtype: NumPy type for the output array. If None, this is automatically chosen based on the type of the ``SparseNdarray``, see :py:func:`~numpy.var` for details. ddof: Delta in the degrees of freedom to subtract from the denominator. Typically set to 1 to obtain the sample variance. Returns: A NumPy array containing the variances. If ``axis = None``, this will be a NumPy scalar instead. """ return array_var( self, axis=axis, dtype=dtype, ddof=ddof, reduce_over_x=_reduce_SparseNdarray, masked=self._is_masked, )
[docs] def any(self, axis: Optional[Union[int, Tuple[int, ...]]] = None, dtype: Optional[numpy.dtype] = None) -> numpy.ndarray: """Test whether any array element along a given axis evaluates to True. Compute this test across the ``SparseNdarray``, possibly over a given axis or set of axes. If the seed has a ``any()`` method, that method is called directly with the supplied arguments. Args: axis: A single integer specifying the axis over which to test for any. Alternatively, a tuple (multiple axes) or None (no axes), see :py:func:`~numpy.any` for details. dtype: NumPy type for the output array. If None, this is automatically chosen based on the type of the ``SparseNdarray``, see :py:func:`~numpy.any` for details. Returns: A NumPy array containing the variances. If ``axis = None``, this will be a NumPy scalar instead. """ return array_any( self, axis=axis, dtype=dtype, reduce_over_x=_reduce_SparseNdarray, masked=self._is_masked, )
[docs] def all(self, axis: Optional[Union[int, Tuple[int, ...]]] = None, dtype: Optional[numpy.dtype] = None) -> numpy.ndarray: """Test whether all array elements along a given axis evaluate to True. Compute this test across the ``SparseNdarray``, possibly over a given axis or set of axes. If the seed has a ``all()`` method, that method is called directly with the supplied arguments. Args: axis: A single integer specifying the axis over which to test for any. Alternatively, a tuple (multiple axes) or None (no axes), see :py:func:`~numpy.any` for details. dtype: NumPy type for the output array. If None, this is automatically chosen based on the type of the ``SparseNdarray``, see :py:func:`~numpy.any` for details. Returns: A NumPy array containing the variances. If ``axis = None``, this will be a NumPy scalar instead. """ return array_all( self, axis=axis, dtype=dtype, reduce_over_x=_reduce_SparseNdarray, masked=self._is_masked, )
# Other stuff
[docs] def __copy__(self) -> "SparseNdarray": """ Returns: A deep copy of this object. """ return SparseNdarray( shape=self._shape, contents=copy.deepcopy(self._contents), index_dtype=self._index_dtype, dtype=self._dtype, is_masked=self._is_masked, check=False )
[docs] def copy(self) -> "SparseNdarray": """ Returns: A deep copy of this object. """ return self.__copy__()
######################################################### ######################################################### def _peek(contents: list, dim: int): if dim == 1: for x in contents: if x is not None: return x[0].dtype, x[1].dtype, numpy.ma.isMaskedArray(x[1]) else: for x in contents: if x is not None: out = _peek(x, dim - 1) if out is not None: return out return None _CheckPayload = namedtuple("_CheckPayload", [ "max_index", "dtype", "index_dtype", "is_masked" ]) def _check_sparse_tuple(indices: numpy.ndarray, values: numpy.ndarray, payload: _CheckPayload): if len(indices) != len(values): raise ValueError("length of index and value vectors should be the same") if values.dtype != payload.dtype: raise ValueError("inconsistent data types for different value vectors") if indices.dtype != payload.index_dtype: raise ValueError("inconsistent data types for different index vectors") for i, ix in enumerate(indices): if ix < 0 or ix >= payload.max_index: raise ValueError("index vectors out of range for the last dimension") for i in range(1, len(indices)): if indices[i] <= indices[i - 1]: raise ValueError("index vectors should be sorted in strictly increasing order") if payload.is_masked != numpy.ma.isMaskedArray(values): raise ValueError("inconsistent masking status for different value vectors") def _recursive_check(contents: list, shape: Tuple[int, ...], payload: _CheckPayload, dim: int): if len(contents) != shape[dim]: raise ValueError("length of 'contents' or its components should match the extent of the corresponding dimension") if dim == 1: for x in contents: if x is not None: _check_sparse_tuple(x[0], x[1], payload) else: for x in contents: if x is not None: _recursive_check(x, shape, payload, dim=dim-1) ######################################################### ######################################################### _SubsetSummary = namedtuple("_SubsetSummary", [ "subset", "increasing", "consecutive", "search_first", "search_last", "first_index", "past_last_index", "random_map" ]) def _characterize_indices(subset: Sequence[int], dim_extent: int): if len(subset) == 0: return _SubsetSummary( subset = subset, increasing = False, consecutive = False, first_index = None, past_last_index = None, search_first = None, search_last = None, random_map = None, ) for i in range(1, len(subset)): if subset[i] <= subset[i-1]: random_map = {} for i, x in enumerate(subset): if x in random_map: if isinstance(random_map[x], list): random_map[x].append(i) else: random_map[x] = [random_map[x], i] else: random_map[x] = i first = min(subset) return _SubsetSummary( subset = subset, increasing = False, consecutive = False, first_index = first, past_last_index = None, search_first = (first > 0), search_last = None, random_map = random_map ) consecutive = True for i in range(1, len(subset)): if subset[i] != subset[i - 1] + 1: consecutive = False break first = subset[0] last = subset[-1] + 1 return _SubsetSummary( subset = subset, increasing = True, consecutive = consecutive, first_index = first, past_last_index = last, search_first = (first > 0), search_last = (last < dim_extent), random_map = None, ) def _extract_sparse_vector_internal(indices: numpy.ndarray, values: numpy.ndarray, subset_summary: _SubsetSummary, f: Callable): subset = subset_summary.subset start_pos = 0 if subset_summary.search_first: start_pos = bisect_left(indices, subset_summary.first_index) end_pos = len(indices) if subset_summary.increasing: pos = 0 x = start_pos for i in subset: while x < end_pos and i > indices[x]: x += 1 if x == end_pos: break if i == indices[x]: f(pos, i, values[x]) pos += 1 else: for x in range(start_pos, end_pos): ix = indices[x] if ix in subset_summary.random_map: targets = subset_summary.random_map[ix] if isinstance(targets, int): f(targets, ix, values[x]) else: for t in targets: f(t, ix, values[x]) def _extract_sparse_vector_to_dense(indices: numpy.ndarray, values: numpy.ndarray, subset_summary: _SubsetSummary, output: numpy.ndarray): if len(subset_summary.subset) == 0: pass elif subset_summary.consecutive: start_pos = 0 first = subset_summary.first_index if subset_summary.search_first: start_pos = bisect_left(indices, first) end_pos = len(indices) if subset_summary.search_last: end_pos = bisect_left(indices, subset_summary.past_last_index, lo=start_pos, hi=end_pos) for x in range(start_pos, end_pos): output[indices[x] - first] = values[x] else: def f(p, i, v): output[p] = v _extract_sparse_vector_internal(indices, values, subset_summary, f) def _recursive_extract_dense_array(contents: numpy.ndarray, subset: Tuple[Sequence[int], ...], subset_summary: _SubsetSummary, output: numpy.ndarray, dim: int): curdex = subset[dim] if dim == 1: pos = 0 for i in curdex: x = contents[i] if x is not None: _extract_sparse_vector_to_dense(x[0], x[1], subset_summary=subset_summary, output=output[pos]) pos += 1 else: pos = 0 for i in curdex: x = contents[i] if x is not None: _recursive_extract_dense_array( contents=x, subset=subset, subset_summary=subset_summary, output=output[pos], dim=dim - 1, ) pos += 1 def _extract_dense_array_from_SparseNdarray(x: SparseNdarray, subset: Tuple[Sequence[int], ...], dtype: Optional[numpy.dtype] = None) -> numpy.ndarray: idims = [len(y) for y in subset] subset_summary = _characterize_indices(subset[0], x._shape[0]) # We reverse the dimensions so that we use F-contiguous storage. This also # makes it slightly easier to do the recursion as we can just index by # the first dimension to obtain a subarray at each recursive step. if dtype is None: dtype = x._dtype output = numpy.zeros((*reversed(idims),), dtype=dtype) if x._is_masked: output = numpy.ma.MaskedArray(output, mask=False) if x._contents is not None: ndim = len(x._shape) if ndim > 1: _recursive_extract_dense_array(x._contents, subset, subset_summary=subset_summary, output=output, dim=ndim-1) else: _extract_sparse_vector_to_dense(x._contents[0], x._contents[1], subset_summary=subset_summary, output=output) return output.T def _extract_sparse_vector_to_sparse(indices: numpy.ndarray, values: numpy.ndarray, subset_summary: _SubsetSummary): if len(subset_summary.subset) == 0: pass elif subset_summary.consecutive: start_pos = 0 first = subset_summary.first_index if subset_summary.search_first: start_pos = bisect_left(indices, first) end_pos = len(indices) if subset_summary.search_last: end_pos = bisect_left(indices, subset_summary.past_last_index, lo=start_pos, hi=end_pos) if start_pos == 0 and end_pos == len(indices): new_indices = indices new_values = values else: new_indices = indices[start_pos:end_pos] new_values = values[start_pos:end_pos] if first: new_indices = new_indices - first return new_indices, new_values elif subset_summary.increasing: new_indices = [] new_values = [] def f(p, i, v): new_indices.append(p) new_values.append(v) _extract_sparse_vector_internal(indices, values, subset_summary, f) if len(new_indices) == 0: return None return ( _convert_to_unmasked_1darray(new_indices, dtype=indices.dtype), _convert_to_maybe_masked_1darray(new_values, dtype=values.dtype, masked=numpy.ma.isMaskedArray(values)) ) else: new_pairs = [] def f(p, i, v): new_pairs.append((p, v)) _extract_sparse_vector_internal(indices, values, subset_summary, f) new_pairs.sort() shape = (len(new_pairs),) new_indices = _allocate_unmasked_ndarray(shape, dtype=indices.dtype) new_values = _allocate_maybe_masked_ndarray(shape, dtype=values.dtype, masked=numpy.ma.isMaskedArray(values)) for i, pair in enumerate(new_pairs): new_indices[i] = pair[0] new_values[i] = pair[1] return new_indices, new_values def _recursive_extract_sparse_array(contents: list, shape: Tuple[int, ...], subset: Tuple[Sequence[int], ...], subset_summary: _SubsetSummary, dim: int): curdex = subset[dim] new_contents = [] if dim == 1: for i in curdex: x = contents[i] if x is not None: y = _extract_sparse_vector_to_sparse(x[0], x[1], subset_summary) new_contents.append(y) else: new_contents.append(None) else: for i in curdex: if contents[i] is not None: y = _recursive_extract_sparse_array( contents[i], shape, subset=subset, subset_summary=subset_summary, dim=dim - 1, ) new_contents.append(y) else: new_contents.append(None) for x in new_contents: if x is not None: return new_contents return None def _extract_sparse_array_from_SparseNdarray(x: SparseNdarray, subset: Tuple[Sequence[int], ...]) -> SparseNdarray: idims = [len(y) for y in subset] subset_summary = _characterize_indices(subset[0], x._shape[0]) new_contents = None if x._contents is not None: ndim = len(x.shape) if ndim > 1: new_contents = _recursive_extract_sparse_array(x._contents, x._shape, subset=subset, subset_summary=subset_summary, dim=ndim-1) else: new_contents = _extract_sparse_vector_to_sparse(x._contents[0], x._contents[1], subset_summary) return SparseNdarray(shape=(*idims,), contents=new_contents, index_dtype=x.index_dtype, dtype=x.dtype, is_masked=x.is_masked, check=False) ######################################################### ######################################################### _TransformPayload = namedtuple("_TransformPayload", [ "fun", "output_dtype" ]) def _transform_sparse_vector(location: Sequence[int], indices: numpy.ndarray, values: numpy.ndarray, payload: _TransformPayload): idx, val = payload.fun(location, indices, values) return (idx.astype(indices.dtype, copy=False), val.astype(payload.output_dtype, copy=False)) # a bit of safety with respect to types. def _recursive_transform_sparse_array(contents: list, shape: Tuple[int, ...], payload: _TransformPayload, dim: int, location: Sequence[int] = []): new_contents = [] location.append(0) if dim == 1: for i in range(shape[dim]): location[-1] = i x = contents[i] if x is not None: new_contents.append(_transform_sparse_vector(location, x[0], x[1], payload)) else: new_contents.append(None) else: for i in range(shape[dim]): if contents[i] is not None: location[-1] = i y = _recursive_transform_sparse_array( contents=contents[i], shape=shape, payload=payload, dim=dim-1, location=location, ) new_contents.append(y) else: new_contents.append(None) location.pop() for x in new_contents: if x is not None: return new_contents return None def _transform_sparse_array_from_SparseNdarray(x: SparseNdarray, f: Callable, output_dtype: numpy.dtype) -> SparseNdarray: new_contents = None if x._contents is not None: payload = _TransformPayload(fun=f, output_dtype=output_dtype) ndim = len(x._shape) if ndim > 1: new_contents = _recursive_transform_sparse_array(contents=x._contents, shape=x._shape, payload=payload, dim=ndim-1) else: new_contents = _transform_sparse_vector((), indices=x._contents[0], values=x._contents[1], payload=payload) return SparseNdarray(shape=x._shape, contents=new_contents, index_dtype=x.index_dtype, dtype=output_dtype, is_masked=x.is_masked, check=False) ######################################################### ######################################################### _BinaryOpPayload = namedtuple("_BinaryOpPayload", [ "fun", "dtype1", "dtype2", "output_dtype", "output_index_dtype" ]) def _binary_operate_sparse_vector(vector1: Tuple[numpy.ndarray, numpy.ndarray], vector2: Tuple[numpy.ndarray, numpy.ndarray], payload: _BinaryOpPayload): if vector1 is None and vector2 is None: return None elif vector1 is not None and vector2 is None: indices1, values1 = vector1 mock = numpy.zeros((1,), dtype=payload.dtype2) # get vector of length 1 for correct type coercion. return indices1.astype(payload.output_index_dtype, copy=False), payload.fun(values1, mock).astype(payload.output_dtype, copy=False) elif vector1 is None and vector2 is not None: indices2, values2 = vector2 mock = numpy.zeros((1,), dtype=payload.dtype1) return indices2.astype(payload.output_index_dtype, copy=False), payload.fun(mock, values2).astype(payload.output_dtype, copy=False) else: indices1, values1 = vector1 indices2, values2 = vector2 f = payload.fun i1 = 0 len1 = len(indices1) z1 = values1.dtype.type(0) i2 = 0 len2 = len(indices2) z2 = values2.dtype.type(0) outval = [] outidx = [] while i1 < len1 and i2 < len2: ix1 = indices1[i1] ix2 = indices2[i2] if ix1 > ix2: outval.append(f(z1, values2[i2])) outidx.append(ix2) i2 += 1 elif ix1 < ix2: outval.append(f(values1[i1], z2)) outidx.append(ix1) i1 += 1 else: outval.append(f(values1[i1], values2[i2])) outidx.append(ix1) i1 += 1 i2 += 1 # Only one of the following should be called. while i2 < len2: outval.append(f(z1, values2[i2])) outidx.append(indices2[i2]) i2 += 1 while i1 < len1: outval.append(f(values1[i1], z2)) outidx.append(indices1[i1]) i1 += 1 return ( _convert_to_unmasked_1darray(outidx, dtype=payload.output_index_dtype), _convert_to_maybe_masked_1darray(outval, dtype=payload.output_dtype, masked=numpy.ma.isMaskedArray(values1) or numpy.ma.isMaskedArray(values2)) ) def _recursive_binary_operation_on_SparseNdarray(contents1: list, contents2: list, payload: _BinaryOpPayload, dim: int): if contents1 is None and contents2 is None: return None new_contents = [] if contents1 is not None and contents2 is None: if dim == 1: for con1 in contents1: new_contents.append(_binary_operate_sparse_vector(con1, None, payload)) else: for con1 in contents1: new_contents.append(_recursive_binary_operation_on_SparseNdarray(con1, None, payload, dim=dim - 1)) elif contents1 is None and contents2 is not None: if dim == 1: for con2 in contents2: new_contents.append(_binary_operate_sparse_vector(None, con2, payload)) else: for con2 in contents2: new_contents.append(_recursive_binary_operation_on_SparseNdarray(None, con2, payload, dim=dim - 1)) else: if dim == 1: for i, con1 in enumerate(contents1): new_contents.append(_binary_operate_sparse_vector(con1, contents2[i], payload)) else: for i, con1 in enumerate(contents1): new_contents.append(_recursive_binary_operation_on_SparseNdarray(con1, contents2[i], payload, dim=dim - 1)) for x in new_contents: if x is not None: return new_contents return None def _binary_operation_on_SparseNdarray(x: SparseNdarray, y: SparseNdarray, operation: ISOMETRIC_OP_WITH_ARGS): op = _choose_operator(operation) dummy1 = numpy.zeros(1, dtype=x._dtype) dummy2 = numpy.zeros(1, dtype=y._dtype) dummy = op(dummy1, dummy2) if dummy[0] != 0: return op(x.__array__(), y.__array__()) if x._contents is None and y._contents is None: new_contents = None else: payload = _BinaryOpPayload(fun=op, dtype1=x._dtype, dtype2=y._dtype, output_index_dtype=x.index_dtype, output_dtype=dummy.dtype) ndim = len(x._shape) if ndim > 1: new_contents = _recursive_binary_operation_on_SparseNdarray(x._contents, y._contents, payload=payload, dim = ndim - 1) else: new_contents = _binary_operate_sparse_vector(x._contents, y._contents, payload=payload) return SparseNdarray(shape=x._shape, contents=new_contents, index_dtype=x.index_dtype, dtype=dummy.dtype, is_masked=x.is_masked, check=False) ######################################################### ######################################################### def _operate_with_args_on_SparseNdarray(x: SparseNdarray, other, operation: ISOMETRIC_OP_WITH_ARGS, right: bool) -> SparseNdarray: if isinstance(other, SparseNdarray): return _binary_operation_on_SparseNdarray(x, other, operation) along = _infer_along_with_args(x._shape, other) num_other = 1 op = _choose_operator(operation) dummy = numpy.zeros(num_other, dtype=x._dtype) if right: dummy = op(dummy, other) else: dummy = op(other, dummy) if num_other and not (dummy == 0).all(): # densifying. if right: return op(x.__array__(), other) else: return op(other, x.__array__()) if isinstance(other, numpy.ndarray): num_other = numpy.prod(other.shape) other = other.reshape((num_other,)) # flattening if along is None: if right: def f2(location, indices, values): return indices, op(values, other) else: def f2(location, indices, values): return indices, op(other, values) elif along > 0: if right: def f2(location, indices, values): p = location[-along] # remember, location is (i) reversed and (ii) missing the final dimension, so '-along' works. return indices, op(values, other[p:p+1]) # get vector of length 1 for correct type coercion. else: def f2(location, indices, values): p = location[-along] return indices, op(other[p:p+1], values) else: if right: def f2(location, indices, values): return indices, op(values, other[indices]) else: def f2(location, indices, values): return indices, op(other[indices], values) return _transform_sparse_array_from_SparseNdarray(x, f2, dummy.dtype) ######################################################### ######################################################### _TransposeFillPayload = namedtuple("_TransposeFillPayload", [ "perm", "new_shape", "new_contents" ]) def _transpose_SparseNdarray_contents_internal(location: Sequence[int], indices: numpy.ndarray, values: numpy.ndarray, payload: _TransposeFillPayload): destination = [] final = None for i, p in enumerate(payload.perm): if p == 0: final = i destination.append(None) else: destination.append(location[-p]) # remember, location is (i) reversed and (ii) missing the final dimension, so '-p' works. ndim = len(payload.new_shape) for i, ix in enumerate(indices): destination[final] = ix target = payload.new_contents for j in range(ndim - 1, 1, -1): # walking backwards from the later dimensions to fill 'new_contents' correctly. d = destination[j] if target[d] is None: replacement = [None] * payload.new_shape[j - 1] target[d] = replacement target = target[d] d = destination[1] if target[d] is None: target[d] = ([], []) outi, outv = target[d] outi.append(destination[0]) outv.append(values[i]) def _recursive_transpose_SparseNdarray_fill(contents: list, payload: _TransposeFillPayload, dim: int, location: Sequence[int] = []): location.append(0) if dim == 1: for i, con in enumerate(contents): if con is not None: location[-1] = i _transpose_SparseNdarray_contents_internal(location, con[0], con[1], payload) else: for i, con in enumerate(contents): if con is not None: location[-1] = i _recursive_transpose_SparseNdarray_fill(con, payload, location=location, dim=dim - 1) location.pop() _TransposeReallocPayload = namedtuple("_TransposeReallocatePayload", [ "output_dtype", "output_index_dtype", "output_is_masked" ]) def _recursive_transpose_SparseNdarray_reallocate(contents: list, payload: _TransposeReallocPayload, dim: int): if dim == 1: for i, con in enumerate(contents): if con is not None: contents[i] = ( _convert_to_unmasked_1darray(con[0], dtype=payload.output_index_dtype), _convert_to_maybe_masked_1darray(con[1], dtype=payload.output_dtype, masked=payload.output_is_masked) ) else: for i, con in enumerate(contents): if con is not None: _recursive_transpose_SparseNdarray_reallocate(con, payload, dim=dim - 1) def _transpose_SparseNdarray(x: SparseNdarray, perm): ndim = len(x._shape) if ndim == 1: return x new_shape = [] for p in perm: new_shape.append(x._shape[p]) new_contents = None if x._contents is not None: new_contents = [None] * new_shape[-1] _recursive_transpose_SparseNdarray_fill( x._contents, _TransposeFillPayload(perm=perm, new_shape=new_shape, new_contents=new_contents), dim=ndim - 1, ) _recursive_transpose_SparseNdarray_reallocate( new_contents, _TransposeReallocPayload(output_dtype=x._dtype, output_index_dtype=x._index_dtype, output_is_masked=x._is_masked), dim=ndim - 1, ) return SparseNdarray(shape=(*new_shape,), contents=new_contents, index_dtype=x._index_dtype, dtype=x._dtype, is_masked=x.is_masked, check=False) ######################################################### ######################################################### _ConcatenatePayload = namedtuple("_ConcatenatePayload", [ "shapes", "offset", "output_dtype", "output_index_dtype", "output_is_masked"]) def _concatenate_sparse_vectors(idx: List[numpy.ndarray], val: List[numpy.ndarray], payload: _ConcatenatePayload): newidx = _concatenate_unmasked_ndarrays(idx, axis=0).astype(payload.output_index_dtype, copy=False) newval = _concatenate_maybe_masked_ndarrays(val, axis=0, masked=payload.output_is_masked).astype(payload.output_dtype, copy=False) return (newidx, newval) def _coerce_concatenated_SparseNdarray_types(contents: list, payload: _ConcatenatePayload, dim: int): if dim == 1: for i, con in enumerate(contents): if con is not None: idx2 = con[0].astype(payload.output_index_dtype, copy=False) val2 = con[1].astype(payload.output_dtype, copy=False) contents[i] = (idx2, val2) else: for i, con in enumerate(contents): if con is not None: _coerce_concatenated_SparseNdarray_types(con, payload, dim=dim - 1) def _recursive_concatenate_SparseNdarrays(contents: list, final_shape: Tuple[int, ...], along: int, payload: _ConcatenatePayload, dim: int): if along == dim: all_none = True for x in contents: if x is not None: all_none = False new_contents = None if not all_none: new_contents = [] for i, x in enumerate(contents): if x is not None: new_contents += x else: new_contents += [None] * payload.shapes[i][along] _coerce_concatenated_SparseNdarray_types(new_contents, payload=payload, dim=dim) return new_contents elif dim == 1: new_contents = [] for i in range(final_shape[dim]): outidx = [] outval = [] for j, c in enumerate(contents): if c is not None and c[i] is not None: outidx.append(c[i][0] + payload.offset[j]) outval.append(c[i][1]) if len(outval): new_contents.append(_concatenate_sparse_vectors(outidx, outval, payload)) else: new_contents.append(None) return new_contents else: new_contents = [] collected = [None] * len(contents) for i in range(final_shape[dim]): for j, c in enumerate(contents): if c is not None: collected[j] = c[i] new_contents.append(_recursive_concatenate_SparseNdarrays(collected, final_shape, along, payload, dim=dim-1)) return new_contents def _concatenate_SparseNdarrays(xs: List[SparseNdarray], along: int): all_contents = [] all_shapes = [] for x in xs: all_contents.append(x._contents) all_shapes.append(x._shape) combined = 0 ref_shape = all_shapes[0] for shape in all_shapes: if len(shape) != len(ref_shape): raise ValueError("inconsistent dimensionalities for combining SparseNdarrays") for d, s in enumerate(shape): if d == along: combined += s elif s != ref_shape[d]: raise ValueError("inconsistent shapes for combining SparseNdarrays along axis " + str(along)) new_shape = list(ref_shape) new_shape[along] = combined dummy_collected = [] dummy_collected_index = [] for x in xs: dummy_collected.append(numpy.zeros(1, dtype=x._dtype)) dummy_collected_index.append(numpy.zeros(1, dtype=x._index_dtype)) output_dtype = _concatenate_unmasked_ndarrays(dummy_collected, axis=0).dtype output_index_dtype = _concatenate_unmasked_ndarrays(dummy_collected_index, axis=0).dtype output_is_masked = any(y._is_masked for y in xs) all_none = True for con in all_contents: if con is not None: all_none = False new_contents = None if not all_none: offset = None ndim = len(new_shape) # Upgrading the index type to ensure we can hold the output, if we're # concatenating on the first dimension (and thus possibly increasing # the indices in the leaves of the subsequent SparseNdarrays). if along == 0: last = 0 offset = [] for i, shape in enumerate(all_shapes): offset.append(last) last += shape[along] for candidate in [output_index_dtype, numpy.uint32, numpy.uint64]: if last < numpy.iinfo(candidate).max: output_index_dtype = candidate break payload = _ConcatenatePayload( shapes=all_shapes, offset=offset, output_dtype=output_dtype, output_index_dtype=output_index_dtype, output_is_masked=output_is_masked ) if ndim > 1: new_contents = _recursive_concatenate_SparseNdarrays(all_contents, new_shape, along=along, payload=payload, dim=ndim-1) else: outidx = [] outval = [] for j, c in enumerate(all_contents): if c is not None: outidx.append(c[0] + offset[j]) outval.append(c[1]) new_contents = _concatenate_sparse_vectors(outidx, outval, payload) return SparseNdarray(shape=(*new_shape,), contents=new_contents, dtype=output_dtype, index_dtype=output_index_dtype, is_masked=output_is_masked, check=False) ######################################################### ######################################################### _ReductionPayload = namedtuple("_ReductionPayload", [ "multipliers", "operation" ]) def _reduce_sparse_vector(idx: numpy.ndarray, val: numpy.ndarray, payload: _ReductionPayload, offset: int = 0): for j, ix in enumerate(idx): extra = payload.multipliers[0] * ix payload.operation(offset + extra, val[j]) return def _recursive_reduce_SparseNdarray(contents, payload: _ReductionPayload, dim: int, offset: int = 0): mult = payload.multipliers[dim] if dim == 1: for i, con in enumerate(contents): if con is not None: _reduce_sparse_vector(con[0], con[1], payload, offset = offset + mult * i) else: for i, con in enumerate(contents): if con is not None: _recursive_reduce_SparseNdarray(con, payload, dim = dim - 1, offset = offset + mult * i) return def _reduce_SparseNdarray(x: SparseNdarray, axes: List[int], operation: Callable): if x.contents is not None: multipliers = _create_offset_multipliers(x.shape, axes) payload = _ReductionPayload(operation=operation, multipliers=multipliers) ndim = len(x.shape) if ndim == 1: _reduce_sparse_vector(x.contents[0], x.contents[1], payload) else: _recursive_reduce_SparseNdarray(x.contents, payload, dim=ndim - 1) return