from itertools import groupby
from typing import List, Optional, Sequence, Tuple, Union
import biocutils as ut
import numpy as np
__author__ = "jkanche"
__copyright__ = "jkanche"
__license__ = "MIT"
STRAND_MAP = {"+": 1, "-": -1, "*": 0}
REV_STRAND_MAP = {"1": "+", "-1": "-", "0": "*"}
[docs]
def sanitize_strand_vector(
strand: Union[Sequence[str], Sequence[int], np.ndarray]
) -> np.ndarray:
"""Create a numpy representation for ``strand``.
Mapping: 1 for "+" (forward strand), 0 for "*" (any strand) and -1 for "-" (reverse strand).
Args:
strand: List of strand.
Raises:
ValueError:
If strand is None.
If strand contains values other than +,- and *.
If strand is not a numpy vector, string of integers or strings.
Returns:
A numpy vector.
"""
if strand is None:
raise ValueError("'strand' cannot be None.")
if isinstance(strand, np.ndarray):
if len(strand.shape) > 1:
raise ValueError("'strand' must be a 1-dimensional vector.")
if not set(np.unique(strand)).issubset([-1, 0, 1]):
raise ValueError(
"'strand' must only contain values 1 (forward strand), -1 (reverse strand) or 0 (reverse strand)."
)
return strand.astype(np.int8)
elif ut.is_list_of_type(strand, str):
if not set(strand).issubset(["+", "-", "*"]):
raise ValueError("Values in 'strand' must be either +, - or *.")
return np.asarray([STRAND_MAP[x] for x in strand], dtype=np.int8)
elif ut.is_list_of_type(strand, (int, float)):
if not set(strand).issubset([1, 0, -1]):
raise ValueError(
"'strand' must only contain values 1 (forward strand), -1 (reverse strand) or 0 (reverse strand)."
)
return np.asarray(strand, dtype=np.int8)
else:
raise TypeError(
"'strand' must be either a numpy vector, a list of integers or strings representing strand."
)
def _sanitize_vec(x: Sequence):
if isinstance(x, np.ma.MaskedArray):
x.filled(fill_value=None)
return x.tolist()
return list(x)
def _sanitize_strand_search_ops(query_strand, subject_strand):
query_strand = REV_STRAND_MAP[query_strand]
subject_strand = REV_STRAND_MAP[subject_strand]
out = None
if query_strand == "+":
if subject_strand == "+":
out = "+"
elif subject_strand == "-":
out = None
elif subject_strand == "*":
out = "+"
elif query_strand == "-":
if subject_strand == "+":
out = None
elif subject_strand == "-":
out = "-"
elif subject_strand == "*":
out = "-"
elif query_strand == "*":
if subject_strand == "*":
out = "+"
elif subject_strand == "-":
out = "-"
elif subject_strand == "*":
out = "-"
if out is None:
return None
return STRAND_MAP[out]
[docs]
def split_intervals(start: int, end: int, step: int) -> List:
"""Split an interval range into equal bins.
Args:
start:
Start interval.
end:
End interval.
step:
Width or step of each interval.
Returns:
List of intervals split into bins.
"""
bins = []
for i in range(start, end + 1, step):
bins.append((i, min(i + step - 1, end) - i))
return bins
[docs]
def slide_intervals(start: int, end: int, width: int, step: int) -> List:
"""Sliding intervals.
Args:
start:
Start interval.
end:
End interval.
step:
Step of each interval.
width:
Width of each interval.
Returns:
List of intervals split into bins.
"""
bins = []
if end - width < start:
bins.append((start, end - start))
else:
for i in range(start, end - width + 2, step):
bins.append((i, min(i + width - 1, end) - i))
return bins
[docs]
def create_np_vector(
intervals: List[Tuple[int, int]],
with_reverse_map: bool = False,
force_size: Optional[int] = None,
dont_sum: bool = False,
value: int = 1,
) -> Tuple[np.ndarray, Optional[List]]:
"""Represent intervals and calculate coverage.
Args:
intervals:
Input interval vector.
with_reverse_map:
Return map of indices? Defaults to False.
force_size:
Force size of the array.
dont_sum:
Do not sum. Defaults to False.
value:
Default value to increment. Defaults to 1.
Returns:
A numpy array representing coverage from the
intervals and optionally a reverse index map.
"""
if len(intervals) < 1:
return intervals
max_end = force_size
if max_end is None:
max_end = max([x[1] for x in intervals])
cov = np.zeros(max_end)
revmap = None
if with_reverse_map:
revmap = [[] for _ in range(max_end)]
for idx in range(len(intervals)):
i = intervals[idx]
if dont_sum:
cov[i[0] - 1 : i[1]] = value
else:
cov[i[0] - 1 : i[1]] += value
if with_reverse_map:
_ = [revmap[x].append(idx + 1) for x in range(i[0] - 1, i[1])]
return cov, revmap
[docs]
def group_by_indices(groups: list) -> dict:
return {
k: [x[0] for x in v]
for k, v in groupby(
sorted(enumerate(groups), key=lambda x: x[1]), lambda x: x[1]
)
}