Source code for delayedarray.extract_sparse_array

from functools import singledispatch
import numpy
from bisect import bisect_left
from typing import Any, Tuple, Sequence, List, Union
from biocutils.package_utils import is_package_installed

from ._subset import _is_subset_noop, _is_subset_consecutive
from ._mask import _convert_to_unmasked_1darray, _convert_to_maybe_masked_1darray, _allocate_unmasked_ndarray, _allocate_maybe_masked_ndarray
from .SparseNdarray import SparseNdarray, _extract_sparse_array_from_SparseNdarray

__author__ = "ltla"
__copyright__ = "ltla"
__license__ = "MIT"


[docs] @singledispatch def extract_sparse_array(x: Any, subset: Tuple[Sequence[int], ...]) -> SparseNdarray: """Extract the contents of ``x`` (or a subset thereof) into a :py:class:`~delayedarray.SparseNdarray.SparseNdarray`. This should only be used for ``x`` where :py:meth:`~delayedarray.is_sparse.is_sparse` is True. Args: x: Any array-like object containing sparse data. subset: Tuple of length equal to the number of dimensions, each containing a sorted and unique sequence of integers specifying the elements of each dimension to extract. Returns: ``SparseNdarray`` for the requested subset. This may be a view so callers should create a copy if they intend to modify it. If :py:func:`~delayedarray.is_masked.is_masked` is True for ``x``, the ``SparseNdarray`` will contain NumPy ``MaskedArray``s internally. """ raise NotImplementedError("'extract_sparse_array(" + str(type(x)) + ")' has not yet been implemented")
[docs] @extract_sparse_array.register def extract_sparse_array_SparseNdarray(x: SparseNdarray, subset: Tuple[Sequence[int], ...]) -> SparseNdarray: """See :py:meth:`~delayedarray.extract_sparse_array.extract_sparse_array`.""" if _is_subset_noop(x.shape, subset): return x else: return _extract_sparse_array_from_SparseNdarray(x, subset)
if is_package_installed("scipy"): import scipy.sparse as sp def _set_empty_contents(contents: List) -> Union[List, None]: for x in contents: if x is not None: return contents return None
[docs] @extract_sparse_array.register def extract_sparse_array_csc_matrix(x: sp.csc_matrix, subset: Tuple[Sequence[int], ...]) -> SparseNdarray: """See :py:meth:`~delayedarray.extract_sparse_array.extract_sparse_array`.""" final_shape = [len(s) for s in subset] new_contents = None if final_shape[0] != 0 and final_shape[1] != 0: rowsub = subset[0] row_consecutive = _is_subset_consecutive(rowsub) first = rowsub[0] last = rowsub[-1] + 1 new_contents = [] for ci in subset[1]: start_pos = x.indptr[ci] end_pos = x.indptr[ci + 1] if first != 0: start_pos = bisect_left(x.indices, first, lo=start_pos, hi=end_pos) if row_consecutive: if last != x.shape[0]: end_pos = bisect_left(x.indices, last, lo=start_pos, hi=end_pos) if end_pos > start_pos: tmp = x.indices[start_pos:end_pos] if first: tmp = tmp - first # don't use -=, this might modify the view by reference. new_contents.append((tmp, x.data[start_pos:end_pos])) else: new_contents.append(None) else: new_val = [] new_idx = [] pos = 0 for p in range(start_pos, end_pos): ri = x.indices[p] while pos < len(rowsub) and ri > rowsub[pos]: pos += 1 if pos == len(rowsub): break if ri == rowsub[pos]: new_idx.append(pos) new_val.append(x.data[p]) pos += 1 if len(new_val): new_contents.append(( _convert_to_unmasked_1darray(new_idx, dtype=x.indices.dtype), _convert_to_maybe_masked_1darray(new_val, dtype=x.data.dtype, masked=numpy.ma.isMaskedArray(x.data)) )) else: new_contents.append(None) new_contents = _set_empty_contents(new_contents) return SparseNdarray((*final_shape,), new_contents, dtype=x.dtype, index_dtype=x.indices.dtype, is_masked=False, check=False)
[docs] @extract_sparse_array.register def extract_sparse_array_csr_matrix(x: sp.csr_matrix, subset: Tuple[Sequence[int], ...]) -> SparseNdarray: """See :py:meth:`~delayedarray.extract_sparse_array.extract_sparse_array`.""" final_shape = [len(s) for s in subset] new_contents = None if final_shape[0] != 0 and final_shape[1] != 0: colsub = subset[1] col_consecutive = _is_subset_consecutive(colsub) cfirst = colsub[0] clast = colsub[-1] + 1 new_contents = [] for i in range(len(colsub)): new_contents.append(([], [])) for rpos, ri in enumerate(subset[0]): start_pos = x.indptr[ri] end_pos = x.indptr[ri + 1] if cfirst != 0: start_pos = bisect_left(x.indices, cfirst, lo=start_pos, hi=end_pos) if col_consecutive: if clast != x.shape[1]: end_pos = bisect_left(x.indices, clast, lo=start_pos, hi=end_pos) for p in range(start_pos, end_pos): ci = x.indices[p] idx, val = new_contents[ci - cfirst] idx.append(rpos) val.append(x.data[p]) else: cpos = 0 for p in range(start_pos, end_pos): ci = x.indices[p] while cpos < len(colsub) and ci > colsub[cpos]: cpos += 1 if cpos == len(colsub): break if ci == colsub[cpos]: idx, val = new_contents[cpos] idx.append(rpos) val.append(x.data[p]) cpos += 1 for i in range(len(new_contents)): idx, val = new_contents[i] if len(idx): new_contents[i] = ( _convert_to_unmasked_1darray(idx, dtype=x.indices.dtype), _convert_to_maybe_masked_1darray(val, dtype=x.data.dtype, masked=numpy.ma.isMaskedArray(x.data)) ) else: new_contents[i] = None new_contents = _set_empty_contents(new_contents) return SparseNdarray((*final_shape,), new_contents, dtype=x.dtype, index_dtype=x.indices.dtype, is_masked=False, check=False)
[docs] @extract_sparse_array.register def extract_sparse_array_coo_matrix(x: sp.coo_matrix, subset: Tuple[Sequence[int], ...]) -> SparseNdarray: """See :py:meth:`~delayedarray.extract_sparse_array.extract_sparse_array`.""" final_shape = [len(s) for s in subset] new_contents = None if final_shape[0] != 0 and final_shape[1] != 0: in_row = {} for i, y in enumerate(subset[0]): in_row[y] = i in_col = {} new_contents = [] for i, y in enumerate(subset[1]): in_col[y] = i new_contents.append([]) for i, v in enumerate(x.data): r = x.row[i] c = x.col[i] if r in in_row and c in in_col: new_contents[in_col[c]].append((in_row[r], v)) for i, con in enumerate(new_contents): if len(con): con.sort() shape = (len(con),) idx = _allocate_unmasked_ndarray(shape, dtype=x.row.dtype) val = _allocate_maybe_masked_ndarray(shape, dtype=x.data.dtype, masked=numpy.ma.isMaskedArray(x.data)) for j, y in enumerate(con): idx[j] = y[0] val[j] = y[1] new_contents[i] = (idx, val) else: new_contents[i] = None new_contents = _set_empty_contents(new_contents) return SparseNdarray((*final_shape,), new_contents, dtype=x.dtype, index_dtype=x.row.dtype, is_masked=False, check=False)