Source code for uchrom.io._io

import gzip
import pandas as pd
import fire
import h5py

from cooler import Cooler


def open_file(path, mode="r"):
    if path.endswith(".gz"):
        return gzip.open(path, mode + "t")
    return open(path, mode)


[docs] def read_pairs(path): assert path.endswith(".pairs") or path.endswith(".pairs.gz") with open_file(path) as f: df = pd.read_csv(f, sep="\t", header=None, comment="#") columns = ["read", "chr1", "pos1", "chr2", "pos2", "strand1", "strand2"] df = df.iloc[:, :len(columns)] df.columns = columns df['chr1'] = df['chr1'].astype(str) df['chr2'] = df['chr2'].astype(str) return df
[docs] def read_particles(path): """Read reconstruction output. Auto-detects format by extension. - ``.h5cd`` → ChromData HDF5 (returned as flat DataFrame) - ``.csv`` or other → CSV """ path_str = str(path) if path_str.endswith(".h5cd"): from uchrom.core import ChromData cd = ChromData.read(path) df = cd.to_dataframe() df['chrom'] = df['chrom'].astype(str) return df df = pd.read_csv(path, index_col=0, sep=",") df['chrom'] = df['chrom'].astype(str) return df
[docs] def save_particles(df, path, cell_id=None): """Save reconstruction output. Default format is ``.h5cd`` (ChromData). Format inferred from file extension: - ``.csv`` → CSV via ``df.to_csv`` - ``.h5cd`` or other → ChromData HDF5 (default) ``cell_id`` is only propagated into the ``.h5cd`` branch (CSV has no dedicated cell column). When writing ``.h5cd`` it is passed through to :meth:`ChromData.from_dataframe`. """ path_str = str(path) if path_str.endswith(".csv"): df.to_csv(path) return from uchrom.core import ChromData cd = ChromData.from_dataframe(df, cell_id=cell_id) cd.write(path)
def read_3dg(path): df = pd.read_csv(path, sep="\t", comment="#", header=None) df.columns = ["chrom", "start", "x", 'y', 'z'] return df
[docs] def load_cool(path, pos='mid'): """Load pixels from cooler.""" c = Cooler(path) with h5py.File(c.store) as f: c_root = f[c.uri.split("::")[1]] if "::" in c.uri else f c_bins = c_root['bins'] bin_df = pd.DataFrame({ 'chrom': c.chromsizes.index[c_bins['chrom'][:]], 'start': c_bins['start'][:], 'end': c_bins['end'][:], }) c_pixels = c_root['pixels'] bin1 = bin_df.loc[c_pixels['bin1_id'][:]].reset_index(drop=True) bin2 = bin_df.loc[c_pixels['bin2_id'][:]].reset_index(drop=True) pixel_df = pd.DataFrame({ 'chr1': bin1.chrom, 'start1': bin1.start, 'end1': bin1.end, 'chr2': bin2.chrom, 'start2': bin2.start, 'end2': bin2.end, 'count': c_pixels['count'][:], }) if pos == 'mid': pixel_df['pos1'] = (pixel_df['start1'] + pixel_df['end1']) / 2 pixel_df['pos2'] = (pixel_df['start2'] + pixel_df['end2']) / 2 return pixel_df
def load_hic(path, resolution, chrom=None, norm='KR'): """Load contact matrix from .hic file using hicstraw.""" try: import hicstraw except ImportError: raise ImportError( "hicstraw is required to read .hic files. " "Install it with: pip install hic-straw" ) hic = hicstraw.HiCFile(path) chromosomes = hic.getChromosomes() # build chrom name -> length lookup (skip 'All') chrom_info = {c.name: c.length for c in chromosomes if c.name != 'All'} if chrom is not None: # strip 'chr' prefix if needed to match hic file convention chrom_key = chrom if chrom_key not in chrom_info: chrom_key = chrom.replace('chr', '') if chrom_key not in chrom_info: raise ValueError( f"Chromosome {chrom} not found. " f"Available: {list(chrom_info.keys())}" ) import numpy as np chrom_len = chrom_info[chrom_key] n_bins = (chrom_len + resolution - 1) // resolution # use getRecords API (strawAsMatrix segfaults on large chromosomes) mzd = hic.getMatrixZoomData( chrom_key, chrom_key, 'observed', norm, 'BP', resolution ) records = mzd.getRecords(0, chrom_len, 0, chrom_len) matrix = np.zeros((n_bins, n_bins)) for rec in records: i = rec.binX // resolution j = rec.binY // resolution if i < n_bins and j < n_bins: val = rec.counts if np.isfinite(val): matrix[i, j] = val matrix[j, i] = val # build bins dataframe starts = [i * resolution for i in range(n_bins)] ends = [min((i + 1) * resolution, chrom_len) for i in range(n_bins)] bins_df = pd.DataFrame({ 'chrom': chrom, 'start': starts, 'end': ends, 'idx': range(n_bins), }) return bins_df, matrix else: raise ValueError("Whole-genome mode not yet supported for .hic files. " "Please specify --chrom.") def load_hic_inter(path, resolution, chrom1, chrom2, norm='KR'): """Load inter-chromosomal contact matrix from .hic file.""" try: import hicstraw except ImportError: raise ImportError( "hicstraw is required to read .hic files. " "Install it with: pip install hic-straw" ) import numpy as np hic = hicstraw.HiCFile(path) chromosomes = hic.getChromosomes() chrom_info = {c.name: c.length for c in chromosomes if c.name != 'All'} # resolve chrom names def resolve(name): if name in chrom_info: return name alt = name.replace('chr', '') if alt in chrom_info: return alt raise ValueError(f"Chromosome {name} not found. Available: {list(chrom_info.keys())}") key1, key2 = resolve(chrom1), resolve(chrom2) len1, len2 = chrom_info[key1], chrom_info[key2] n1 = (len1 + resolution - 1) // resolution n2 = (len2 + resolution - 1) // resolution mzd = hic.getMatrixZoomData(key1, key2, 'observed', norm, 'BP', resolution) records = mzd.getRecords(0, len1, 0, len2) matrix = np.zeros((n1, n2)) for rec in records: i = rec.binX // resolution j = rec.binY // resolution if i < n1 and j < n2: val = rec.counts if np.isfinite(val): matrix[i, j] = val return matrix def load_hic_genome(path, resolution, chroms=None, norm='KR'): """Load whole-genome contact matrix (intra + inter) from .hic or .mcool. Returns: bins_df: DataFrame with chrom, start, end, idx for all bins matrix: N x N whole-genome contact matrix offsets: dict mapping chrom name -> start index in the matrix chrom_sizes: dict mapping chrom name -> number of bins """ import numpy as np if path.endswith(('.mcool', '.cool')): return _load_genome_mcool(path, resolution, chroms) elif path.endswith('.hic'): return _load_genome_hic(path, resolution, chroms, norm) else: raise ValueError(f"Unsupported format: {path}. Use .hic, .mcool, or .cool") def _load_genome_mcool(path, resolution, chroms=None): """Load whole-genome contact matrix from .mcool/.cool file.""" import numpy as np from cooler import Cooler if '::' in path: clr = Cooler(path) else: clr = Cooler(f"{path}::resolutions/{resolution}") if chroms is None: skip = {'chrY', 'Y', 'chrMT', 'MT', 'chrM', 'M'} chroms = [c for c in clr.chromnames if c not in skip] # compute offsets and bins offsets = {} all_bins = [] total = 0 chrom_sizes = {} for c in chroms: chrom_bins = clr.bins().fetch(c) n_bins = len(chrom_bins) offsets[c] = total chrom_sizes[c] = n_bins for i in range(n_bins): all_bins.append({ 'chrom': c, 'start': int(chrom_bins['start'].iloc[i]), 'end': int(chrom_bins['end'].iloc[i]), 'idx': total + i, }) total += n_bins bins_df = pd.DataFrame(all_bins) matrix = np.zeros((total, total)) # fill matrix block by block for i, c1 in enumerate(chroms): o1 = offsets[c1] n1 = chrom_sizes[c1] # intra-chromosomal intra = clr.matrix(balance=True).fetch(c1) intra = np.nan_to_num(intra, nan=0.0) matrix[o1:o1+n1, o1:o1+n1] = intra # inter-chromosomal for j in range(i + 1, len(chroms)): c2 = chroms[j] o2 = offsets[c2] n2 = chrom_sizes[c2] try: inter = clr.matrix(balance=True).fetch(c1, c2) inter = np.nan_to_num(inter, nan=0.0) matrix[o1:o1+n1, o2:o2+n2] = inter matrix[o2:o2+n2, o1:o1+n1] = inter.T except Exception: pass return bins_df, matrix, offsets, chrom_sizes def _load_genome_hic(path, resolution, chroms=None, norm='KR'): """Load whole-genome contact matrix from .hic file.""" try: import hicstraw except ImportError: raise ImportError("hicstraw is required. Install with: pip install hic-straw") import numpy as np hic = hicstraw.HiCFile(path) chrom_info_raw = {c.name: c.length for c in hic.getChromosomes() if c.name != 'All'} if chroms is None: skip = {'Y', 'MT', 'M'} chroms = [c for c in chrom_info_raw if c not in skip and c.replace('chr', '') not in skip] chroms = sorted(chroms, key=lambda x: ( 0 if x.replace('chr', '').isdigit() else 1, int(x.replace('chr', '')) if x.replace('chr', '').isdigit() else ord(x.replace('chr', '')) )) # compute offsets and bins offsets = {} all_bins = [] total = 0 chrom_sizes = {} for c in chroms: key = c if c in chrom_info_raw else c.replace('chr', '') clen = chrom_info_raw[key] n_bins = (clen + resolution - 1) // resolution offsets[c] = total chrom_sizes[c] = n_bins display_name = c if c.startswith('chr') else f'chr{c}' for i in range(n_bins): all_bins.append({ 'chrom': display_name, 'start': i * resolution, 'end': min((i + 1) * resolution, clen), 'idx': total + i, }) total += n_bins bins_df = pd.DataFrame(all_bins) matrix = np.zeros((total, total)) # resolve chrom keys chrom_keys = {} for c in chroms: chrom_keys[c] = c if c in chrom_info_raw else c.replace('chr', '') # fill matrix (reuse single HiCFile object) for i, c1 in enumerate(chroms): key1 = chrom_keys[c1] n1 = chrom_sizes[c1] o1 = offsets[c1] len1 = chrom_info_raw[key1] # intra-chromosomal (diagonal block) mzd = hic.getMatrixZoomData(key1, key1, 'observed', norm, 'BP', resolution) records = mzd.getRecords(0, len1, 0, len1) for rec in records: ri = rec.binX // resolution rj = rec.binY // resolution if ri < n1 and rj < n1: val = rec.counts if np.isfinite(val): matrix[o1+ri, o1+rj] = val matrix[o1+rj, o1+ri] = val # inter-chromosomal (off-diagonal blocks) for j in range(i + 1, len(chroms)): c2 = chroms[j] key2 = chrom_keys[c2] n2 = chrom_sizes[c2] o2 = offsets[c2] len2 = chrom_info_raw[key2] try: mzd = hic.getMatrixZoomData( key1, key2, 'observed', norm, 'BP', resolution) records = mzd.getRecords(0, len1, 0, len2) for rec in records: ri = rec.binX // resolution rj = rec.binY // resolution if ri < n1 and rj < n2: val = rec.counts if np.isfinite(val): matrix[o1+ri, o2+rj] = val matrix[o2+rj, o1+ri] = val except Exception: pass # some pairs may not have data return bins_df, matrix, offsets, chrom_sizes def export_pdb_coords(file_path, particles, scale=1.0, extended=True): """ Write chromosome particle coordinates as a PDB format file """ alc = ' ' ins = ' ' prefix = 'HETATM' line_format = '%-80.80s\n' if extended: pdb_format = '%-6.6s%5.1d %4.4s%s%3.3s %s%4.1d%s %8.3f%8.3f%8.3f%6.2f%6.2f %2.2s %10d\n' ter_format = '%-6.6s%5.1d %s %s%4.1d%s %10d\n' else: pdb_format = '%-6.6s%5.1d %4.4s%s%3.3s %s%4.1d%s %8.3f%8.3f%8.3f%6.2f%6.2f %2.2s \n' ter_format = '%-6.6s%5.1d %s %s%4.1d%s \n' file_obj = open(file_path, 'w') write = file_obj.write chromosomes = list(particles['chrom'].unique()) sort_chromos = [] for chromo in chromosomes: if chromo[:3].lower() == 'chr': key = chromo[3:] else: key = chromo if key.isdigit(): key = '%03d' % int(key) sort_chromos.append((key, chromo)) sort_chromos.sort() sort_chromos = [x[1] for x in sort_chromos] particle_size = particles.iloc[0]['end'] - particles.iloc[0]['start'] title = 'Genome structure export' write(line_format % 'TITLE %s' % title) write(line_format % 'REMARK 210') write(line_format % 'REMARK 210 Atom type C is used for all particles') write(line_format % 'REMARK 210 Atom number increases every %s bases' % particle_size) write(line_format % 'REMARK 210 Residue code indicates chromosome') write(line_format % 'REMARK 210 Residue number represents which sequence Mb the atom is in') if extended: file_obj.write(line_format % 'REMARK 210 Extended PDB format with particle seq. pos. in last column') file_obj.write(line_format % 'REMARK 210') pos_chromo = {} m = 0 line = 'MODEL %4d' % (m+1) write(line_format % line) c = 0 j = 1 seqPrev = None for k, chromo in enumerate(sort_chromos): chain_code = chr(ord('A')+k) tlc = chromo if tlc.lower().startswith("chr"): tlc = tlc[3:] while len(tlc) < 2: tlc = '_' + tlc if len(tlc) == 2: tlc = 'C' + tlc if len(tlc) > 3: tlc = tlc[:3] chrom_df = particles[particles['chrom'] == chromo] if chrom_df.shape[0] == 0: continue pos = chrom_df['start'] for i, seqPos in enumerate(list(pos.values)): c += 1 seqMb = int(seqPos//1e6) + 1 if seqMb == seqPrev: j += 1 else: j = 1 el = 'C' a = 'C%d' % j aName = '%-3s' % a x, y, z = chrom_df[['x', 'y', 'z']].iloc[i] #XYZ coordinates if scale != 1.0: x, y, z = [v*scale for v in [x, y, z]] seqPrev = seqMb pos_chromo[c] = chromo if extended: line = pdb_format % (prefix,c,aName,alc,tlc,chain_code,seqMb,ins,x,y,z,0.0,0.0,el,seqPos) else: line = pdb_format % (prefix,c,aName,alc,tlc,chain_code,seqMb,ins,x,y,z,0.0,0.0,el) write(line) write(line_format % 'ENDMDL') for i in range(c - 1): if pos_chromo[i+1] == pos_chromo[i+2]: line = 'CONECT%5.1d%5.1d' % (i+1, i+2) write(line_format % line) write(line_format % 'END') file_obj.close() def read_bintu_tracing( path, chrom: str, start_bp: int, resolution_bp: int = 30_000, skip_header_line: bool = True, cell_id_per_trace: bool = True, ): """Read a Bintu 2018-style chromatin-tracing CSV into a ChromData. Bintu et al. 2018 (*Science* 362, eaau1783) distribute their FISH imaging as simple CSVs with columns ``Chromosome index, Segment index, Z, X, Y`` (coordinates in nm). Each ``Chromosome index`` is one imaged chromatin fiber in one cell (they do not distinguish multiple alleles per cell), and segments are numbered 1..N along the genomic region. Parameters ---------- path : str Path to the CSV (e.g. ``IMR90_chr21-18-20Mb.csv``). chrom : str Chromosome name for the output (e.g. ``'chr21'``). start_bp : int Genomic start of segment 1 (from the dataset's README — for ``IMR90_chr21-18-20Mb`` this is ``18_627_714`` in hg38). resolution_bp : int Genomic size of each segment (30 kb in Bintu 2018). skip_header_line : bool If True, skip the first descriptive line before the CSV header. cell_id_per_trace : bool If True, assign each trace to its own ``cell_id`` (one trace per cell — matches how Bintu 2018 published the data). Returns ------- ChromData """ import pandas as pd import numpy as np from uchrom.core import ChromData raw = pd.read_csv(path, skiprows=1 if skip_header_line else 0) raw = raw.dropna(subset=['X', 'Y', 'Z']) raw = raw.rename(columns={ 'Chromosome index': 'trace_id', 'Segment index': 'segment', 'X': 'x', 'Y': 'y', 'Z': 'z', }) raw['trace_id'] = raw['trace_id'].astype(int) raw['segment'] = raw['segment'].astype(int) raw['chrom'] = chrom raw['start'] = (raw['segment'] - 1) * resolution_bp + start_bp raw['end'] = raw['start'] + resolution_bp if cell_id_per_trace: raw['cell_id'] = raw['trace_id'].astype(str) coords = raw[['x', 'y', 'z']].to_numpy(dtype=np.float64) spots_cols = ['chrom', 'start', 'end', 'trace_id'] if 'cell_id' in raw.columns: spots_cols.append('cell_id') spots = raw[spots_cols].reset_index(drop=True) return ChromData( coords=coords, spots=spots, uns={'xyz_unit': 'nm', 'genome_assembly': 'hg38', 'source': 'Bintu 2018 (Science 362, eaau1783)'}, ) def particles_to_pdb(path_particles, path_pdb, split_by_chrom=False): df = read_particles(path_particles) if not split_by_chrom: export_pdb_coords(path_pdb, df) else: from os.path import splitext chroms = list(df['chrom'].unique()) prefix, suffix = splitext(path_pdb) for chr in chroms: path = f"{prefix}.{chr}{suffix}" sdf = df[df['chrom'] == chr] export_pdb_coords(path, sdf) if __name__ == "__main__": fire.Fire({ "particles_to_pdb": particles_to_pdb })