from functools import singledispatch
from itertools import chain
from typing import Any
from warnings import warn
from biocutils import is_list_of_type
from biocutils.package_utils import is_package_installed
from numpy import concatenate, ndarray
from .utils import (
_convert_1d_sparse_to_dense,
_is_1d_dense_arrays,
_is_1d_sparse_arrays,
)
__author__ = "jkanche"
__copyright__ = "jkanche"
__license__ = "MIT"
[docs]
@singledispatch
def combine_seqs(*x: Any):
"""Combine vector-like objects (1-dimensional arrays).
If all elements are :py:class:`~numpy.ndarray`,
we combine them using numpy's :py:func:`~numpy.concatenate`.
If all elements are either :py:class:`~scipy.sparse.spmatrix` or
:py:class:`~scipy.sparse.sparray`, these objects are combined
using scipy's :py:class:`~scipy.sparse.hstack`.
If all elements are :py:class:`~pandas.Series` objects, they are combined using
:py:func:`~pandas.concat`.
For all other scenario's, all elements are coerced to a :py:class:`~list` and
combined.
Args:
x (Any): Vector-like objects to combine.
All elements of ``x`` are expected to be the same class or
atleast compatible with each other.
Raises:
TypeError: If any object in the list cannot be coerced to a list.
Returns:
A combined object, typically the same type as the first element in ``x``.
A :py:class:`~numpy.ndarray`, if the elements are a mix of dense and sparse objects.
A :py:class:`~list`, if one of the objects is a :py:class:`~list`.
"""
raise NotImplementedError("`combine_seqs` method is not implemented for objects.")
def _generic_combine_seqs_dense_sparse(x):
elems = []
for elem in x:
if not isinstance(elem, ndarray):
elem = _convert_1d_sparse_to_dense(elem)
elems.append(elem)
if _is_1d_dense_arrays(elems) is not True:
raise ValueError(
"Not all elements are 1-dimensional arrays, use `combine_rows` instead."
)
return concatenate(elems)
def _generic_coerce_list(x):
elems = []
for elem in x:
if isinstance(elem, ndarray):
elems.append(list(elem))
elif hasattr(elem, "shape"): # probably a sparse
elems.append(list(_convert_1d_sparse_to_dense(elem)))
elif isinstance(elem, (list, tuple)): # do nothing
elems.append(elem)
else: # not sure what else
elems.append(elem)
return combine_seqs(*elems)
@combine_seqs.register(list)
def _combine_seqs_lists(*x: list):
return list(chain(*x))
@combine_seqs.register(ndarray)
def _combine_seqs_dense_arrays(*x: ndarray):
if is_list_of_type(x, ndarray):
if _is_1d_dense_arrays(x) is not True:
raise ValueError(
"Not all elements are 1-dimensional arrays, use `combine_rows` instead."
)
return concatenate(x)
warn("Not all elements are numpy ndarrays.")
if all([hasattr(y, "shape") for y in x]) is True:
# assuming it's a mix of numpy and scipy arrays
return _generic_combine_seqs_dense_sparse(x)
# coerce everything to a list and combine_seqs
return _generic_coerce_list(x)
if is_package_installed("scipy") is True:
import scipy.sparse as sp
def _combine_seqs_sparse_matrices(*x):
if is_list_of_type(x, sp.spmatrix):
sp_conc = sp.hstack(x)
if _is_1d_sparse_arrays(x) is not True:
raise ValueError(
"Not all elements are 1-dimensional matrices, use `combine_rows` instead."
)
first = x[0]
if isinstance(first, sp.csr_matrix):
return sp_conc.tocsr()
elif isinstance(first, sp.csc_matrix):
return sp_conc.tocsc()
elif isinstance(first, sp.bsr_matrix):
return sp_conc.tobsr()
elif isinstance(first, sp.coo_matrix):
return sp_conc.tocoo()
elif isinstance(first, sp.dia_matrix):
return sp_conc.todia()
elif isinstance(first, sp.lil_matrix):
return sp_conc.tolil()
else:
return sp_conc
warn("Not all elements are scipy sparse matrices.")
if is_list_of_type(x, (ndarray, sp.spmatrix)):
return _generic_combine_seqs_dense_sparse(x)
return _generic_coerce_list(x)
try:
def _combine_seqs_sparse_arrays(*x):
if is_list_of_type(x, sp.sparray):
sp_conc = sp.hstack(x)
if _is_1d_sparse_arrays(x) is not True:
raise ValueError(
"Not all elements are 1-dimensional arrays, use `combine_rows` instead."
)
first = x[0]
if isinstance(first, sp.csr_array):
return sp_conc.tocsr()
elif isinstance(first, sp.csc_array):
return sp_conc.tocsc()
elif isinstance(first, sp.bsr_array):
return sp_conc.tobsr()
elif isinstance(first, sp.coo_array):
return sp_conc.tocoo()
elif isinstance(first, sp.dia_array):
return sp_conc.todia()
elif isinstance(first, sp.lil_array):
return sp_conc.tolil()
else:
return sp_conc
warn("Not all elements are scipy sparse arrays.")
if is_list_of_type(x, (ndarray, sp.sparray, sp.spmatrix)):
return _generic_combine_seqs_dense_sparse(x)
return _generic_coerce_list(x)
combine_seqs.register(sp.sparray, _combine_seqs_sparse_arrays)
except Exception:
pass
try:
combine_seqs.register(sp.spmatrix, _combine_seqs_sparse_matrices)
except Exception:
pass
if is_package_installed("pandas") is True:
from pandas import Series, concat
@combine_seqs.register(Series)
def _combine_seqs_pandas_series(*x):
if is_list_of_type(x, Series):
return concat(x)
# not everything is a Series
if any([isinstance(y, list) for y in x]) is True:
elems = []
for elem in x:
if isinstance(elem, list):
elems.append(Series(elem))
else:
elems.append(elem)
return concat(elems)
raise TypeError("All elements must be Pandas `Series` objects.")