"""Structural feature enrichment helpers for ``ChromData``.
This module wraps existing structure callers and projects their interval
outputs back onto the spot axis. It is deliberately independent of
``uchrom.auto_discovery`` so browser, plotting, notebooks, and discovery can
share the same structural feature layer.
"""
from __future__ import annotations
from typing import Sequence
import numpy as np
import pandas as pd
from uchrom.fea.project import INTERVAL_COLUMNS, project_interval_features_to_spots, unique_spot_intervals
from uchrom.fea.registry import append_feature_registry_entry, table_provenance
STRUCTURAL_FEATURE_COLUMNS = (
"distance_to_tad_boundary",
"inside_tad",
"loop_anchor_overlap",
"loop_anchor_count",
"distance_to_loop_anchor",
"compartment_label",
"compartment_pc2",
)
[docs]
def call_tads_multi(
cdata,
*,
chroms: Sequence[str] | None = None,
params=None,
device: str = "auto",
result_key: str = "tads",
store: bool = True,
verbose: bool = False,
) -> pd.DataFrame:
"""Call TADs on multiple chromosomes and optionally store one merged table."""
from uchrom.strc.tad import call_tads_by_pval
rows = []
for chrom in _chroms(cdata, chroms):
df = call_tads_by_pval(
cdata,
chrom=chrom,
params=params,
device=device,
store=False,
verbose=verbose,
)
if not df.empty:
rows.append(df)
out = _concat_or_empty(rows, _empty_tads())
if store:
cdata.results[result_key] = out
return out
[docs]
def call_loops_multi(
cdata,
*,
chroms: Sequence[str] | None = None,
params=None,
device: str = "auto",
result_key: str = "loops",
store: bool = True,
verbose: bool = False,
) -> pd.DataFrame:
"""Call loops on multiple chromosomes and optionally store one merged table."""
from uchrom.strc.loop import call_loops_axiswise_f
rows = []
for chrom in _chroms(cdata, chroms):
df = call_loops_axiswise_f(
cdata,
chrom=chrom,
params=params,
device=device,
store=False,
verbose=verbose,
)
if not df.empty:
rows.append(df)
out = _concat_or_empty(rows, _empty_loops())
if store:
cdata.results[result_key] = out
return out
[docs]
def call_compartments_multi(
cdata,
*,
chroms: Sequence[str] | None = None,
params=None,
device: str = "auto",
result_key: str = "compartments",
store: bool = True,
verbose: bool = False,
) -> pd.DataFrame:
"""Call compartments on multiple chromosomes and store one merged table."""
from uchrom.strc.comp import call_compartments_axes_pc
rows = []
for chrom in _chroms(cdata, chroms):
df = call_compartments_axes_pc(
cdata,
chrom=chrom,
params=params,
device=device,
store=False,
verbose=verbose,
)
if not df.empty:
rows.append(df)
out = _concat_or_empty(rows, _empty_compartments())
if store:
cdata.results[result_key] = out
return out
[docs]
def call_structures_multi(
cdata,
*,
structures: Sequence[str] = ("tads", "loops", "compartments"),
chroms: Sequence[str] | None = None,
device: str = "auto",
store: bool = True,
verbose: bool = False,
) -> dict[str, pd.DataFrame]:
"""Run selected structural callers on multiple chromosomes."""
selected = {str(name) for name in structures}
unknown = selected - {"tads", "loops", "compartments"}
if unknown:
raise ValueError(f"unknown structural callers: {sorted(unknown)}")
out: dict[str, pd.DataFrame] = {}
if "tads" in selected:
out["tads"] = call_tads_multi(cdata, chroms=chroms, device=device, store=store, verbose=verbose)
if "loops" in selected:
out["loops"] = call_loops_multi(cdata, chroms=chroms, device=device, store=store, verbose=verbose)
if "compartments" in selected:
out["compartments"] = call_compartments_multi(
cdata,
chroms=chroms,
device=device,
store=store,
verbose=verbose,
)
return out
[docs]
def compute_structural_features(
intervals: pd.DataFrame,
*,
tads: pd.DataFrame | None = None,
loops: pd.DataFrame | None = None,
compartments: pd.DataFrame | None = None,
) -> pd.DataFrame:
"""Compute spot-bin structural features from interval result tables."""
out = _normalise_intervals(intervals)
if tads is not None:
distance, inside = _tad_features(out, tads)
out["distance_to_tad_boundary"] = distance
out["inside_tad"] = inside
if loops is not None:
overlap, count, distance = _loop_anchor_features(out, loops)
out["loop_anchor_overlap"] = overlap
out["loop_anchor_count"] = count
out["distance_to_loop_anchor"] = distance
if compartments is not None:
labels, pc2 = _compartment_features(out, compartments)
out["compartment_label"] = labels
out["compartment_pc2"] = pc2
return out
[docs]
def add_structural_features(
cdata,
*,
tads: pd.DataFrame | None = None,
loops: pd.DataFrame | None = None,
compartments: pd.DataFrame | None = None,
tads_key: str = "tads",
loops_key: str = "loops",
compartments_key: str = "compartments",
prefix: str = "strc",
result_key: str = "bin_features",
project: bool = True,
store: bool = True,
overwrite: bool = False,
) -> pd.DataFrame:
"""Project TAD/loop/compartment outputs into ``cdata.tracks``."""
tads = _result_or_arg(cdata, tads, tads_key)
loops = _result_or_arg(cdata, loops, loops_key)
compartments = _result_or_arg(cdata, compartments, compartments_key)
if tads is None and loops is None and compartments is None:
raise ValueError("no structural result tables were provided or found in cdata.results")
table = compute_structural_features(
unique_spot_intervals(cdata),
tads=tads,
loops=loops,
compartments=compartments,
)
value_columns = [c for c in STRUCTURAL_FEATURE_COLUMNS if c in table.columns]
if store:
cdata.results[result_key] = _merge_feature_table(
cdata.results.get(result_key),
table,
value_columns=value_columns,
overwrite=overwrite,
)
if project:
cdata.tracks = project_interval_features_to_spots(
cdata,
table,
prefix=prefix,
value_columns=value_columns,
into=cdata.tracks,
overwrite=overwrite,
)
append_feature_registry_entry(cdata, {
"feature_group": "structure",
"features": [_prefixed_name(c, prefix) for c in value_columns],
"result_key": result_key if store else None,
"source_results": [
key for key, value in (
(tads_key, tads),
(loops_key, loops),
(compartments_key, compartments),
)
if value is not None
],
"coordinate_convention": "0-based half-open",
"parameters": {
"projected_to_tracks": bool(project),
"track_prefix": prefix,
},
"outputs": table_provenance(table, value_columns=value_columns),
"created_by": "uchrom.strc.enrichment",
})
return table
def _tad_features(intervals: pd.DataFrame, tads: pd.DataFrame):
if tads.empty:
return np.full(len(intervals), np.nan), np.zeros(len(intervals), dtype=float)
tads = _normalise_intervals(tads)
distances = np.full(len(intervals), np.nan, dtype=float)
inside = np.zeros(len(intervals), dtype=float)
for chrom, idx in intervals.groupby("chrom").groups.items():
idx_array = idx.to_numpy()
sub = tads[tads["chrom"] == str(chrom)]
if sub.empty:
continue
boundaries = np.unique(np.concatenate([sub["start"].to_numpy(), sub["end"].to_numpy()]))
mids = _midpoints(intervals.loc[idx_array])
distances[idx_array] = np.min(np.abs(mids[:, None] - boundaries[None, :]), axis=1)
start = intervals.loc[idx_array, "start"].to_numpy()
end = intervals.loc[idx_array, "end"].to_numpy()
tad_start = sub["start"].to_numpy()
tad_end = sub["end"].to_numpy()
overlaps = (np.minimum(end[:, None], tad_end[None, :]) - np.maximum(start[:, None], tad_start[None, :])) > 0
inside[idx_array] = overlaps.any(axis=1).astype(float)
return distances, inside
def _loop_anchor_features(intervals: pd.DataFrame, loops: pd.DataFrame):
anchors = _loop_anchor_table(loops)
overlap = np.zeros(len(intervals), dtype=float)
count = np.zeros(len(intervals), dtype=float)
distance = np.full(len(intervals), np.nan, dtype=float)
if anchors.empty:
return overlap, count, distance
for chrom, idx in intervals.groupby("chrom").groups.items():
idx_array = idx.to_numpy()
sub = anchors[anchors["chrom"] == str(chrom)]
if sub.empty:
continue
start = intervals.loc[idx_array, "start"].to_numpy()
end = intervals.loc[idx_array, "end"].to_numpy()
anchor_start = sub["start"].to_numpy()
anchor_end = sub["end"].to_numpy()
bp = np.maximum(0, np.minimum(end[:, None], anchor_end[None, :]) - np.maximum(start[:, None], anchor_start[None, :]))
hit = bp > 0
count[idx_array] = hit.sum(axis=1).astype(float)
overlap[idx_array] = hit.any(axis=1).astype(float)
mids = _midpoints(intervals.loc[idx_array])
anchor_mids = _midpoints(sub)
distance[idx_array] = np.min(np.abs(mids[:, None] - anchor_mids[None, :]), axis=1)
return overlap, count, distance
def _compartment_features(intervals: pd.DataFrame, compartments: pd.DataFrame):
compartments = _normalise_intervals(compartments)
labels = np.full(len(intervals), "", dtype=object)
pc2 = np.full(len(intervals), np.nan, dtype=float)
if compartments.empty:
return labels, pc2
for chrom, idx in intervals.groupby("chrom").groups.items():
idx_array = idx.to_numpy()
sub = compartments[compartments["chrom"] == str(chrom)].reset_index(drop=True)
if sub.empty:
continue
start = intervals.loc[idx_array, "start"].to_numpy()
end = intervals.loc[idx_array, "end"].to_numpy()
comp_start = sub["start"].to_numpy()
comp_end = sub["end"].to_numpy()
overlaps = np.maximum(0, np.minimum(end[:, None], comp_end[None, :]) - np.maximum(start[:, None], comp_start[None, :]))
best = np.argmax(overlaps, axis=1)
ok = overlaps[np.arange(len(idx_array)), best] > 0
if "compartment" in sub.columns:
comp_labels = sub["compartment"].astype(str).to_numpy()
labels[idx_array[ok]] = comp_labels[best[ok]]
if "pc2" in sub.columns:
comp_pc2 = sub["pc2"].to_numpy(dtype=float)
pc2[idx_array[ok]] = comp_pc2[best[ok]]
return labels, pc2
def _loop_anchor_table(loops: pd.DataFrame) -> pd.DataFrame:
if loops is None or loops.empty:
return pd.DataFrame(columns=list(INTERVAL_COLUMNS))
required = {"chrom1", "start1", "end1", "chrom2", "start2", "end2"}
missing = required - set(loops.columns)
if missing:
raise ValueError(f"loop table missing columns: {sorted(missing)}")
left = loops.rename(columns={"chrom1": "chrom", "start1": "start", "end1": "end"})[
list(INTERVAL_COLUMNS)
]
right = loops.rename(columns={"chrom2": "chrom", "start2": "start", "end2": "end"})[
list(INTERVAL_COLUMNS)
]
anchors = pd.concat([left, right], ignore_index=True)
return _normalise_intervals(anchors).drop_duplicates(list(INTERVAL_COLUMNS)).reset_index(drop=True)
def _normalise_intervals(frame: pd.DataFrame) -> pd.DataFrame:
missing = [c for c in INTERVAL_COLUMNS if c not in frame.columns]
if missing:
raise ValueError(f"interval table missing columns: {missing}")
out = frame.loc[:, list(frame.columns)].copy()
out["chrom"] = out["chrom"].astype(str)
out["start"] = out["start"].astype(np.int64)
out["end"] = out["end"].astype(np.int64)
bad = out["end"] <= out["start"]
if bad.any():
rows = out.loc[bad, list(INTERVAL_COLUMNS)].head(3).to_dict("records")
raise ValueError(f"intervals must satisfy end > start: {rows}")
return out
def _merge_feature_table(existing, table: pd.DataFrame, *, value_columns: Sequence[str], overwrite: bool) -> pd.DataFrame:
if existing is None:
return table[list(INTERVAL_COLUMNS) + list(value_columns)].copy()
out = existing.copy()
conflicts = [c for c in value_columns if c in out.columns]
if conflicts and not overwrite:
raise ValueError(f"result table already contains columns: {conflicts}")
if conflicts:
out = out.drop(columns=conflicts)
return out.merge(table[list(INTERVAL_COLUMNS) + list(value_columns)], on=list(INTERVAL_COLUMNS), how="outer")
def _result_or_arg(cdata, value, key: str):
if value is not None:
return value
if key not in cdata.results:
return None
found = cdata.results.get(key)
return found if isinstance(found, pd.DataFrame) else None
def _chroms(cdata, chroms: Sequence[str] | None) -> list[str]:
return [str(c) for c in (chroms if chroms is not None else cdata.chroms)]
def _concat_or_empty(frames: list[pd.DataFrame], empty: pd.DataFrame) -> pd.DataFrame:
if not frames:
return empty
return pd.concat(frames, ignore_index=True)
def _midpoints(frame: pd.DataFrame) -> np.ndarray:
return (frame["start"].to_numpy(dtype=float) + frame["end"].to_numpy(dtype=float)) * 0.5
def _prefixed_name(name: str, prefix: str | None) -> str:
if not prefix:
return str(name)
return f"{str(prefix).rstrip('.')}.{name}"
def _empty_tads() -> pd.DataFrame:
return pd.DataFrame(columns=["chrom", "start", "end", "level", "score", "pval", "fdr"])
def _empty_loops() -> pd.DataFrame:
return pd.DataFrame(columns=[
"chrom1", "start1", "end1", "chrom2", "start2", "end2",
"score", "pval", "fdr", "cluster_size", "contact_freq", "summit_i", "summit_j",
])
def _empty_compartments() -> pd.DataFrame:
return pd.DataFrame(columns=["chrom", "start", "end", "bin_index", "cluster", "compartment", "pc2"])
__all__ = [
"STRUCTURAL_FEATURE_COLUMNS",
"add_structural_features",
"call_compartments_multi",
"call_loops_multi",
"call_structures_multi",
"call_tads_multi",
"compute_structural_features",
]