from __future__ import annotations
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Union
from warnings import warn
import biocutils as ut
from biocframe import BiocFrame
import numpy as np
from .partition import Partitioning
__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"
[docs]
def is_pandas(x: Any) -> bool:
"""Check if ``x`` is a :py:class:`~pandas.DataFrame`.
Args:
x:
Any object.
Returns:
True if ``x`` is a :py:class:`~pandas.DataFrame`.
"""
if hasattr(x, "dtypes"):
return True
return False
def _sanitize_frame(frame, num_rows: int):
frame = frame if frame is not None else BiocFrame({}, number_of_rows=num_rows)
if is_pandas(frame):
frame = BiocFrame.from_pandas(frame)
return frame
def _validate_data_and_partitions(unlist_data, partition):
if len(unlist_data) != partition.nobj():
raise ValueError(
f"Length of 'unlist_data' ({len(unlist_data)}) "
f"doesn't match 'partitioning' total length ({partition.nobj()})."
)
[docs]
class CompressedList(ut.BiocObject):
"""Base class for compressed list objects.
`CompressedList` stores list elements concatenated in a single vector-like object
with partitioning information that defines where each list element starts and ends.
"""
[docs]
def __init__(
self,
unlist_data: Any,
partitioning: Partitioning,
element_type: Any = None,
element_metadata: Optional[BiocFrame] = None,
metadata: Optional[Union[Dict[str, Any], ut.NamedList]] = None,
_validate: bool = True,
):
"""Initialize a CompressedList.
Args:
unlist_data:
Vector-like object containing concatenated elements.
partitioning:
Partitioning object defining element boundaries (exclusive).
element_type:
class for the type of elements.
element_metadata:
Optional metadata for elements.
metadata:
Optional general metadata.
_validate:
Internal use only.
"""
super().__init__(metadata=metadata, _validate=_validate)
self._unlist_data = unlist_data
self._partitioning = partitioning
self._element_type = element_type
self._element_metadata = _sanitize_frame(element_metadata, len(partitioning))
if _validate:
_validate_data_and_partitions(self._unlist_data, self._partitioning)
#########################
######>> Copying <<######
#########################
[docs]
def __deepcopy__(self, memo=None, _nil=[]):
"""
Returns:
A deep copy of the current ``Partitioning``.
"""
from copy import deepcopy
_unlistdata_copy = deepcopy(self._unlist_data)
_part_copy = deepcopy(self._partitioning)
_elem_type_copy = deepcopy(self._element_type)
_elem_metadata_copy = deepcopy(self._element_metadata)
_metadata_copy = deepcopy(self._metadata)
current_class_const = type(self)
return current_class_const(
unlist_data=_unlistdata_copy,
partitioning=_part_copy,
element_type=_elem_type_copy,
element_metadata=_elem_metadata_copy,
metadata=_metadata_copy,
_validate=False,
)
[docs]
def __copy__(self):
"""
Returns:
A shallow copy of the current ``Partitioning``.
"""
current_class_const = type(self)
return current_class_const(
unlist_data=self._unlist_data,
partitioning=self._partitioning,
element_type=self._element_type,
element_metadata=self._element_metadata,
metadata=self._metadata,
_validate=False,
)
[docs]
def copy(self):
"""Alias for :py:meth:`~__copy__`."""
return self.__copy__()
######################################
######>> length and iterators <<######
######################################
[docs]
def __len__(self) -> int:
"""Return the number of list elements."""
return len(self._partitioning)
[docs]
def get_element_lengths(self) -> np.ndarray:
"""Get the lengths of each list element."""
return self._partitioning.get_element_lengths()
[docs]
def __iter__(self) -> Iterator[Any]:
"""Iterate over list elements."""
for i in range(len(self)):
yield self[i]
##########################
######>> Printing <<######
##########################
[docs]
def __repr__(self) -> str:
"""
Returns:
A string representation.
"""
output = f"{type(self).__name__}(number_of_elements={len(self)}"
output += ", unlist_data=" + ut.print_truncated_list(self._unlist_data)
output += ", partitioning=" + self._partitioning.__repr__()
_etype_name = "__unknown_class__"
if isinstance(self._element_type, str):
_etype_name = self._element_type
elif hasattr(self._element_type, "__name__"):
_etype_name = self._element_type.__name__
output += ", element_type=" + _etype_name
output += ", element_metadata=" + self._element_metadata.__repr__()
if len(self._metadata) > 0:
output += ", metadata=" + ut.print_truncated_dict(self._metadata)
output += ")"
return output
def __str__(self) -> str:
"""
Returns:
A pretty-printed string containing the contents of this object.
"""
output = f"class: {type(self).__name__}\n"
_etype_name = "__unknown_class__"
if isinstance(self._element_type, str):
_etype_name = self._element_type
elif hasattr(self._element_type, "__name__"):
_etype_name = self._element_type.__name__
output += f"number of elements: ({len(self)}) of type: {_etype_name}\n"
output += f"unlist_data: {ut.print_truncated_list(self._unlist_data)}\n"
output += f"partitioning: {ut.print_truncated_list(self._partitioning)}\n"
output += f"element_metadata({str(len(self._element_metadata))} rows): {ut.print_truncated_list(list(self._element_metadata.get_column_names()), sep=' ', include_brackets=False, transform=lambda y: y)}\n"
output += f"metadata({str(len(self._metadata))}): {ut.print_truncated_list(list(self._metadata.keys()), sep=' ', include_brackets=False, transform=lambda y: y)}\n"
return output
#############################
######>> element_type <<#####
#############################
[docs]
def get_element_type(self) -> str:
"""Return the element_type."""
return self._element_type
@property
def element_type(self) -> str:
"""Alias for :py:attr:`~get_element_type`, provided for back-compatibility."""
return self.get_element_type()
###########################
######>> partitions <<#####
###########################
[docs]
def get_partitioning(self) -> Partitioning:
"""Return the paritioning info."""
return self._partitioning
@property
def paritioning(self) -> Partitioning:
"""Alias for :py:attr:`~get_paritioning`, provided for back-compatibility."""
return self.get_partitioning()
#######################
######>> names <<######
#######################
[docs]
def get_names(self) -> Optional[ut.Names]:
"""Get the names of list elements."""
return self._partitioning.get_names()
[docs]
def set_names(self, names: Sequence[str], in_place: bool = False) -> CompressedList:
"""Set the names of list elements.
names:
New names, same as the number of rows.
May be `None` to remove names.
in_place:
Whether to modify the ``CompressedList`` in place.
Returns:
A modified ``CompressedList`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
output = self._define_output(in_place)
output._partitioning = self._partitioning.set_names(names, in_place=False)
return output
@property
def names(self) -> Optional[ut.Names]:
"""Alias for :py:attr:`~get_names`."""
return self._partitioning.get_names()
@names.setter
def names(self, names: Sequence[str]):
"""Alias for :py:attr:`~set_names` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'names' is an in-place operation, use 'set_names' instead",
UserWarning,
)
self.set_names(names=names, in_place=True)
#############################
######>> unlist_data <<######
#############################
[docs]
def get_unlist_data(self) -> Any:
"""Get all elements."""
return self._unlist_data
[docs]
def set_unlist_data(self, unlist_data: Any, in_place: bool = False) -> CompressedList:
"""Set new list elements.
Args:
unlist_data:
New vector-like object containing concatenated elements.
in_place:
Whether to modify the ``CompressedList`` in place.
Returns:
A modified ``CompressedList`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
output = self._define_output(in_place)
_validate_data_and_partitions(unlist_data=unlist_data, partition=self._partitioning)
output._unlist_data = unlist_data
return output
@property
def unlist_data(self) -> Any:
"""Alias for :py:attr:`~get_unlist_data`."""
return self.get_unlist_data()
@unlist_data.setter
def unlist_data(self, unlist_data: Any):
"""Alias for :py:attr:`~set_unlist_data` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'unlist_data' is an in-place operation, use 'set_unlist_data' instead",
UserWarning,
)
self.set_unlist_data(unlist_data, in_place=True)
###################################
######>> element metadata <<#######
###################################
@property
def element_metadata(self) -> BiocFrame:
"""Alias for :py:attr:`~get_element_metadata`."""
return self.get_element_metadata()
@element_metadata.setter
def element_metadata(self, element_metadata: BiocFrame):
"""Alias for :py:attr:`~set_element_metadata` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'element_metadata' is an in-place operation, use 'set_element_metadata' instead",
UserWarning,
)
self.set_element_metadata(element_metadata, in_place=True)
##########################
######>> accessors <<#####
##########################
[docs]
def __getitem__(self, key: Union[int, str, slice]) -> Any:
"""Get an element or slice of elements from the list.
Args:
key:
Integer index, string name, or slice.
Returns:
List element(s).
"""
# string keys (names)
if isinstance(key, str):
if key not in list(self.get_names()):
raise KeyError(f"No element named '{key}'.")
key = list(self.names).index(key)
# integer indices
if isinstance(key, int):
if key < 0:
key += len(self)
if key < 0 or key >= len(self):
raise IndexError(f"List index '{key}' out of range.")
start, end = self._partitioning.get_partition_range(key)
return self.extract_range(start, end)
# slices
elif isinstance(key, (range, slice)):
if isinstance(key, range):
key = slice(key.start, key.stop, key.step)
indices = range(*key.indices(len(self)))
result = []
for i in indices:
start, end = self._partitioning.get_partition_range(i)
result.append(self.extract_range(start, end))
current_class_const = type(self)
return current_class_const.from_list(
result, names=[self.names[i] for i in indices] if self.names is not None else None
)
else:
try:
return self.extract_subset(indices=key)
except Exception as e:
raise TypeError("'key' must be int, str, slice or list of indices.") from e
##################################
######>> abstract methods <<######
##################################
[docs]
@classmethod
def from_list(
cls,
lst: Any,
names: Optional[Union[ut.Names, Sequence[str]]] = None,
metadata: Optional[Union[Dict[str, Any], ut.NamedList]] = None,
) -> CompressedList:
"""Create a CompressedList from a regular list.
This method must be implemented by subclasses to handle
type-specific conversion from list to unlist_data.
Args:
lst:
List to convert.
names:
Optional names for list elements.
metadata:
Optional metadata.
Returns:
A new `CompressedList`.
"""
# Flatten the list
flat_data = []
for sublist in lst:
flat_data.extend(sublist)
# Create partitioning
partitioning = Partitioning.from_list(lst, names)
# Create unlist_data
# unlist_data = cls._element_type(data=flat_data)
return cls(flat_data, partitioning, metadata=metadata)
###########################
######>> coercions <<######
###########################
[docs]
def to_list(self) -> List[List[Any]]:
"""Convert to a regular Python list.
Returns:
A regular Python list with all elements.
"""
result = []
for i in range(len(self)):
_subset = list(self[i])
if len(_subset) == 0:
_subset = [None]
result.append(_subset)
return result
[docs]
def as_list(self) -> List[List[Any]]:
"""Alias to :py:meth:`~to_list`"""
return self.to_list()
[docs]
def unlist(self, use_names: bool = False) -> Any:
"""Get the underlying unlisted data.
Args:
use_names:
Whether to include names in the result if applicable.
Returns:
The unlisted data.
"""
return (
self._unlist_data
if use_names is False
else self._unlist_data.set_names(self.get_partitioning().get_names(), in_place=False)
)
[docs]
def relist(self, unlist_data: Any) -> CompressedList:
"""Create a new `CompressedList` with the same partitioning but different data.
Args:
unlist_data:
New unlisted data.
Returns:
A new CompressedList.
"""
_validate_data_and_partitions(unlist_data, self._partitioning)
current_class_const = type(self)
return current_class_const(
unlist_data,
self._partitioning.copy(),
element_type=self._element_type,
element_metadata=self._element_metadata.copy(),
metadata=self._metadata.copy(),
)
[docs]
def lapply(self, func: Callable) -> CompressedList:
"""Apply a function to each element.
Args:
func:
Function to apply to each element.
Returns:
A new CompressedList with the results.
"""
result = [func(elem) for elem in self]
current_class_const = type(self)
return current_class_const.from_list(result, self.names, self._metadata)
[docs]
@classmethod
def empty(cls, n: int):
"""Create an zero-length `CompressedGenomicRangesList` object.
Args:
n:
Number of elements.
Returns:
same type as caller, in this case a `CompressedGenomicRangesList`.
"""
_range_lengths = [0] * n
return CompressedList(unlist_data=[], partitioning=Partitioning(ends=_range_lengths))
#######################
######>> extend <<#####
#######################
[docs]
def extend(self, other: CompressedList, in_place: bool = False) -> CompressedList:
"""
Args:
other:
Some CompressedList object.
in_place:
Whether to perform the modification in place.
Returns:
A ``CompressedList`` where items in ``other`` are added to the end. If
``in_place = False``, this is a new object, otherwise a reference
to the current object is returned.
"""
output = self._define_output(in_place)
output._unlist_data = ut.combine_sequences(output._unlist_data, other._unlist_data)
output._partitioning = ut.combine_sequences(output._partitioning, other._partitioning)
return output
@ut.combine_sequences.register(CompressedList)
def _register_combine_patitioning(*x: CompressedList) -> CompressedList:
if not x:
raise ValueError("Cannot combine an empty object")
output = x[0].copy()
for i in range(1, len(x)):
output.extend(x[i], in_place=True)
return output