"""ChromData — core data container for U-Chrom.
Purpose
-------
``ChromData`` is the project-wide interface for chromatin 3D structure
data. It unifies the two kinds of input U-Chrom consumes —
* **sequencing-based** (Hi-C, Dip-C) → produced by
:mod:`uchrom.recon` reconstruction
* **imaging-based** (ORCA, MERFISH, DNA seqFISH+) → produced by
:mod:`uchrom.im` + chromatin tracing
— into a single flat "structure table" (*genomic bins → 3-D coordinates*)
plus hierarchical metadata for cells, traces, and population-level
aggregate results. Every downstream analysis module (``uchrom.strc``,
``uchrom.fea``, ``uchrom.emb``, ``uchrom.pl``, ``uchrom.browser``)
consumes or produces ``ChromData`` — the format is the contract that
decouples tools along the pipeline.
Conceptually it is the chromatin-structure analogue of
`AnnData <https://anndata.readthedocs.io/>`_ for scRNA-seq. The main
difference is an extra hierarchical axis: AnnData has
``observations × variables``; ChromData has
``Cell → Trace → Spot`` with coordinates in 3-D.
Hierarchy
---------
Cell → Trace → Spot::
* Spot — one genomic bin observed in one trace, with an (x, y, z).
* Trace — an ordered polymer of spots along a chromatin fibre. In
a diploid cell one chromosome contributes two traces
(maternal + paternal, when haplotype-resolved).
* Cell — a physical cell that contains one or more traces.
Attributes summary
------------------
coords (n_spots, 3) ndarray x, y, z per spot
spots DataFrame (n_spots, ≥4) chrom, start, end, trace_id,
[cell_id, spot_id, ...]
cells DataFrame (n_cells, ?) cell_type, position, ...
cellm dict[str, ndarray] per-cell multi-dim
(embedding, UMAP, ...)
tracks DataFrame (n_spots, n_track) epigenomic signals aligned
to spots (ATAC, ChIP, ...)
traces DataFrame (n_traces, ?) trace-level metadata
layers dict[str, (n_spots, 3)] alt. coordinate sets
(raw / corrected / aligned)
results dict analysis outputs
(loops, tads, compartments)
uns dict unstructured metadata
(genome_assembly, xyz_unit,
fofct_header, ...)
See :attr:`_SPOTS_REQUIRED_COLUMNS` below for the contract on ``spots``.
Spot required columns
---------------------
chrom str (stored as ``pd.Categorical``)
start int, 0-based BED-style start
end int, non-inclusive end
trace_id int or str (stored as ``pd.Categorical``)
Optional but conventional columns:
cell_id int or str (categorical)
spot_id int or str (globally unique; FOF-CT Spot_ID)
sub_cell_roi_id see FOF-CT
extra_cell_roi_id see FOF-CT
Any extra column is preserved verbatim (e.g. ``Readout`` in Takei / Hu
FOF-CT files). String-heavy columns are auto-converted to
``pd.Categorical`` in ``__init__`` for ~10× memory savings on large
datasets.
FOF-CT compatibility
--------------------
:meth:`ChromData.from_fofct` parses a `4DN FOF-CT core table
<https://fish-omics-format.readthedocs.io/>`_:
FOF-CT field → ChromData location
---------------------- ---------------------------------
Spot_ID spots['spot_id']
Trace_ID spots['trace_id']
X, Y, Z coords[:, 0], coords[:, 1], coords[:, 2]
Chrom spots['chrom']
Chrom_Start spots['start']
Chrom_End spots['end']
Cell_ID spots['cell_id']
Sub_Cell_ROI_ID spots['sub_cell_roi_id']
Extra_Cell_ROI_ID spots['extra_cell_roi_id']
Header fields (``##Genome_Assembly``, ``##XYZ_Unit``, ``#Lab_Name``,
``#Software_*``, ...) are preserved verbatim in
``uns['fofct_header']``; the two standard keys ``genome_assembly`` and
``xyz_unit`` are also promoted to top-level ``uns`` entries. The parser
tolerates several real-world FOF-CT quirks (case-insensitive
``##columns=``, header lines wrapped in double quotes with trailing
commas, a CSV column header line after ``##columns=``, etc).
On-disk format — ``.h5cd``
--------------------------
HDF5 with two root-group attributes:
uchrom_format_version "MAJOR.MINOR" contract of the file layout
uchrom_version uchrom package version that wrote the file
Semantics:
* **Same MAJOR** — readable. Higher MINOR triggers a warning but
unknown fields are ignored (forward-compatible additive changes).
* **Different MAJOR** — :exc:`ValueError` on read, with guidance to
upgrade uchrom or convert the file.
* **Missing attribute** (pre-versioning files) — warn and assume
legacy 1.0 layout.
Layout::
data.h5cd
├── @uchrom_format_version = "1.0"
├── @uchrom_version = "0.2.0"
├── coords (n_spots, 3) float64
├── spots/ group — one dataset per DataFrame column
├── cells/ group
├── tracks/ group
├── traces/ group
├── cellm/ group of (n_cells, ...) ndarrays per key
├── layers/ group of (n_spots, 3) ndarrays per key
├── results/ group — DataFrames as sub-groups,
│ arrays as datasets
└── uns/ nested group — scalars as attrs,
dicts as sub-groups
New MAJOR versions should add a ``_read_vN`` function at the bottom of
this module and extend the dispatch in :meth:`ChromData.read`.
Design decisions (why this shape)
---------------------------------
* **Flat storage + hierarchical access.** ``spots`` holds every
observation in one DataFrame; ``get_cell`` / ``get_trace`` /
``get_chrom`` return new ``ChromData`` instances. Mutation is
non-destructive.
* **No global pairwise matrix.** Pairwise distances / contacts are
biologically meaningful per-trace, not globally across cells, and
would be O(n²) to store. Use
:meth:`ChromData.compute_distances` on demand.
* **Three "core tables"** distinguishing *where the data comes from*
drives the module layout:
* ``core`` (coords + spots) — *what* (3-D structure)
* ``cell`` (cells + cellm) — *who* (cell identity / embedding)
* ``tracks`` — *what else per bin*
(ATAC / ChIP signal)
* **Categorical dtypes** on string columns save ~10× memory without
changing behaviour.
* **Versioned on-disk format** lets future changes break cleanly —
downstream tools can refuse unknown MAJOR versions rather than
silently deserialise wrong data.
See also :mod:`uchrom.core.spec` for a short human-oriented description
of the format, and ``docs/source/guide/chromdata.md`` for tutorial-style
material.
"""
from __future__ import annotations
from copy import deepcopy
from pathlib import Path
from typing import Union, Optional, Dict, List
import numpy as np
import pandas as pd
PathLike = Union[str, Path]
_SPOTS_REQUIRED_COLUMNS = ("chrom", "start", "end", "trace_id")
_CATEGORICAL_COLUMNS = ("chrom", "trace_id", "cell_id")
# ``.h5cd`` on-disk format version. Semantics (MAJOR.MINOR):
# MAJOR bump — breaking change: a reader for the old MAJOR cannot read the
# new file without running a migration.
# MINOR bump — additive/backwards-compatible change (e.g. a new optional
# group or attribute). Older readers should ignore unknown
# fields and keep working.
#
# 1.1: per-DataFrame ``_index`` dataset + ``_index_name`` attr. Files with
# a non-trivial DataFrame index (e.g. ``cells`` keyed by ``cell_id``)
# previously lost that index on round-trip; 1.1 preserves it. 1.0
# readers ignore the extra dataset and recover the old behaviour.
FORMAT_VERSION = "1.1"
_SUPPORTED_MAJOR = 1
[docs]
class ChromData:
"""Chromatin Data — the core container for U-Chrom.
See the module docstring of :mod:`uchrom.core.cdata` for the full
purpose, hierarchy, FOF-CT mapping, and on-disk format contract.
Summary:
* The central abstraction is a "structure table" — genomic bins
mapped to 3D coordinates. Each row of ``spots`` (with the
corresponding row of ``coords``) is one **Spot**.
* Spots are grouped hierarchically as **Cell → Trace → Spot**. A
Trace is an ordered chromatin-fibre polymer; a Cell contains one
or more traces.
* All analysis in U-Chrom consumes or produces ``ChromData``.
Reconstruction modules (``uchrom.recon``) emit it, structure
callers (``uchrom.strc``) decorate ``cd.results[...]`` with TADs /
loops / compartments, and the browser / plotters render it.
Parameters
----------
coords : ndarray, shape (n_spots, 3)
3-D coordinates (x, y, z) per spot.
spots : DataFrame, shape (n_spots, ≥4)
Per-spot metadata. **Required columns:**
``chrom`` (str, will be categorified),
``start`` (int, 0-based BED-style),
``end`` (int, non-inclusive),
``trace_id`` (int or str, will be categorified).
Optional: ``cell_id`` (int or str), ``spot_id``, FOF-CT
``sub_cell_roi_id`` / ``extra_cell_roi_id``, and any
experiment-specific annotation column (carried through
verbatim).
cells : DataFrame, optional
Per-cell metadata indexed by cell_id.
cellm : dict[str, ndarray], optional
Per-cell multi-dimensional annotations (embeddings, UMAP, …).
Each array's first axis length = ``n_cells``.
tracks : DataFrame, optional
Epigenomic signals (ATAC, ChIP-seq, …) row-aligned to ``spots``.
Length must equal ``n_spots``.
traces : DataFrame, optional
Per-trace metadata indexed by trace_id.
layers : dict[str, ndarray], optional
Alternative coordinate sets, each with shape ``(n_spots, 3)`` —
e.g. raw / drift-corrected / aligned.
results : dict, optional
Analysis outputs. Conventional keys:
``'loops'`` → DataFrame (chrom1, start1, end1, …),
``'tads'`` → DataFrame (chrom, start, end, …),
``'compartments'`` → ndarray or DataFrame per bin.
uns : dict, optional
Unstructured metadata preserved on disk. Conventional keys:
``'genome_assembly'``, ``'xyz_unit'``, ``'fofct_header'``.
validate : bool
If True (default), validate internal consistency on
construction (coords shape, spots required columns, tracks /
layers alignment).
Attributes
----------
n_spots, n_traces, n_cells, chroms : derived accessors
Key methods
-----------
from_dataframe, from_fofct, read, write, to_dataframe,
get_cell, get_trace, get_chrom, compute_distances
On-disk format — ``.h5cd``
--------------------------
Versioned HDF5. See :mod:`uchrom.core.cdata` module docstring for
the full layout and the :meth:`read` / :meth:`write` round-trip
contract.
Notes
-----
* Subsetting (``cd[mask]``, ``get_chrom`` etc.) always returns a new
``ChromData``; the source is not mutated.
* Global pairwise distance matrices are intentionally not stored —
they are biologically meaningful per-trace, not across cells, and
would be O(n²) memory. Compute on demand via
:meth:`compute_distances(trace_id=...)`.
* String columns (``chrom``, ``trace_id``, ``cell_id``) are
auto-converted to ``pd.Categorical`` for ~10× memory savings.
"""
def __init__(
self,
coords: np.ndarray,
spots: pd.DataFrame,
*,
cells: Optional[pd.DataFrame] = None,
cellm: Optional[Dict[str, np.ndarray]] = None,
tracks: Optional[pd.DataFrame] = None,
traces: Optional[pd.DataFrame] = None,
layers: Optional[Dict[str, np.ndarray]] = None,
results: Optional[dict] = None,
uns: Optional[dict] = None,
linked_adata=None,
validate: bool = True,
):
self.coords = np.asarray(coords, dtype=np.float64)
self.spots = spots.reset_index(drop=True)
self.cells = cells if cells is not None else pd.DataFrame()
self.cellm = cellm if cellm is not None else {}
self.tracks = tracks
self.traces = traces if traces is not None else pd.DataFrame()
self.layers = layers if layers is not None else {}
self.results = results if results is not None else {}
self.uns = uns if uns is not None else {}
self._linked_adata = linked_adata
# Convert string-heavy columns to categorical for memory efficiency
self._categorify()
if validate:
self._validate()
# ------------------------------------------------------------------
# Properties
# ------------------------------------------------------------------
@property
def n_spots(self) -> int:
return self.coords.shape[0]
@property
def n_traces(self) -> int:
return self.spots["trace_id"].nunique()
@property
def n_cells(self) -> int:
if "cell_id" in self.spots.columns:
return self.spots["cell_id"].nunique()
return len(self.cells) if len(self.cells) > 0 else 0
@property
def chroms(self) -> List[str]:
return list(self.spots["chrom"].cat.categories
if hasattr(self.spots["chrom"], "cat")
else self.spots["chrom"].unique())
@property
def linked_adata(self):
"""Linked AnnData object, loaded lazily from ``uns`` metadata if possible."""
if self._linked_adata is None:
path = self.uns.get("linked_anndata", {}).get("path")
if path:
path = Path(path)
if path.exists():
import anndata as ad
self._linked_adata = ad.read_h5ad(path)
return self._linked_adata
@linked_adata.setter
def linked_adata(self, value) -> None:
self._linked_adata = value
[docs]
def load_linked_anndata(self, path: Optional[PathLike] = None):
"""Load, cache, and return the linked AnnData object."""
if path is None:
path = self.uns.get("linked_anndata", {}).get("path")
if path is None:
raise ValueError("No linked AnnData path is recorded in cd.uns['linked_anndata'].")
import anndata as ad
self._linked_adata = ad.read_h5ad(path)
return self._linked_adata
@property
def discovery_schema(self) -> dict:
"""Agent-readable auto-discovery schema for this ``ChromData``.
If a schema is stored in ``uns['auto_discovery_schema']`` it is
parsed and returned. Otherwise a fresh in-memory schema is built
without mutating ``uns``.
"""
from uchrom.auto_discovery.schema import (
DISCOVERY_SCHEMA_KEY,
build_discovery_schema,
unpack_schema,
)
raw = self.uns.get(DISCOVERY_SCHEMA_KEY)
if raw is not None:
return unpack_schema(raw)
return build_discovery_schema(self)
@property
def auto_discovery_schema(self) -> dict:
"""Alias for :attr:`discovery_schema`."""
return self.discovery_schema
[docs]
def build_discovery_schema(self, *, store: bool = True, **kwargs) -> dict:
"""Build the auto-discovery schema, optionally storing it in ``uns``.
The stored representation is an HDF5-friendly JSON payload under
``uns['auto_discovery_schema']``, so it round-trips with ``.h5cd``.
"""
from uchrom.auto_discovery.schema import (
DISCOVERY_SCHEMA_KEY,
build_discovery_schema,
pack_schema,
)
schema = build_discovery_schema(self, **kwargs)
if store:
self.uns[DISCOVERY_SCHEMA_KEY] = pack_schema(schema)
return schema
[docs]
def update_discovery_schema(self, schema: Optional[dict] = None, **kwargs) -> dict:
"""Store a supplied or newly built auto-discovery schema in ``uns``."""
from uchrom.auto_discovery.schema import DISCOVERY_SCHEMA_KEY, pack_schema
if schema is None:
return self.build_discovery_schema(store=True, **kwargs)
self.uns[DISCOVERY_SCHEMA_KEY] = pack_schema(schema)
return schema
[docs]
def validate_discovery_schema(
self,
schema: Optional[dict] = None,
*,
raise_on_error: bool = False,
) -> List[str]:
"""Validate a discovery schema against this ``ChromData``.
Returns a list of issues. If ``raise_on_error=True``, raises
``ValueError`` when any issue is found.
"""
from uchrom.auto_discovery.schema import validate_discovery_schema
issues = validate_discovery_schema(schema or self.discovery_schema, cdata=self)
if issues and raise_on_error:
raise ValueError("; ".join(issues))
return issues
[docs]
def describe_for_agent(self, *, max_items: int = 40) -> str:
"""Return a compact prompt-ready description of available data."""
from uchrom.auto_discovery.schema import schema_to_agent_context
return schema_to_agent_context(self.discovery_schema, max_items=max_items)
# ------------------------------------------------------------------
# Validation
# ------------------------------------------------------------------
def _validate(self):
if self.coords.ndim != 2 or self.coords.shape[1] != 3:
raise ValueError(
f"coords must have shape (n, 3), got {self.coords.shape}"
)
if self.coords.shape[0] != len(self.spots):
raise ValueError(
f"coords rows ({self.coords.shape[0]}) != spots rows ({len(self.spots)})"
)
for col in _SPOTS_REQUIRED_COLUMNS:
if col not in self.spots.columns:
raise ValueError(f"spots missing required column: '{col}'")
if self.tracks is not None and len(self.tracks) != self.n_spots:
raise ValueError(
f"tracks rows ({len(self.tracks)}) != n_spots ({self.n_spots})"
)
cell_axis_len = len(self.cells) if len(self.cells) > 0 else self.n_cells
for key, arr in self.cellm.items():
arr = np.asarray(arr)
if arr.ndim == 0:
raise ValueError(f"cellm['{key}'] must have at least one dimension")
if cell_axis_len > 0 and arr.shape[0] != cell_axis_len:
raise ValueError(
f"cellm['{key}'] rows ({arr.shape[0]}) != n_cells ({cell_axis_len})"
)
for key, arr in self.layers.items():
if arr.shape != self.coords.shape:
raise ValueError(
f"layers['{key}'] shape {arr.shape} != coords shape {self.coords.shape}"
)
def _categorify(self):
for col in _CATEGORICAL_COLUMNS:
if col in self.spots.columns and not hasattr(self.spots[col].dtype, "categories"):
self.spots[col] = self.spots[col].astype("category")
# ------------------------------------------------------------------
# Subsetting
# ------------------------------------------------------------------
def __getitem__(self, index) -> "ChromData":
if isinstance(index, (pd.Series, np.ndarray)):
if index.dtype == bool:
index = np.where(index)[0]
idx = np.asarray(index)
new_coords = self.coords[idx]
new_spots = self.spots.iloc[idx].reset_index(drop=True)
new_tracks = self.tracks.iloc[idx].reset_index(drop=True) if self.tracks is not None else None
new_layers = {k: v[idx] for k, v in self.layers.items()}
# Filter traces/cells to those still referenced
new_traces = self._filter_traces(new_spots)
new_cells, new_cellm = self._filter_cells(new_spots)
return ChromData(
new_coords,
new_spots,
cells=new_cells,
cellm=new_cellm,
tracks=new_tracks,
traces=new_traces,
layers=new_layers,
results=self.results,
uns=self.uns,
linked_adata=None,
validate=False,
)
def _filter_traces(self, spots: pd.DataFrame) -> pd.DataFrame:
if len(self.traces) == 0:
return self.traces
keep = spots["trace_id"].unique()
return self.traces[self.traces.index.isin(keep)]
def _filter_cells(self, spots: pd.DataFrame):
if len(self.cells) == 0:
return self.cells, self.cellm
if "cell_id" not in spots.columns:
return self.cells, self.cellm
keep = {str(x) for x in spots["cell_id"].unique()}
axis_ids = self._cells_axis_ids()
if axis_ids is None:
return self.cells, self.cellm
mask = axis_ids.isin(keep)
new_cells = self.cells[mask]
new_cellm = {}
for k, v in self.cellm.items():
new_cellm[k] = v[mask.values] if hasattr(mask, "values") else v[mask]
return new_cells, new_cellm
def _spot_cell_ids(self) -> pd.Index:
if "cell_id" not in self.spots.columns:
return pd.Index([], name="cell_id")
return pd.Index([str(x) for x in self.spots["cell_id"].unique()], name="cell_id")
def _cells_axis_ids(self) -> Optional[pd.Index]:
"""Return the cell ids that define rows of cells/cellm, if known."""
if len(self.cells) == 0:
return None
spot_ids = set(self._spot_cell_ids())
candidates = []
if not isinstance(self.cells.index, pd.RangeIndex) or self.cells.index.name is not None:
candidates.append(self.cells.index)
for col in ("cell_id", "cell_name"):
if col in self.cells.columns:
candidates.append(self.cells[col])
for values in candidates:
ids = pd.Index([str(x) for x in values], name="cell_id")
if not spot_ids or spot_ids.intersection(set(ids)):
return ids
if len(self.cells) == len(self._spot_cell_ids()):
return self._spot_cell_ids()
return pd.Index([str(x) for x in self.cells.index], name="cell_id")
[docs]
def get_cell(self, cell_id) -> "ChromData":
if "cell_id" not in self.spots.columns:
raise KeyError("spots has no 'cell_id' column")
mask = self.spots["cell_id"] == cell_id
return self[mask.values]
[docs]
def get_trace(self, trace_id) -> "ChromData":
mask = self.spots["trace_id"] == trace_id
return self[mask.values]
[docs]
def get_chrom(self, chrom: str) -> "ChromData":
mask = self.spots["chrom"] == chrom
return self[mask.values]
# ------------------------------------------------------------------
# Pairwise computation (on-demand, per-trace)
# ------------------------------------------------------------------
[docs]
def compute_distances(self, trace_id=None) -> np.ndarray:
"""Compute pairwise Euclidean distance matrix.
Parameters
----------
trace_id : optional
If given, compute only for spots in that trace.
If None, compute for all spots (use with caution on large data).
Returns
-------
np.ndarray, shape (n, n)
"""
if trace_id is not None:
sub = self.get_trace(trace_id)
return _pdist_matrix(sub.coords)
return _pdist_matrix(self.coords)
# ------------------------------------------------------------------
# AnnData integration
# ------------------------------------------------------------------
[docs]
def link_anndata(
self,
adata,
*,
cell_id_col: Optional[str] = None,
copy_obs: bool = True,
copy_obsm: bool = True,
) -> int:
"""Import cell-level metadata from an AnnData into this ChromData.
Matches cells by ``cell_id``: each unique value in
``spots['cell_id']`` is looked up in ``adata.obs`` (by index, or
by the column *cell_id_col* if given). Matched cells get their
``adata.obs`` columns merged into ``self.cells`` and their
``adata.obsm`` arrays copied into ``self.cellm``.
If ``self.cells`` already exists, its row order is preserved and
AnnData rows are aligned onto that cell axis. This is important
for multi-omics loaders such as Takei 2025, where chromatin
tracing coordinates live in ``coords/spots``, RNA/IF signals live
in spot-level ``tracks``, and mRNA clustering/UMAP already live
in ``cells`` / ``cellm``.
Parameters
----------
adata : anndata.AnnData
The single-cell dataset to link (e.g. scRNA-seq).
cell_id_col : str, optional
Column in ``adata.obs`` that holds cell identifiers matching
``spots['cell_id']``. If ``None``, ``adata.obs.index`` is
used as the key.
copy_obs : bool
If True (default), copy ``adata.obs`` columns into
``self.cells``.
copy_obsm : bool
If True (default), copy ``adata.obsm`` arrays into
``self.cellm``.
Returns
-------
int
Number of cells matched.
Raises
------
KeyError
If ``spots`` has no ``cell_id`` column.
"""
if "cell_id" not in self.spots.columns:
raise KeyError(
"spots has no 'cell_id' column — cannot match cells. "
"Add cell_id to spots before calling link_anndata()."
)
cd_cell_ids = self._spot_cell_ids()
if cell_id_col is not None:
if cell_id_col not in adata.obs.columns:
raise KeyError(f"adata.obs has no column '{cell_id_col}'")
adata_index = pd.Index(adata.obs[cell_id_col].values)
else:
adata_index = adata.obs.index
# Cast both sides to string for robust matching.
cd_set = set(cd_cell_ids)
adata_str = pd.Index([str(x) for x in adata_index])
if adata_str.has_duplicates:
dupes = adata_str[adata_str.duplicated()].unique().tolist()
raise ValueError(
"AnnData cell identifiers are not unique; duplicate ids: "
f"{dupes[:5]}"
)
matched_set = cd_set & set(adata_str)
if len(self.cells) > 0:
existing_axis = self._cells_axis_ids()
if existing_axis is not None:
matched = [c for c in existing_axis if c in matched_set]
else:
matched = [c for c in cd_cell_ids if c in matched_set]
else:
matched = [c for c in cd_cell_ids if c in matched_set]
if not matched:
import warnings
warnings.warn(
"No cell_id overlap between ChromData and AnnData. "
"Check that cell_id values match.",
stacklevel=2,
)
return 0
adata_pos = {cell_id: i for i, cell_id in enumerate(adata_str)}
order = [adata_pos[cell_id] for cell_id in matched]
# Build / update the cell table keyed by the matched ids.
new_index = pd.Index(matched, name="cell_id")
if len(self.cells) > 0:
old_axis = self._cells_axis_ids()
if old_axis is None:
cells = pd.DataFrame(index=new_index)
else:
cells = self.cells.copy()
cells.index = old_axis
cells = cells.loc[new_index].copy()
cells.index.name = "cell_id"
else:
cells = pd.DataFrame(index=new_index)
old_cellm = self.cellm
if len(self.cells) > 0 and old_cellm:
old_axis = self._cells_axis_ids()
if old_axis is not None:
old_pos = {cell_id: i for i, cell_id in enumerate(old_axis)}
keep = [old_pos[cell_id] for cell_id in matched]
old_cellm = {k: np.asarray(v)[keep].copy() for k, v in old_cellm.items()}
if copy_obs:
obs = adata.obs.iloc[order].copy()
obs.index = new_index
for col in obs.columns:
cells[col] = obs[col].values
self.cells = cells
self.cellm = old_cellm
if copy_obsm:
for key in adata.obsm:
arr = np.asarray(adata.obsm[key])
if arr.shape[0] != adata.n_obs:
raise ValueError(
f"adata.obsm['{key}'] rows ({arr.shape[0]}) != adata.n_obs ({adata.n_obs})"
)
self.cellm[key] = arr[order].copy()
return len(matched)
[docs]
def to_anndata(self):
"""Export cell-level data as an AnnData object.
Creates an AnnData where each observation is a cell, ``obs`` is
``self.cells``, and ``obsm`` is ``self.cellm``. The ``X`` matrix
is left empty (zeros) because ChromData has no cell-by-feature
expression matrix. Spot-level RNA-FISH / IF / epigenomic
signals, such as Takei 2025 tracks, remain in ``self.tracks`` and
are not flattened into AnnData.X.
Returns
-------
anndata.AnnData
Raises
------
ImportError
If anndata is not installed.
"""
import anndata as ad
if len(self.cells) > 0:
obs = self.cells.copy()
axis_ids = self._cells_axis_ids()
if axis_ids is not None:
obs.index = axis_ids
obs.index.name = "cell_id"
elif self.n_cells > 0:
obs = pd.DataFrame(index=self._spot_cell_ids())
else:
raise ValueError(
"No cell-level data to export. "
"Use link_anndata() first or populate cd.cells."
)
obsm = {}
for key, value in self.cellm.items():
arr = np.asarray(value)
if arr.shape[0] != len(obs):
raise ValueError(
f"cellm['{key}'] rows ({arr.shape[0]}) != exported cells ({len(obs)})"
)
obsm[key] = arr.copy()
adata = ad.AnnData(
X=np.zeros((len(obs), 0)),
obs=obs,
obsm=obsm,
)
return adata
# ------------------------------------------------------------------
# I/O
# ------------------------------------------------------------------
[docs]
def write(self, path: PathLike) -> None:
"""Write to HDF5 (.h5cd) file."""
import h5py
try:
from uchrom import __version__ as _uchrom_version
except Exception:
_uchrom_version = "unknown"
path = Path(path)
with h5py.File(path, "w") as f:
f.attrs["uchrom_format_version"] = FORMAT_VERSION
f.attrs["uchrom_version"] = _uchrom_version
f.create_dataset("coords", data=self.coords)
_write_dataframe(f, "spots", self.spots)
_write_dataframe(f, "cells", self.cells)
_write_dataframe(f, "tracks", self.tracks)
_write_dataframe(f, "traces", self.traces)
_write_dict_of_arrays(f, "layers", self.layers)
_write_dict_of_arrays(f, "cellm", self.cellm)
_write_results(f, "results", self.results)
_write_uns(f, "uns", self.uns)
[docs]
@classmethod
def read(cls, path: PathLike) -> "ChromData":
"""Read from HDF5 (.h5cd) file.
Dispatches to a version-specific reader based on
``f.attrs['uchrom_format_version']``. Files written before
versioning was introduced are read with a warning using the v1.0
reader (the on-disk layout has been stable from the start).
Forward compatibility:
- Same MAJOR, higher MINOR → read with a warning; unknown fields
are ignored silently by the lower-level helpers.
- Different MAJOR → raise ``ValueError`` with guidance.
"""
import warnings
import h5py
path = Path(path)
with h5py.File(path, "r") as f:
version_str = f.attrs.get("uchrom_format_version")
if version_str is None:
warnings.warn(
f"{path}: missing uchrom_format_version attr; "
f"assuming legacy 1.0 layout.",
stacklevel=2,
)
version_str = "1.0"
elif isinstance(version_str, bytes):
version_str = version_str.decode("utf-8")
major, minor = _parse_version(version_str)
if major != _SUPPORTED_MAJOR:
raise ValueError(
f"{path}: format version {version_str} has incompatible "
f"MAJOR with this uchrom (supports MAJOR={_SUPPORTED_MAJOR}). "
f"Upgrade uchrom or convert the file."
)
cur_major, cur_minor = _parse_version(FORMAT_VERSION)
if minor > cur_minor:
warnings.warn(
f"{path}: written with format version {version_str} "
f"(reader supports up to {FORMAT_VERSION}). "
f"Forward-compatible — unknown fields will be ignored.",
stacklevel=2,
)
return _read_v1(cls, f)
[docs]
@classmethod
def from_dataframe(cls, df: pd.DataFrame, *, cell_id=None, **kwargs) -> "ChromData":
"""Create from a reconstruction output DataFrame.
Expects columns: chrom, start, end, x, y, z.
Each chromosome becomes one trace.
Parameters
----------
df : DataFrame with columns chrom, start, end, x, y, z.
cell_id : hashable, optional
If given, tag every spot with this cell identifier (e.g. derived
from the output filename for single-cell reconstruction). The
DataFrame's own ``cell_id`` column, if any, takes precedence.
**kwargs
Forwarded to the :class:`ChromData` constructor.
"""
required = {"chrom", "start", "end", "x", "y", "z"}
missing = required - set(df.columns)
if missing:
raise ValueError(f"DataFrame missing columns: {missing}")
coords = df[["x", "y", "z"]].values.astype(np.float64)
# Assign trace_id: one trace per chromosome
chroms = df["chrom"].values
chrom_to_id = {c: i for i, c in enumerate(sorted(set(chroms)))}
trace_ids = [chrom_to_id[c] for c in chroms]
spots = pd.DataFrame({
"chrom": df["chrom"].values,
"start": df["start"].values,
"end": df["end"].values,
"trace_id": trace_ids,
})
if "cell_id" in df.columns:
spots["cell_id"] = df["cell_id"].values
elif cell_id is not None:
spots["cell_id"] = cell_id
# Carry over extra columns (e.g. contact_count)
skip = required | {"index", "cell_id"}
for col in df.columns:
if col not in skip and col not in spots.columns:
spots[col] = df[col].values
return cls(coords, spots, **kwargs)
[docs]
@classmethod
def from_fofct(cls, core_path: PathLike, **kwargs) -> "ChromData":
"""Read from FOF-CT core table file.
Parameters
----------
core_path : path
Path to the FOF-CT core table (CSV/TSV/TXT).
**kwargs
Additional keyword arguments passed to ChromData constructor
(e.g. cells, tracks, uns).
"""
path = Path(core_path)
header_meta = {}
data_lines = []
columns = None # from ``##columns=`` meta line if present
with open(path, "r") as fh:
for line in fh:
raw = line.strip()
if not raw:
continue
# Some FOF-CT writers wrap the whole header line in quotes and
# pad with trailing commas so every line has the same column
# count. Strip those so we can recognise ``#`` headers.
stripped = raw.rstrip(",").strip()
if stripped.startswith('"') and stripped.endswith('"') and len(stripped) >= 2:
stripped = stripped[1:-1]
if stripped.lower().startswith("##columns="):
cols_str = stripped.split("=", 1)[1].strip().strip("()")
columns = [c.strip() for c in cols_str.split(",")]
elif stripped.startswith("##"):
key, _, val = stripped[2:].partition("=")
header_meta[key.strip()] = val.strip()
elif stripped.startswith("#"):
key, _, val = stripped[1:].partition(":")
header_meta[key.strip()] = val.strip()
else:
data_lines.append(raw)
if not data_lines:
raise ValueError(f"No data rows found in {path}")
# Some FOF-CT files repeat the column names as a CSV header row right
# before the data (in addition to the ``##columns=`` meta line). If
# the first non-comment line has non-numeric values in positions where
# numbers are expected, treat it as a CSV header.
first_token = data_lines[0].split(",", 1)[0].strip()
has_csv_header = bool(first_token) and not (
first_token[0].isdigit() or first_token[0] in ("-", "+", ".")
)
if has_csv_header:
header_row = data_lines[0]
data_lines = data_lines[1:]
if columns is None:
columns = [c.strip() for c in header_row.split(",")]
if not data_lines:
raise ValueError(f"No data rows found in {path}")
# Parse data
from io import StringIO as _StringIO
text = "\n".join(data_lines)
df = pd.read_csv(_StringIO(text), header=None, skipinitialspace=True)
if columns is not None and len(columns) == len(df.columns):
df.columns = columns
elif columns is not None:
# Column count mismatch — trust the data and label the extras
df.columns = columns + [f"col_{i}" for i in range(len(columns), len(df.columns))]
else:
default_cols = ["Spot_ID", "Trace_ID", "X", "Y", "Z",
"Chrom", "Chrom_Start", "Chrom_End"]
df.columns = default_cols[:len(df.columns)]
# Map FOF-CT columns to ChromData — case-insensitive.
canonical_map = {
"x": "x", "y": "y", "z": "z",
"chrom": "chrom",
"chrom_start": "start", "chromstart": "start",
"chrom_end": "end", "chromend": "end",
"trace_id": "trace_id", "traceid": "trace_id",
"spot_id": "spot_id", "spotid": "spot_id",
"cell_id": "cell_id", "cellid": "cell_id",
"sub_cell_roi_id": "sub_cell_roi_id",
"extra_cell_roi_id": "extra_cell_roi_id",
}
rename = {c: canonical_map[c.lower()] for c in df.columns if c.lower() in canonical_map}
df = df.rename(columns=rename)
missing = {"x", "y", "z"} - set(df.columns)
if missing:
raise ValueError(
f"FOF-CT core table missing coordinate columns {missing}. "
f"Parsed columns: {list(df.columns)}"
)
coords = df[["x", "y", "z"]].values.astype(np.float64)
# Spot-level columns: the required set first, then any extras carried
# through verbatim (custom FOF-CT columns like ``Readout``).
spot_cols = ["chrom", "start", "end", "trace_id"]
for extra in ("spot_id", "cell_id", "sub_cell_roi_id", "extra_cell_roi_id"):
if extra in df.columns and extra not in spot_cols:
spot_cols.append(extra)
coord_cols = {"x", "y", "z"}
for col in df.columns:
if col not in spot_cols and col not in coord_cols:
spot_cols.append(col)
spots = df[spot_cols].copy()
# Store FOF-CT header metadata (case-insensitive lookup for promotion).
uns = kwargs.pop("uns", {})
if header_meta:
uns["fofct_header"] = header_meta
lower_meta = {k.lower(): v for k, v in header_meta.items()}
if "xyz_unit" in lower_meta:
uns["xyz_unit"] = lower_meta["xyz_unit"]
if "genome_assembly" in lower_meta:
uns["genome_assembly"] = lower_meta["genome_assembly"]
return cls(coords, spots, uns=uns, **kwargs)
[docs]
@classmethod
def from_pyhim_trace(
cls,
ecsv_path: PathLike,
barcode_dict: dict | pd.DataFrame | None = None,
**kwargs,
) -> "ChromData":
"""Read a PyHiM chromatin-trace ECSV table into a ChromData.
PyHiM (Devos et al. 2024) emits one ECSV file per trace-building
run. Schema (from ``chromatin_trace_table.py`` upstream):
Spot_ID, Trace_ID, x, y, z, Chrom, Chrom_Start, Chrom_End,
ROI #, Mask_id, Barcode #, label
``meta['comments']`` carries ``xyz_unit=...`` and
``genome_assembly=...``.
Parameters
----------
ecsv_path : path
Path to the ECSV file written by PyHiM.
barcode_dict : dict[int, (chrom, start, end)] or DataFrame, optional
Required when ``Chrom``/``Chrom_Start``/``Chrom_End`` are empty
in the ECSV (PyHiM does not always populate them). As a
DataFrame, expects columns ``barcode, chrom, start, end``.
If ``Chrom`` is populated, ``barcode_dict`` is ignored.
**kwargs
Additional keyword arguments passed to the ChromData
constructor (``cells``, ``tracks``, ``uns``, ...).
Notes
-----
- ``Mask_id`` becomes ``cell_id`` (PyHiM convention).
- ECSV header comments are captured in
``cd.uns['pyhim']['ecsv_comments']`` and any
``xyz_unit`` / ``genome_assembly`` entries are also promoted
to ``cd.uns`` directly (matching :meth:`from_fofct`).
"""
try:
from astropy.table import Table
except ImportError as e:
raise ImportError(
"astropy is required to read PyHiM ECSV trace tables. "
"Install with `pip install u-chrom[im]` or `pip install astropy`."
) from e
path = Path(ecsv_path)
table = Table.read(path, format="ascii.ecsv")
df = table.to_pandas()
# Astropy can decode byte-string columns as bytes — decode to str.
for col in df.columns:
if df[col].dtype == object and df[col].apply(
lambda v: isinstance(v, bytes)
).any():
df[col] = df[col].apply(
lambda v: v.decode() if isinstance(v, bytes) else v
)
missing_coord = {"x", "y", "z"} - set(df.columns)
if missing_coord:
raise ValueError(
f"PyHiM ECSV missing coordinate columns {missing_coord}. "
f"Got: {list(df.columns)}"
)
# Decide whether Chrom is filled. PyHiM writes empty strings (which
# astropy round-trips as NaN) when the barcode→genome mapping wasn't
# supplied at trace-building time.
chrom_filled = (
"Chrom" in df.columns
and df["Chrom"].notna().any()
and df["Chrom"].astype(str).str.strip().ne("").any()
)
if not chrom_filled:
if barcode_dict is None:
raise ValueError(
"PyHiM ECSV has no Chrom/Chrom_Start/Chrom_End data; "
"pass `barcode_dict={barcode: (chrom, start, end)}` "
"or a DataFrame[barcode, chrom, start, end]."
)
df = _apply_pyhim_barcode_dict(df, barcode_dict)
# Canonical column names for ChromData.
rename = {
"Chrom": "chrom",
"Chrom_Start": "start",
"Chrom_End": "end",
"Trace_ID": "trace_id",
"Spot_ID": "spot_id",
"Mask_id": "cell_id",
"ROI #": "roi_id",
"Barcode #": "barcode",
}
df = df.rename(columns={k: v for k, v in rename.items() if k in df.columns})
coords = df[["x", "y", "z"]].values.astype(np.float64)
spot_cols = ["chrom", "start", "end", "trace_id"]
for extra in ("spot_id", "cell_id", "roi_id", "barcode", "label"):
if extra in df.columns:
spot_cols.append(extra)
spots = df[spot_cols].copy()
# Capture ECSV meta into uns.
uns = kwargs.pop("uns", None) or {}
ecsv_comments = list(table.meta.get("comments") or [])
uns.setdefault("pyhim", {})["ecsv_comments"] = ecsv_comments
for c in ecsv_comments:
if "=" in c:
k, _, v = c.partition("=")
k = k.strip(); v = v.strip()
if k in ("xyz_unit", "genome_assembly"):
uns.setdefault(k, v)
return cls(coords, spots, uns=uns, **kwargs)
[docs]
@classmethod
def from_seqfish_multiomics(cls, spot_glob, **kwargs) -> "ChromData":
"""Load Takei 2025 DNA seqFISH+ cerebellum data.
Thin shim around :func:`uchrom.io.seqfish_multiomics.read_seqfish_multiomics`.
See that function for the full parameter list.
"""
from uchrom.io.seqfish_multiomics import read_seqfish_multiomics
return read_seqfish_multiomics(spot_glob, **kwargs)
[docs]
@classmethod
def from_seqfish_multiomics_linked(cls, spot_glob, **kwargs):
"""Load linked Takei 2025 DNA tracing + RNA AnnData artifacts.
Thin shim around
:func:`uchrom.io.seqfish_multiomics.load_seqfish_multiomics_linked`.
Returns a ``ChromData`` with RNA expression available at
``cd.linked_adata`` and can write paired ``.h5cd`` / ``.h5ad``
files.
"""
from uchrom.io.seqfish_multiomics import load_seqfish_multiomics_linked
return load_seqfish_multiomics_linked(spot_glob, **kwargs)
[docs]
@classmethod
def from_takei2025_cerebellum(cls, **kwargs):
"""Load linked Takei 2025 cerebellum data.
Thin shim around
:func:`uchrom.io.seqfish_multiomics.load_takei2025_cerebellum`.
Returns a ``ChromData`` with RNA expression available at
``cd.linked_adata``.
"""
from uchrom.io.seqfish_multiomics import load_takei2025_cerebellum
return load_takei2025_cerebellum(**kwargs)
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""Export as a flat DataFrame with coords + spots columns."""
df = self.spots.copy()
df["x"] = self.coords[:, 0]
df["y"] = self.coords[:, 1]
df["z"] = self.coords[:, 2]
# Reorder so coords are prominent
front = ["chrom", "start", "end", "x", "y", "z"]
rest = [c for c in df.columns if c not in front]
return df[front + rest]
# ------------------------------------------------------------------
# Copy
# ------------------------------------------------------------------
[docs]
def copy(self) -> "ChromData":
return ChromData(
self.coords.copy(),
self.spots.copy(),
cells=self.cells.copy() if len(self.cells) > 0 else None,
cellm={k: v.copy() for k, v in self.cellm.items()} or None,
tracks=self.tracks.copy() if self.tracks is not None else None,
traces=self.traces.copy() if len(self.traces) > 0 else None,
layers={k: v.copy() for k, v in self.layers.items()} or None,
results=deepcopy(self.results) or None,
uns=deepcopy(self.uns) or None,
linked_adata=self._linked_adata.copy() if self._linked_adata is not None else None,
validate=False,
)
# ------------------------------------------------------------------
# Repr
# ------------------------------------------------------------------
def __repr__(self) -> str:
parts = [
f"ChromData: n_spots={self.n_spots}, n_traces={self.n_traces}",
]
if self.n_cells > 0:
parts[0] += f", n_cells={self.n_cells}"
spot_cols = list(self.spots.columns)
parts.append(f" spots: {spot_cols}")
if len(self.cells) > 0:
parts.append(f" cells: {list(self.cells.columns)} ({len(self.cells)} cells)")
if self.cellm:
shapes = {k: v.shape for k, v in self.cellm.items()}
parts.append(f" cellm: {shapes}")
if self.tracks is not None:
parts.append(f" tracks: {list(self.tracks.columns)}")
if len(self.traces) > 0:
parts.append(f" traces: {list(self.traces.columns)} ({len(self.traces)} traces)")
if self.layers:
parts.append(f" layers: {list(self.layers.keys())}")
if self.results:
parts.append(f" results: {list(self.results.keys())}")
if self.uns:
parts.append(f" uns: {list(self.uns.keys())}")
if self._linked_adata is not None:
parts.append(f" linked_adata: {self._linked_adata.shape}")
elif self.uns.get("linked_anndata", {}).get("path"):
parts.append(" linked_adata: lazy")
return "\n".join(parts)
def __len__(self) -> int:
return self.n_spots
# ======================================================================
# Utilities
# ======================================================================
def _pdist_matrix(coords: np.ndarray) -> np.ndarray:
"""Compute pairwise Euclidean distance matrix from (n, 3) coords."""
diff = coords[:, None, :] - coords[None, :, :]
return np.sqrt((diff ** 2).sum(axis=-1))
def _apply_pyhim_barcode_dict(
df: pd.DataFrame,
barcode_dict,
) -> pd.DataFrame:
"""Fill Chrom / Chrom_Start / Chrom_End from a Barcode # → (chrom, start, end) map.
Accepts a Python ``dict`` keyed by integer barcode, or a DataFrame
with columns ``barcode, chrom, start, end``. Unmapped barcodes
raise ``KeyError`` listing the offenders.
"""
if isinstance(barcode_dict, pd.DataFrame):
bdf = barcode_dict
required = {"barcode", "chrom", "start", "end"}
missing_cols = required - set(bdf.columns)
if missing_cols:
raise ValueError(
f"barcode_dict DataFrame missing columns: {sorted(missing_cols)}"
)
mapping = {
int(row.barcode): (str(row.chrom), int(row.start), int(row.end))
for row in bdf.itertuples(index=False)
}
else:
mapping = {int(k): tuple(v) for k, v in dict(barcode_dict).items()}
if "Barcode #" not in df.columns:
raise ValueError(
"PyHiM ECSV has no 'Barcode #' column — cannot apply barcode_dict."
)
barcodes = df["Barcode #"].astype(int)
unknown = sorted(set(barcodes) - set(mapping))
if unknown:
raise KeyError(
f"barcode_dict missing entries for barcodes: {unknown}"
)
df = df.copy()
df["Chrom"] = barcodes.map(lambda b: mapping[b][0])
df["Chrom_Start"] = barcodes.map(lambda b: mapping[b][1]).astype("int64")
df["Chrom_End"] = barcodes.map(lambda b: mapping[b][2]).astype("int64")
return df
def _parse_version(s: str):
"""Parse 'MAJOR.MINOR' into a (major, minor) int tuple."""
parts = str(s).split(".")
try:
major = int(parts[0])
minor = int(parts[1]) if len(parts) > 1 else 0
except (ValueError, IndexError):
raise ValueError(f"invalid format version string: {s!r}")
return major, minor
# ======================================================================
# Version-specific readers
# ======================================================================
# Add new `_read_vN` functions as MAJOR bumps occur; dispatch from
# ChromData.read(). Each reader is responsible for its on-disk layout
# and may call a migration helper to produce the current in-memory form.
def _read_v1(cls, f) -> "ChromData":
"""Reader for .h5cd format MAJOR version 1."""
coords = f["coords"][:]
spots = _read_dataframe(f, "spots")
cells = _read_dataframe(f, "cells")
tracks = _read_dataframe(f, "tracks")
traces = _read_dataframe(f, "traces")
layers = _read_dict_of_arrays(f, "layers")
cellm = _read_dict_of_arrays(f, "cellm")
results = _read_results(f, "results")
uns = _read_uns(f, "uns")
return cls(
coords,
spots,
cells=cells if len(cells) > 0 else None,
cellm=cellm or None,
tracks=tracks if len(tracks) > 0 else None,
traces=traces if len(traces) > 0 else None,
layers=layers or None,
results=results or None,
uns=uns or None,
)
# ======================================================================
# HDF5 I/O helpers
# ======================================================================
def _write_dataframe(f, name: str, df: Optional[pd.DataFrame]):
"""Write a DataFrame to an HDF5 group, one dataset per column.
Format 1.1: when the index is not a trivial ``RangeIndex`` or has a
name, it is written to a reserved ``_index`` dataset with the name
stored in ``_index_name``. Old (1.0) readers ignore both and
reconstruct a default RangeIndex — the historical behaviour.
"""
if df is None or len(df) == 0:
f.create_group(name)
return
grp = f.create_group(name)
grp.attrs["_n_rows"] = len(df)
grp.attrs["_columns"] = list(df.columns)
for col in df.columns:
series = df[col]
if hasattr(series.dtype, "categories"):
# Store categorical as string
data = series.astype(str).values
elif series.dtype == object:
data = series.astype(str).values
else:
data = series.values
if data.dtype.kind in ("U", "S", "O"):
grp.create_dataset(col, data=data.astype("S"))
else:
grp.create_dataset(col, data=data)
# Preserve a non-trivial index — see format 1.1 note above.
idx = df.index
is_trivial_range = (
isinstance(idx, pd.RangeIndex)
and idx.start == 0
and idx.step == 1
and idx.name is None
)
if not is_trivial_range:
idx_vals = idx.to_numpy()
if idx_vals.dtype == object or idx_vals.dtype.kind in ("U", "S"):
data = np.array([str(x) for x in idx_vals]).astype("S")
else:
data = idx_vals
grp.create_dataset("_index", data=data)
grp.attrs["_index_name"] = idx.name if idx.name is not None else ""
def _read_dataframe(f, name: str) -> pd.DataFrame:
"""Read a DataFrame from an HDF5 group."""
if name not in f:
return pd.DataFrame()
grp = f[name]
if "_columns" not in grp.attrs:
return pd.DataFrame()
columns = list(grp.attrs["_columns"])
data = {}
for col in columns:
if col not in grp:
continue
arr = grp[col][:]
if arr.dtype.kind == "S":
arr = np.array([x.decode("utf-8") for x in arr])
data[col] = arr
if not data:
return pd.DataFrame()
df = pd.DataFrame(data)
# Format 1.1: restore the index if it was persisted.
if "_index" in grp:
idx_arr = grp["_index"][:]
if idx_arr.dtype.kind == "S":
idx_arr = np.array([x.decode("utf-8") for x in idx_arr])
name_attr = grp.attrs.get("_index_name", "")
if isinstance(name_attr, bytes):
name_attr = name_attr.decode("utf-8")
idx_name = name_attr or None
df.index = pd.Index(idx_arr, name=idx_name)
return df
def _write_dict_of_arrays(f, name: str, d: dict):
"""Write a dict of numpy arrays to an HDF5 group."""
grp = f.create_group(name)
for key, arr in d.items():
grp.create_dataset(key, data=np.asarray(arr))
def _read_dict_of_arrays(f, name: str) -> dict:
"""Read a dict of numpy arrays from an HDF5 group."""
if name not in f:
return {}
grp = f[name]
return {key: grp[key][:] for key in grp}
def _write_results(f, name: str, results: dict):
"""Write results dict (DataFrames and arrays) to HDF5."""
grp = f.create_group(name)
for key, val in results.items():
if isinstance(val, pd.DataFrame):
_write_dataframe(grp, key, val)
elif isinstance(val, np.ndarray):
grp.create_dataset(key, data=val)
def _read_results(f, name: str) -> dict:
"""Read results dict from HDF5."""
if name not in f:
return {}
grp = f[name]
results = {}
for key in grp:
item = grp[key]
if isinstance(item, type(grp)): # is a group → DataFrame
results[key] = _read_dataframe(grp, key)
else:
results[key] = item[:]
return results
def _write_uns(f, name: str, uns: dict):
"""Write uns dict to HDF5 attrs and sub-groups."""
grp = f.create_group(name)
for key, val in uns.items():
if isinstance(val, (str, int, float, bool)):
grp.attrs[key] = val
elif isinstance(val, np.ndarray):
grp.create_dataset(key, data=val)
elif isinstance(val, dict):
_write_uns(grp, key, val)
elif isinstance(val, (list, tuple)):
try:
grp.create_dataset(key, data=np.array(val))
except (TypeError, ValueError):
grp.attrs[key] = str(val)
else:
grp.attrs[key] = str(val)
def _read_uns(f, name: str) -> dict:
"""Read uns dict from HDF5."""
if name not in f:
return {}
grp = f[name]
result = dict(grp.attrs)
for key in grp:
item = grp[key]
if isinstance(item, type(grp)):
result[key] = _read_uns(grp, key)
else:
result[key] = item[:]
return result