Source code for delayedarray.to_scipy_sparse_matrix
import numpy
from functools import singledispatch
from typing import Any, Literal
from biocutils.package_utils import is_package_installed
from .SparseNdarray import SparseNdarray
from .to_sparse_array import to_sparse_array
if is_package_installed("scipy"):
import scipy.sparse
def _to_csc(x: Any) -> scipy.sparse.csc_matrix:
all_indptrs = numpy.zeros(x.shape[1] + 1, dtype=numpy.uint64)
if x.contents is not None:
all_indices = []
all_values = []
counter = 0
for i, y in enumerate(x.contents):
if y is not None:
counter += len(y[0])
all_indices.append(y[0])
all_values.append(y[1])
all_indptrs[i + 1] = counter
all_indices = numpy.concatenate(all_indices)
all_values = numpy.concatenate(all_values)
else:
all_indices = numpy.zeros(0, dtype=x.index_dtype)
all_values = numpy.zeros(0, dtype=x.dtype)
return scipy.sparse.csc_matrix((all_values, all_indices, all_indptrs), shape=x.shape)
def _to_csr(x: Any) -> scipy.sparse.csr_matrix:
all_indptrs = numpy.zeros(x.shape[0] + 1, dtype=numpy.uint64)
if x.contents is not None:
# First pass (in memory) to obtain the total sizes.
for i, y in enumerate(x.contents):
if y is not None:
for ix in y[0]:
all_indptrs[ix + 1] += 1
for i in range(1, len(all_indptrs)):
all_indptrs[i] += all_indptrs[i - 1]
all_indices = numpy.ndarray(all_indptrs[-1], dtype=x.index_dtype)
all_values = numpy.ndarray(all_indptrs[-1], dtype=x.dtype)
# Second pass to fill the allocations that we just made.
offsets = all_indptrs.copy()
for i, y in enumerate(x.contents):
if y is not None:
vals = y[1]
for j, ix in enumerate(y[0]):
o = offsets[ix]
all_indices[o] = i
all_values[o] = vals[j]
offsets[ix] += 1
else:
all_indices = numpy.zeros(0, dtype=x.index_dtype)
all_values = numpy.zeros(0, dtype=x.dtype)
return scipy.sparse.csr_matrix((all_values, all_indices, all_indptrs), shape=x.shape)
def _to_coo(x: Any) -> scipy.sparse.coo_matrix:
if x.contents is not None:
# First pass (in memory) to obtain the total sizes.
total_count = 0
for i, y in enumerate(x.contents):
if y is not None:
total_count += len(y[0])
all_rows = numpy.ndarray(total_count, dtype=x.index_dtype)
all_cols = numpy.ndarray(total_count, dtype=numpy.uint64)
all_values = numpy.ndarray(total_count, dtype=x.dtype)
# Second pass to fill the allocations that we just made.
counter = 0
for i, y in enumerate(x.contents):
if y is not None:
vals = y[1]
for j, ix in enumerate(y[0]):
all_rows[counter] = ix
all_cols[counter] = i
all_values[counter] = vals[j]
counter += 1
else:
all_indices = numpy.zeros(0, dtype=x.index_dtype)
all_values = numpy.zeros(0, dtype=x.dtype)
return scipy.sparse.coo_matrix((all_values, (all_rows, all_cols)), shape=x.shape)
[docs]
@singledispatch
def to_scipy_sparse_matrix(x: Any, format: Literal["coo", "csr", "csc"] = "csc") -> scipy.sparse.spmatrix:
"""
Convert a 2-dimensional array into a SciPy sparse matrix.
Args:
x:
Input matrix where :py:func:`~delayedarray.is_sparse.is_sparse`
returns True and :py:func:`~delayedarray.is_masked.is_masked`
returns False.
format:
Type of SciPy matrix to create - coordinate (coo), compressed
sparse row (csr) or compressed sparse column (csc).
Returns:
A SciPy sparse matrix with the contents of ``x``.
"""
# One might think that we could be more memory-efficient by doing block
# processing. However, there is no advantage from doing so as we eventually
# need to hold all the blocks in memory before concatenation. We'd only
# avoid this if we did two passes; one to collect the total size for
# allocation, and another to actually fill the vectors; not good, so we
# just forget about it and load it all into memory up-front.
return to_scipy_sparse_matrix_from_SparseNdarray(to_sparse_array(x), format=format)
[docs]
@to_scipy_sparse_matrix.register
def to_scipy_sparse_matrix_from_SparseNdarray(x: SparseNdarray, format: Literal["coo", "csr", "csc"] = "csc") -> scipy.sparse.spmatrix:
"""See :py:meth:`~to_scipy_sparse_matrix`."""
if format == "csc":
return _to_csc(x)
elif format == "csr":
return _to_csr(x)
else:
return _to_coo(x)