from typing import List, Optional, Sequence, Tuple, Union
import numpy
import tiledb
from delayedarray import (
DelayedArray,
SparseNdarray,
chunk_grid,
chunk_shape_to_grid,
extract_dense_array,
extract_sparse_array,
is_masked,
is_sparse,
wrap,
)
__author__ = "jkanche"
__copyright__ = "jkanche"
__license__ = "MIT"
[docs]
class TileDbArraySeed:
"""TileDB-backed dataset as a ``DelayedArray`` array seed."""
def __init__(self, path: str, attribute_name: str) -> None:
"""
Args:
path:
Path or URI to the TileDB file.
name:
Attribute name inside the TileDB file that contains the array.
"""
self._path = path
self._attribute_name = attribute_name
_schema = tiledb.ArraySchema.load(self._path)
self._is_sparse = _schema.sparse
self._shape = _schema.shape
_all_attr = []
for i in range(_schema.nattr):
_all_attr.append(_schema.attr(i).name)
if self._attribute_name not in _all_attr:
raise ValueError(
f"Attribute '{self._attribute_name}' not in the tiledb schema."
)
_attr_schema = _schema.attr(self._attribute_name)
self._dtype = _attr_schema.dtype
_all_dimnames = []
_all_dimnames_tile = []
for i in range(_schema.domain.ndim):
_dim = _schema.domain.dim(i)
_all_dimnames.append(_dim.name)
_all_dimnames_tile.append(_dim.tile)
self._dimnames = _all_dimnames
self._tiles = _all_dimnames_tile
@property
def dtype(self) -> numpy.dtype:
"""
Returns:
NumPy type of this array.
"""
return self._dtype
@property
def shape(self) -> Tuple[int, ...]:
"""
Returns:
Tuple containing the dimensions of this array.
"""
return self._shape
@property
def path(self) -> str:
"""
Returns:
Path to the HDF5 file.
"""
return self._path
@property
def attribute_name(self) -> str:
"""
Returns:
Attribute name inside the TileDB file that contains the array.
"""
return self._attribute_name
@property
def is_sparse(self) -> bool:
"""
Returns:
Whether the Array is sparse.
"""
return self._is_sparse
@property
def dimnames(self) -> List[str]:
"""
Returns:
Names of each dimension of the matrix.
"""
return self._dimnames
[docs]
@chunk_grid.register
def chunk_grid_TileDbArraySeed(x: TileDbArraySeed):
"""See :py:meth:`~delayedarray.chunk_grid.chunk_grid`.
The cost factor is set to 20 to reflect the computational work involved in extracting data from disk.
"""
return chunk_shape_to_grid(x._tiles, x._shape, cost_factor=20)
[docs]
@is_sparse.register
def is_sparse_TileDbArraySeed(x: TileDbArraySeed):
"""See :py:meth:`~delayedarray.is_sparse.is_sparse`."""
return x.is_sparse
[docs]
@is_masked.register
def is_masked_TileDbArraySeed(x: TileDbArraySeed):
"""See :py:meth:`~delayedarray.is_masked.is_masked`."""
return False
def _sanitize_subset(subset, dimlength):
if isinstance(subset, slice):
if subset == slice(None):
subset = slice(dimlength)
subset = list(range(*subset.indices(dimlength)))
elif isinstance(subset, range):
subset = list(subset)
return sorted(subset)
def _extract_array(x: TileDbArraySeed, subset: Tuple[Sequence[int], ...]):
"""Extract slices from a TileDB Array."""
_parsed_subset = []
_first_subset = _sanitize_subset(subset[0], x._shape[0])
_parsed_subset.append(_first_subset)
if len(subset) > 1:
_second_subset = _sanitize_subset(subset[1], x._shape[1])
_parsed_subset.append(_second_subset)
else:
_second_subset = _sanitize_subset(slice(x._shape[1]), x._shape[1])
with tiledb.open(x._path, "r") as mat:
_data = mat.multi_index[tuple(_parsed_subset)]
if x.is_sparse is True:
return (len(_first_subset), len(_second_subset)), (
_data[x._dimnames[0]],
_data[x._dimnames[1]],
_data[x._attribute_name],
_parsed_subset,
)
return (len(_first_subset), len(_second_subset)), numpy.array(
_data[x._attribute_name]
)
def _SparseNdarray_contents_from_coordinates(
rows, cols, vals, shape, val_dtype, parsed_subset
):
output = [None] * shape[-1]
for i, val in enumerate(vals):
_offset_col = parsed_subset[1].index(cols[i])
_offset_row = parsed_subset[0].index(rows[i])
if output[_offset_col] is None:
output[_offset_col] = [
numpy.array([], dtype=numpy.int32),
numpy.array([], dtype=val_dtype),
]
output[_offset_col][0] = numpy.append(output[_offset_col][0], _offset_row)
output[_offset_col][1] = numpy.append(output[_offset_col][1], val)
for i, o in enumerate(output):
if o is not None:
_idx_order = numpy.argsort(o[0])
_indices = o[0][_idx_order].astype(numpy.int32)
_vals = o[1][_idx_order]
output[i] = (_indices, _vals)
if all([x is None for x in output]):
output = None
return output
[docs]
class TileDbArray(DelayedArray):
"""Sparse or Dense arrays from TileDB file as a ``DelayedArray``.
This subclass allows developers to implement custom methods for tiledb-backed sparse or dense matrices.
"""
def __init__(
self,
path: Union[str, TileDbArraySeed],
attribute_name: Optional[str],
):
"""To construct a ``TileDbArray`` from an existing :py:class:`~TileDbArraySeed`, use
:py:meth:`~delayedarray.wrap.wrap` instead.
Args:
path:
Path to the TileDB file or a :class:`~TileDbArraySeed` object.
attribute_name:
Name of the attribute containing the array.
"""
if isinstance(path, TileDbArraySeed):
seed = path
else:
if attribute_name is None:
raise ValueError("'attribute_name' cannot be 'None'.")
seed = TileDbArraySeed(path, attribute_name)
super(TileDbArray, self).__init__(seed)
@property
def path(self) -> str:
"""
Returns:
Path to the TileDB file.
"""
return self.seed.path
@property
def attribute_name(self) -> Optional[str]:
"""
Returns:
Name of the TileDB attribute containing the matrix contents.
"""
return self.seed.attribute_name
[docs]
@wrap.register
def wrap_Hdf5CompressedSparseMatrixSeed(x: TileDbArraySeed):
"""See :py:meth:`~delayedarray.wrap.wrap`."""
return TileDbArray(x, None)