from typing import Dict, List, Literal, Optional, Sequence, Tuple, Union
from warnings import warn
import biocutils as ut
import numpy as np
from biocframe import BiocFrame
from iranges import IRanges
from .SeqInfo import SeqInfo, merge_SeqInfo
from .utils import (
create_np_vector,
group_by_indices,
sanitize_strand_vector,
slide_intervals,
split_intervals,
)
__author__ = "jkanche"
__copyright__ = "jkanche"
__license__ = "MIT"
_granges_delim = "__"
def _guess_num_ranges(seqnames, ranges):
if len(seqnames) != len(ranges):
raise ValueError("Number of 'seqnames' and 'ranges' do not match!")
return len(seqnames)
def _validate_seqnames(seqnames, seqinfo, num_ranges):
if seqnames is None:
raise ValueError("'seqnames' cannot be None!")
if len(seqnames) != num_ranges:
raise ValueError(
"Length of 'seqnames' does not match the number of ranges.",
f"Need to be {num_ranges}, provided {len(seqnames)}.",
)
if np.isnan(seqnames).any():
raise ValueError("'seqnames' cannot contain None values.")
if not isinstance(seqinfo, SeqInfo):
raise TypeError("'seqinfo' is not an instance of `SeqInfo` class.")
_l = len(seqinfo)
if (seqnames > _l).any():
raise ValueError("'seqnames' contains sequence name not represented in 'seqinfo'.")
def _validate_ranges(ranges, num_ranges):
if ranges is None:
raise ValueError("'ranges' cannot be None.")
if not isinstance(ranges, IRanges):
raise TypeError("'ranges' is not an instance of `IRanges`.")
if len(ranges) != num_ranges:
raise ValueError(
"Length of 'ranges' does not match the number of seqnames.",
f"Need to be {num_ranges}, provided {len(ranges)}.",
)
def _validate_optional_attrs(strand, mcols, names, num_ranges):
if strand is not None:
if len(strand) != num_ranges:
raise ValueError("Length of 'strand' does not match the number of ranges.")
if np.isnan(strand).any():
raise ValueError("'strand' cannot contain None values.")
if mcols is not None:
if not isinstance(mcols, BiocFrame):
raise TypeError("'mcols' is not a `BiocFrame` object.")
if mcols.shape[0] != num_ranges:
raise ValueError("Length of 'mcols' does not match the number of ranges.")
if names is not None:
if len(names) != num_ranges:
raise ValueError("Length of 'names' does not match the number of ranges.")
if any(x is None for x in names):
raise ValueError("'names' cannot contain None values.")
[docs]
class GenomicRangesIter:
"""An iterator to a :py:class:`~GenomicRanges` object."""
[docs]
def __init__(self, obj: "GenomicRanges") -> None:
"""Initialize the iterator.
Args:
obj:
Source object to iterate.
"""
self._gr = obj
self._current_index = 0
[docs]
def __iter__(self):
return self
[docs]
def __next__(self):
if self._current_index < len(self._gr):
iter_row_index = self._gr.names[self._current_index] if self._gr.names is not None else None
iter_slice = self._gr[self._current_index]
self._current_index += 1
return (iter_row_index, iter_slice)
raise StopIteration
[docs]
class GenomicRanges:
"""``GenomicRanges`` provides a container class to represent and operate over genomic regions and annotations.
Note: The documentation for some of the methods are derived from the
`GenomicRanges R/Bioconductor package <https://github.com/Bioconductor/GenomicRanges>`_.
"""
[docs]
def __init__(
self,
seqnames: Sequence[str],
ranges: IRanges,
strand: Optional[Union[Sequence[str], Sequence[int], np.ndarray]] = None,
names: Optional[Sequence[str]] = None,
mcols: Optional[BiocFrame] = None,
seqinfo: Optional[SeqInfo] = None,
metadata: Optional[dict] = None,
validate: bool = True,
):
"""Initialize a ``GenomicRanges`` object.
Args:
seqnames:
List of sequence or chromosome names.
ranges:
Genomic positions and widths of each position. Must have the same length as ``seqnames``.
strand:
Strand information for each genomic range. This should be 0 (any strand),
1 (forward strand) or -1 (reverse strand). If None, all genomic ranges
are assumed to be 0.
May be provided as a list of strings representing the strand;
"+" for forward strand, "-" for reverse strand, or "*" for any strand and will be mapped
accordingly to 1, -1 or 0.
names:
Names for each genomic range. Defaults to None, which means the ranges are unnamed.
mcols:
A ~py:class:`~biocframe.BiocFrame.BiocFrame` with the number of rows same as
number of genomic ranges, containing per-range annotation. Defaults to None, in which case an empty
BiocFrame object is created.
seqinfo:
Sequence information. Defaults to None, in which case a
:py:class:`~genomicranges.SeqInfo.SeqInfo` object is created with the unique set of
chromosome names from ``seqnames``.
metadata:
Additional metadata. Defaults to None, and is assigned to an empty dictionary.
validate:
Internal use only.
"""
if seqinfo is None:
seqinfo = SeqInfo(seqnames=sorted(list(set(seqnames))))
self._seqinfo = seqinfo
self._reverse_seqindex = None
self._seqnames = self._sanitize_seqnames(seqnames, self._seqinfo)
self._ranges = ranges
if strand is None:
strand = np.repeat(0, len(self._seqnames))
self._strand = np.asarray(strand, dtype=np.int8)
else:
self._strand = sanitize_strand_vector(strand)
if names is not None and not isinstance(names, ut.Names):
names = ut.Names(names)
self._names = names
if mcols is None:
mcols = BiocFrame({}, number_of_rows=len(self._seqnames))
self._mcols = mcols
self._metadata = metadata if metadata is not None else {}
if validate is True:
_num_ranges = _guess_num_ranges(self._seqnames, self._ranges)
_validate_ranges(self._ranges, _num_ranges)
_validate_seqnames(self._seqnames, self._seqinfo, _num_ranges)
_validate_optional_attrs(self._strand, self._mcols, self._names, _num_ranges)
def _build_reverse_seqindex(self, seqinfo: SeqInfo):
self._reverse_seqindex = ut.reverse_index.build_reverse_index(seqinfo.get_seqnames())
def _remove_reverse_seqindex(self):
del self._reverse_seqindex
def _sanitize_seqnames(self, seqnames, seqinfo):
if self._reverse_seqindex is None:
self._build_reverse_seqindex(seqinfo)
if not isinstance(seqnames, np.ndarray):
seqnames = np.asarray([self._reverse_seqindex[x] for x in seqnames])
if len(seqnames) == 0:
seqnames = seqnames.astype(np.uint8)
else:
num_uniq = np.max(seqnames)
_types = [np.uint8, np.uint16, np.uint32, np.uint64]
for _dtype in _types:
if num_uniq < np.iinfo(_dtype).max:
seqnames = seqnames.astype(_dtype)
break
return seqnames
def _define_output(self, in_place: bool = False) -> "GenomicRanges":
if in_place is True:
return self
else:
return self.__copy__()
#########################
######>> Copying <<######
#########################
[docs]
def __deepcopy__(self, memo=None, _nil=[]):
"""
Returns:
A deep copy of the current ``GenomicRanges``.
"""
from copy import deepcopy
_ranges_copy = deepcopy(self._ranges)
_seqnames_copy = deepcopy(self._seqnames)
_strand_copy = deepcopy(self._strand)
_names_copy = deepcopy(self._names)
_mcols_copy = deepcopy(self._mcols)
_seqinfo_copy = deepcopy(self._seqinfo)
_metadata_copy = deepcopy(self.metadata)
current_class_const = type(self)
return current_class_const(
seqnames=_seqnames_copy,
ranges=_ranges_copy,
strand=_strand_copy,
names=_names_copy,
mcols=_mcols_copy,
seqinfo=_seqinfo_copy,
metadata=_metadata_copy,
)
[docs]
def __copy__(self):
"""
Returns:
A shallow copy of the current ``GenomicRanges``.
"""
current_class_const = type(self)
return current_class_const(
seqnames=self._seqnames,
ranges=self._ranges,
strand=self._strand,
names=self._names,
mcols=self._mcols,
seqinfo=self._seqinfo,
metadata=self._metadata,
)
[docs]
def copy(self):
"""Alias for :py:meth:`~__copy__`."""
return self.__copy__()
######################################
######>> length and iterators <<######
######################################
[docs]
def __len__(self) -> int:
"""
Returns:
Number of rows.
"""
return len(self._ranges)
[docs]
def __iter__(self) -> GenomicRangesIter:
"""Iterator over rows."""
return GenomicRangesIter(self)
##########################
######>> Printing <<######
##########################
[docs]
def __repr__(self) -> str:
"""
Returns:
A string representation of this ``GenomicRanges``.
"""
output = "GenomicRanges(number_of_ranges=" + str(len(self))
output += ", seqnames=" + ut.print_truncated_list(self._seqnames)
output += ", ranges=" + repr(self._ranges)
if self._strand is not None:
output += ", strand=" + ut.print_truncated_list(self._strand)
if self._names is not None:
output += ", names=" + ut.print_truncated_list(self._names)
if self._mcols is not None:
output += ", mcols=" + repr(self._mcols)
if self._seqinfo is not None:
output += ", seqinfo" + repr(self._seqinfo)
if len(self._metadata) > 0:
output += ", metadata=" + ut.print_truncated_dict(self._metadata)
output += ")"
return output
def __str__(self) -> str:
"""
Returns:
A pretty-printed string containing the contents of this ``GenomicRanges``.
"""
output = f"GenomicRanges with {len(self)} range{'s' if len(self) != 1 else ''}"
output += f" and {len(self._mcols.get_column_names())} metadata column{'s' if len(self._mcols.get_column_names()) != 1 else ''}\n"
nr = len(self)
added_table = False
if nr:
if nr <= 10:
indices = range(nr)
insert_ellipsis = False
else:
indices = [0, 1, 2, nr - 3, nr - 2, nr - 1]
insert_ellipsis = True
raw_floating = ut.create_floating_names(self._names, indices)
if insert_ellipsis:
raw_floating = raw_floating[:3] + [""] + raw_floating[3:]
floating = ["", ""] + raw_floating
columns = []
header = ["seqnames", "<str>"]
_seqnames = []
for x in self._seqnames[indices]:
_seqnames.append(self._seqinfo.get_seqnames()[x])
showed = _seqnames
if insert_ellipsis:
showed = showed[:3] + ["..."] + showed[3:]
columns.append(header + showed)
header = ["ranges", "<IRanges>"]
showed = [f"{x._start[0]} - {x.end[0]}" for _, x in self._ranges[indices]]
if insert_ellipsis:
showed = showed[:3] + ["..."] + showed[3:]
columns.append(header + showed)
header = ["strand", f"<{ut.print_type(self._strand)}>"]
_strand = []
for x in self._strand[indices]:
if x == 1:
_strand.append("+")
elif x == -1:
_strand.append("-")
else:
_strand.append("*")
showed = _strand
if insert_ellipsis:
showed = showed[:3] + ["..."] + showed[3:]
columns.append(header + showed)
if self._mcols.shape[1] > 0:
spacer = ["|"] * (len(indices) + insert_ellipsis)
columns.append(["", ""] + spacer)
for col in self._mcols.get_column_names():
data = self._mcols.column(col)
showed = ut.show_as_cell(data, indices)
header = [col, "<" + ut.print_type(data) + ">"]
showed = ut.truncate_strings(showed, width=max(40, len(header[0]), len(header[1])))
if insert_ellipsis:
showed = showed[:3] + ["..."] + showed[3:]
columns.append(header + showed)
output += ut.print_wrapped_table(columns, floating_names=floating)
added_table = True
footer = []
if self._seqinfo is not None and len(self._seqinfo):
footer.append(
"seqinfo("
+ str(len(self._seqinfo))
+ " sequences): "
+ ut.print_truncated_list(
self._seqinfo.get_seqnames(),
sep=" ",
include_brackets=False,
transform=lambda y: y,
)
)
if len(self._metadata):
footer.append(
"metadata("
+ str(len(self.metadata))
+ "): "
+ ut.print_truncated_list(
list(self.metadata.keys()),
sep=" ",
include_brackets=False,
transform=lambda y: y,
)
)
if len(footer):
if added_table:
output += "\n------\n"
output += "\n".join(footer)
return output
##########################
######>> seqnames <<######
##########################
[docs]
def get_seqnames(
self, as_type: Literal["factor", "list"] = "list"
) -> Union[Tuple[np.ndarray, List[str]], List[str]]:
"""Access sequence names.
Args:
as_type:
Access seqnames as factor tuple, in which case, levels and codes
are returned.
If ``list``, then codes are mapped to levels and returned.
Returns:
List of sequence names.
"""
if as_type == "factor":
return self._seqnames, self._seqinfo.get_seqnames()
elif as_type == "list":
return [self._seqinfo.get_seqnames()[x] for x in self._seqnames]
else:
raise ValueError("Argument 'as_type' must be 'factor' or 'list'.")
[docs]
def set_seqnames(self, seqnames: Union[Sequence[str], np.ndarray], in_place: bool = False) -> "GenomicRanges":
"""Set new sequence names.
Args:
seqnames:
List of sequence or chromosome names. Optionally can be a numpy array with indices mapped
to :py:attr:``~seqinfo``.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
_validate_seqnames(seqnames, len(self))
if not isinstance(seqnames, np.ndarray):
seqnames = np.asarray([self._seqinfo.get_seqnames().index(x) for x in list(seqnames)])
output = self._define_output(in_place)
output._seqnames = seqnames
return output
@property
def seqnames(self) -> Union[Union[np.ndarray, List[str]], np.ndarray]:
"""Alias for :py:meth:`~get_seqnames`."""
return self.get_seqnames()
@seqnames.setter
def seqnames(self, seqnames: Union[Sequence[str], np.ndarray]):
"""Alias for :py:meth:`~set_seqnames` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'seqnames' is an in-place operation, use 'set_seqnames' instead",
UserWarning,
)
self.set_row_names(seqnames, in_place=True)
########################
######>> ranges <<######
########################
[docs]
def get_ranges(self) -> IRanges:
"""
Returns:
An ``IRanges`` object containing the positions.
"""
return self._ranges
[docs]
def set_ranges(self, ranges: IRanges, in_place: bool = False) -> "GenomicRanges":
"""Set new ranges.
Args:
ranges:
IRanges containing the genomic positions and widths.
Must have the same length as ``seqnames``.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
_validate_ranges(ranges, len(self))
output = self._define_output(in_place)
output._ranges = ranges
return output
@property
def ranges(self) -> IRanges:
"""Alias for :py:meth:`~get_ranges`."""
return self.get_ranges()
@ranges.setter
def ranges(self, ranges: IRanges):
"""Alias for :py:meth:`~set_ranges` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'ranges' is an in-place operation, use 'set_ranges' instead",
UserWarning,
)
self.set_ranges(ranges, in_place=True)
########################
######>> strand <<######
########################
[docs]
def get_strand(
self, as_type: Literal["numpy", "factor", "list"] = "numpy"
) -> Union[Tuple[np.ndarray, dict], List[str]]:
"""Access strand information.
Args:
as_type:
Access seqnames as factor codes, in which case, a numpy
vector is retuned.
If ``factor``, a tuple width levels as a dictionary and
indices to ``seqinfo.get_seqnames()`` is returned.
If ``list``, then codes are mapped to levels and returned.
Returns:
A numpy vector representing strand, 0
for any strand, -1 for reverse strand
and 1 for forward strand.
A tuple of codes and levels.
A list of "+", "-", or "*" for each range.
"""
if as_type == "numpy":
return self._strand
elif as_type == "factor":
return self._strand, {"-1": "-", "0": "*", "1": "+"}
elif as_type == "list":
_strand = []
for x in self._strand:
if x == 1:
_strand.append("+")
elif x == -1:
_strand.append("-")
else:
_strand.append("*")
return _strand
else:
raise ValueError("Argument 'as_type' must be 'factor' or 'list'.")
[docs]
def set_strand(
self,
strand: Optional[Union[Sequence[str], Sequence[int], np.ndarray]],
in_place: bool = False,
) -> "GenomicRanges":
"""Set new strand information.
Args:
strand:
Strand information for each genomic range. This should be 0 (any strand),
1 (forward strand) or -1 (reverse strand). If None, all genomic ranges
are assumed to be 0.
May be provided as a list of strings representing the strand;
"+" for forward strand, "-" for reverse strand, or "*" for any strand and will be mapped
accordingly to 1, -1 or 0.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
if strand is None:
strand = np.repeat(0, len(self))
strand = sanitize_strand_vector(strand)
_validate_optional_attrs(strand, None, None, len(self))
output = self._define_output(in_place)
output._strand = strand
return output
@property
def strand(self) -> np.ndarray:
"""Alias for :py:meth:`~get_strand`."""
return self.get_strand()
@strand.setter
def strand(
self,
strand: Optional[Union[Sequence[str], Sequence[int], np.ndarray]],
):
"""Alias for :py:meth:`~set_strand` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'strand' is an in-place operation, use 'set_strand' instead",
UserWarning,
)
self.set_strand(strand, in_place=True)
########################
######>> names <<#######
########################
[docs]
def get_names(self) -> ut.Names:
"""
Returns:
A list of names for each genomic range.
"""
return self._names
[docs]
def set_names(
self,
names: Optional[Sequence[str]],
in_place: bool = False,
) -> "GenomicRanges":
"""Set new names.
Args:
names:
Names for each genomic range.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
if names is not None:
_validate_optional_attrs(None, None, names, len(self))
if not isinstance(names, ut.Names):
names = ut.Names(names)
output = self._define_output(in_place)
output._names = names
return output
@property
def names(self) -> ut.Names:
"""Alias for :py:meth:`~get_names`."""
return self.get_names()
@names.setter
def names(
self,
names: Optional[Sequence[str]],
):
"""Alias for :py:meth:`~set_names` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'names' is an in-place operation, use 'set_names' instead",
UserWarning,
)
self.set_names(names, in_place=True)
########################
######>> mcols <<#######
########################
[docs]
def get_mcols(self) -> BiocFrame:
"""
Returns:
A ~py:class:`~biocframe.BiocFrame.BiocFrame` containing per-range annotations.
"""
return self._mcols
[docs]
def set_mcols(
self,
mcols: Optional[BiocFrame],
in_place: bool = False,
) -> "GenomicRanges":
"""Set new range metadata.
Args:
mcols:
A ~py:class:`~biocframe.BiocFrame.BiocFrame` with length same as the number
of ranges, containing per-range annotations.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
if mcols is None:
mcols = BiocFrame({}, number_of_rows=len(self))
_validate_optional_attrs(None, mcols, None, len(self))
output = self._define_output(in_place)
output._mcols = mcols
return output
@property
def mcols(self) -> BiocFrame:
"""Alias for :py:meth:`~get_mcols`."""
return self.get_mcols()
@mcols.setter
def mcols(
self,
mcols: Optional[BiocFrame],
):
"""Alias for :py:meth:`~set_mcols` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'mcols' is an in-place operation, use 'set_mcols' instead",
UserWarning,
)
self.set_mcols(mcols, in_place=True)
##########################
######>> seqinfo <<#######
##########################
[docs]
def get_seqinfo(self) -> SeqInfo:
"""
Returns:
A ~py:class:`~genomicranges.SeqInfo.SeqInfo` containing sequence information.
"""
return self._seqinfo
[docs]
def set_seqinfo(
self,
seqinfo: Optional[SeqInfo],
in_place: bool = False,
) -> "GenomicRanges":
"""Set new sequence information.
Args:
seqinfo:
A ~py:class:`~genomicranges.SeqInfo.SeqInfo` object contaning information
about sequences in :py:attr:`~seqnames`.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
seqnames = self.get_seqnames(as_type="list")
if seqinfo is None:
seqinfo = SeqInfo(seqnames=seqnames)
self._reverse_seqindex = None
new_seqnames = self._sanitize_seqnames(seqnames, seqinfo)
_validate_seqnames(new_seqnames, seqinfo, len(self))
output = self._define_output(in_place)
output._seqinfo = seqinfo
output._seqnames = new_seqnames
return output
@property
def seqinfo(self) -> np.ndarray:
"""Alias for :py:meth:`~get_seqinfo`."""
return self.get_seqinfo()
@seqinfo.setter
def seqinfo(
self,
seqinfo: Optional[SeqInfo],
):
"""Alias for :py:meth:`~set_seqinfo` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'seqinfo' is an in-place operation, use 'set_seqinfo' instead",
UserWarning,
)
self.set_seqinfo(seqinfo, in_place=True)
###########################
######>> metadata <<#######
###########################
@property
def metadata(self) -> dict:
"""Alias for :py:attr:`~get_metadata`."""
return self.get_metadata()
@metadata.setter
def metadata(self, metadata: dict):
"""Alias for :py:attr:`~set_metadata` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'metadata' is an in-place operation, use 'set_metadata' instead",
UserWarning,
)
self.set_metadata(metadata, in_place=True)
################################
######>> Single getters <<######
################################
[docs]
def get_start(self) -> np.ndarray:
"""Get all start positions.
Returns:
NumPy array of 32-bit signed integers containing the start
positions for all ranges.
"""
return self._ranges.get_start()
@property
def start(self) -> np.ndarray:
"""Alias for :py:attr:`~get_start`."""
return self.get_start()
[docs]
def get_end(self) -> np.ndarray:
"""Get all end positions.
Returns:
NumPy array of 32-bit signed integers containing the end
positions for all ranges.
"""
return self._ranges.get_end()
@property
def end(self) -> np.ndarray:
"""Alias for :py:attr:`~get_end`."""
return self.get_end()
[docs]
def get_width(self) -> np.ndarray:
"""Get width of each genomic range.
Returns:
NumPy array of 32-bit signed integers containing the width
for all ranges.
"""
return self._ranges.get_width()
@property
def width(self) -> np.ndarray:
"""Alias for :py:attr:`~get_width`."""
return self.get_width()
#########################
######>> Slicers <<######
#########################
[docs]
def get_subset(self, subset: Union[str, int, bool, Sequence]) -> "GenomicRanges":
"""Subset ``GenomicRanges``, based on their indices or names.
Args:
subset:
Indices to be extracted. This may be an integer, boolean, string,
or any sequence thereof, as supported by
:py:meth:`~biocutils.normalize_subscript.normalize_subscript`.
Scalars are treated as length-1 sequences.
Strings may only be used if :py:attr:``~names`` are available (see
:py:meth:`~get_names`). The first occurrence of each string
in the names is used for extraction.
Returns:
A new ``GenomicRanges`` object with the ranges of interest.
"""
if len(self) == 0:
return GenomicRanges.empty()
idx, _ = ut.normalize_subscript(subset, len(self), self._names)
current_class_const = type(self)
return current_class_const(
seqnames=self._seqnames[idx],
ranges=self._ranges[idx],
strand=self._strand[idx],
names=self._names[idx] if self._names is not None else None,
mcols=self._mcols[idx, :],
seqinfo=self._seqinfo,
metadata=self._metadata,
)
[docs]
def __getitem__(self, subset: Union[str, int, bool, Sequence]) -> "GenomicRanges":
"""Alias to :py:attr:`~get_subset`."""
return self.get_subset(subset)
[docs]
def set_subset(
self,
args: Union[Sequence, int, str, bool, slice, range],
value: "GenomicRanges",
in_place: bool = False,
) -> "GenomicRanges":
"""Add or update positions (in-place operation).
Args:
args:
Integer indices, a boolean filter, or (if the current object is
named) names specifying the ranges to be replaced, see
:py:meth:`~biocutils.normalize_subscript.normalize_subscript`.
value:
An ``GenomicRanges`` object of length equal to the number of ranges
to be replaced, as specified by ``subset``.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
idx, _ = ut.normalize_subscript(args, len(self), self._names)
output = self._define_output(in_place)
output._seqnames[idx] = value._seqnames
output._ranges[idx] = value._ranges
output._strand[idx] = value._strand
if value._names is not None:
if output._names is None:
output._names = [""] * len(output)
for i, j in enumerate(idx):
output._names[j] = value._names[i]
elif output._names is not None:
for i, j in enumerate(idx):
output._names[j] = ""
if value._mcols is not None:
output._mcols[idx, :] = value._mcols
return output
[docs]
def __setitem__(
self,
args: Union[Sequence, int, str, bool, slice, range],
value: "GenomicRanges",
) -> "GenomicRanges":
"""Alias to :py:attr:`~set_subset`.
This operation modifies object in-place.
"""
warn(
"Modifying a subset of the object is an in-place operation, use 'set_subset' instead",
UserWarning,
)
return self.set_subset(args, value, in_place=True)
################################
######>> pandas interop <<######
################################
[docs]
def to_pandas(self) -> "pandas.DataFrame":
"""Convert this ``GenomicRanges`` object into a :py:class:`~pandas.DataFrame`.
Returns:
A :py:class:`~pandas.DataFrame` object.
"""
import pandas as pd
_rdf = self._ranges.to_pandas()
_rdf["seqnames"] = self.get_seqnames()
_rdf["strand"] = self.get_strand(as_type="list")
if self._names is not None:
_rdf.index = self._names
if self._mcols is not None:
if self._mcols.shape[1] > 0:
_rdf = pd.concat([_rdf, self._mcols.to_pandas()], axis=1)
return _rdf
[docs]
@classmethod
def from_pandas(cls, input: "pandas.DataFrame") -> "GenomicRanges":
"""Create a ``GenomicRanges`` from a :py:class:`~pandas.DataFrame` object.
Args:
input:
Input data. must contain columns 'seqnames', 'starts' and 'widths' or "ends".
Returns:
A ``GenomicRanges`` object.
"""
from pandas import DataFrame
if not isinstance(input, DataFrame):
raise TypeError("`input` is not a pandas `DataFrame` object.")
if "starts" not in input.columns:
raise ValueError("'input' must contain column 'starts'.")
start = input["starts"].tolist()
if "widths" not in input.columns and "ends" not in input.columns:
raise ValueError("'input' must contain either 'widths' or 'ends' columns.")
drops = []
if "widths" in input.columns:
drops.append("widths")
width = input["widths"].tolist()
else:
drops.append("ends")
width = input["ends"] - input["starts"]
if "seqnames" not in input.columns:
raise ValueError("'input' must contain column 'seqnames'.")
seqnames = input["seqnames"].tolist()
ranges = IRanges(start, width)
strand = None
if "strand" in input.columns:
strand = input["strand"].tolist()
drops.append("strand")
# mcols
drops.extend(["starts", "seqnames"])
mcols_df = input.drop(columns=drops)
mcols = None
if (not mcols_df.empty) or len(mcols_df.columns) > 0:
mcols = BiocFrame.from_pandas(mcols_df)
names = None
if input.index is not None:
names = [str(i) for i in input.index.to_list()]
return cls(ranges=ranges, seqnames=seqnames, strand=strand, names=names, mcols=mcols)
################################
######>> polars interop <<######
################################
[docs]
def to_polars(self) -> "polars.DataFrame":
"""Convert this ``GenomicRanges`` object into a :py:class:`~polars.DataFrame`.
Returns:
A :py:class:`~polars.DataFrame` object.
"""
import polars as pl
_rdf = self._ranges.to_polars()
_rdf = _rdf.with_columns(seqnames=self.get_seqnames(), strand=self.get_strand(as_type="list"))
if self._names is not None:
_rdf = _rdf.with_columns(rownames=self._names)
if self._mcols is not None:
if self._mcols.shape[1] > 0:
_rdf = pl.concat([_rdf, self._mcols.to_polars()], how="horizontal")
return _rdf
[docs]
@classmethod
def from_polars(cls, input: "polars.DataFrame") -> "GenomicRanges":
"""Create a ``GenomicRanges`` from a :py:class:`~polars.DataFrame` object.
Args:
input:
Input polars DataFrame.
must contain columns 'seqnames', 'starts' and 'widths' or "ends".
Returns:
A ``GenomicRanges`` object.
"""
from polars import DataFrame
if not isinstance(input, DataFrame):
raise TypeError("`input` is not a polars `DataFrame` object.")
if "starts" not in input.columns:
raise ValueError("'input' must contain column 'starts'.")
start = input["starts"].to_list()
if "widths" not in input.columns and "ends" not in input.columns:
raise ValueError("'input' must contain either 'widths' or 'ends' columns.")
drops = []
if "widths" in input.columns:
drops.append("widths")
width = input["widths"].to_list()
else:
drops.append("ends")
width = input["ends"] - input["starts"]
if "seqnames" not in input.columns:
raise ValueError("'input' must contain column 'seqnames'.")
seqnames = input["seqnames"].to_list()
ranges = IRanges(start, width)
strand = None
if "strand" in input.columns:
strand = input["strand"].to_list()
drops.append("strand")
# mcols
drops.extend(["starts", "seqnames"])
mcols_df = input.drop(drops)
mcols = None
if (not mcols_df.is_empty()) or len(mcols_df.columns) > 0:
mcols = BiocFrame.from_polars(mcols_df)
names = None
return cls(ranges=ranges, seqnames=seqnames, strand=strand, names=names, mcols=mcols)
#####################################
######>> intra-range methods <<######
#####################################
[docs]
def flank(
self,
width: int,
start: bool = True,
both: bool = False,
ignore_strand: bool = False,
in_place: bool = False,
) -> "GenomicRanges":
"""Compute flanking ranges for each range. The logic for this comes from the `R/GenomicRanges` & `IRanges`
packages.
If ``start`` is ``True`` for a given range, the flanking occurs at the `start`,
otherwise the `end`.
The `widths` of the flanks are given by the ``width`` parameter.
``width`` can be negative, in which case the flanking region is
reversed so that it represents a prefix or suffix of the range.
Usage:
`gr.flank(3, True)`, where "x" indicates a range in ``gr`` and "-" indicates the
resulting flanking region:
---xxxxxxx
If ``start`` were ``False``, the range in ``gr`` becomes
xxxxxxx---
For negative width, i.e. `gr.flank(x, -3, FALSE)`, where "*" indicates the overlap
between "x" and the result:
xxxx***
If ``both`` is ``True``, then, for all ranges in "x", the flanking regions are
extended into (or out of, if ``width`` is negative) the range, so that the result
straddles the given endpoint and has twice the width given by width.
This is illustrated below for `gr.flank(3, both=TRUE)`:
---***xxxx
Args:
width:
Width to flank by. May be negative.
start:
Whether to only flank starts. Defaults to True.
both:
Whether to flank both starts and ends. Defaults to False.
ignore_strand:
Whether to ignore strands. Defaults to False.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object with the flanked regions,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
all_starts = self.start
all_ends = self.end
all_strands = self.strand
# figure out which position to pin, start or end?
start_flags = np.repeat(start, len(all_strands))
if not ignore_strand:
start_flags = [start != (all_strands[i] == -1) for i in range(len(all_strands))]
new_starts = []
new_widths = []
# if both is true, then depending on start_flag, we extend it out
# I couldn't understand the scenario's with witdh <=0,
# so refer to the R implementation here
for idx in range(len(start_flags)):
sf = start_flags[idx]
tstart = 0
if both is True:
tstart = all_starts[idx] - abs(width) if sf else all_ends[idx] - abs(width)
else:
if width >= 0:
tstart = all_starts[idx] - abs(width) if sf else all_ends[idx]
else:
tstart = all_starts[idx] if sf else all_ends[idx] + width
new_starts.append(tstart)
new_widths.append((width * (2 if both else 1)))
output = self._define_output(in_place)
output._ranges = IRanges(new_starts, new_widths)
return output
[docs]
def resize(
self,
width: Union[int, List[int], np.ndarray],
fix: Literal["start", "end", "center"] = "start",
ignore_strand: bool = False,
in_place: bool = False,
) -> "GenomicRanges":
"""Resize ranges to the specified ``width`` where either the ``start``, ``end``, or ``center`` is used as an
anchor.
Args:
width:
Width to resize, cannot be negative!
fix:
Fix positions by "start", "end", or "center".
Defaults to "start".
ignore_strand:
Whether to ignore strands. Defaults to False.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Raises:
ValueError:
If parameter ``fix`` is neither `start`, `end`, nor `center`.
If ``width`` is negative.
Returns:
A modified ``GenomicRanges`` object with the resized regions,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
_REV_FIX = {"start": "end", "end": "start", "center": "center"}
if ignore_strand is False:
fix = [fix] * len(self)
for i in range(len(self.strand)):
if self._strand[i] == -1:
fix[i] = _REV_FIX[fix[i]]
output = self._define_output(in_place)
output._ranges = self._ranges.resize(width=width, fix=fix)
return output
[docs]
def shift(self, shift: Union[int, List[int], np.ndarray] = 0, in_place: bool = False) -> "GenomicRanges":
"""Shift all intervals.
Args:
shift:
Shift interval. If shift is 0, the current
object is returned. Defaults to 0.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object with the shifted regions,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
output = self._define_output(in_place)
if shift == 0:
return self
output._ranges = self._ranges.shift(shift=shift)
return output
[docs]
def restrict(
self,
start: Optional[Union[int, List[int], np.ndarray]] = None,
end: Optional[Union[int, List[int], np.ndarray]] = None,
keep_all_ranges: bool = False,
in_place: bool = False,
) -> "GenomicRanges":
"""Restrict ranges to a given start and end positions.
Args:
start:
Start position. Defaults to None.
end:
End position. Defaults to None.
keep_all_ranges:
Whether to keep intervals that do not overlap with start and end.
Defaults to False.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object with the restricted regions,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
restricted_ir = self._ranges.restrict(start=start, end=end, keep_all_ranges=True)
output = self._define_output(in_place)
output._ranges = restricted_ir
if keep_all_ranges is True:
restricted_ir._width = np.clip(restricted_ir.width, 0, None)
else:
_flt_idx = list(np.where(restricted_ir.width > -1)[0])
output = output[_flt_idx]
return output
[docs]
def trim(self, in_place: bool = False) -> "GenomicRanges":
"""Trim sequences outside of bounds for non-circular chromosomes.
Args:
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object with the trimmed regions,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
if self.seqinfo is None:
raise ValueError("Cannot trim ranges. `seqinfo` is not available.")
seqlengths = self.seqinfo.seqlengths
is_circular = self.seqinfo.is_circular
if seqlengths is None:
raise ValueError("Cannot trim ranges. `seqlengths` is not available.")
if is_circular is None:
warn("considering all sequences as non-circular...")
all_chrs = self._seqnames
all_ends = self.end
new_ends = []
for i in range(len(self)):
_t_chr = all_chrs[i]
_end = all_ends[i]
if is_circular is not None and is_circular[_t_chr] is False and _end > seqlengths[_t_chr]:
_end = seqlengths[_t_chr] + 1
new_ends.append(_end)
output = self._define_output(in_place)
output._ranges.width = np.asarray(new_ends) - output._ranges.start
return output
[docs]
def narrow(
self,
start: Optional[Union[int, List[int], np.ndarray]] = None,
width: Optional[Union[int, List[int], np.ndarray]] = None,
end: Optional[Union[int, List[int], np.ndarray]] = None,
in_place: bool = False,
) -> "GenomicRanges":
"""Narrow genomic positions by provided ``start``, ``width`` and ``end`` parameters.
Important: these parameters are relative shift in positions for each range.
Args:
start:
Relative start position. Defaults to None.
width:
Relative end position. Defaults to None.
end:
Relative width of the interval. Defaults to None.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object with the trimmed regions,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
if start is not None and end is not None and width is not None:
raise ValueError("Only provide two of the three parameters - `start`, " "`end` and `width` but not all!")
if width is not None:
if start is None and end is None:
raise ValueError("If width is provided, either start or end must be provided.")
narrow_ir = self._ranges.narrow(start=start, end=end, width=width)
output = self._define_output(in_place)
output._ranges = narrow_ir
return output
def _group_indices_by_chrm(self, ignore_strand: bool = False) -> dict:
__strand = self._strand.copy()
if ignore_strand:
__strand = np.zeros(len(self), dtype=np.int8)
# else:
# __strand[__strand == 0] = 1
_seqnames = [self._seqinfo._seqnames[i] for i in self._seqnames]
grp_keys = np.char.add(np.char.add(_seqnames, f"{_granges_delim}"), __strand.astype(str))
unique_grps, inverse_indices = np.unique(grp_keys, return_inverse=True)
chrm_grps = {str(grp): np.where(inverse_indices == i)[0].tolist() for i, grp in enumerate(unique_grps)}
return chrm_grps
[docs]
def reduce(
self,
with_reverse_map: bool = False,
drop_empty_ranges: bool = False,
min_gap_width: int = 1,
ignore_strand: bool = False,
) -> "GenomicRanges":
"""Reduce orders the ranges, then merges overlapping or adjacent ranges.
Args:
with_reverse_map:
Whether to return map of indices back to
original object. Defaults to False.
drop_empty_ranges:
Whether to drop empty ranges. Defaults to False.
min_gap_width:
Ranges separated by a gap of
at least ``min_gap_width`` positions are not merged. Defaults to 1.
ignore_strand:
Whether to ignore strands. Defaults to False.
Returns:
A new ``GenomicRanges`` object with reduced intervals.
"""
chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
_new_mcols = self._ranges._mcols.set_column("reduceindices", range(len(self)))
_new_ranges = self._ranges.set_mcols(_new_mcols)
_new_self = self.set_ranges(_new_ranges)
all_grp_ranges = []
rev_map = []
groups = []
for seq in _new_self._seqinfo._seqnames:
_iter_strands = [0] if ignore_strand is True else [1, -1, 0]
for strd in _iter_strands:
_key = f"{seq}{_granges_delim}{strd}"
if _key in chrm_grps:
_grp_subset = _new_self[chrm_grps[_key]]
_oindices = _grp_subset._ranges._mcols.get_column("reduceindices")
res_ir = _grp_subset._ranges.reduce(
with_reverse_map=True,
drop_empty_ranges=drop_empty_ranges,
min_gap_width=min_gap_width,
)
groups.extend([_key] * len(res_ir))
all_grp_ranges.append(res_ir)
_rev_map = []
for j in res_ir._mcols.get_column("revmap"):
_rev_map.append([_oindices[x] for x in j])
rev_map.extend(_rev_map)
all_merged_ranges = ut.combine_sequences(*all_grp_ranges)
all_merged_ranges._mcols.remove_column("revmap", in_place=True)
splits = [x.split(_granges_delim) for x in groups]
new_seqnames = [x[0] for x in splits]
new_strand = np.asarray([int(x[1]) for x in splits])
output = GenomicRanges(seqnames=new_seqnames, strand=new_strand, ranges=all_merged_ranges)
if with_reverse_map is True:
output._mcols.set_column("revmap", rev_map, in_place=True)
# self._ranges._mcols.remove_column("reduceindices", in_place=True)
return output
[docs]
def range(self, with_reverse_map: bool = False, ignore_strand: bool = False) -> "GenomicRanges":
"""Calculate range bounds for each distinct (seqname, strand) pair.
Args:
with_reverse_map:
Whether to return map of indices back to
original object. Defaults to False.
ignore_strand:
Whether to ignore strands. Defaults to False.
Returns:
A new ``GenomicRanges`` object with the range bounds.
"""
chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
all_grp_ranges = []
rev_map = []
groups = []
for seq in self._seqinfo.get_seqnames():
_iter_strands = [0] if ignore_strand is True else [1, -1, 0]
for strd in _iter_strands:
_key = f"{seq}{_granges_delim}{strd}"
if _key in chrm_grps:
_grp_subset = self[chrm_grps[_key]]
res_ir = _grp_subset._ranges.range()
groups.extend([_key] * len(res_ir))
all_grp_ranges.append(res_ir)
rev_map.extend(chrm_grps[_key] * len(res_ir))
all_merged_ranges = ut.combine_sequences(*all_grp_ranges)
splits = [x.split(_granges_delim) for x in groups]
new_seqnames = [x[0] for x in splits]
new_strand = np.asarray([int(x[1]) for x in splits])
output = GenomicRanges(seqnames=new_seqnames, strand=new_strand, ranges=all_merged_ranges)
if with_reverse_map is True:
output._mcols.set_column("revmap", rev_map, in_place=True)
return output
[docs]
def gaps(
self,
start: int = 1,
end: Optional[Union[int, Dict[str, int]]] = None,
ignore_strand: bool = False,
) -> "GenomicRanges":
"""Identify complemented ranges for each distinct (seqname, strand) pair.
Args:
start:
Restrict chromosome start position. Defaults to 1.
end:
Restrict end position for each chromosome.
Defaults to None. If None, extracts sequence information from
:py:attr:`~seqinfo` object if available.
ignore_strand:
Whether to ignore strands. Defaults to False.
Returns:
A new ``GenomicRanges`` with complement ranges.
"""
chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
all_grp_ranges = []
groups = []
for i, chrm in enumerate(self._seqinfo.get_seqnames()):
_iter_strands = [0] if ignore_strand is True else [1, -1, 0]
for strd in _iter_strands:
_key = f"{chrm}{_granges_delim}{strd}"
_end = None
if isinstance(end, dict):
_end = end[chrm]
elif isinstance(end, int):
_end = end
gaps = None
if _key in chrm_grps:
_grp_subset = self[chrm_grps[_key]]
gaps = _grp_subset._ranges.gaps(
start=start,
end=_end, # - 1 if _end is not None else _end
)
else:
if _end is None:
_end = self._seqinfo.seqlengths[i]
if _end is not None:
gaps = IRanges(start=[start], width=[_end - start + 1])
if gaps is not None:
all_grp_ranges.append(gaps)
groups.extend([_key] * len(gaps))
all_merged_ranges = ut.combine_sequences(*all_grp_ranges)
splits = [x.split(_granges_delim) for x in groups]
new_seqnames = [x[0] for x in splits]
new_strand = np.asarray([int(x[1]) for x in splits])
output = GenomicRanges(seqnames=new_seqnames, strand=new_strand, ranges=all_merged_ranges)
return output
[docs]
def disjoin(self, with_reverse_map: bool = False, ignore_strand: bool = False) -> "GenomicRanges":
"""Calculate disjoint genomic positions for each distinct (seqname, strand) pair.
Args:
with_reverse_map:
Whether to return a map of indices back to the original object.
Defaults to False.
ignore_strand:
Whether to ignore strands. Defaults to False.
Returns:
A new ``GenomicRanges`` containing disjoint ranges.
"""
chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
all_grp_ranges = []
rev_map = []
groups = []
for seq in self._seqinfo.get_seqnames():
_iter_strands = [0] if ignore_strand is True else [1, -1, 0]
for strd in _iter_strands:
_key = f"{seq}{_granges_delim}{strd}"
if _key in chrm_grps:
_grp_subset = self[chrm_grps[_key]]
res_ir = _grp_subset._ranges.disjoin(with_reverse_map=True)
groups.append(_key)
all_grp_ranges.append(res_ir)
_rev_map = []
for j in res_ir._mcols.get_column("revmap"):
_rev_map.append([chrm_grps[_key][x] for x in j])
rev_map.append(_rev_map[0])
all_merged_ranges = ut.combine_sequences(*all_grp_ranges)
splits = [x.split(_granges_delim) for x in groups]
new_seqnames = [x[0] for x in splits]
new_strand = np.asarray([int(x[1]) for x in splits])
output = GenomicRanges(seqnames=new_seqnames, strand=new_strand, ranges=all_merged_ranges)
if with_reverse_map is True:
output._mcols.set_column("revmap", rev_map, in_place=True)
return output
[docs]
def coverage(self, shift: int = 0, width: Optional[int] = None, weight: int = 1) -> Dict[str, np.ndarray]:
"""Calculate coverage for each chromosome, For each position, counts the number of ranges that cover it.
Args:
shift:
Shift all genomic positions. Defaults to 0.
width:
Restrict the width of all
chromosomes. Defaults to None.
weight:
Weight to use. Defaults to 1.
Returns:
A dictionary with chromosome names as keys and the
coverage vector as value.
"""
chrm_grps = self._group_indices_by_chrm(ignore_strand=True)
shift_arr = None
if shift > 0:
shift_arr = np.zeros(shift)
result = {}
for chrm, group in chrm_grps.items():
_grp_subset = self[group]
all_intvals = [(x[0], x[1]) for x in zip(_grp_subset._ranges._start, _grp_subset._ranges.end)]
cov, _ = create_np_vector(intervals=all_intvals, with_reverse_map=False)
if shift > 0:
cov = ut.combine_sequences(shift_arr, cov)
if weight > 0:
cov = cov * weight
if width is not None:
cov = cov[:width]
result[chrm.split(_granges_delim)[0]] = cov
return result
################################
######>> set operations <<######
################################
[docs]
def union(self, other: "GenomicRanges") -> "GenomicRanges":
"""Find union of genomic intervals with `other`.
Args:
other:
The other ``GenomicRanges`` object.
Raises:
TypeError:
If ``other`` is not a ``GenomicRanges``.
Returns:
A new ``GenomicRanges`` object with all ranges.
"""
if not isinstance(other, GenomicRanges):
raise TypeError("'other' is not a `GenomicRanges` object.")
all_combined = _combine_GenomicRanges(self, other)
output = all_combined.reduce(min_gap_width=0, drop_empty_ranges=True)
return output
[docs]
def setdiff(self, other: "GenomicRanges") -> "GenomicRanges":
"""Find set difference of genomic intervals with `other`.
Args:
other:
The other ``GenomicRanges`` object.
Raises:
TypeError:
If ``other`` is not of type ``GenomicRanges``.
Returns:
A new ``GenomicRanges`` object with the diff ranges.
"""
if not isinstance(other, GenomicRanges):
raise TypeError("'other' is not a `GenomicRanges` object.")
all_combined = _fast_combine_GenomicRanges(self, other)
range_bounds = all_combined.range(ignore_strand=True)
rb_ends = {}
for _, val in range_bounds:
rb_ends[val.get_seqnames()[0]] = val.end[0]
x_gaps = self.gaps(end=rb_ends)
x_gaps_u = x_gaps.union(other)
diff = x_gaps_u.gaps(end=rb_ends)
return diff
[docs]
def intersect(self, other: "GenomicRanges") -> "GenomicRanges":
"""Find intersecting genomic intervals with `other`.
Args:
other:
The other ``GenomicRanges`` object.
Raises:
TypeError:
If ``other`` is not a ``GenomicRanges``.
Returns:
A new ``GenomicRanges`` object with intersecting ranges.
"""
if not isinstance(other, GenomicRanges):
raise TypeError("'other' is not a `GenomicRanges` object.")
all_combined = _fast_combine_GenomicRanges(self, other)
range_bounds = all_combined.range(ignore_strand=True)
rb_ends = {}
for _, val in range_bounds:
rb_ends[val.get_seqnames()[0]] = val.end[0]
_gaps = other.gaps(end=rb_ends)
# _inter = self.setdiff(_gaps)
x_gaps = self.gaps(end=rb_ends)
x_gaps_u = x_gaps.union(_gaps)
diff = x_gaps_u.gaps(end=rb_ends)
return diff
[docs]
def intersect_ncls(self, other: "GenomicRanges") -> "GenomicRanges":
"""Find intersecting genomic intervals with `other` (uses NCLS index).
Args:
other:
The other ``GenomicRanges`` object.
Raises:
TypeError:
If ``other`` is not a ``GenomicRanges``.
Returns:
A new ``GenomicRanges`` object with intersecting ranges.
"""
if not isinstance(other, GenomicRanges):
raise TypeError("'other' is not a `GenomicRanges` object.")
if not ut.package_utils.is_package_installed("ncls"):
raise ImportError("package: 'ncls' is not installed.")
from ncls import NCLS
self_end = self.end
other_end = other.end
other_ncls = NCLS(other.start, other_end, np.arange(len(other)))
_self_indexes, _other_indexes = other_ncls.all_overlaps_both(self.start, self_end, np.arange(len(self)))
other_chrms = np.array([other._seqinfo._seqnames[other._seqnames[i]] for i in _other_indexes])
self_chrms = np.array([self._seqinfo._seqnames[self._seqnames[i]] for i in _self_indexes])
other_strands = other._strand[_other_indexes]
self_strands = self._strand[_self_indexes]
filtered_indexes = np.logical_and(other_chrms == self_chrms, other_strands == self_strands)
self_starts = self.start[_self_indexes][filtered_indexes]
other_starts = other.start[_other_indexes][filtered_indexes]
new_starts = np.where(self_starts > other_starts, self_starts, other_starts)
self_ends = self_end[_self_indexes][filtered_indexes]
other_ends = other_end[_other_indexes][filtered_indexes]
new_ends = np.where(self_ends < other_ends, self_ends, other_ends)
# filtered_keys = self_grp_keys[filtered_indexes]
# splits = [x.split(_granges_delim) for x in filtered_keys]
new_seqnames = self_chrms[filtered_indexes].tolist()
new_strands = self_strands[filtered_indexes]
output = GenomicRanges(
seqnames=new_seqnames,
ranges=IRanges(new_starts, new_ends - new_starts),
strand=np.array(new_strands).astype(np.int8),
)
return output
###################################
######>> search operations <<######
###################################
[docs]
def find_overlaps(
self,
query: "GenomicRanges",
query_type: Literal["any", "start", "end", "within"] = "any",
select: Literal["all", "first", "last", "arbitrary"] = "all",
max_gap: int = -1,
min_overlap: int = 1,
ignore_strand: bool = False,
) -> List[List[int]]:
"""Find overlaps between subject (self) and a ``query`` ``GenomicRanges`` object.
Args:
query:
Query `GenomicRanges`.
query_type:
Overlap query type, must be one of
- "any": Any overlap is good
- "start": Overlap at the beginning of the intervals
- "end": Must overlap at the end of the intervals
- "within": Fully contain the query interval
Defaults to "any".
select:
Determine what hit to choose when
there are multiple hits for an interval in ``subject``.
max_gap:
Maximum gap allowed in the overlap.
Defaults to -1 (no gap allowed).
min_overlap:
Minimum overlap with query. Defaults to 1.
ignore_strand:
Whether to ignore strands. Defaults to False.
Raises:
TypeError: If ``query`` is not of type `GenomicRanges`.
Returns:
A list with the same length as ``query``,
containing hits to overlapping indices.
"""
OVERLAP_QUERY_TYPES = ["any", "start", "end", "within"]
if not isinstance(query, GenomicRanges):
raise TypeError("'query' is not a `GenomicRanges` object.")
if query_type not in OVERLAP_QUERY_TYPES:
raise ValueError(f"'{query_type}' must be one of {', '.join(OVERLAP_QUERY_TYPES)}.")
rev_map = [[] for _ in range(len(query))]
subject_chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
query_chrm_grps = query._group_indices_by_chrm(ignore_strand=ignore_strand)
for group, indices in query_chrm_grps.items():
_subset = []
if group in subject_chrm_grps:
_subset.extend(subject_chrm_grps[group])
_grp_split = group.split(_granges_delim)
if _grp_split[1] != "0":
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}0"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
else:
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}1"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
_any_grp_rev = f"{_grp_split[0]}{_granges_delim}-1"
if _any_grp_rev in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_rev])
if len(_subset) > 0:
_sub_subset = self[_subset]
_query_subset = query[indices]
res_idx = _sub_subset._ranges.find_overlaps(
query=_query_subset._ranges,
query_type=query_type,
select=select,
max_gap=max_gap,
min_overlap=min_overlap,
delete_index=False,
)
for j, val in enumerate(res_idx):
_rev_map = [_subset[x] for x in val]
rev_map[indices[j]] = _rev_map
return rev_map
[docs]
def count_overlaps(
self,
query: "GenomicRanges",
query_type: Literal["any", "start", "end", "within"] = "any",
max_gap: int = -1,
min_overlap: int = 1,
ignore_strand: bool = False,
) -> List[int]:
"""Count overlaps between subject (self) and a ``query`` ``GenomicRanges`` object.
Args:
query:
Query `GenomicRanges`.
query_type:
Overlap query type, must be one of
- "any": Any overlap is good
- "start": Overlap at the beginning of the intervals
- "end": Must overlap at the end of the intervals
- "within": Fully contain the query interval
Defaults to "any".
max_gap:
Maximum gap allowed in the overlap.
Defaults to -1 (no gap allowed).
min_overlap:
Minimum overlap with query. Defaults to 1.
ignore_strand:
Whether to ignore strands. Defaults to False.
Raises:
TypeError: If ``query`` is not of type `GenomicRanges`.
Returns:
A list with the same length as ``query``,
containing number of overlapping indices.
"""
OVERLAP_QUERY_TYPES = ["any", "start", "end", "within"]
if not isinstance(query, GenomicRanges):
raise TypeError("'query' is not a `GenomicRanges` object.")
if query_type not in OVERLAP_QUERY_TYPES:
raise ValueError(f"'{query_type}' must be one of {', '.join(OVERLAP_QUERY_TYPES)}.")
rev_map = [0 for _ in range(len(query))]
subject_chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
query_chrm_grps = query._group_indices_by_chrm(ignore_strand=ignore_strand)
for group, indices in query_chrm_grps.items():
_subset = []
if group in subject_chrm_grps:
_subset.extend(subject_chrm_grps[group])
_grp_split = group.split(_granges_delim)
if _grp_split[1] != "0":
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}0"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
else:
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}1"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
_any_grp_rev = f"{_grp_split[0]}{_granges_delim}-1"
if _any_grp_rev in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_rev])
if len(_subset) > 0:
_sub_subset = self[_subset]
_query_subset = query[indices]
res_idx = _sub_subset._ranges.find_overlaps(
query=_query_subset._ranges,
query_type=query_type,
select="all",
max_gap=max_gap,
min_overlap=min_overlap,
delete_index=False,
)
for j, val in enumerate(res_idx):
_rev_map = [_subset[x] for x in val]
rev_map[indices[j]] = len(_rev_map)
return rev_map
[docs]
def subset_by_overlaps(
self,
query: "GenomicRanges",
query_type: Literal["any", "start", "end", "within"] = "any",
max_gap: int = -1,
min_overlap: int = 1,
ignore_strand: bool = False,
) -> "GenomicRanges":
"""Subset ``subject`` (self) with overlaps in ``query`` `GenomicRanges` object.
Args:
query:
Query `GenomicRanges`.
query_type:
Overlap query type, must be one of
- "any": Any overlap is good
- "start": Overlap at the beginning of the intervals
- "end": Must overlap at the end of the intervals
- "within": Fully contain the query interval
Defaults to "any".
max_gap:
Maximum gap allowed in the overlap.
Defaults to -1 (no gap allowed).
min_overlap:
Minimum overlap with query. Defaults to 1.
ignore_strand:
Whether to ignore strands. Defaults to False.
Raises:
TypeError:
If ``query`` is not of type `GenomicRanges`.
Returns:
A ``GenomicRanges`` object containing overlapping ranges.
"""
OVERLAP_QUERY_TYPES = ["any", "start", "end", "within"]
if not isinstance(query, GenomicRanges):
raise TypeError("'query' is not a `GenomicRanges` object.")
if query_type not in OVERLAP_QUERY_TYPES:
raise ValueError(f"'{query_type}' must be one of {', '.join(OVERLAP_QUERY_TYPES)}.")
rev_map = []
subject_chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
query_chrm_grps = query._group_indices_by_chrm(ignore_strand=ignore_strand)
for group, indices in query_chrm_grps.items():
_subset = []
if group in subject_chrm_grps:
_subset.extend(subject_chrm_grps[group])
_grp_split = group.split(_granges_delim)
if _grp_split[1] != "0":
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}0"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
else:
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}1"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
_any_grp_rev = f"{_grp_split[0]}{_granges_delim}-1"
if _any_grp_rev in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_rev])
if len(_subset) > 0:
_sub_subset = self[_subset]
_query_subset = query[indices]
res_idx = _sub_subset._ranges.find_overlaps(
query=_query_subset._ranges,
query_type=query_type,
select="all",
max_gap=max_gap,
min_overlap=min_overlap,
delete_index=False,
)
for _, val in enumerate(res_idx):
_rev_map = [_subset[x] for x in val]
rev_map.extend(_rev_map)
return self[list(set(rev_map))]
#################################
######>> nearest methods <<######
#################################
[docs]
def nearest(
self,
query: "GenomicRanges",
select: Literal["all", "arbitrary"] = "all",
ignore_strand: bool = False,
) -> List[List[int]]:
"""Search nearest positions both upstream and downstream that overlap with each range in ``query``.
Args:
query:
Query ``GenomicRanges`` to find nearest positions.
select:
Determine what hit to choose when there are
multiple hits for an interval in ``query``.
ignore_strand:
Whether to ignore strand. Defaults to False.
Returns:
A list with the same length as ``query``,
containing hits to nearest indices.
"""
if not isinstance(query, GenomicRanges):
raise TypeError("'query' is not a `GenomicRanges` object.")
rev_map = [[] for _ in range(len(query))]
subject_chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
query_chrm_grps = query._group_indices_by_chrm(ignore_strand=ignore_strand)
for group, indices in query_chrm_grps.items():
_subset = []
if group in subject_chrm_grps:
_subset.extend(subject_chrm_grps[group])
_grp_split = group.split(_granges_delim)
if _grp_split[1] != "0":
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}0"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
else:
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}1"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
_any_grp_rev = f"{_grp_split[0]}{_granges_delim}-1"
if _any_grp_rev in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_rev])
if len(_subset) > 0:
_sub_subset = self[_subset]
_query_subset = query[indices]
res_idx = _sub_subset._ranges.nearest(query=_query_subset._ranges, select=select, delete_index=False)
for j, val in enumerate(res_idx):
_rev_map = [_subset[x] for x in val]
rev_map[indices[j]] = _rev_map
return rev_map
[docs]
def precede(
self,
query: "GenomicRanges",
select: Literal["all", "arbitrary"] = "all",
ignore_strand: bool = False,
) -> List[List[int]]:
"""Search nearest positions only downstream that overlap with each range in ``query``.
Args:
query:
Query ``GenomicRanges`` to find nearest positions.
select:
Determine what hit to choose when there are
multiple hits for an interval in ``query``.
ignore_strand:
Whether to ignore strand. Defaults to False.
Returns:
A List with the same length as ``query``,
containing hits to nearest indices.
"""
if not isinstance(query, GenomicRanges):
raise TypeError("'query' is not a `GenomicRanges` object.")
rev_map = [[] for _ in range(len(query))]
subject_chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
query_chrm_grps = query._group_indices_by_chrm(ignore_strand=ignore_strand)
for group, indices in query_chrm_grps.items():
_subset = []
if group in subject_chrm_grps:
_subset.extend(subject_chrm_grps[group])
_grp_split = group.split(_granges_delim)
if _grp_split[1] != "0":
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}0"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
else:
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}1"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
_any_grp_rev = f"{_grp_split[0]}{_granges_delim}-1"
if _any_grp_rev in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_rev])
if len(_subset) > 0:
_sub_subset = self[_subset]
_query_subset = query[indices]
res_idx = _sub_subset._ranges.precede(query=_query_subset._ranges, select=select, delete_index=False)
for j, val in enumerate(res_idx):
_rev_map = [_subset[x] for x in val]
rev_map[indices[j]] = _rev_map
return rev_map
[docs]
def follow(
self,
query: "GenomicRanges",
select: Literal["all", "arbitrary"] = "all",
ignore_strand: bool = False,
) -> List[List[int]]:
"""Search nearest positions only upstream that overlap with each range in ``query``.
Args:
query:
Query ``GenomicRanges`` to find nearest positions.
select:
Determine what hit to choose when there are
multiple hits for an interval in ``query``.
ignore_strand:
Whether to ignore strand. Defaults to False.
Returns:
A List with the same length as ``query``,
containing hits to nearest indices.
"""
if not isinstance(query, GenomicRanges):
raise TypeError("'query' is not a `GenomicRanges` object.")
rev_map = [[] for _ in range(len(query))]
subject_chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
query_chrm_grps = query._group_indices_by_chrm(ignore_strand=ignore_strand)
for group, indices in query_chrm_grps.items():
_subset = []
if group in subject_chrm_grps:
_subset.extend(subject_chrm_grps[group])
_grp_split = group.split(_granges_delim)
if _grp_split[1] != "0":
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}0"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
else:
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}1"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
_any_grp_rev = f"{_grp_split[0]}{_granges_delim}-1"
if _any_grp_rev in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_rev])
if len(_subset) > 0:
_sub_subset = self[_subset]
_query_subset = query[indices]
res_idx = _sub_subset._ranges.follow(query=_query_subset._ranges, select=select, delete_index=False)
for j, val in enumerate(res_idx):
_rev_map = [_subset[x] for x in val]
rev_map[indices[j]] = _rev_map
return rev_map
[docs]
def distance(self, query: Union["GenomicRanges", IRanges]) -> np.ndarray:
"""Compute the pair-wise distance with intervals in query.
Args:
query:
Query `GenomicRanges` or `IRanges`.
Returns:
Numpy vector containing distances for each interval in query.
"""
if not isinstance(query, (IRanges, GenomicRanges)):
raise TypeError("'query' is not a `GenomicRanges` or `IRanges` object.")
if len(self) != len(query):
raise ValueError("'query' does not contain the same number of intervals.")
_qranges = query
if isinstance(query, GenomicRanges):
_qranges = query.get_ranges()
return self._ranges.distance(_qranges)
[docs]
def match(self, query: "GenomicRanges") -> List[List[int]]:
"""Element wise comparison to find exact match ranges.
Args:
query:
Query ``GenomicRanges`` to search for matches.
Raises:
TypeError:
If ``query`` is not of type ``GenomicRanges``.
Returns:
A List with the same length as ``query``,
containing hits to matching indices.
"""
if not isinstance(query, GenomicRanges):
raise TypeError("'query' is not a `GenomicRanges` object.")
ignore_strand = False
subject_chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
rev_map = []
groups = []
for i in range(len(query)):
try:
_seqname = query.get_seqnames()[i]
except Exception as _:
warn(f"'{query.get_seqnames()[i]}' is not present in subject.")
_strand = query._strand[i]
if ignore_strand is True:
_strand = 0
_key = f"{_seqname}{_granges_delim}{_strand}"
if _key in subject_chrm_grps:
_grp_subset = self[subject_chrm_grps[_key]]
res_idx = _grp_subset._ranges.find_overlaps(
query=query._ranges[i],
query_type="any",
select="all",
delete_index=False,
)
groups.append(i)
_rev_map = []
for j in res_idx:
for x in j:
_mrange = self[subject_chrm_grps[_key][x]]._ranges
if (
_mrange.start[0] == query._ranges[i].start[0]
and _mrange.width[0] == query._ranges[i].width[0]
):
_rev_map.append(subject_chrm_grps[_key][x])
rev_map.append(_rev_map)
return rev_map
def _get_ranges_as_list(self) -> List[Tuple[int, int, int]]:
"""Internal method to get ranges as a list of tuples.
Returns:
List of tuples containing the start, end and the index.
"""
ranges = []
for i in range(len(self)):
ranges.append(
(
self._seqnames[i],
self._strand[i],
self._ranges._start[i],
self._ranges.end[i],
i,
)
)
return ranges
[docs]
def order(self, decreasing: bool = False) -> np.ndarray:
"""Get the order of indices for sorting.
Order orders the genomic ranges by chromosome and strand.
Strand is ordered by reverse first (-1), any strand (0) and
forward strand (-1). Then by the start positions and width if
two regions have the same start.
Args:
decreasing:
Whether to sort in descending order. Defaults to False.
Returns:
NumPy vector containing index positions in the sorted order.
"""
intvals = sorted(self._get_ranges_as_list(), reverse=decreasing)
order = [o[4] for o in intvals]
return np.asarray(order)
[docs]
def sort(self, decreasing: bool = False, in_place: bool = False) -> "GenomicRanges":
"""Get the order of indices for sorting.
Args:
decreasing:
Whether to sort in descending order. Defaults to False.
in_place:
Whether to modify the object in place. Defaults to False.
Returns:
A modified ``GenomicRanges`` object with the trimmed regions,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
order = self.order(decreasing=decreasing)
output = self._define_output(in_place)
return output[list(order)]
[docs]
def rank(self) -> List[int]:
"""Get rank of the ``GenomicRanges`` object.
For each range identifies its position is a sorted order.
Returns:
Numpy vector containing rank.
"""
intvals = sorted(self._get_ranges_as_list())
order = [o[4] for o in intvals]
rank = [order.index(x) for x in range(len(order))]
return rank
##############################
######>> misc methods <<######
##############################
[docs]
def sample(self, k: int = 5) -> "GenomicRanges":
"""Randomly sample ``k`` intervals.
Args:
k:
Number of intervals to sample. Defaults to 5.
Returns:
A new ``GenomicRanges`` with randomly sampled ranges.
"""
from random import sample
sample = sample(range(len(self)), k=k)
return self[sample]
[docs]
def invert_strand(self, in_place: bool = False) -> "GenomicRanges":
"""Invert strand for each range.
Conversion map:
- "+" map to "-"
- "-" becomes "+"
- "*" stays the same
Args:
in_place:
Whether to modify the object in place. Defaults to False.
Returns:
A modified ``GenomicRanges`` object with the trimmed regions,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
convertor = {"1": "-1", "-1": "1", "0": "0"}
inverts = [convertor[str(idx)] for idx in self._strand]
output = self._define_output(in_place)
output._strand = inverts
return output
################################
######>> window methods <<######
################################
[docs]
def tile_by_range(self, n: Optional[int] = None, width: Optional[int] = None) -> "GenomicRanges":
"""Split each sequence length into chunks by ``n`` (number of intervals) or ``width`` (intervals with equal
width).
Note: Either ``n`` or ``width`` must be provided, but not both.
Also, checkout :py:meth:`~genomicranges.io.tiling.tile_genome` for splitting
the genome into chunks.
Args:
n:
Number of intervals to split into.
Defaults to None.
width:
Width of each interval. Defaults to None.
Raises:
ValueError:
If both ``n`` or ``width`` are provided.
Returns:
A new ``GenomicRanges`` with the split ranges.
"""
if n is not None and width is not None:
raise ValueError("Either `n` or `width` must be provided but not both.")
ranges = self.range()
seqnames = []
strand = []
starts = []
widths = []
for _, val in ranges:
twidth = None
if n is not None:
twidth = int((val._ranges.width[0]) / n)
elif width is not None:
twidth = width
all_intervals = split_intervals(val._ranges._start[0], val._ranges.end[0] - 1, twidth)
seqnames.extend([val.get_seqnames()[0]] * len(all_intervals))
strand.extend([int(val.strand[0])] * len(all_intervals))
starts.extend([x[0] for x in all_intervals])
widths.extend(x[1] for x in all_intervals)
return GenomicRanges(seqnames=seqnames, strand=strand, ranges=IRanges(start=starts, width=widths))
[docs]
def tile(self, n: Optional[int] = None, width: Optional[int] = None) -> "GenomicRanges":
"""Split each interval by ``n`` (number of sub intervals) or ``width`` (intervals with equal width).
Note: Either ``n`` or ``width`` must be provided but not both.
Also, checkout :py:func:`~genomicranges.io.tiling.tile_genome` for splitting
a genome into chunks, or
:py:meth:`~genomicranges.GenomicRanges.GenomicRanges.tile_by_range`.
Args:
n:
Number of intervals to split into.
Defaults to None.
width:
Width of each interval. Defaults to None.
Raises:
ValueError:
If both ``n`` and ``width`` are provided.
Returns:
A new ``GenomicRanges`` with the split ranges.
"""
if n is not None and width is not None:
raise ValueError("either `n` or `width` must be provided but not both")
import math
seqnames = []
strand = []
starts = []
widths = []
counter = 0
for _, val in self:
twidth = None
if n is not None:
twidth = math.ceil((val._ranges._width + 1) / (n))
if twidth < 1:
raise RuntimeError(f"'width' of region is less than 'n' for range in: {counter}.")
elif width is not None:
twidth = width
all_intervals = split_intervals(val._ranges._start[0], val._ranges.end[0] - 1, twidth)
seqnames.extend([val.get_seqnames()[0]] * len(all_intervals))
strand.extend([int(val.strand[0])] * len(all_intervals))
starts.extend([x[0] for x in all_intervals])
widths.extend(x[1] for x in all_intervals)
counter += 1
return GenomicRanges(seqnames=seqnames, strand=strand, ranges=IRanges(start=starts, width=widths))
[docs]
def sliding_windows(self, width: int, step: int = 1) -> "GenomicRanges":
"""Slide along each range by ``width`` (intervals with equal ``width``) and ``step``.
Also, checkout :py:func:`~genomicranges.io.tiling.tile_genome` for splitting
a gneomic into chunks, or
:py:meth:`~genomicranges.GenomicRanges.GenomicRanges.tile_by_range`.
Args:
n:
Number of intervals to split into.
Defaults to None.
width:
Width of each interval. Defaults to None.
Returns:
A new ``GenomicRanges`` with the sliding ranges.
"""
seqnames = []
strand = []
starts = []
widths = []
for _, val in self:
all_intervals = slide_intervals(
val._ranges._start[0],
val._ranges.end[0] - 1,
width=width,
step=step,
)
seqnames.extend([val.get_seqnames()[0]] * len(all_intervals))
strand.extend([int(val.strand[0])] * len(all_intervals))
starts.extend([x[0] for x in all_intervals])
widths.extend(x[1] for x in all_intervals)
return GenomicRanges(seqnames=seqnames, strand=strand, ranges=IRanges(start=starts, width=widths))
[docs]
@classmethod
def tile_genome(
cls,
seqlengths: Union[Dict, SeqInfo],
n: Optional[int] = None,
width: Optional[int] = None,
) -> "GenomicRanges":
"""Create a new ``GenomicRanges`` by partitioning a specified genome.
If ``n`` is provided, the region is split into ``n`` intervals. The last interval may
not contain the same 'width' as the other regions.
Alternatively, ``width`` may be provided for each interval. Similarly, the last region
may be less than ``width``.
Either ``n`` or ``width`` must be provided but not both.
Args:
seqlengths:
Sequence lengths of each chromosome.
``seqlengths`` may be a dictionary, where keys specify the chromosome
and the value is the length of each chromosome in the genome.
Alternatively, ``seqlengths`` may be an instance of
:py:class:`~genomicranges.SeqInfo.SeqInfo`.
n:
Number of intervals to split into.
Defaults to None, then 'width' of each interval is computed from ``seqlengths``.
width:
Width of each interval. Defaults to None.
Raises:
ValueError:
If both ``n`` and ``width`` are provided.
Returns:
A new ``GenomicRanges`` with the tiled regions.
"""
import math
if n is not None and width is not None:
raise ValueError("Both `n` or `width` are provided!")
seqlen_ = seqlengths
if isinstance(seqlengths, SeqInfo):
seqlen_ = dict(zip(seqlengths.get_seqnames(), seqlengths.seqlengths))
seqnames = []
strand = []
starts = []
widths = []
for chrm, chrlen in seqlen_.items():
twidth = None
if n is not None:
twidth = math.ceil(chrlen / n)
elif width is not None:
twidth = width
all_intervals = split_intervals(1, chrlen, twidth)
seqnames.extend([chrm] * len(all_intervals))
strand.extend(["*"] * len(all_intervals))
starts.extend([x[0] for x in all_intervals])
widths.extend(x[1] for x in all_intervals)
return GenomicRanges(seqnames=seqnames, strand=strand, ranges=IRanges(start=starts, width=widths))
[docs]
def binned_average(
self,
scorename: str,
bins: "GenomicRanges",
outname: str = "binned_average",
in_place: bool = False,
) -> "GenomicRanges":
"""Calculate average for a column across all regions in ``bins``, then set a column specified by 'outname' with
those values.
Args:
scorename:
Score column to compute averages on.
bins:
Bins you want to use.
outname:
New column name to add to the object.
in_place:
Whether to modify ``bins`` in place.
Raises:
ValueError:
If ``scorename`` column does not exist.
``scorename`` is not all ints or floats.
TypeError:
If ``bins`` is not of type `GenomicRanges`.
Returns:
A modified ``bins`` object with the computed averages,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
import statistics
if not isinstance(bins, GenomicRanges):
raise TypeError("'bins' is not a `GenomicRanges` object.")
if scorename not in self._mcols.column_names:
raise ValueError(f"'{scorename}' is not a valid column name")
values = self._mcols.get_column(scorename)
if not all(isinstance(x, (int, float)) for x in values):
raise ValueError(f"'{scorename}' values must be either `ints` or `floats`.")
outvec = []
for _, val in bins:
overlap = self.subset_by_overlaps(query=val)
outvec.append(statistics.mean(overlap._mcols.get_column(scorename)))
output = bins._define_output(in_place=in_place)
output._mcols.set_column(outname, outvec, in_place=True)
return output
#######################
######>> split <<######
#######################
[docs]
def split(self, groups: list) -> "GenomicRangesList":
"""Split the `GenomicRanges` object into a :py:class:`~genomicranges.GenomicRangesList.GenomicRangesList`.
Args:
groups:
A list specifying the groups or factors to split by.
Must have the same length as the number of genomic elements
in the object.
Returns:
A `GenomicRangesList` containing the groups and their
corresponding elements.
"""
if len(groups) != len(self):
raise ValueError("Number of groups must match the number of genomic elements.")
gdict = group_by_indices(groups=groups)
_names = []
_grs = []
for k, v in gdict.items():
_names.append(k)
_grs.append(self[v])
from .GenomicRangesList import GenomicRangesList
return GenomicRangesList(ranges=_grs, names=_names)
#######################
######>> empty <<######
#######################
[docs]
@classmethod
def empty(cls):
"""Create an zero-length `GenomicRanges` object.
Returns:
same type as caller, in this case a `GenomicRanges`.
"""
return cls([], IRanges.empty())
##########################
######>> subtract <<######
##########################
[docs]
def subtract(self, x: "GenomicRanges", min_overlap: int = 1, ignore_strand: bool = False) -> "GenomicRangesList":
"""Subtract searches for features in ``x`` that overlap ``self`` by at least the number of base pairs given by
``min_overlap``.
Args:
x:
Object to subtract.
min_overlap:
Minimum overlap with query.
Defaults to 1.
ignore_strand:
Whether to ignore strands.
Defaults to False.
Returns:
`GenomicRangesList` with the same size as ``self`` containing
the subtracted regions.
"""
_x_reduce = x.reduce(ignore_strand=ignore_strand)
hits = self.find_overlaps(_x_reduce, min_overlap=min_overlap, ignore_strand=ignore_strand)
gr_idxs = [[] for _ in range(len(self))]
for ii, ix in enumerate(hits):
for hix in ix:
gr_idxs[hix].append(ii)
gresults = {}
for idx, iids in enumerate(gr_idxs):
_name = str(idx)
if self._names is not None:
_name = self._names[idx]
if len(iids) > 0:
gresults[_name] = self[idx].setdiff(x[iids])
else:
gresults[_name] = self[idx]
from .GenomicRangesList import GenomicRangesList
return GenomicRangesList.from_dict(gresults)
def _fast_combine_GenomicRanges(*x: GenomicRanges) -> GenomicRanges:
return GenomicRanges(
ranges=ut.combine_sequences(*[y._ranges for y in x]),
seqnames=ut.combine_sequences(*[y.get_seqnames() for y in x]),
strand=ut.combine_sequences(*[y._strand for y in x]),
names=None,
mcols=None,
seqinfo=merge_SeqInfo([y._seqinfo for y in x]),
metadata=None,
validate=False,
)
@ut.combine_sequences.register
def _combine_GenomicRanges(*x: GenomicRanges) -> GenomicRanges:
has_names = False
for y in x:
if y._names is not None:
has_names = True
break
all_names = None
if has_names:
all_names = []
for y in x:
if y._names is not None:
all_names += y._names
else:
all_names += [""] * len(y)
return GenomicRanges(
ranges=ut.combine_sequences(*[y._ranges for y in x]),
seqnames=ut.combine_sequences(*[y.get_seqnames() for y in x]),
strand=ut.combine_sequences(*[y._strand for y in x]),
names=all_names,
mcols=ut.relaxed_combine_rows(*[y._mcols for y in x]),
seqinfo=merge_SeqInfo([y._seqinfo for y in x]),
metadata=x[0]._metadata,
validate=False,
)