"""Distance-based aggregate statistics over a population of traces.
Input convention: a flat DataFrame with columns
``chrom, start, end, x, y, z, trace_id`` (what ``ChromData.to_dataframe()``
produces, or what the browser's :class:`ChromatinLayer.df` stores).
The core helper :func:`_bin_coord_cube` pivots the flat table into a
``(n_traces, n_bins, 3)`` array with NaN for missing spots, which lets
every aggregate statistic be computed as a straightforward NaN-aware
reduction.
"""
from __future__ import annotations
import numpy as np
import pandas as pd
def _bin_coord_cube(df: pd.DataFrame, chrom=None):
"""Pivot a flat spots+coords table into a per-trace coordinate cube.
Returns
-------
cube : ndarray, shape (n_traces, n_bins, 3)
Coordinates with NaN where a trace is missing a bin.
bin_ids : list of tuples
``(start, end)`` for each bin, sorted by ``start``.
trace_ids : list
Trace identifiers in cube axis-0 order.
"""
required = {"chrom", "start", "end", "x", "y", "z", "trace_id"}
missing = required - set(df.columns)
if missing:
raise ValueError(f"DataFrame missing columns: {missing}")
if chrom is not None:
df = df[df["chrom"].astype(str) == str(chrom)]
if df.empty:
raise ValueError("No rows after filtering; cannot compute cube.")
# Coerce start/end to int so dict-key lookups don't fall over float.
df = df.assign(
start=df["start"].astype(np.int64),
end=df["end"].astype(np.int64),
)
bins = (
df[["start", "end"]]
.drop_duplicates()
.sort_values("start", kind="mergesort")
.reset_index(drop=True)
)
bin_ids = [(int(s), int(e)) for s, e in zip(bins["start"], bins["end"])]
bin_to_idx = {k: i for i, k in enumerate(bin_ids)}
n_bins = len(bin_ids)
trace_ids = list(df["trace_id"].unique())
tid_to_idx = {t: i for i, t in enumerate(trace_ids)}
n_traces = len(trace_ids)
cube = np.full((n_traces, n_bins, 3), np.nan, dtype=np.float64)
ti = np.asarray(
[tid_to_idx[t] for t in df["trace_id"]], dtype=np.int64
)
bi = np.asarray(
[bin_to_idx[(int(s), int(e))] for s, e in zip(df["start"], df["end"])],
dtype=np.int64,
)
cube[ti, bi, 0] = df["x"].to_numpy()
cube[ti, bi, 1] = df["y"].to_numpy()
cube[ti, bi, 2] = df["z"].to_numpy()
return cube, bin_ids, trace_ids
def _pairwise_distance_per_trace(cube: np.ndarray) -> np.ndarray:
"""Return a (n_traces, n_bins, n_bins) Euclidean distance tensor.
Missing spots propagate as NaN distances.
"""
diff = cube[:, :, None, :] - cube[:, None, :, :]
return np.sqrt(np.sum(diff ** 2, axis=-1))
[docs]
def mean_distance_matrix(
df: pd.DataFrame,
chrom=None,
reduce: str = "median",
):
"""Population-level mean/median pairwise distance matrix.
For each pair of genomic bins ``(i, j)``, the distance is computed
per-trace and then reduced across traces with ``np.nanmedian`` (the
Bintu 2018 convention) or ``np.nanmean``.
Parameters
----------
df : DataFrame with spots + coords.
chrom : optional chromosome filter.
reduce : 'median' (default) or 'mean'.
Returns
-------
matrix : ndarray (n_bins, n_bins)
bin_ids : list of (start, end)
n_traces : int
"""
cube, bin_ids, trace_ids = _bin_coord_cube(df, chrom=chrom)
dist = _pairwise_distance_per_trace(cube)
if reduce == "mean":
agg = np.nanmean(dist, axis=0)
else:
agg = np.nanmedian(dist, axis=0)
return agg, bin_ids, len(trace_ids)
[docs]
def radius_of_gyration(df: pd.DataFrame, chrom=None) -> pd.Series:
"""Per-trace radius of gyration.
``Rg = sqrt(mean over spots of ||r - centroid||²)``. Traces with
fewer than 2 spots contribute NaN.
"""
if chrom is not None:
df = df[df["chrom"].astype(str) == str(chrom)]
rgs = {}
for tid, sdf in df.groupby("trace_id", observed=True):
pos = sdf[["x", "y", "z"]].to_numpy()
if pos.shape[0] < 2:
rgs[tid] = np.nan
continue
centroid = pos.mean(axis=0)
rgs[tid] = float(np.sqrt(np.mean(np.sum((pos - centroid) ** 2, axis=1))))
return pd.Series(rgs, name="rg").sort_index()
__all__ = [
"mean_distance_matrix",
"contact_frequency",
"radius_of_gyration",
]