from typing import Optional, Tuple, Union
import numpy as np
__author__ = "Jayaram Kancherla"
__copyright__ = "jkanche"
__license__ = "MIT"
[docs]
def normalize_array(
x: Optional[Union[int, float, np.number, np.ndarray]], length: int, dtype: np.dtype = np.int32
) -> np.ma.MaskedArray:
"""Normalize input to masked array with proper length and type.
Args:
x:
Input value (scalar, array, or None).
length:
Expected length for output array.
dtype:
Expected numpy dtype.
Returns:
Normalized masked array.
"""
# If None, return a masked array of the expected length
# so the downstream code is less complicated
if x is None:
return np.ma.masked_array(np.zeros(length, dtype=dtype), mask=True)
# scalars get converted into array of length n
if np.isscalar(x):
return np.ma.masked_array([x] * length, dtype=dtype, mask=False)
# everything else becomes a ndarray
if not isinstance(x, np.ndarray):
x = np.asarray(x, dtype=dtype)
# Handle length mismatch
if len(x) < length:
# recycle values
repeats = length // len(x) + (1 if length % len(x) else 0)
x = np.tile(x, repeats)[:length]
elif len(x) > length:
raise Exception(f"input length {len(x)} exceeds expected length {length}")
return np.ma.masked_array(x, dtype=dtype, mask=False)
[docs]
def handle_negative_coords(coords: np.ma.MaskedArray, ref_len: np.ndarray) -> np.ma.MaskedArray:
"""Convert negative coordinates to positive using reference length.
Args:
coords:
Coordinate array (can have negative values).
ref_len:
Reference lengths for conversion.
Returns:
Array with negative coordinates converted to positive.
"""
return np.ma.where((~coords.mask) & (coords < 0), ref_len + coords + 1, coords)
[docs]
def clip_ranges(
starts: np.ndarray, widths: np.ndarray, min_val: Optional[int] = None, max_val: Optional[int] = None
) -> Tuple[np.ndarray, np.ndarray]:
"""Clip ranges to specified bounds.
Args:
starts:
Start positions.
widths:
Widths.
min_val:
Minimum allowed position.
Defaults to None for no lower bound.
max_val:
Maximum allowed position.
Defaults to None for no upper bound.
Returns:
Tuple of clipped (starts, widths) ranges.
"""
ends = starts + widths - 1
if min_val is not None:
starts = np.maximum(starts, min_val)
if max_val is not None:
ends = np.minimum(ends, max_val)
new_widths = np.maximum(ends - starts + 1, 0)
return starts, new_widths
[docs]
def compute_up_down(
starts: np.ndarray,
widths: np.ndarray,
upstream: Union[int, np.ndarray],
downstream: Union[int, np.ndarray],
site: str,
) -> Tuple[np.ndarray, np.ndarray]:
"""Helper for promoters/terminators."""
length = len(starts)
if len(starts) == 0:
return starts, widths
upstream_arr = normalize_array(upstream, length)
downstream_arr = normalize_array(downstream, length)
if upstream_arr.mask.any() or downstream_arr.mask.any():
raise ValueError("'upstream' and 'downstream' cannot be NA")
if np.any(upstream_arr < 0) or np.any(downstream_arr < 0):
raise ValueError("'upstream' and 'downstream' must be >= 0")
site_pos = starts if site == "TSS" else starts + widths - 1
new_starts = site_pos - upstream_arr.data
new_ends = site_pos + downstream_arr.data - 1
new_widths = new_ends - new_starts + 1
return new_starts, new_widths
[docs]
def calc_gap_and_overlap(start1: int, width1: int, start2: int, width2: int):
"""Calculate gap, overlap and relative position between two intervals."""
end1 = start1 + width1 - 1
end2 = start2 + width2 - 1
overlap_start = max(start1, start2)
overlap_end = min(end1, end2)
overlap = max(0, overlap_end - overlap_start + 1)
if end1 < start2:
# First interval precedes second
gap = start2 - end1 - 1
position = "start"
elif end2 < start1:
# First interval follows second
gap = start1 - end2 - 1
position = "end"
else:
# Intervals overlap
gap = -overlap
position = "overlap"
return gap, overlap, position
[docs]
def find_interval(x: np.ndarray, vec: np.ndarray) -> np.ndarray:
"""Python implementation of R's findInterval function.
Args:
x:
Values to find intervals for.
vec:
Sorted vector to find intervals in.
Returns:
NumPy array of indices indicating which interval each x value falls into.
"""
if len(vec) == 0:
return np.zeros(len(x), dtype=np.int32)
indices = np.searchsorted(vec, x, side="right") - 1
return indices