Source code for uchrom.recon.fish._hic

"""Hi-C helpers for GEM-FISH.

- :func:`load_contact_matrix` pulls a chromosome-restricted, square
  contact matrix from a ``.cool`` / ``.mcool`` file at the requested
  resolution, returning ``(matrix, bin_df)`` with ``bin_df`` carrying
  ``(chrom, start, end)`` for each row / column.
- :func:`aggregate_over_tads` collapses a bin-resolution matrix into a
  TAD-resolution matrix given a list of ``(start, end)`` TAD windows.
- :func:`contact_to_distance` applies Abbas et al. 2019 Eqn. 10 to
  convert inter-TAD contact counts to initial distance estimates used
  by the Stage-3 assembly step.
"""

from __future__ import annotations

from typing import List, Optional, Tuple

import numpy as np
import pandas as pd


[docs] def load_contact_matrix( path: str, chrom: str, resolution: Optional[int] = None, normalize: bool = True, ) -> Tuple[np.ndarray, pd.DataFrame]: """Load a single-chromosome contact matrix from ``.cool`` / ``.mcool``. Parameters ---------- path : str Path to a ``.cool`` (single resolution) or ``.mcool`` file. For ``.mcool``, pass ``resolution`` to pick the resolution group; the function prepends ``::resolutions/{resolution}`` as needed. chrom : str Chromosome name as stored in the cooler. resolution : int, optional Required for ``.mcool`` files; ignored for ``.cool`` files that already fix the resolution. normalize : bool Apply cooler's weight column (KR / ICE as stored) when available. Falls back to raw counts if no weights are present. Returns ------- matrix : ndarray (n_bins, n_bins), float64 bin_df : DataFrame with ``chrom, start, end`` per row. """ try: from cooler import Cooler except ImportError as exc: # pragma: no cover raise ImportError( "cooler is required to read .cool/.mcool files" ) from exc if path.endswith(".mcool"): if resolution is None: raise ValueError("resolution must be provided for .mcool files") uri = f"{path}::/resolutions/{int(resolution)}" else: uri = path c = Cooler(uri) if chrom not in c.chromnames: raise ValueError( f"chromosome '{chrom}' not found in {uri}; " f"available: {list(c.chromnames)[:10]}…" ) mat = c.matrix(balance=normalize).fetch(chrom) mat = np.asarray(mat, dtype=np.float64) mat[~np.isfinite(mat)] = 0.0 bins = c.bins().fetch(chrom)[["chrom", "start", "end"]].reset_index(drop=True) return mat, bins
[docs] def aggregate_over_tads( matrix: np.ndarray, bin_df: pd.DataFrame, tads: pd.DataFrame, ) -> Tuple[np.ndarray, List[Tuple[int, int]]]: """Collapse a bin-resolution matrix into a TAD-resolution matrix. Parameters ---------- matrix : ndarray (n_bins, n_bins) Row / column ordering matches ``bin_df``. bin_df : DataFrame with ``chrom, start, end``. tads : DataFrame with ``start, end`` (bp). Assumed sorted by start and on the same chromosome as the matrix. Returns ------- tad_matrix : ndarray (n_tads, n_tads) Sum of bin-level contacts over each ``(TAD_i, TAD_j)`` window. tad_bin_indices : list of (start_idx, end_idx) into ``bin_df`` Bin-index range covered by each TAD (end exclusive). """ starts = bin_df["start"].to_numpy() ends = bin_df["end"].to_numpy() tad_bins = [] for _, row in tads.iterrows(): lo = int(row["start"]) hi = int(row["end"]) s_idx = int(np.searchsorted(ends, lo + 1, side="left")) e_idx = int(np.searchsorted(starts, hi, side="left")) if e_idx <= s_idx: e_idx = s_idx + 1 # keep at least one bin tad_bins.append((s_idx, e_idx)) n_tads = len(tad_bins) agg = np.zeros((n_tads, n_tads), dtype=np.float64) for i, (si, ei) in enumerate(tad_bins): for j, (sj, ej) in enumerate(tad_bins): agg[i, j] = matrix[si:ei, sj:ej].sum() return agg, tad_bins
[docs] def contact_to_distance( contacts: np.ndarray, genomic_distances: Optional[np.ndarray] = None, alpha: float = 0.25, fallback_c: float = 1.0, fallback_beta: float = 0.5, ) -> np.ndarray: """Abbas 2019 Eqn. 10 — convert contact counts to initial distances. ``d_ij = f_ij^{-α}`` when ``f_ij > 0``. For zero-contact pairs the paper falls back to a genomic-distance-based estimate ``d_ij = c · g_ij^{β}`` where ``g_ij`` is the genomic separation in bp. We use the two fallback parameters in the same sense. Parameters ---------- contacts : ndarray (N, N) genomic_distances : ndarray (N, N), optional Absolute genomic separation of each TAD pair in bp. Must be supplied if any zero contacts need the fallback estimate. alpha : float Power-law exponent; 0.25 from Wang 2016 (empirical for IMR90). fallback_c, fallback_beta : float Parameters of the power-law fallback for zero-contact pairs. Returns ------- d : ndarray (N, N) Symmetric, non-negative. Diagonal is 0. """ f = np.asarray(contacts, dtype=np.float64) n = f.shape[0] d = np.zeros_like(f) mask_pos = f > 0 with np.errstate(invalid="ignore", divide="ignore"): d[mask_pos] = f[mask_pos] ** (-alpha) if (~mask_pos).any(): if genomic_distances is None: # Best we can do: leave at 0 and let the caller handle. pass else: g = np.asarray(genomic_distances, dtype=np.float64) fallback = fallback_c * np.power( np.maximum(g, 1.0), fallback_beta, ) d[~mask_pos] = fallback[~mask_pos] np.fill_diagonal(d, 0.0) # Symmetrise defensively d = 0.5 * (d + d.T) return d
__all__ = [ "load_contact_matrix", "aggregate_over_tads", "contact_to_distance", ]