from collections import defaultdict
from typing import Dict, List, Literal, Optional, Sequence, Tuple, Union
from warnings import warn
import biocutils as ut
import numpy as np
from biocframe import BiocFrame
from iranges import IRanges
from .SeqInfo import SeqInfo, merge_SeqInfo
from .utils import (
compute_up_down,
group_by_indices,
sanitize_strand_vector,
)
__author__ = "jkanche"
__copyright__ = "jkanche"
__license__ = "MIT"
_granges_delim = "__"
def _guess_num_ranges(seqnames, ranges):
if len(seqnames) != len(ranges):
raise ValueError("Number of 'seqnames' and 'ranges' do not match!")
return len(seqnames)
def _validate_seqnames(seqnames, seqinfo, num_ranges):
if seqnames is None:
raise ValueError("'seqnames' cannot be None!")
if len(seqnames) != num_ranges:
raise ValueError(
"Length of 'seqnames' does not match the number of ranges.",
f"Need to be {num_ranges}, provided {len(seqnames)}.",
)
if np.isnan(seqnames).any():
raise ValueError("'seqnames' cannot contain None values.")
if not isinstance(seqinfo, SeqInfo):
raise TypeError("'seqinfo' is not an instance of `SeqInfo` class.")
_l = len(seqinfo)
if (seqnames > _l).any():
raise ValueError("'seqnames' contains sequence name not represented in 'seqinfo'.")
def _validate_ranges(ranges, num_ranges):
if ranges is None:
raise ValueError("'ranges' cannot be None.")
if not isinstance(ranges, IRanges):
raise TypeError("'ranges' is not an instance of `IRanges`.")
if len(ranges) != num_ranges:
raise ValueError(
"Length of 'ranges' does not match the number of seqnames.",
f"Need to be {num_ranges}, provided {len(ranges)}.",
)
def _validate_optional_attrs(strand, mcols, names, num_ranges):
if strand is not None:
if len(strand) != num_ranges:
raise ValueError("Length of 'strand' does not match the number of ranges.")
if np.isnan(strand).any():
raise ValueError("'strand' cannot contain None values.")
if mcols is not None:
if not isinstance(mcols, BiocFrame):
raise TypeError("'mcols' is not a `BiocFrame` object.")
if mcols.shape[0] != num_ranges:
raise ValueError("Length of 'mcols' does not match the number of ranges.")
if names is not None:
if len(names) != num_ranges:
raise ValueError("Length of 'names' does not match the number of ranges.")
if any(x is None for x in names):
raise ValueError("'names' cannot contain None values.")
[docs]
class GenomicRangesIter:
"""An iterator to a :py:class:`~GenomicRanges` object."""
[docs]
def __init__(self, obj: "GenomicRanges") -> None:
"""Initialize the iterator.
Args:
obj:
Source object to iterate.
"""
self._gr = obj
self._current_index = 0
[docs]
def __iter__(self):
return self
[docs]
def __next__(self):
if self._current_index < len(self._gr):
iter_row_index = self._gr.names[self._current_index] if self._gr.names is not None else None
iter_slice = self._gr[self._current_index]
self._current_index += 1
return (iter_row_index, iter_slice)
raise StopIteration
[docs]
class GenomicRanges:
"""``GenomicRanges`` provides a container class to represent and operate over genomic regions and annotations.
Note: The documentation for some of the methods are derived from the
`GenomicRanges R/Bioconductor package <https://github.com/Bioconductor/GenomicRanges>`_.
"""
[docs]
def __init__(
self,
seqnames: Sequence[str],
ranges: IRanges,
strand: Optional[Union[Sequence[str], Sequence[int], np.ndarray]] = None,
names: Optional[Sequence[str]] = None,
mcols: Optional[BiocFrame] = None,
seqinfo: Optional[SeqInfo] = None,
metadata: Optional[dict] = None,
validate: bool = True,
):
"""Initialize a ``GenomicRanges`` object.
Args:
seqnames:
List of sequence or chromosome names.
ranges:
Genomic positions and widths of each position. Must have the same length as ``seqnames``.
strand:
Strand information for each genomic range. This should be 0 (any strand),
1 (forward strand) or -1 (reverse strand). If None, all genomic ranges
are assumed to be 0 (any strand).
May be provided as a list of strings representing the strand;
"+" for forward strand, "-" for reverse strand, or "*" for any strand and will be mapped
to 1, -1 or 0 respectively.
names:
Names for each genomic range.
Defaults to None, which means the ranges are unnamed.
mcols:
A ~py:class:`~biocframe.BiocFrame.BiocFrame` with the number of rows same as
number of genomic ranges, containing per-range annotation.
Defaults to None, in which case an empty BiocFrame object is created.
seqinfo:
Sequence information. Defaults to None, in which case a
:py:class:`~genomicranges.SeqInfo.SeqInfo` object is created with the unique set of
chromosome names from ``seqnames``.
metadata:
Additional metadata.
Defaults to None, and is assigned to an empty dictionary.
validate:
Internal use only.
"""
if seqinfo is None:
seqinfo = SeqInfo(seqnames=sorted(list(set(seqnames))))
self._seqinfo = seqinfo
self._reverse_seqindex = None
self._seqnames = self._sanitize_seqnames(seqnames, self._seqinfo)
self._ranges = ranges
if strand is None:
strand = np.repeat(0, len(self._seqnames))
self._strand = np.asarray(strand, dtype=np.int8)
else:
self._strand = sanitize_strand_vector(strand)
if names is not None and not isinstance(names, ut.Names):
names = ut.Names(names)
self._names = names
if mcols is None:
mcols = BiocFrame({}, number_of_rows=len(self._seqnames))
self._mcols = mcols
self._metadata = metadata if metadata is not None else {}
if validate is True:
_num_ranges = _guess_num_ranges(self._seqnames, self._ranges)
_validate_ranges(self._ranges, _num_ranges)
_validate_seqnames(self._seqnames, self._seqinfo, _num_ranges)
_validate_optional_attrs(self._strand, self._mcols, self._names, _num_ranges)
def _build_reverse_seqindex(self, seqinfo: SeqInfo):
self._reverse_seqindex = ut.reverse_index.build_reverse_index(seqinfo.get_seqnames())
def _remove_reverse_seqindex(self):
del self._reverse_seqindex
def _sanitize_seqnames(self, seqnames, seqinfo):
if self._reverse_seqindex is None:
self._build_reverse_seqindex(seqinfo)
if not isinstance(seqnames, np.ndarray):
seqnames = np.asarray([self._reverse_seqindex[x] for x in seqnames])
if len(seqnames) == 0:
seqnames = seqnames.astype(np.uint8)
else:
num_uniq = np.max(seqnames)
_types = [np.uint8, np.uint16, np.uint32, np.uint64]
for _dtype in _types:
if num_uniq < np.iinfo(_dtype).max:
seqnames = seqnames.astype(_dtype)
break
return seqnames
def _define_output(self, in_place: bool = False) -> "GenomicRanges":
if in_place is True:
return self
else:
return self.__copy__()
#########################
######>> Copying <<######
#########################
[docs]
def __deepcopy__(self, memo=None, _nil=[]):
"""
Returns:
A deep copy of the current ``GenomicRanges``.
"""
from copy import deepcopy
_ranges_copy = deepcopy(self._ranges)
_seqnames_copy = deepcopy(self._seqnames)
_strand_copy = deepcopy(self._strand)
_names_copy = deepcopy(self._names)
_mcols_copy = deepcopy(self._mcols)
_seqinfo_copy = deepcopy(self._seqinfo)
_metadata_copy = deepcopy(self.metadata)
current_class_const = type(self)
return current_class_const(
seqnames=_seqnames_copy,
ranges=_ranges_copy,
strand=_strand_copy,
names=_names_copy,
mcols=_mcols_copy,
seqinfo=_seqinfo_copy,
metadata=_metadata_copy,
)
[docs]
def __copy__(self):
"""
Returns:
A shallow copy of the current ``GenomicRanges``.
"""
current_class_const = type(self)
return current_class_const(
seqnames=self._seqnames,
ranges=self._ranges,
strand=self._strand,
names=self._names,
mcols=self._mcols,
seqinfo=self._seqinfo,
metadata=self._metadata,
)
[docs]
def copy(self):
"""Alias for :py:meth:`~__copy__`."""
return self.__copy__()
######################################
######>> length and iterators <<######
######################################
[docs]
def __len__(self) -> int:
"""
Returns:
Number of rows.
"""
return len(self._ranges)
[docs]
def __iter__(self) -> GenomicRangesIter:
"""Iterator over rows."""
return GenomicRangesIter(self)
##########################
######>> Printing <<######
##########################
[docs]
def __repr__(self) -> str:
"""
Returns:
A string representation of this ``GenomicRanges``.
"""
output = "GenomicRanges(number_of_ranges=" + str(len(self))
output += ", seqnames=" + ut.print_truncated_list(self._seqnames)
output += ", ranges=" + repr(self._ranges)
if self._strand is not None:
output += ", strand=" + ut.print_truncated_list(self._strand)
if self._names is not None:
output += ", names=" + ut.print_truncated_list(self._names)
if self._mcols is not None:
output += ", mcols=" + repr(self._mcols)
if self._seqinfo is not None:
output += ", seqinfo" + repr(self._seqinfo)
if len(self._metadata) > 0:
output += ", metadata=" + ut.print_truncated_dict(self._metadata)
output += ")"
return output
def __str__(self) -> str:
"""
Returns:
A pretty-printed string containing the contents of this ``GenomicRanges``.
"""
output = f"GenomicRanges with {len(self)} range{'s' if len(self) != 1 else ''}"
output += f" and {len(self._mcols.get_column_names())} metadata column{'s' if len(self._mcols.get_column_names()) != 1 else ''}\n"
nr = len(self)
added_table = False
if nr:
if nr <= 10:
indices = range(nr)
insert_ellipsis = False
else:
indices = [0, 1, 2, nr - 3, nr - 2, nr - 1]
insert_ellipsis = True
raw_floating = ut.create_floating_names(self._names, indices)
if insert_ellipsis:
raw_floating = raw_floating[:3] + [""] + raw_floating[3:]
floating = ["", ""] + raw_floating
columns = []
header = ["seqnames", "<str>"]
_seqnames = []
for x in self._seqnames[indices]:
_seqnames.append(self._seqinfo.get_seqnames()[x])
showed = _seqnames
if insert_ellipsis:
showed = showed[:3] + ["..."] + showed[3:]
columns.append(header + showed)
header = ["ranges", "<IRanges>"]
showed = [f"{x._start[0]} - {x.end[0]}" for _, x in self._ranges[indices]]
if insert_ellipsis:
showed = showed[:3] + ["..."] + showed[3:]
columns.append(header + showed)
header = ["strand", f"<{ut.print_type(self._strand)}>"]
_strand = []
for x in self._strand[indices]:
if x == 1:
_strand.append("+")
elif x == -1:
_strand.append("-")
else:
_strand.append("*")
showed = _strand
if insert_ellipsis:
showed = showed[:3] + ["..."] + showed[3:]
columns.append(header + showed)
if self._mcols.shape[1] > 0:
spacer = ["|"] * (len(indices) + insert_ellipsis)
columns.append(["", ""] + spacer)
for col in self._mcols.get_column_names():
data = self._mcols.column(col)
showed = ut.show_as_cell(data, indices)
header = [col, "<" + ut.print_type(data) + ">"]
showed = ut.truncate_strings(showed, width=max(40, len(header[0]), len(header[1])))
if insert_ellipsis:
showed = showed[:3] + ["..."] + showed[3:]
columns.append(header + showed)
output += ut.print_wrapped_table(columns, floating_names=floating)
added_table = True
footer = []
if self._seqinfo is not None and len(self._seqinfo):
footer.append(
"seqinfo("
+ str(len(self._seqinfo))
+ " sequences): "
+ ut.print_truncated_list(
self._seqinfo.get_seqnames(),
sep=" ",
include_brackets=False,
transform=lambda y: y,
)
)
if len(self._metadata):
footer.append(
"metadata("
+ str(len(self.metadata))
+ "): "
+ ut.print_truncated_list(
list(self.metadata.keys()),
sep=" ",
include_brackets=False,
transform=lambda y: y,
)
)
if len(footer):
if added_table:
output += "\n------\n"
output += "\n".join(footer)
return output
##########################
######>> seqnames <<######
##########################
[docs]
def get_seqnames(self, as_type: Literal["factor", "list"] = "list") -> Union[ut.Factor, List[str]]:
"""Access sequence names.
Args:
as_type:
Access seqnames as factor tuple, in which case, levels and codes
are returned.
If ``list``, then codes are mapped to levels and returned.
Returns:
A :py:class:`biocutils.Factor` if `as_type="factor"`.
Otherwise a list of sequence names.
"""
if as_type == "factor":
return ut.Factor(codes=self._seqnames, levels=self._seqinfo.get_seqnames())
elif as_type == "list":
return [self._seqinfo.get_seqnames()[x] for x in self._seqnames]
else:
raise ValueError("Argument 'as_type' must be 'factor' or 'list'.")
[docs]
def set_seqnames(self, seqnames: Union[Sequence[str], np.ndarray], in_place: bool = False) -> "GenomicRanges":
"""Set new sequence names.
Args:
seqnames:
List of sequence or chromosome names.
Optionally can be a numpy array with indices mapped
to :py:attr:``~seqinfo``.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
_validate_seqnames(seqnames, len(self))
if not isinstance(seqnames, np.ndarray):
seqnames = np.asarray([self._seqinfo.get_seqnames().index(x) for x in list(seqnames)])
output = self._define_output(in_place)
output._seqnames = seqnames
return output
@property
def seqnames(self) -> Union[Union[np.ndarray, List[str]], np.ndarray]:
"""Alias for :py:meth:`~get_seqnames`."""
return self.get_seqnames()
@seqnames.setter
def seqnames(self, seqnames: Union[Sequence[str], np.ndarray]):
"""Alias for :py:meth:`~set_seqnames` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'seqnames' is an in-place operation, use 'set_seqnames' instead",
UserWarning,
)
self.set_seqnames(seqnames, in_place=True)
########################
######>> ranges <<######
########################
[docs]
def get_ranges(self) -> IRanges:
"""
Returns:
An ``IRanges`` object containing the positions.
"""
return self._ranges
[docs]
def set_ranges(self, ranges: IRanges, in_place: bool = False) -> "GenomicRanges":
"""Set new ranges.
Args:
ranges:
IRanges containing the genomic positions and widths.
Must have the same length as ``seqnames``.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
_validate_ranges(ranges, len(self))
output = self._define_output(in_place)
output._ranges = ranges
return output
@property
def ranges(self) -> IRanges:
"""Alias for :py:meth:`~get_ranges`."""
return self.get_ranges()
@ranges.setter
def ranges(self, ranges: IRanges):
"""Alias for :py:meth:`~set_ranges` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'ranges' is an in-place operation, use 'set_ranges' instead",
UserWarning,
)
self.set_ranges(ranges, in_place=True)
########################
######>> strand <<######
########################
[docs]
def get_strand(
self, as_type: Literal["numpy", "factor", "list"] = "numpy"
) -> Union[Tuple[np.ndarray, dict], List[str]]:
"""Access strand information.
Args:
as_type:
Access seqnames as factor codes, in which case, a numpy
vector is retuned.
If ``factor``, a tuple with codes as the strand vector
and levels a dictionary containing the mapping.
If ``list``, then codes are mapped to levels and returned.
Returns:
A numpy vector representing strand, 0
for any strand, -1 for reverse strand
and 1 for forward strand.
A tuple of codes and levels.
A list of "+", "-", or "*" for each range.
"""
if as_type == "numpy":
return self._strand
elif as_type == "factor":
return self._strand, {"-1": "-", "0": "*", "1": "+"}
elif as_type == "list":
_strand = []
for x in self._strand:
if x == 1:
_strand.append("+")
elif x == -1:
_strand.append("-")
else:
_strand.append("*")
return _strand
else:
raise ValueError("Argument 'as_type' must be 'factor' or 'list'.")
[docs]
def set_strand(
self, strand: Optional[Union[Sequence[str], Sequence[int], np.ndarray]], in_place: bool = False
) -> "GenomicRanges":
"""Set new strand information.
Args:
strand:
Strand information for each genomic range. This should be 0 (any strand),
1 (forward strand) or -1 (reverse strand).
Alternatively, may provide a list of strings representing the strand;
"+" for forward strand, "-" for reverse strand, or "*" for any strand
and will be automatically mapped to 1, -1 or 0 respectively.
May be set to None; in which case all genomic ranges are assumed to be 0.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
if strand is None:
strand = np.repeat(0, len(self))
strand = sanitize_strand_vector(strand)
_validate_optional_attrs(strand, None, None, len(self))
output = self._define_output(in_place)
output._strand = strand
return output
@property
def strand(self) -> np.ndarray:
"""Alias for :py:meth:`~get_strand`."""
return self.get_strand()
@strand.setter
def strand(self, strand: Optional[Union[Sequence[str], Sequence[int], np.ndarray]]):
"""Alias for :py:meth:`~set_strand` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'strand' is an in-place operation, use 'set_strand' instead",
UserWarning,
)
self.set_strand(strand, in_place=True)
########################
######>> names <<#######
########################
[docs]
def get_names(self) -> ut.Names:
"""
Returns:
A list of names for each genomic range.
"""
return self._names
[docs]
def set_names(self, names: Optional[Sequence[str]], in_place: bool = False) -> "GenomicRanges":
"""Set new names.
Args:
names:
Names for each genomic range.
May be `None` to remove names.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
if names is not None:
_validate_optional_attrs(None, None, names, len(self))
if not isinstance(names, ut.Names):
names = ut.Names(names)
output = self._define_output(in_place)
output._names = names
return output
@property
def names(self) -> ut.Names:
"""Alias for :py:meth:`~get_names`."""
return self.get_names()
@names.setter
def names(self, names: Optional[Sequence[str]]):
"""Alias for :py:meth:`~set_names` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'names' is an in-place operation, use 'set_names' instead",
UserWarning,
)
self.set_names(names, in_place=True)
########################
######>> mcols <<#######
########################
[docs]
def get_mcols(self) -> BiocFrame:
"""
Returns:
A ~py:class:`~biocframe.BiocFrame.BiocFrame` containing per-range annotations.
"""
return self._mcols
[docs]
def set_mcols(self, mcols: Optional[BiocFrame], in_place: bool = False) -> "GenomicRanges":
"""Set new range metadata.
Args:
mcols:
A ~py:class:`~biocframe.BiocFrame.BiocFrame` with length same as the number
of ranges, containing per-range annotations.
May be None to remove range metadata.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
if mcols is None:
mcols = BiocFrame({}, number_of_rows=len(self))
_validate_optional_attrs(None, mcols, None, len(self))
output = self._define_output(in_place)
output._mcols = mcols
return output
@property
def mcols(self) -> BiocFrame:
"""Alias for :py:meth:`~get_mcols`."""
return self.get_mcols()
@mcols.setter
def mcols(self, mcols: Optional[BiocFrame]):
"""Alias for :py:meth:`~set_mcols` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'mcols' is an in-place operation, use 'set_mcols' instead",
UserWarning,
)
self.set_mcols(mcols, in_place=True)
##########################
######>> seqinfo <<#######
##########################
[docs]
def get_seqinfo(self) -> SeqInfo:
"""
Returns:
A ~py:class:`~genomicranges.SeqInfo.SeqInfo` containing sequence information.
"""
return self._seqinfo
[docs]
def set_seqinfo(self, seqinfo: Optional[SeqInfo], in_place: bool = False) -> "GenomicRanges":
"""Set new sequence information.
Args:
seqinfo:
A ~py:class:`~genomicranges.SeqInfo.SeqInfo` object containing information
about sequences in :py:attr:`~seqnames`.
May be None to remove sequence information. This would then generate a
new sequence information based on the current sequence names.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
seqnames = self.get_seqnames(as_type="list")
if seqinfo is None:
seqinfo = SeqInfo(seqnames=seqnames)
self._reverse_seqindex = None
new_seqnames = self._sanitize_seqnames(seqnames, seqinfo)
_validate_seqnames(new_seqnames, seqinfo, len(self))
output = self._define_output(in_place)
output._seqinfo = seqinfo
output._seqnames = new_seqnames
return output
@property
def seqinfo(self) -> np.ndarray:
"""Alias for :py:meth:`~get_seqinfo`."""
return self.get_seqinfo()
@seqinfo.setter
def seqinfo(
self,
seqinfo: Optional[SeqInfo],
):
"""Alias for :py:meth:`~set_seqinfo` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'seqinfo' is an in-place operation, use 'set_seqinfo' instead",
UserWarning,
)
self.set_seqinfo(seqinfo, in_place=True)
###########################
######>> metadata <<#######
###########################
@property
def metadata(self) -> dict:
"""Alias for :py:attr:`~get_metadata`."""
return self.get_metadata()
@metadata.setter
def metadata(self, metadata: dict):
"""Alias for :py:attr:`~set_metadata` with ``in_place = True``.
As this mutates the original object, a warning is raised.
"""
warn(
"Setting property 'metadata' is an in-place operation, use 'set_metadata' instead",
UserWarning,
)
self.set_metadata(metadata, in_place=True)
################################
######>> Single getters <<######
################################
[docs]
def get_start(self) -> np.ndarray:
"""Get all start positions.
Returns:
NumPy array of 32-bit signed integers containing the start
positions for all ranges.
"""
return self._ranges.get_start()
@property
def start(self) -> np.ndarray:
"""Alias for :py:attr:`~get_start`."""
return self.get_start()
[docs]
def get_end(self) -> np.ndarray:
"""Get all end positions.
Returns:
NumPy array of 32-bit signed integers containing the end
positions for all ranges.
"""
return self._ranges.get_end()
@property
def end(self) -> np.ndarray:
"""Alias for :py:attr:`~get_end`."""
return self.get_end()
[docs]
def get_width(self) -> np.ndarray:
"""Get width of each genomic range.
Returns:
NumPy array of 32-bit signed integers containing the width
for all ranges.
"""
return self._ranges.get_width()
@property
def width(self) -> np.ndarray:
"""Alias for :py:attr:`~get_width`."""
return self.get_width()
[docs]
def get_seqlengths(self) -> np.ndarray:
"""Get sequence lengths for each genomic range.
Returns:
An ndarray containing the sequence lengths.
"""
seqlenths = self._seqinfo.get_seqlengths()
return np.asarray([seqlenths[x] for x in self._seqnames])
#########################
######>> Slicers <<######
#########################
[docs]
def get_subset(self, subset: Union[str, int, bool, Sequence]) -> "GenomicRanges":
"""Subset ``GenomicRanges``, based on their indices or names.
Args:
subset:
Indices to be extracted. This may be an integer, boolean, string,
or any sequence thereof, as supported by
:py:meth:`~biocutils.normalize_subscript.normalize_subscript`.
Scalars are treated as length-1 sequences.
Strings may only be used if :py:attr:``~names`` are available (see
:py:meth:`~get_names`). The first occurrence of each string
in the names is used for extraction.
Returns:
A new ``GenomicRanges`` object with the ranges of interest.
"""
if len(self) == 0:
return GenomicRanges.empty()
idx, _ = ut.normalize_subscript(subset, len(self), self._names)
current_class_const = type(self)
return current_class_const(
seqnames=ut.subset_sequence(self._seqnames, idx),
ranges=ut.subset_sequence(self._ranges, idx),
strand=ut.subset_sequence(self._strand, idx),
names=ut.subset_sequence(self._names, idx) if self._names is not None else None,
mcols=ut.subset(self._mcols, idx),
seqinfo=self._seqinfo,
metadata=self._metadata,
)
[docs]
def __getitem__(self, subset: Union[str, int, bool, Sequence]) -> "GenomicRanges":
"""Alias to :py:attr:`~get_subset`."""
return self.get_subset(subset)
[docs]
def set_subset(
self,
args: Union[Sequence, int, str, bool, slice, range],
value: "GenomicRanges",
in_place: bool = False,
) -> "GenomicRanges":
"""Udate positions.
Args:
args:
Integer indices, a boolean filter, or (if the current object is
named) names specifying the ranges to be replaced, see
:py:meth:`~biocutils.normalize_subscript.normalize_subscript`.
value:
An ``GenomicRanges`` object of length equal to the number of ranges
to be replaced, as specified by ``subset``.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object, either as a copy of the original
or as a reference to the (in-place-modified) original.
"""
idx, _ = ut.normalize_subscript(args, len(self), self._names)
output = self._define_output(in_place)
output._seqnames[idx] = value._seqnames
output._ranges[idx] = value._ranges
output._strand[idx] = value._strand
if value._names is not None:
if output._names is None:
output._names = [""] * len(output)
for i, j in enumerate(idx):
output._names[j] = value._names[i]
elif output._names is not None:
for i, j in enumerate(idx):
output._names[j] = ""
if value._mcols is not None:
output._mcols[idx, :] = value._mcols
return output
[docs]
def __setitem__(
self,
args: Union[Sequence, int, str, bool, slice, range],
value: "GenomicRanges",
) -> "GenomicRanges":
"""Alias to :py:attr:`~set_subset`.
This operation modifies object in-place.
"""
warn(
"Modifying a subset of the object is an in-place operation, use 'set_subset' instead",
UserWarning,
)
return self.set_subset(args, value, in_place=True)
################################
######>> pandas interop <<######
################################
[docs]
def to_pandas(self):
"""Convert this ``GenomicRanges`` to a :py:class:`~pandas.DataFrame` object.
Returns:
A :py:class:`~pandas.DataFrame` object.
"""
import pandas as pd
_rdf = self._ranges.to_pandas()
_rdf["seqnames"] = self.get_seqnames()
_rdf["strand"] = self.get_strand(as_type="list")
if self._names is not None:
_rdf.index = self._names
if self._mcols is not None:
if self._mcols.shape[1] > 0:
_rdf = pd.concat([_rdf, self._mcols.to_pandas()], axis=1)
return _rdf
[docs]
@classmethod
def from_pandas(cls, input) -> "GenomicRanges":
"""Create an ``GenomicRanges`` object from a :py:class:`~pandas.DataFrame`.
Args:
input:
Input data. Must contain columns 'seqnames', 'starts' and 'widths' or "ends".
Returns:
A ``GenomicRanges`` object.
"""
from pandas import DataFrame
if not isinstance(input, DataFrame):
raise TypeError("`input` is not a pandas `DataFrame` object.")
if "starts" not in input.columns:
raise ValueError("'input' must contain column 'starts'.")
start = input["starts"].tolist()
if "widths" not in input.columns and "ends" not in input.columns:
raise ValueError("'input' must contain either 'widths' or 'ends' columns.")
drops = []
if "widths" in input.columns:
drops.append("widths")
width = input["widths"].tolist()
else:
drops.append("ends")
width = input["ends"] - input["starts"] + 1
if "seqnames" not in input.columns:
raise ValueError("'input' must contain column 'seqnames'.")
seqnames = input["seqnames"].tolist()
ranges = IRanges(start, width)
strand = None
if "strand" in input.columns:
strand = input["strand"].tolist()
drops.append("strand")
# mcols
drops.extend(["starts", "seqnames"])
mcols_df = input.drop(columns=drops)
mcols = None
if (not mcols_df.empty) or len(mcols_df.columns) > 0:
mcols = BiocFrame.from_pandas(mcols_df)
names = None
if input.index is not None:
names = [str(i) for i in input.index.to_list()]
return cls(ranges=ranges, seqnames=seqnames, strand=strand, names=names, mcols=mcols)
################################
######>> polars interop <<######
################################
[docs]
def to_polars(self):
"""Convert this ``GenomicRanges`` to a :py:class:`~polars.DataFrame` object.
Returns:
A :py:class:`~polars.DataFrame` object.
"""
import polars as pl
_rdf = self._ranges.to_polars()
_rdf = _rdf.with_columns(seqnames=self.get_seqnames(), strand=self.get_strand(as_type="list"))
if self._names is not None:
_rdf = _rdf.with_columns(rownames=self._names)
if self._mcols is not None:
if self._mcols.shape[1] > 0:
_rdf = pl.concat([_rdf, self._mcols.to_polars()], how="horizontal")
return _rdf
[docs]
@classmethod
def from_polars(cls, input) -> "GenomicRanges":
"""Create an ``GenomicRanges`` object from a :py:class:`~polars.DataFrame`.
Args:
input:
Input polars DataFrame. Must contain columns 'seqnames', 'starts' and 'widths' or "ends".
Returns:
A ``GenomicRanges`` object.
"""
from polars import DataFrame
if not isinstance(input, DataFrame):
raise TypeError("`input` is not a polars `DataFrame` object.")
if "starts" not in input.columns:
raise ValueError("'input' must contain column 'starts'.")
start = input["starts"].to_list()
if "widths" not in input.columns and "ends" not in input.columns:
raise ValueError("'input' must contain either 'widths' or 'ends' columns.")
drops = []
if "widths" in input.columns:
drops.append("widths")
width = input["widths"].to_list()
else:
drops.append("ends")
width = input["ends"] - input["starts"] + 1
if "seqnames" not in input.columns:
raise ValueError("'input' must contain column 'seqnames'.")
seqnames = input["seqnames"].to_list()
ranges = IRanges(start, width)
strand = None
if "strand" in input.columns:
strand = input["strand"].to_list()
drops.append("strand")
# mcols
drops.extend(["starts", "seqnames"])
mcols_df = input.drop(drops)
mcols = None
if (not mcols_df.is_empty()) or len(mcols_df.columns) > 0:
mcols = BiocFrame.from_polars(mcols_df)
names = None
return cls(ranges=ranges, seqnames=seqnames, strand=strand, names=names, mcols=mcols)
#####################################
######>> intra-range methods <<######
#####################################
[docs]
def flank(
self,
width: int,
start: Union[bool, np.ndarray, List[bool]] = True,
both: bool = False,
ignore_strand: bool = False,
in_place: bool = False,
) -> "GenomicRanges":
"""Compute flanking ranges for each range. The logic for this comes from the `R/GenomicRanges` & `IRanges`
packages.
If ``start`` is ``True`` for a given range, the flanking occurs at the `start`,
otherwise the `end`.
The `widths` of the flanks are given by the ``width`` parameter.
``width`` can be negative, in which case the flanking region is
reversed so that it represents a prefix or suffix of the range.
Usage:
`gr.flank(3, True)`, where "x" indicates a range in ``gr`` and "-" indicates the
resulting flanking region:
---xxxxxxx
If ``start`` were ``False``, the range in ``gr`` becomes
xxxxxxx---
For negative width, i.e. `gr.flank(x, -3, FALSE)`, where "*" indicates the overlap
between "x" and the result:
xxxx***
If ``both`` is ``True``, then, for all ranges in "x", the flanking regions are
extended into (or out of, if ``width`` is negative) the range, so that the result
straddles the given endpoint and has twice the width given by width.
This is illustrated below for `gr.flank(3, both=TRUE)`:
---***xxxx
Args:
width:
Width to flank by. May be negative.
start:
Whether to only flank starts.
Defaults to True.
Alternatively, you may provide a list of start values,
whose length is the same as the number of ranges.
both:
Whether to flank both starts and ends. Defaults to False.
ignore_strand:
Whether to ignore strands. Defaults to False.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object with the flanked regions,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
if not isinstance(ignore_strand, bool):
raise TypeError("'ignore_strand' must be True or False.")
output = self._define_output(in_place)
if isinstance(start, bool):
start_arr = np.full(len(output), start, dtype=bool)
else:
start_arr = np.asarray(start, dtype=bool)
if len(start_arr) != len(output):
start_arr = np.resize(start_arr, len(output)) # may be throw an error?
if not ignore_strand:
start_arr = np.asarray(start_arr != (output.get_strand() == -1))
new_ranges = output._ranges.flank(width=width, start=start_arr, both=both, in_place=False)
output._ranges = new_ranges
return output
[docs]
def resize(
self,
width: Union[int, List[int], np.ndarray],
fix: Union[Literal["start", "end", "center"], List[Literal["start", "end", "center"]]] = "start",
ignore_strand: bool = False,
in_place: bool = False,
) -> "GenomicRanges":
"""Resize ranges to the specified ``width`` where either the ``start``, ``end``, or ``center`` is used as an
anchor.
Args:
width:
Width to resize, cannot be negative!
fix:
Fix positions by "start", "end", or "center".
Defaults to "start".
Alternatively, may provide a list of fix positions
for each genomic range.
ignore_strand:
Whether to ignore strands. Defaults to False.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Raises:
ValueError:
If parameter ``fix`` is neither `start`, `end`, nor `center`.
If ``width`` is negative.
Returns:
A modified ``GenomicRanges`` object with the resized regions,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
_REV_FIX = {"start": "end", "end": "start", "center": "center"}
if not isinstance(ignore_strand, bool):
raise TypeError("'ignore_strand' must be True or False.")
output = self._define_output(in_place)
if isinstance(fix, str):
fix_arr = [fix] * len(output)
else:
fix_arr = fix
if len(output) > 0 and (len(fix_arr) > len(output) or len(output) % len(fix_arr) != 0):
raise ValueError("Number of ranges is not a multiple of 'fix' length")
if ignore_strand:
fix_arr = [fix_arr[i % len(fix_arr)] for i in range(len(output))]
else:
if len(output) == 0:
fix_arr = []
else:
fix_arr = [_REV_FIX[f] if strand == -1 else f for f, strand in zip(fix_arr, output.get_strand())]
output._ranges = self._ranges.resize(width=width, fix=fix_arr)
return output
[docs]
def shift(self, shift: Union[int, List[int], np.ndarray] = 0, in_place: bool = False) -> "GenomicRanges":
"""Shift all intervals.
Args:
shift:
Shift interval. If shift is 0, the current
object is returned. Defaults to 0.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object with the shifted regions,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
output = self._define_output(in_place)
output._ranges = self._ranges.shift(shift=shift)
return output
[docs]
def terminators(self, upstream: int = 2000, downstream: int = 200, in_place: bool = False) -> "GenomicRanges":
"""Extend ranges to termiantor regions.
Generates terminator ranges relative to the transcription end site (TES).
Args:
upstream:
Number of positions to extend in the 5' direction.
Defaults to 2000.
downstream:
Number of positions to extend in the 3' direction.
Defaults to 200.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object with the promoter regions,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
output = self._define_output(in_place)
new_starts, new_widths = compute_up_down(
output.get_start(), output.get_end(), output.get_strand(), upstream, downstream, site="TES"
)
output._ranges = IRanges(start=new_starts, width=new_widths)
return output
[docs]
def restrict(
self,
start: Optional[Union[int, Dict[str, int], np.ndarray]] = None,
end: Optional[Union[int, Dict[str, int], np.ndarray]] = None,
keep_all_ranges: bool = False,
) -> "GenomicRanges":
"""Restrict ranges to a given start and end positions.
Args:
start:
Start position. Defaults to None.
Alternatively may provide a dictionary
mapping sequence names to starts, or array of starts.
end:
End position. Defaults to None.
Alternatively may provide a dictionary
mapping sequence names to starts, or array of starts.
keep_all_ranges:
Whether to keep intervals that do not overlap with start and end.
Defaults to False.
Returns:
A new ``GenomicRanges`` object with the restricted regions.
"""
start_is_dict = isinstance(start, dict)
end_is_dict = isinstance(end, dict)
if not (start_is_dict or end_is_dict):
# directly restrict ranges
new_ranges = self._ranges.restrict(start=start, end=end, keep_all_ranges=keep_all_ranges)
new_seqnames = self._seqnames
new_strand = self.strand
new_names = self._names
new_mcols = self._mcols
# Get indices of kept ranges for filtering seqnames and strand
if not keep_all_ranges:
keep_idx = new_ranges.get_mcols().get_column("revmap")
new_seqnames = ut.subset_sequence(self._seqnames, keep_idx)
new_strand = ut.subset_sequence(self._strand, keep_idx)
new_names = ut.subset_sequence(self._names, keep_idx) if self._names is not None else None
new_mcols = ut.subset(self._mcols, keep_idx)
return GenomicRanges(
seqnames=new_seqnames,
ranges=new_ranges,
strand=new_strand,
names=new_names,
mcols=new_mcols,
seqinfo=self._seqinfo,
metadata=self._metadata,
)
if start is None:
start = {}
elif not start_is_dict:
start = {seq: start for seq in np.unique(self._seqinfo._seqnames)}
if end is None:
end = {}
elif not end_is_dict:
end = {seq: end for seq in np.unique(self._seqinfo._seqnames)}
split_indices = defaultdict(list)
seqnames = self._seqnames
for i, seq in enumerate(seqnames):
split_indices[seq].append(i)
all_indices = []
new_granges_list = []
for seq in np.unique(seqnames):
idx = split_indices[seq]
seq_ranges = ut.subset_sequence(self._ranges, idx)
seq_seqnames = ut.subset_sequence(seqnames, idx)
seq_strand = ut.subset_sequence(self._strand, idx)
seq_names = ut.subset_sequence(self._names, idx) if self._names is not None else None
seq_mcols = ut.subset(self._mcols, idx)
seq_start = start.get(self._seqinfo._seqnames[seq], None)
seq_end = end.get(self._seqinfo._seqnames[seq], None)
new_seq_ranges = seq_ranges.restrict(start=seq_start, end=seq_end, keep_all_ranges=keep_all_ranges)
# Only keep metadata for non-dropped ranges; IRanges restrict returns a revmap
if not keep_all_ranges:
keep_idx = new_seq_ranges.get_mcols().get_column("revmap")
new_seqnames = ut.subset_sequence(self._seqnames, keep_idx)
new_strand = ut.subset_sequence(self._strand, keep_idx)
new_names = ut.subset_sequence(self._names, keep_idx) if self._names is not None else None
new_mcols = ut.subset(self._mcols, keep_idx)
all_indices.extend([idx[i] for i, k in enumerate(keep_idx)])
else:
new_seqnames = seq_seqnames
new_strand = seq_strand
new_names = seq_names
new_mcols = seq_mcols
all_indices.extend(idx)
new_granges_list.append(
GenomicRanges(
seqnames=new_seqnames,
ranges=new_seq_ranges,
strand=new_strand,
names=new_names,
mcols=new_mcols,
seqinfo=self._seqinfo,
metadata=self._metadata,
)
)
combined_granges = _combine_GenomicRanges(*new_granges_list)
order = np.argsort(all_indices)
ordered_granges = combined_granges[order]
return ordered_granges
[docs]
def get_out_of_bound_index(self) -> np.ndarray:
"""Find indices of genomic ranges that are out of bounds.
Returns:
A numpy array of integer indices where ranges are out of bounds.
"""
if len(self) == 0:
return []
# incase it contains NA's
seqlevel_is_circ = [val is True for val in self._seqinfo._is_circular]
seqlength_is_na = [slength is None for slength in self._seqinfo._seqlengths]
seqlevel_has_bounds = [not (circ or is_na) for circ, is_na in zip(seqlevel_is_circ, seqlength_is_na)]
out_of_bounds = []
starts = self.get_start()
ends = self.get_end()
for i, seq_id in enumerate(self._seqnames):
if seqlevel_has_bounds[seq_id] and (starts[i] < 1 or ends[i] > self._seqinfo._seqlengths[seq_id]):
out_of_bounds.append(i)
return np.asarray(out_of_bounds, dtype=np.int32)
[docs]
def trim(self, in_place: bool = False) -> "GenomicRanges":
"""Trim sequences outside of bounds for non-circular chromosomes.
Args:
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A new ``GenomicRanges`` object with the trimmed regions.
"""
if self.seqinfo is None:
raise ValueError("Cannot trim ranges. `seqinfo` is not available.")
seqlengths = self.seqinfo._seqlengths
is_circular = self.seqinfo._is_circular
if seqlengths is None:
raise ValueError("Cannot trim ranges. `seqlengths` is not available.")
if is_circular is None:
warn("considering all sequences as non-circular...")
out_of_bounds = self.get_out_of_bound_index()
output = self._define_output(in_place=in_place)
if len(out_of_bounds) == 0:
return output.__copy__()
filtered_seqs = self._seqnames[out_of_bounds]
new_ends = self.get_seqlengths()[filtered_seqs]
output._ranges[out_of_bounds] = output._ranges[out_of_bounds].restrict(
start=1, end=new_ends, keep_all_ranges=True
)
return output
[docs]
def narrow(
self,
start: Optional[Union[int, List[int], np.ndarray]] = None,
width: Optional[Union[int, List[int], np.ndarray]] = None,
end: Optional[Union[int, List[int], np.ndarray]] = None,
in_place: bool = False,
) -> "GenomicRanges":
"""Narrow genomic positions by provided ``start``, ``width`` and ``end`` parameters.
Important: these parameters are relative shift in positions for each range.
Args:
start:
Relative start position. Defaults to None.
width:
Relative end position. Defaults to None.
end:
Relative width of the interval. Defaults to None.
in_place:
Whether to modify the ``GenomicRanges`` object in place.
Returns:
A modified ``GenomicRanges`` object with the trimmed regions,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
if start is not None and end is not None and width is not None:
raise ValueError("Only provide two of the three parameters - `start`, `end` and `width` but not all!")
if width is not None:
if start is None and end is None:
raise ValueError("If width is provided, either start or end must be provided.")
narrow_ir = self._ranges.narrow(start=start, end=end, width=width)
output = self._define_output(in_place)
output._ranges = narrow_ir
return output
#####################################
######>> inter-range methods <<######
#####################################
def _group_indices_by_chrm(self, ignore_strand: bool = False) -> dict:
__strand = self._strand.copy()
if ignore_strand:
__strand = np.zeros(len(self), dtype=np.int8)
# else:
# __strand[__strand == 0] = 1
_seqnames = [self._seqinfo._seqnames[i] for i in self._seqnames]
grp_keys = np.char.add(np.char.add(_seqnames, f"{_granges_delim}"), __strand.astype(str))
unique_grps, inverse_indices = np.unique(grp_keys, return_inverse=True)
chrm_grps = {str(grp): np.where(inverse_indices == i)[0].tolist() for i, grp in enumerate(unique_grps)}
return chrm_grps
[docs]
def reduce(
self,
with_reverse_map: bool = False,
drop_empty_ranges: bool = False,
min_gap_width: int = 1,
ignore_strand: bool = False,
) -> "GenomicRanges":
"""Reduce orders the ranges, then merges overlapping or adjacent ranges.
Args:
with_reverse_map:
Whether to return map of indices back to
original object. Defaults to False.
drop_empty_ranges:
Whether to drop empty ranges. Defaults to False.
min_gap_width:
Ranges separated by a gap of
at least ``min_gap_width`` positions are not merged. Defaults to 1.
ignore_strand:
Whether to ignore strands. Defaults to False.
Returns:
A new ``GenomicRanges`` object with reduced intervals.
"""
chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
_new_mcols = self._ranges._mcols.set_column("reduceindices", range(len(self)))
_new_ranges = self._ranges.set_mcols(_new_mcols)
_new_self = self.set_ranges(_new_ranges)
all_grp_ranges = []
rev_map = []
groups = []
for seq in _new_self._seqinfo._seqnames:
_iter_strands = [0] if ignore_strand is True else [1, -1, 0]
for strd in _iter_strands:
_key = f"{seq}{_granges_delim}{strd}"
if _key in chrm_grps:
_grp_subset = _new_self[chrm_grps[_key]]
_oindices = _grp_subset._ranges._mcols.get_column("reduceindices")
res_ir = _grp_subset._ranges.reduce(
with_reverse_map=True,
drop_empty_ranges=drop_empty_ranges,
min_gap_width=min_gap_width,
)
groups.extend([_key] * len(res_ir))
all_grp_ranges.append(res_ir)
_rev_map = []
for j in res_ir._mcols.get_column("revmap"):
_rev_map.append([_oindices[x] for x in j])
rev_map.extend(_rev_map)
all_merged_ranges = ut.combine_sequences(*all_grp_ranges)
all_merged_ranges._mcols.remove_column("revmap", in_place=True)
splits = [x.split(_granges_delim) for x in groups]
new_seqnames = [x[0] for x in splits]
new_strand = np.asarray([int(x[1]) for x in splits])
output = GenomicRanges(seqnames=new_seqnames, strand=new_strand, ranges=all_merged_ranges)
if with_reverse_map is True:
output._mcols.set_column("revmap", rev_map, in_place=True)
return output
[docs]
def range(self, with_reverse_map: bool = False, ignore_strand: bool = False) -> "GenomicRanges":
"""Calculate range bounds for each distinct (seqname, strand) pair.
Args:
with_reverse_map:
Whether to return map of indices back to
original object. Defaults to False.
ignore_strand:
Whether to ignore strands. Defaults to False.
Returns:
A new ``GenomicRanges`` object with the range bounds.
"""
chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
all_grp_ranges = []
rev_map = []
groups = []
for seq in self._seqinfo.get_seqnames():
_iter_strands = [0] if ignore_strand is True else [1, -1, 0]
for strd in _iter_strands:
_key = f"{seq}{_granges_delim}{strd}"
if _key in chrm_grps:
_grp_subset = self[chrm_grps[_key]]
res_ir = _grp_subset._ranges.range()
groups.extend([_key] * len(res_ir))
all_grp_ranges.append(res_ir)
rev_map.extend(chrm_grps[_key] * len(res_ir))
all_merged_ranges = ut.combine_sequences(*all_grp_ranges)
splits = [x.split(_granges_delim) for x in groups]
new_seqnames = [x[0] for x in splits]
new_strand = np.asarray([int(x[1]) for x in splits])
output = GenomicRanges(seqnames=new_seqnames, strand=new_strand, ranges=all_merged_ranges)
if with_reverse_map is True:
output._mcols.set_column("revmap", rev_map, in_place=True)
return output
[docs]
def gaps(
self,
start: int = 1,
end: Optional[Union[int, Dict[str, int]]] = None,
ignore_strand: bool = False,
) -> "GenomicRanges":
"""Identify complemented ranges for each distinct (seqname, strand) pair.
Args:
start:
Restrict chromosome start position. Defaults to 1.
end:
Restrict end position for each chromosome.
Defaults to None. If None, extracts sequence information from
:py:attr:`~seqinfo` object if available.
Alternatively, you may provide a dictionary with seqnames as
keys and the values specifying the ends.
ignore_strand:
Whether to ignore strands. Defaults to False.
Returns:
A new ``GenomicRanges`` with complement ranges.
"""
chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
all_grp_ranges = []
groups = []
for i, chrm in enumerate(self._seqinfo.get_seqnames()):
_iter_strands = [0] if ignore_strand is True else [1, -1, 0]
for strd in _iter_strands:
_key = f"{chrm}{_granges_delim}{strd}"
_end = None
if isinstance(end, dict):
_end = end[chrm]
elif isinstance(end, int):
_end = end
gaps = None
if _key in chrm_grps:
_grp_subset = self[chrm_grps[_key]]
gaps = _grp_subset._ranges.gaps(
start=start,
end=_end, # - 1 if _end is not None else _end
)
else:
if _end is None:
_end = self._seqinfo.seqlengths[i]
if _end is not None:
gaps = IRanges(start=[start], width=[_end - start + 1])
if gaps is not None:
all_grp_ranges.append(gaps)
groups.extend([_key] * len(gaps))
all_merged_ranges = ut.combine_sequences(*all_grp_ranges)
splits = [x.split(_granges_delim) for x in groups]
new_seqnames = [x[0] for x in splits]
new_strand = np.asarray([int(x[1]) for x in splits])
output = GenomicRanges(seqnames=new_seqnames, strand=new_strand, ranges=all_merged_ranges)
return output
[docs]
def disjoin(self, with_reverse_map: bool = False, ignore_strand: bool = False) -> "GenomicRanges":
"""Calculate disjoint genomic positions for each distinct (seqname, strand) pair.
Args:
with_reverse_map:
Whether to return a map of indices back to the original object.
Defaults to False.
ignore_strand:
Whether to ignore strands. Defaults to False.
Returns:
A new ``GenomicRanges`` containing disjoint ranges.
"""
chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
all_grp_ranges = []
rev_map = []
groups = []
for seq in self._seqinfo.get_seqnames():
_iter_strands = [0] if ignore_strand is True else [1, -1, 0]
for strd in _iter_strands:
_key = f"{seq}{_granges_delim}{strd}"
if _key in chrm_grps:
_grp_subset = self[chrm_grps[_key]]
res_ir = _grp_subset._ranges.disjoin(with_reverse_map=True)
groups.extend([_key] * len(res_ir))
all_grp_ranges.append(res_ir)
_rev_map = []
for j in res_ir._mcols.get_column("revmap"):
_rev_map.append([chrm_grps[_key][x] for x in j])
rev_map.append(_rev_map[0])
all_merged_ranges = ut.combine_sequences(*all_grp_ranges)
splits = [x.split(_granges_delim) for x in groups]
new_seqnames = [x[0] for x in splits]
new_strand = np.asarray([int(x[1]) for x in splits])
output = GenomicRanges(seqnames=new_seqnames, strand=new_strand, ranges=all_merged_ranges)
if with_reverse_map is True:
output._mcols.set_column("revmap", rev_map, in_place=True)
return output
[docs]
def is_disjoint(self, ignore_strand: bool = False) -> bool:
"""Calculate disjoint genomic positions for each distinct (seqname, strand) pair.
Args:
ignore_strand:
Whether to ignore strands. Defaults to False.
Returns:
True if all ranges are disjoint, otherwise False.
"""
chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
is_disjoint = None
for seq in self._seqinfo.get_seqnames():
_iter_strands = [0] if ignore_strand is True else [1, -1, 0]
for strd in _iter_strands:
_key = f"{seq}{_granges_delim}{strd}"
if _key in chrm_grps:
_grp_subset = self[chrm_grps[_key]]
res_ir = _grp_subset._ranges.is_disjoint()
if is_disjoint is None:
is_disjoint = res_ir
else:
is_disjoint = is_disjoint and res_ir
return is_disjoint
[docs]
def disjoint_bins(self, ignore_strand: bool = False) -> np.ndarray:
"""Split ranges into a set of bins so that the ranges in each bin are disjoint.
Args:
ignore_strand:
Whether to ignore strands. Defaults to False.
Returns:
An ndarray indicating the bin index for each range.
"""
chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
all_results = []
order = []
for seq in self._seqinfo.get_seqnames():
_iter_strands = [0] if ignore_strand is True else [1, -1, 0]
for strd in _iter_strands:
_key = f"{seq}{_granges_delim}{strd}"
if _key in chrm_grps:
_grp_subset = self[chrm_grps[_key]]
res_ir = _grp_subset._ranges.disjoint_bins()
order.extend(chrm_grps[_key])
all_results.extend(res_ir)
merged = np.asarray(all_results).flatten()
return merged[np.argsort(order, stable=True)]
[docs]
def coverage(self, shift: int = 0, width: Optional[int] = None, weight: int = 1) -> Dict[str, np.ndarray]:
"""Calculate coverage for each chromosome, For each position, counts the number of ranges that cover it.
Args:
shift:
Shift all genomic positions. Defaults to 0.
width:
Restrict the width of all chromosomes. Defaults to None.
weight:
Weight to use. Defaults to 1.
Returns:
A dictionary with chromosome names as keys and the
coverage vector.
"""
chrm_grps = self._group_indices_by_chrm(ignore_strand=True)
result = {}
for chrm, group in chrm_grps.items():
_grp_subset = self[group]
cov = _grp_subset._ranges.coverage(shift=shift, width=width, weight=weight)
result[chrm.split(_granges_delim)[0]] = cov
return result
################################
######>> set operations <<######
################################
[docs]
def union(self, other: "GenomicRanges", ignore_strand: bool = False) -> "GenomicRanges":
"""Find union of genomic intervals with `other`.
Args:
other:
The other ``GenomicRanges`` object.
ignore_strand:
Whether to ignore strands. Defaults to False.
Returns:
A new ``GenomicRanges`` object with all ranges.
"""
if not isinstance(other, GenomicRanges):
raise TypeError("'other' is not a `GenomicRanges` object.")
grs = [self, other]
if ignore_strand is True:
grs = [self.set_strand(strand=None), other.set_strand(strand=None)]
all_combined = _combine_GenomicRanges(*grs)
output = all_combined.reduce(drop_empty_ranges=True)
return output
[docs]
def setdiff(self, other: "GenomicRanges", ignore_strand: bool = False) -> "GenomicRanges":
"""Find set difference of genomic intervals with `other`.
Args:
other:
The other ``GenomicRanges`` object.
ignore_strand:
Whether to ignore strands. Defaults to False.
Returns:
A new ``GenomicRanges`` object with the diff ranges.
"""
if not isinstance(other, GenomicRanges):
raise TypeError("'other' is not a `GenomicRanges` object.")
x = self.__copy__()
y = other.__copy__()
if ignore_strand is True:
x = self.set_strand(strand=None)
y = other.set_strand(strand=None)
x.set_seqinfo(merge_SeqInfo((x.get_seqinfo(), y.get_seqinfo())), in_place=True)
y.set_seqinfo(merge_SeqInfo((y.get_seqinfo(), x.get_seqinfo())), in_place=True)
seqlengths = dict(zip(x._seqinfo.get_seqnames(), x._seqinfo.get_seqlengths()))
all_combs = _fast_combine_GenomicRanges(x, y).range(ignore_strand=True)
all_combs_ends = dict(zip(all_combs.get_seqnames(), all_combs.get_end()))
for seq, seqlen in seqlengths.items():
if seqlen is None:
_val = all_combs_ends.get(seq, None)
seqlengths[seq] = int(_val) if _val is not None else None
x_gaps = x.gaps(end=seqlengths)
x_gaps_u = x_gaps.union(y)
diff = x_gaps_u.gaps(end=seqlengths)
return diff
[docs]
def intersect(self, other: "GenomicRanges", ignore_strand: bool = False) -> "GenomicRanges":
"""Find intersecting genomic intervals with `other`.
Args:
other:
The other ``GenomicRanges`` object.
ignore_strand:
Whether to ignore strands. Defaults to False.
Returns:
A new ``GenomicRanges`` object with intersecting ranges.
"""
if not isinstance(other, GenomicRanges):
raise TypeError("'other' is not a `GenomicRanges` object.")
x = self.__copy__()
y = other.__copy__()
if ignore_strand is True:
x = self.set_strand(strand=None)
y = other.set_strand(strand=None)
x.set_seqinfo(merge_SeqInfo((x.get_seqinfo(), y.get_seqinfo())), in_place=True)
y.set_seqinfo(merge_SeqInfo((y.get_seqinfo(), x.get_seqinfo())), in_place=True)
seqlengths = dict(zip(x._seqinfo.get_seqnames(), x._seqinfo.get_seqlengths()))
all_combs = _fast_combine_GenomicRanges(x, y).range(ignore_strand=True)
all_combs_ends = dict(zip(all_combs.get_seqnames(), all_combs.get_end()))
for seq, seqlen in seqlengths.items():
if seqlen is None:
seqlengths[seq] = int(all_combs_ends[seq])
y_gaps = y.gaps(end=seqlengths)
diff = x.setdiff(y_gaps)
return diff
[docs]
def intersect_ncls(self, other: "GenomicRanges") -> "GenomicRanges":
"""Find intersecting genomic intervals with `other` (uses NCLS index).
Args:
other:
The other ``GenomicRanges`` object.
Returns:
A new ``GenomicRanges`` object with intersecting ranges.
"""
if not isinstance(other, GenomicRanges):
raise TypeError("'other' is not a `GenomicRanges` object.")
if not ut.package_utils.is_package_installed("ncls"):
raise ImportError("package: 'ncls' is not installed.")
from ncls import NCLS
self_end = self.end
other_end = other.end
other_ncls = NCLS(other.start, other_end, np.arange(len(other)))
_self_indexes, _other_indexes = other_ncls.all_overlaps_both(self.start, self_end, np.arange(len(self)))
other_chrms = np.array([other._seqinfo._seqnames[other._seqnames[i]] for i in _other_indexes])
self_chrms = np.array([self._seqinfo._seqnames[self._seqnames[i]] for i in _self_indexes])
other_strands = other._strand[_other_indexes]
self_strands = self._strand[_self_indexes]
filtered_indexes = np.logical_and(other_chrms == self_chrms, other_strands == self_strands)
self_starts = self.start[_self_indexes][filtered_indexes]
other_starts = other.start[_other_indexes][filtered_indexes]
new_starts = np.where(self_starts > other_starts, self_starts, other_starts)
self_ends = self_end[_self_indexes][filtered_indexes]
other_ends = other_end[_other_indexes][filtered_indexes]
new_ends = np.where(self_ends < other_ends, self_ends, other_ends)
# filtered_keys = self_grp_keys[filtered_indexes]
# splits = [x.split(_granges_delim) for x in filtered_keys]
new_seqnames = self_chrms[filtered_indexes].tolist()
new_strands = self_strands[filtered_indexes]
output = GenomicRanges(
seqnames=new_seqnames,
ranges=IRanges(new_starts, new_ends - new_starts),
strand=np.array(new_strands).astype(np.int8),
)
return output
###################################
######>> search operations <<######
###################################
[docs]
def find_overlaps(
self,
query: "GenomicRanges",
query_type: Literal["any", "start", "end", "within"] = "any",
select: Literal["all", "first", "last", "arbitrary"] = "all",
max_gap: int = -1,
min_overlap: int = 0,
ignore_strand: bool = False,
) -> BiocFrame:
"""Find overlaps between subject (self) and a ``query`` ``GenomicRanges`` object.
Args:
query:
Query `GenomicRanges`.
query_type:
Overlap query type, must be one of
- "any": Any overlap is good
- "start": Overlap at the beginning of the intervals
- "end": Must overlap at the end of the intervals
- "within": Fully contain the query interval
Defaults to "any".
select:
Determine what hit to choose when
there are multiple hits for an interval in ``subject``.
max_gap:
Maximum gap allowed in the overlap.
Defaults to -1 (no gap allowed).
min_overlap:
Minimum overlap with query. Defaults to 0.
ignore_strand:
Whether to ignore strands. Defaults to False.
Raises:
TypeError: If ``query`` is not of type `GenomicRanges`.
Returns:
A BiocFrame with two columns:
- query_hits: Indices into query ranges
- self_hits: Corresponding indices into self ranges that are upstream
Each row represents a query-self pair where self overlaps query.
"""
OVERLAP_QUERY_TYPES = ["any", "start", "end", "within"]
if not isinstance(query, GenomicRanges):
raise TypeError("'query' is not a `GenomicRanges` object.")
if query_type not in OVERLAP_QUERY_TYPES:
raise ValueError(f"'{query_type}' must be one of {', '.join(OVERLAP_QUERY_TYPES)}.")
subject_chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
query_chrm_grps = query._group_indices_by_chrm(ignore_strand=ignore_strand)
all_qhits = np.array([], dtype=np.int32)
all_shits = np.array([], dtype=np.int32)
for group, indices in query_chrm_grps.items():
_subset = []
if group in subject_chrm_grps:
_subset.extend(subject_chrm_grps[group])
_grp_split = group.split(_granges_delim)
if _grp_split[1] != "0":
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}0"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
else:
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}1"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
_any_grp_rev = f"{_grp_split[0]}{_granges_delim}-1"
if _any_grp_rev in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_rev])
if len(_subset) > 0:
_sub_subset = self[_subset]
_query_subset = query[indices]
res_idx = _sub_subset._ranges.find_overlaps(
query=_query_subset._ranges,
query_type=query_type,
select=select,
max_gap=max_gap,
min_overlap=min_overlap,
delete_index=False,
)
all_qhits = np.concatenate(
(all_qhits, [indices[j] for j in res_idx.get_column("query_hits")]), dtype=np.int32
)
all_shits = np.concatenate(
(all_shits, [_subset[x] for x in res_idx.get_column("self_hits")]), dtype=np.int32
)
order = np.argsort(all_qhits, stable=True)
return BiocFrame({"query_hits": all_qhits, "self_hits": all_shits})[order, :]
[docs]
def count_overlaps(
self,
query: "GenomicRanges",
query_type: Literal["any", "start", "end", "within"] = "any",
max_gap: int = -1,
min_overlap: int = 0,
ignore_strand: bool = False,
) -> np.ndarray:
"""Count overlaps between subject (self) and a ``query`` ``GenomicRanges`` object.
Args:
query:
Query `GenomicRanges`.
query_type:
Overlap query type, must be one of
- "any": Any overlap is good
- "start": Overlap at the beginning of the intervals
- "end": Must overlap at the end of the intervals
- "within": Fully contain the query interval
Defaults to "any".
max_gap:
Maximum gap allowed in the overlap.
Defaults to -1 (no gap allowed).
min_overlap:
Minimum overlap with query. Defaults to 0.
ignore_strand:
Whether to ignore strands. Defaults to False.
Raises:
TypeError: If ``query`` is not of type `GenomicRanges`.
Returns:
NumPy vector with length matching query,
value represents the number of overlaps in `self` for each query.
"""
_overlaps = self.find_overlaps(
query, query_type=query_type, max_gap=max_gap, min_overlap=min_overlap, ignore_strand=ignore_strand
)
result = np.zeros(len(query))
_ucounts = np.unique_counts(_overlaps.get_column("query_hits"))
result[_ucounts.values] = _ucounts.counts
return result
[docs]
def subset_by_overlaps(
self,
query: "GenomicRanges",
query_type: Literal["any", "start", "end", "within"] = "any",
max_gap: int = -1,
min_overlap: int = 0,
ignore_strand: bool = False,
) -> "GenomicRanges":
"""Subset ``subject`` (self) with overlaps in ``query`` `GenomicRanges` object.
Args:
query:
Query `GenomicRanges`.
query_type:
Overlap query type, must be one of
- "any": Any overlap is good
- "start": Overlap at the beginning of the intervals
- "end": Must overlap at the end of the intervals
- "within": Fully contain the query interval
Defaults to "any".
max_gap:
Maximum gap allowed in the overlap.
Defaults to -1 (no gap allowed).
min_overlap:
Minimum overlap with query. Defaults to 0.
ignore_strand:
Whether to ignore strands. Defaults to False.
Raises:
TypeError:
If ``query`` is not of type `GenomicRanges`.
Returns:
A ``GenomicRanges`` object containing overlapping ranges.
"""
_overlaps = self.find_overlaps(
query, query_type=query_type, max_gap=max_gap, min_overlap=min_overlap, ignore_strand=ignore_strand
)
_all_indices = np.unique(_overlaps.get_column("self_hits"))
return self[_all_indices]
#################################
######>> nearest methods <<######
#################################
[docs]
def nearest(
self,
query: "GenomicRanges",
select: Literal["all", "arbitrary"] = "arbitrary",
ignore_strand: bool = False,
) -> Union[np.ndarray, BiocFrame]:
"""Search nearest positions both upstream and downstream that overlap with each range in ``query``.
Args:
query:
Query ``GenomicRanges`` to find nearest positions.
select:
Determine what hit to choose when there are
multiple hits for an interval in ``query``.
ignore_strand:
Whether to ignore strand. Defaults to False.
Returns:
If select="arbitrary":
A numpy array of integers with length matching query, containing indices
into self for the closest for each query range. Value may be None if there
are no matches.
If select="all":
A BiocFrame with two columns:
- query_hits: Indices into query ranges
- self_hits: Corresponding indices into self ranges that are upstream
Each row represents a query-self pair where subject is nearest to query.
"""
if not isinstance(query, GenomicRanges):
raise TypeError("'query' is not a `GenomicRanges` object.")
subject_chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
query_chrm_grps = query._group_indices_by_chrm(ignore_strand=ignore_strand)
if select == "arbitrary":
result = np.full(len(query), None)
else:
result = {"all_qhits": np.array([], dtype=np.int32), "all_shits": np.array([], dtype=np.int32)}
for group, indices in query_chrm_grps.items():
_subset = []
if group in subject_chrm_grps:
_subset.extend(subject_chrm_grps[group])
_grp_split = group.split(_granges_delim)
if _grp_split[1] != "0":
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}0"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
else:
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}1"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
_any_grp_rev = f"{_grp_split[0]}{_granges_delim}-1"
if _any_grp_rev in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_rev])
if len(_subset) > 0:
_sub_subset = self[_subset]
_query_subset = query[indices]
res_idx = _sub_subset._ranges.nearest(query=_query_subset._ranges, select=select, delete_index=False)
if select == "arbitrary":
for i, x in enumerate(res_idx):
result[indices[i]] = _subset[x]
else:
result["all_qhits"] = np.concatenate(
(result["all_qhits"], [indices[j] for j in res_idx.get_column("query_hits")]), dtype=np.int32
)
result["all_shits"] = np.concatenate(
(result["all_shits"], [_subset[x] for x in res_idx.get_column("self_hits")]), dtype=np.int32
)
if select == "all":
result = BiocFrame({"query_hits": result["all_qhits"], "self_hits": result["all_shits"]})
return result
[docs]
def precede(
self,
query: "GenomicRanges",
select: Literal["all", "first"] = "first",
ignore_strand: bool = False,
) -> Union[np.ndarray, BiocFrame]:
"""Search nearest positions only downstream that overlap with each range in ``query``.
Args:
query:
Query ``GenomicRanges`` to find nearest positions.
select:
Whether to return "all" hits or just "first".
Defaults to "first".
ignore_strand:
Whether to ignore strand. Defaults to False.
Returns:
If select="first":
A numpy array of integers with length matching query, containing indices
into self for the closest upstream position of each query range. Value may be
None if there are no matches.
If select="all":
A BiocFrame with two columns:
- query_hits: Indices into query ranges
- self_hits: Corresponding indices into self ranges that are upstream
Each row represents a query-self pair where self precedes query.
"""
if not isinstance(query, GenomicRanges):
raise TypeError("'query' is not a `GenomicRanges` object.")
subject_chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
query_chrm_grps = query._group_indices_by_chrm(ignore_strand=ignore_strand)
if select == "first":
result = np.full(len(query), None)
else:
result = {"all_qhits": np.array([], dtype=np.int32), "all_shits": np.array([], dtype=np.int32)}
for group, indices in query_chrm_grps.items():
_subset = []
if group in subject_chrm_grps:
_subset.extend(subject_chrm_grps[group])
_grp_split = group.split(_granges_delim)
if _grp_split[1] != "0":
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}0"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
else:
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}1"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
_any_grp_rev = f"{_grp_split[0]}{_granges_delim}-1"
if _any_grp_rev in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_rev])
if len(_subset) > 0:
_sub_subset = self[_subset]
_query_subset = query[indices]
res_idx = _sub_subset._ranges.precede(query=_query_subset._ranges, select=select)
if select == "first":
matches = res_idx != None # noqa: E711
not_none = res_idx[matches]
if len(not_none) > 0:
for i, x in enumerate(not_none):
result[indices[i]] = _subset[x]
else:
result["all_qhits"] = np.concatenate(
(result["all_qhits"], [indices[j] for j in res_idx.get_column("query_hits")])
)
result["all_shits"] = np.concatenate(
(result["all_shits"], [_subset[x] for x in res_idx.get_column("self_hits")])
)
if select == "all":
result = BiocFrame({"query_hits": result["all_qhits"], "self_hits": result["all_shits"]})
return result
[docs]
def follow(
self,
query: "GenomicRanges",
select: Literal["all", "last"] = "last",
ignore_strand: bool = False,
) -> Union[np.ndarray, BiocFrame]:
"""Search nearest positions only upstream that overlap with each range in ``query``.
Args:
query:
Query ``GenomicRanges`` to find nearest positions.
select:
Whether to return "all" hits or just "last".
Defaults to "last".
ignore_strand:
Whether to ignore strand. Defaults to False.
Returns:
If select="last":
A numpy array of integers with length matching query, containing indices
into self for the closest downstream position of each query range. Value may be
None if there are no matches.
If select="all":
A BiocFrame with two columns:
- query_hits: Indices into query ranges
- self_hits: Corresponding indices into self ranges that are upstream
Each row represents a query-self pair where self follows query.
"""
if not isinstance(query, GenomicRanges):
raise TypeError("'query' is not a `GenomicRanges` object.")
subject_chrm_grps = self._group_indices_by_chrm(ignore_strand=ignore_strand)
query_chrm_grps = query._group_indices_by_chrm(ignore_strand=ignore_strand)
if select == "last":
result = np.full(len(query), None)
else:
result = {"all_qhits": np.array([], dtype=np.int32), "all_shits": np.array([], dtype=np.int32)}
for group, indices in query_chrm_grps.items():
_subset = []
if group in subject_chrm_grps:
_subset.extend(subject_chrm_grps[group])
_grp_split = group.split(_granges_delim)
if _grp_split[1] != "0":
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}0"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
else:
_any_grp_fwd = f"{_grp_split[0]}{_granges_delim}1"
if _any_grp_fwd in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_fwd])
_any_grp_rev = f"{_grp_split[0]}{_granges_delim}-1"
if _any_grp_rev in subject_chrm_grps:
_subset.extend(subject_chrm_grps[_any_grp_rev])
if len(_subset) > 0:
_sub_subset = self[_subset]
_query_subset = query[indices]
res_idx = _sub_subset._ranges.follow(query=_query_subset._ranges, select=select)
if select == "last":
matches = res_idx != None # noqa: E711
not_none = res_idx[matches]
if len(not_none) > 0:
for i, x in enumerate(not_none):
result[indices[i]] = _subset[x]
else:
result["all_qhits"] = np.concatenate(
(result["all_qhits"], [indices[j] for j in res_idx.get_column("query_hits")])
)
result["all_shits"] = np.concatenate(
(result["all_shits"], [_subset[x] for x in res_idx.get_column("self_hits")])
)
if select == "all":
result = BiocFrame({"query_hits": result["all_qhits"], "self_hits": result["all_shits"]})
return result
[docs]
def distance(self, query: Union["GenomicRanges", IRanges]) -> np.ndarray:
"""Compute the pair-wise distance with intervals in query.
Args:
query:
Query `GenomicRanges` or `IRanges`.
Returns:
Numpy vector containing distances for each interval in query.
"""
if not isinstance(query, (IRanges, GenomicRanges)):
raise TypeError("'query' is neither a `GenomicRanges` nor `IRanges` object.")
if len(self) != len(query):
raise ValueError("'query' does not contain the same number of ranges.")
_qranges = query
if isinstance(query, GenomicRanges):
_qranges = query.get_ranges()
return self._ranges.distance(_qranges)
#############################
######>> comparisons <<######
#############################
[docs]
def match(self, query: "GenomicRanges", ignore_strand: bool = False) -> np.ndarray:
"""Element wise comparison to find exact match ranges.
Args:
query:
Query ``GenomicRanges`` to search for matches.
ignore_strand:
Whether to ignore strand. Defaults to False.
Returns:
A NumPy array with length matching query
containing the matched indices.
"""
if not isinstance(query, GenomicRanges):
raise TypeError("'query' is not a `GenomicRanges` object.")
result = np.full(len(query), None)
all_overlaps = self.find_overlaps(query, ignore_strand=ignore_strand)
ol_query = all_overlaps.get_column("query_hits")
ol_subject = all_overlaps.get_column("self_hits")
unique_queries = np.unique(ol_query)
for q in unique_queries:
q_ctx = query[q]
matches = np.where(ol_query == q)[0]
self_indices = ol_subject[matches]
self_matches = self[ol_subject[matches]]
tgt = self_indices[
(np.array(self_matches.get_start()) == q_ctx.get_start()[0])
& (np.array(self_matches.get_end()) == q_ctx.get_end()[0])
]
if result[q] is None:
result[q] = tgt[0]
return result
def _get_ranges_as_list(self) -> List[Tuple[int, int, int]]:
"""Internal method to get ranges as a list of tuples.
Returns:
List of tuples containing the start, end and the index.
"""
ranges = []
strands = self._strand.copy()
strands[strands == 1] = 6
strands[strands == -1] = 7
strands[strands == 0] = 8
for i in range(len(self)):
ranges.append(
(
self._seqnames[i],
strands[i],
self._ranges._start[i],
self._ranges.end[i],
i,
)
)
return ranges
[docs]
def order(self, decreasing: bool = False) -> np.ndarray:
"""Get the order of indices for sorting.
Order orders the genomic ranges by chromosome and strand.
Strand is ordered by reverse first (-1), any strand (0) and
forward strand (-1). Then by the start positions and width if
two regions have the same start.
Args:
decreasing:
Whether to sort in descending order. Defaults to False.
Returns:
NumPy vector containing index positions in the sorted order.
"""
intvals = sorted(self._get_ranges_as_list(), reverse=decreasing)
order = [o[4] for o in intvals]
return np.asarray(order)
[docs]
def sort(self, decreasing: bool = False, in_place: bool = False) -> "GenomicRanges":
"""Get the order of indices for sorting.
Args:
decreasing:
Whether to sort in descending order. Defaults to False.
in_place:
Whether to modify the object in place. Defaults to False.
Returns:
A modified ``GenomicRanges`` object with the trimmed regions,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
order = self.order(decreasing=decreasing)
output = self._define_output(in_place)
return output[list(order)]
[docs]
def rank(self) -> List[int]:
"""Get rank of the ``GenomicRanges`` object.
For each range identifies its position is a sorted order.
Returns:
Numpy vector containing rank.
"""
intvals = sorted(self._get_ranges_as_list())
order = [o[4] for o in intvals]
rank = [order.index(x) for x in range(len(order))]
return rank
##############################
######>> misc methods <<######
##############################
[docs]
def sample(self, k: int = 5) -> "GenomicRanges":
"""Randomly sample ``k`` intervals.
Args:
k:
Number of intervals to sample. Defaults to 5.
Returns:
A new ``GenomicRanges`` with randomly sampled ranges.
"""
from random import sample
sample = sample(range(len(self)), k=k)
return self[sample]
[docs]
def invert_strand(self, in_place: bool = False) -> "GenomicRanges":
"""Invert strand for each range.
Conversion map:
- "+" map to "-"
- "-" becomes "+"
- "*" stays the same
Args:
in_place:
Whether to modify the object in place. Defaults to False.
Returns:
A modified ``GenomicRanges`` object with the trimmed regions,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
convertor = {"1": "-1", "-1": "1", "0": "0"}
inverts = [convertor[str(idx)] for idx in self._strand]
output = self._define_output(in_place)
output._strand = inverts
return output
################################
######>> window methods <<######
################################
[docs]
def tile(self, n: Optional[int] = None, width: Optional[int] = None) -> List["GenomicRanges"]:
"""Split each interval by ``n`` (number of sub intervals) or ``width`` (intervals with equal width).
Note: Either ``n`` or ``width`` must be provided but not both.
Also, checkout :py:func:`~genomicranges.io.tiling.tile_genome` for splitting
a genome into chunks.
Args:
n:
Number of intervals to split into.
Defaults to None.
width:
Width of each interval. Defaults to None.
Raises:
ValueError:
If both ``n`` and ``width`` are provided.
Returns:
List of ``GenomicRanges`` with the split ranges.
"""
if n is not None and width is not None:
raise ValueError("either `n` or `width` must be provided but not both")
range_tiles = self._ranges.tile(n=n, width=width)
seqnames = self.get_seqnames("list")
result = []
for i in range(len(self)):
num_tiles = len(range_tiles[i])
result.append(
GenomicRanges(
seqnames=[seqnames[i]] * num_tiles,
ranges=range_tiles[i],
strand=np.repeat(self._strand[i], num_tiles),
names=self._names[i] * num_tiles if self._names is not None else None,
seqinfo=self._seqinfo,
)
)
return result
[docs]
def sliding_windows(self, width: int, step: int = 1) -> List["GenomicRanges"]:
"""Slide along each range by ``width`` (intervals with equal ``width``) and ``step``.
Also, checkout :py:func:`~genomicranges.io.tiling.tile_genome` for splitting
a gneomic into chunks, or
:py:meth:`~genomicranges.GenomicRanges.GenomicRanges.tile_by_range`.
Args:
n:
Number of intervals to split into.
Defaults to None.
width:
Width of each interval. Defaults to None.
Returns:
A new ``GenomicRanges`` with the sliding ranges.
"""
range_windows = self._ranges.sliding_windows(width=width, step=step)
seqnames = self.get_seqnames("list")
result = []
for i in range(len(self)):
num_tiles = len(range_windows[i])
result.append(
GenomicRanges(
seqnames=[seqnames[i]] * num_tiles,
ranges=range_windows[i],
strand=np.repeat(self._strand[i], num_tiles),
names=self._names[i] * num_tiles if self._names is not None else None,
seqinfo=self._seqinfo,
)
)
return result
# TODO: should really be a genomicrangeslist
[docs]
@classmethod
def tile_genome(
cls,
seqlengths: Dict[str, int],
ntile: Optional[int] = None,
tilewidth: Optional[int] = None,
cut_last_tile_in_chrom: bool = False,
) -> "GenomicRanges":
"""Tile genome into approximately equal-sized regions.
Args:
seqlengths:
Dictionary of sequence lengths by chromosome.
ntile:
Number of tiles (exclusive with tilewidth).
tilewidth:
Width of tiles (exclusive with ntile).
cut_last_tile_in_chrom:
Whether to cut the last tile in each chromosome.
Returns:
`GenomicRanges` object with ranges and bin numbers in mcols.
"""
if ntile is not None and tilewidth is not None:
raise ValueError("only one of 'ntile' and 'tilewidth' can be specified")
seqlen_ = seqlengths
if isinstance(seqlengths, SeqInfo):
seqlen_ = dict(zip(seqlengths.get_seqnames(), seqlengths.seqlengths))
if not seqlen_:
raise ValueError("seqlengths must be non-empty")
if any(length <= 0 for length in seqlen_.values()):
raise ValueError("seqlengths contains zero or negative values")
chroms = list(seqlen_.keys())
lengths = np.array([seqlen_[c] for c in chroms], dtype=np.int32)
chrom_ends = np.cumsum(lengths)
# chrom_starts = np.r_[0, chrom_ends[:-1]]
genome_size = chrom_ends[-1]
if ntile is not None:
if cut_last_tile_in_chrom:
raise ValueError("cut_last_tile_in_chrom must be FALSE when ntile is supplied")
if not isinstance(ntile, (int, np.integer)) or ntile < 1 or ntile > genome_size:
raise ValueError("ntile must be an integer >= 1 and <= genome size")
tilewidth = genome_size / ntile
genome_breaks = np.floor(tilewidth * np.arange(1, ntile + 1)).astype(np.int32)
all_chroms = []
all_starts = []
all_widths = []
all_abs_starts = [] # track absolute starts
all_abs_ends = [] # track absolute ends
prev_end = 0
for i, length in enumerate(lengths):
curr_end = prev_end + length
chr_mask = (genome_breaks > prev_end) & (genome_breaks <= curr_end)
chr_breaks = genome_breaks[chr_mask]
if len(chr_breaks) > 0:
# Handle first tile in chromosome
if not all_starts or genome_breaks[chr_mask][0] > prev_end + 1:
all_chroms.append(chroms[i])
start = 1
end = chr_breaks[0] - prev_end
all_starts.append(start)
all_widths.append(end)
all_abs_starts.append(prev_end + 1)
all_abs_ends.append(chr_breaks[0])
# Handle remaining tiles in chromosome
for j in range(1, len(chr_breaks)):
all_chroms.append(chroms[i])
start = chr_breaks[j - 1] - prev_end + 1
end = chr_breaks[j] - prev_end
all_starts.append(start)
all_widths.append(end - start + 1)
all_abs_starts.append(chr_breaks[j - 1] + 1)
all_abs_ends.append(chr_breaks[j])
prev_end = curr_end
if i < len(lengths) - 1 and (len(chr_breaks) == 0 or chr_breaks[-1] < curr_end):
all_chroms.append(chroms[i])
if len(chr_breaks) > 0:
start = chr_breaks[-1] - prev_end + length + 1
else:
start = 1
all_starts.append(start)
width = length - start + 1
all_widths.append(width)
if len(chr_breaks) > 0:
all_abs_starts.append(chr_breaks[-1] + 1)
else:
all_abs_starts.append(prev_end + 1)
all_abs_ends.append(curr_end)
all_abs_starts = np.array(all_abs_starts)
all_abs_ends = np.array(all_abs_ends)
bin_width = genome_size / ntile
bin_starts = np.arange(0, genome_size, bin_width)
bin_ends = np.append(bin_starts[1:], genome_size)
all_bins = np.zeros(len(all_abs_starts), dtype=np.int32)
for i in range(len(all_abs_starts)):
range_start = all_abs_starts[i]
range_end = all_abs_ends[i]
overlaps = np.minimum(bin_ends, range_end) - np.maximum(bin_starts, range_start)
overlaps = np.maximum(overlaps, 0)
all_bins[i] = np.argmax(overlaps) + 1
else:
if not isinstance(tilewidth, (int, np.integer)) or tilewidth < 1 or tilewidth > genome_size:
raise ValueError("tilewidth must be an integer >= 1 and <= genome size")
ntile = int(np.ceil(genome_size / tilewidth))
return GenomicRanges.tile_genome(seqlen_, ntile=ntile, cut_last_tile_in_chrom=False)
ranges = IRanges(start=np.array(all_starts, dtype=np.int32), width=np.array(all_widths, dtype=np.int32))
mcols = BiocFrame({"bin": np.array(all_bins, dtype=np.int32)})
return GenomicRanges(seqnames=all_chroms, ranges=ranges, strand=["*"] * len(all_starts), mcols=mcols)
[docs]
def binned_average(
self,
scorename: str,
bins: "GenomicRanges",
outname: str = "binned_average",
in_place: bool = False,
) -> "GenomicRanges":
"""Calculate average for a column across all regions in ``bins``, then set a column specified by 'outname' with
those values.
Args:
scorename:
Score column to compute averages on.
bins:
Bins you want to use.
outname:
New column name to add to the object.
in_place:
Whether to modify ``bins`` in place.
Raises:
ValueError:
If ``scorename`` column does not exist.
``scorename`` is not all ints or floats.
TypeError:
If ``bins`` is not of type `GenomicRanges`.
Returns:
A modified ``bins`` object with the computed averages,
either as a copy of the original or as a reference to the
(in-place-modified) original.
"""
import statistics
if not isinstance(bins, GenomicRanges):
raise TypeError("'bins' is not a `GenomicRanges` object.")
if scorename not in self._mcols.column_names:
raise ValueError(f"'{scorename}' is not a valid column name")
values = self._mcols.get_column(scorename)
if not all(isinstance(x, (int, float)) for x in values):
raise ValueError(f"'{scorename}' values must be either `ints` or `floats`.")
outvec = []
for _, val in bins:
overlap = self.subset_by_overlaps(query=val)
outvec.append(statistics.mean(overlap._mcols.get_column(scorename)))
output = bins._define_output(in_place=in_place)
output._mcols.set_column(outname, outvec, in_place=True)
return output
#######################
######>> split <<######
#######################
[docs]
def split(self, groups: list) -> "GenomicRangesList":
"""Split the `GenomicRanges` object into a :py:class:`~genomicranges.GenomicRangesList.GenomicRangesList`.
Args:
groups:
A list specifying the groups or factors to split by.
Must have the same length as the number of genomic elements
in the object.
Returns:
A `GenomicRangesList` containing the groups and their
corresponding elements.
"""
if len(groups) != len(self):
raise ValueError("Number of groups must match the number of genomic elements.")
gdict = group_by_indices(groups=groups)
_names = []
_grs = []
for k, v in gdict.items():
_names.append(k)
_grs.append(self[v])
from .GenomicRangesList import GenomicRangesList
return GenomicRangesList(ranges=_grs, names=_names)
#######################
######>> empty <<######
#######################
[docs]
@classmethod
def empty(cls):
"""Create an zero-length `GenomicRanges` object.
Returns:
same type as caller, in this case a `GenomicRanges`.
"""
return cls([], IRanges.empty())
##########################
######>> subtract <<######
##########################
[docs]
def subtract(
self, other: "GenomicRanges", min_overlap: int = 1, ignore_strand: bool = False
) -> "GenomicRangesList":
"""Subtract searches for features in ``x`` that overlap ``self`` by at least the number of base pairs given by
``min_overlap``.
Args:
other:
Object to subtract.
min_overlap:
Minimum overlap with query.
Defaults to 1.
ignore_strand:
Whether to ignore strands.
Defaults to False.
Returns:
A `GenomicRangesList` with the same size as ``self`` containing
the subtracted regions.
"""
_other_reduce = other.reduce(ignore_strand=ignore_strand)
hits = _other_reduce.find_overlaps(self, min_overlap=min_overlap, ignore_strand=ignore_strand)
mapper = {}
for i in range(len(self)):
mapper[i] = []
for i in range(len(hits)):
s_hit = int(hits.get_column("self_hits")[i])
q_hit = int(hits.get_column("query_hits")[i])
mapper[q_hit].append(s_hit)
mapper_with_y = {}
for idx, val in mapper.items():
mapper_with_y[idx] = other[val]
psetdiff = {}
for idx, val in mapper_with_y.items():
if len(val) == 0:
psetdiff[idx] = self[idx]
else:
psetdiff[idx] = self[idx].setdiff(val)
from .GenomicRangesList import GenomicRangesList
return GenomicRangesList.from_dict(psetdiff)
def _fast_combine_GenomicRanges(*x: GenomicRanges) -> GenomicRanges:
return GenomicRanges(
ranges=ut.combine_sequences(*[y._ranges for y in x]),
seqnames=ut.combine_sequences(*[y.get_seqnames() for y in x]),
strand=ut.combine_sequences(*[y._strand for y in x]),
names=None,
mcols=None,
seqinfo=merge_SeqInfo([y._seqinfo for y in x]),
metadata=None,
validate=False,
)
@ut.combine_sequences.register(GenomicRanges)
def _combine_GenomicRanges(*x: GenomicRanges) -> GenomicRanges:
has_names = False
for y in x:
if y._names is not None:
has_names = True
break
all_names = None
if has_names:
all_names = []
for y in x:
if y._names is not None:
all_names += y._names
else:
all_names += [""] * len(y)
return GenomicRanges(
ranges=ut.combine_sequences(*[y._ranges for y in x]),
seqnames=ut.combine_sequences(*[y.get_seqnames() for y in x]),
strand=ut.combine_sequences(*[y._strand for y in x]),
names=all_names,
mcols=ut.relaxed_combine_rows(*[y._mcols for y in x]),
seqinfo=merge_SeqInfo([y._seqinfo for y in x]),
metadata=x[0]._metadata,
validate=False,
)
@ut.extract_row_names.register(GenomicRanges)
def _rownames_gr(x: GenomicRanges):
return x.get_names()