Source code for uchrom.auto_discovery.runner

"""Runnable auto-discovery pipeline for ChromData."""

from __future__ import annotations

import asyncio
import json
import math
import threading
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Awaitable

from .ideas import DiscoveryIdea, review_idea_against_schema
from .notebooks import (
    create_exploration_notebook,
    execute_notebook_python,
    upsert_structured_conclusion,
)


[docs] @dataclass class DiscoveryRunConfig: """Configuration for a local auto-discovery run.""" h5cd_path: str | Path output_dir: str | Path max_ideas: int = 12 ideas_path: str | Path | None = None max_complexity: int = 5 idea_source: str = "pantheon" code_source: str = "pantheon" model: str | None = None reasoning_effort: str | None = None llm_timeout: int = 420 idea_agent_count: int = 3 notebook_agent_concurrency: int = 3 generate_schematic_image: bool = False schematic_image_model: str | None = None schematic_image_model_args: dict[str, Any] | None = None execute: bool = True stop_on_error: bool = False store_schema: bool = False dataset_name: str | None = None
[docs] @dataclass class DiscoveryRunResult: """Summary of an auto-discovery run.""" output_dir: str n_generated: int n_accepted: int n_executed: int n_verified: int report_path: str ideas_path: str reviews_path: str results_path: str notebooks: list[str] = field(default_factory=list) agent_records_path: str | None = None
[docs] def run_auto_discovery(config: DiscoveryRunConfig | dict[str, Any]) -> DiscoveryRunResult: """Run schema → idea → notebook → verification for one ``.h5cd``.""" if not isinstance(config, DiscoveryRunConfig): config = DiscoveryRunConfig(**dict(config)) from uchrom import ChromData h5cd_path = Path(config.h5cd_path) output_dir = Path(config.output_dir) notebooks_dir = output_dir / "notebooks" output_dir.mkdir(parents=True, exist_ok=True) notebooks_dir.mkdir(parents=True, exist_ok=True) cdata = ChromData.read(h5cd_path) schema = cdata.build_discovery_schema(store=config.store_schema, dataset_name=config.dataset_name) if config.store_schema: cdata.write(h5cd_path) agent_records_path = output_dir / "agent_records.jsonl" agent_records: list[dict[str, Any]] = [] if config.ideas_path is not None: ideas = [ DiscoveryIdea.from_dict(row) for row in _read_jsonl(Path(config.ideas_path)) ][:config.max_ideas] elif config.idea_source == "pantheon": from .pantheon import generate_pantheon_ideas ideas, records = _run_async(generate_pantheon_ideas( schema, output_dir=output_dir, max_ideas=config.max_ideas, model=config.model, timeout=config.llm_timeout, idea_agent_count=config.idea_agent_count, )) agent_records.extend(_agent_record_rows(records)) elif config.idea_source == "openai": from .llm import generate_openai_ideas ideas = generate_openai_ideas( schema, max_ideas=config.max_ideas, model=config.model, reasoning_effort=config.reasoning_effort, timeout=config.llm_timeout, ) else: raise ValueError("idea_source must be 'pantheon' or 'openai'") ideas_path = output_dir / "ideas.jsonl" reviews_path = output_dir / "reviews.jsonl" results_path = output_dir / "results.jsonl" report_path = output_dir / "report.md" reviews = [] results = [] notebooks = [] accepted: list[DiscoveryIdea] = [] for idea in ideas: review = review_idea_against_schema(idea, schema, max_complexity=config.max_complexity) reviews.append({"idea_id": idea.idea_id, **review.to_dict()}) if review.accepted: accepted.append(idea) # Persist generation/review outputs before notebook agents run. Long # notebook batches may hit transient provider errors, and these files make # the generated idea set auditable even if exploration is interrupted. _write_jsonl(ideas_path, [idea.to_dict() for idea in ideas]) _write_jsonl(reviews_path, reviews) if config.code_source == "pantheon": for idea in accepted: notebook_path = notebooks_dir / f"{idea.idea_id}.ipynb" verification_code = _verification_code_for_idea(idea, output_dir) create_exploration_notebook( idea, notebook_path, h5cd_path=h5cd_path, run_output_dir=output_dir, analysis_code=_pantheon_analysis_placeholder(), verification_code=verification_code, ) notebooks.append(str(notebook_path)) if accepted: from .pantheon import run_pantheon_notebook_agents records = _run_async(run_pantheon_notebook_agents( accepted, schema=schema, h5cd_path=h5cd_path, output_dir=output_dir, notebooks_dir=notebooks_dir, model=config.model, timeout=config.llm_timeout, concurrency=config.notebook_agent_concurrency, generate_schematic_image=config.generate_schematic_image, schematic_image_model=config.schematic_image_model, schematic_image_model_args=config.schematic_image_model_args, )) agent_records.extend(_agent_record_rows(records)) elif config.code_source != "openai": raise ValueError("code_source must be 'pantheon' or 'openai'") for idea in accepted: notebook_path = notebooks_dir / f"{idea.idea_id}.ipynb" if config.code_source == "openai": from .llm import generate_openai_analysis_code analysis_code = generate_openai_analysis_code( idea, schema, model=config.model, reasoning_effort=config.reasoning_effort, timeout=config.llm_timeout, ) verification_code = _verification_code_for_idea(idea, output_dir) create_exploration_notebook( idea, notebook_path, h5cd_path=h5cd_path, run_output_dir=output_dir, analysis_code=analysis_code, verification_code=verification_code, ) notebooks.append(str(notebook_path)) if config.execute: started = time.time() execution = execute_notebook_python(notebook_path, stop_on_error=config.stop_on_error) elapsed = time.time() - started verification = execution.get("verification") or {} _complete_runner_verification( notebook_path, verification, elapsed_sec=elapsed, stop_on_error=config.stop_on_error, ) upsert_structured_conclusion(notebook_path, idea, verification) result = { "idea_id": idea.idea_id, "notebook": str(notebook_path), "execution_ok": bool(execution.get("ok")), "elapsed_sec": round(elapsed, 3), "verification": verification, "errors": execution.get("errors", []), } else: result = { "idea_id": idea.idea_id, "notebook": str(notebook_path), "execution_ok": None, "elapsed_sec": None, "verification": {"status": "not_executed"}, "errors": [], } results.append(result) _write_jsonl(ideas_path, [idea.to_dict() for idea in ideas]) _write_jsonl(reviews_path, reviews) _write_jsonl(results_path, results) _write_jsonl(agent_records_path, agent_records) _write_report(report_path, config, schema, ideas, reviews, results) verified = [ r for r in results if (r.get("verification") or {}).get("status") == "pass" ] return DiscoveryRunResult( output_dir=str(output_dir), n_generated=len(ideas), n_accepted=len(accepted), n_executed=len(results) if config.execute else 0, n_verified=len(verified), report_path=str(report_path), ideas_path=str(ideas_path), reviews_path=str(reviews_path), results_path=str(results_path), notebooks=notebooks, agent_records_path=str(agent_records_path), )
def _run_async(coro: Awaitable[Any]) -> Any: """Run an async backend call from scripts or active notebook loops.""" try: asyncio.get_running_loop() except RuntimeError: return asyncio.run(_await_with_cleanup(coro)) result: dict[str, Any] = {} def _target() -> None: try: result["value"] = asyncio.run(_await_with_cleanup(coro)) except BaseException as exc: # pragma: no cover - active loop fallback result["error"] = exc thread = threading.Thread(target=_target, daemon=True) thread.start() thread.join() if "error" in result: raise result["error"] return result.get("value") async def _await_with_cleanup(coro: Awaitable[Any]) -> Any: loop = asyncio.get_running_loop() previous_handler = loop.get_exception_handler() def _ignore_async_client_close(loop: asyncio.AbstractEventLoop, context: dict[str, Any]) -> None: exc = context.get("exception") future = context.get("future") future_name = "" if future is not None and hasattr(future, "get_coro"): future_name = getattr(future.get_coro(), "__qualname__", "") is_httpx_close = "AsyncClient.aclose" in future_name is_closed_loop = isinstance(exc, RuntimeError) and str(exc) == "Event loop is closed" if is_httpx_close and is_closed_loop: return if previous_handler is not None: previous_handler(loop, context) else: loop.default_exception_handler(context) loop.set_exception_handler(_ignore_async_client_close) try: return await coro finally: # Pantheon/OpenAI clients may schedule async close tasks just after the # final response. Let them drain before asyncio.run closes the loop. await asyncio.sleep(0.5) loop.set_exception_handler(previous_handler) def _agent_record_rows(records: list[Any]) -> list[dict[str, Any]]: return [ { "agent_name": record.agent_name, "role": record.role, "prompt_path": record.prompt_path, "content": record.content, } for record in records ] def _pantheon_analysis_placeholder() -> str: return ( "# PantheonOS notebook agent must edit this cell with executable analysis.\n" "# It may also insert Markdown and additional code cells around it.\n" "raise RuntimeError('PantheonOS notebook agent did not replace the exploration placeholder')\n" ) def _verification_code_for_idea(idea: DiscoveryIdea, output_dir: Path) -> str: return """ checks = {check: 'not_run' for check in IDEA.validation_checks} notes = [] checks.setdefault('statistical_hypothesis_test', 'not_run') def _check_keys(prefix): return [key for key in checks if key == prefix or key.startswith(prefix + ':')] def _set_check(prefix, value): keys = _check_keys(prefix) if not keys: checks[prefix] = value return for key in keys: checks[key] = value def _check_status(prefix): values = [checks[key] for key in _check_keys(prefix)] if not values: return None if 'fail' in values: return 'fail' if all(value == 'pass' for value in values): return 'pass' return values[0] _set_check('required_fields_exist', 'pass' if review is not None and review.accepted else 'fail') if _check_keys('cell_id_alignment'): aligned = True if cdata is not None and adata is not None and len(cdata.cells) == len(adata.obs_names): aligned = list(map(str, cdata.cells.index)) == list(map(str, adata.obs_names)) _set_check('cell_id_alignment', 'pass' if aligned else 'fail') if _check_keys('minimum_cell_count'): n_cells = analysis_summary.get('n_selected_cells') if n_cells is None and 'cell_type' in getattr(result_table, 'columns', []): n_cells = len(result_table) if n_cells is None: n_cells = len(cdata.cells) if cdata is not None and getattr(cdata, 'n_cells', 0) else 0 _set_check('minimum_cell_count', 'pass' if n_cells >= 1 else 'fail') if _check_keys('minimum_spot_or_trace_count'): n_rows = analysis_summary.get('n_rows') if n_rows is None: n_rows = len(result_table) if result_table is not None else 0 _set_check('minimum_spot_or_trace_count', 'pass' if n_rows >= 1 else 'fail') if _check_keys('finite_numeric_output'): value = analysis_summary.get('parameter_value') _set_check('finite_numeric_output', 'pass' if value is not None and np.isfinite(value) else 'fail') if _check_keys('statistical_hypothesis_test'): p_value = analysis_summary.get('p_value') test_method = analysis_summary.get('test_method') null_hypothesis = analysis_summary.get('null_hypothesis') alternative_hypothesis = analysis_summary.get('alternative_hypothesis') observed_statistic = analysis_summary.get('observed_statistic') effect_size = analysis_summary.get('effect_size') hypothesis_test_status = analysis_summary.get('hypothesis_test_status', 'pass') try: p_float = float(p_value) except Exception: p_float = np.nan try: stat_float = float(observed_statistic) except Exception: stat_float = np.nan try: effect_float = float(effect_size) except Exception: effect_float = np.nan has_required_test = ( test_method is not None and str(test_method).strip() != '' and null_hypothesis is not None and str(null_hypothesis).strip() != '' and alternative_hypothesis is not None and str(alternative_hypothesis).strip() != '' and np.isfinite(p_float) and 0.0 <= p_float <= 1.0 and np.isfinite(stat_float) and np.isfinite(effect_float) and hypothesis_test_status != 'insufficient_data' ) if result_table is not None and hasattr(result_table, 'columns'): has_required_test = has_required_test and 'p_value' in result_table.columns and 'test_method' in result_table.columns else: has_required_test = False _set_check('statistical_hypothesis_test', 'pass' if has_required_test else 'fail') if not has_required_test: notes.append('statistical_hypothesis_test failed: analysis_summary must include null_hypothesis, alternative_hypothesis, test_method, observed_statistic, effect_size, finite p_value in [0,1], and result_table columns p_value/test_method') if _check_keys('negative_control_or_permutation'): test_method_text = str(analysis_summary.get('test_method', '')).lower() summary_keys_text = ' '.join(str(key).lower() for key in analysis_summary.keys()) result_columns_text = '' if result_table is not None and hasattr(result_table, 'columns'): result_columns_text = ' '.join(str(col).lower() for col in result_table.columns) control_text = ' '.join([test_method_text, summary_keys_text, result_columns_text]) has_control_or_permutation = any( token in control_text for token in ['permutation', 'randomization', 'shuffle', 'negative_control', 'null_distribution', 'control'] ) _set_check( 'negative_control_or_permutation', 'pass' if has_control_or_permutation else 'not_implemented', ) for check in list(checks): if checks[check] == 'not_run' and ('negative_control' in check or check.endswith('_control')): checks[check] = 'not_implemented' required_for_pass = ['required_fields_exist', 'minimum_cell_count', 'finite_numeric_output', 'statistical_hypothesis_test'] status = 'pass' for check in required_for_pass: if _check_status(check) == 'fail': status = 'fail' notes.append(f'{check} failed') n_rows_for_status = analysis_summary.get('n_rows') if n_rows_for_status is None: n_rows_for_status = len(result_table) if result_table is not None else 0 if n_rows_for_status == 0: status = 'fail' notes.append('analysis produced no result rows') verification = { 'idea_id': IDEA.idea_id, 'status': status, 'checks': checks, 'parameter_value': analysis_summary.get('parameter_value'), 'p_value': analysis_summary.get('p_value'), 'test_method': analysis_summary.get('test_method'), 'effect_size': analysis_summary.get('effect_size'), 'observed_statistic': analysis_summary.get('observed_statistic'), 'null_hypothesis': analysis_summary.get('null_hypothesis'), 'alternative_hypothesis': analysis_summary.get('alternative_hypothesis'), 'hypothesis_test_status': analysis_summary.get('hypothesis_test_status'), 'n_selected_cells': analysis_summary.get('n_selected_cells'), 'n_rows': analysis_summary.get('n_rows'), 'result_path': analysis_summary.get('result_path'), 'notes': notes + analysis_summary.get('notes', []), } print(json.dumps(verification, indent=2)) """ def _complete_runner_verification( notebook_path: Path, verification: dict[str, Any], *, elapsed_sec: float, stop_on_error: bool, ) -> None: checks = verification.setdefault("checks", {}) def _check_keys(prefix: str) -> list[str]: return [key for key in checks if key == prefix or key.startswith(prefix + ":")] def _set_check(prefix: str, value: str) -> None: keys = _check_keys(prefix) if not keys: return for key in keys: checks[key] = value runtime_keys = _check_keys("runtime_under_budget") if runtime_keys and any(checks.get(key) == "not_run" for key in runtime_keys): _set_check("runtime_under_budget", "pass" if elapsed_sec <= 60 else "fail") rerun_keys = _check_keys("deterministic_rerun") if rerun_keys and any(checks.get(key) == "not_run" for key in rerun_keys): first_value = verification.get("parameter_value") rerun = execute_notebook_python(notebook_path, stop_on_error=stop_on_error) second = rerun.get("verification") or {} second_value = second.get("parameter_value") same = _same_numeric(first_value, second_value) _set_check("deterministic_rerun", "pass" if rerun.get("ok") and same else "fail") verification.setdefault("notes", []).append( f"deterministic_rerun parameter_value={second_value!r}" ) if any(v == "fail" for v in checks.values()): verification["status"] = "fail" def _same_numeric(a: Any, b: Any, *, tol: float = 1e-9) -> bool: try: af = float(a) bf = float(b) except Exception: return a == b if math.isnan(af) and math.isnan(bf): return True return abs(af - bf) <= tol def _write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None: with path.open("w") as fh: for row in rows: fh.write(json.dumps(row, default=_json_default, sort_keys=True) + "\n") def _read_jsonl(path: Path) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] with path.open() as fh: for line in fh: line = line.strip() if line: rows.append(json.loads(line)) return rows def _write_report( path: Path, config: DiscoveryRunConfig, schema: dict[str, Any], ideas: list[DiscoveryIdea], reviews: list[dict[str, Any]], results: list[dict[str, Any]], ) -> None: verified = [ r for r in results if (r.get("verification") or {}).get("status") == "pass" ] lines = [ "# U-Chrom Auto-Discovery Run", "", f"h5cd: `{config.h5cd_path}`", f"schema_hash: `{schema.get('schema_hash')}`", f"generated ideas: {len(ideas)}", f"accepted ideas: {sum(1 for r in reviews if r.get('accepted'))}", f"executed notebooks: {len(results) if config.execute else 0}", f"verified ideas: {len(verified)}", "", "## Verified Ideas", ] if not verified: lines.append("") lines.append("None.") for result in verified: verification = result.get("verification") or {} idea = next((i for i in ideas if i.idea_id == result["idea_id"]), None) lines.extend([ "", f"### {idea.idea_title if idea else result['idea_id']}", f"- idea_id: `{result['idea_id']}`", f"- parameter_value: `{verification.get('parameter_value')}`", f"- test_method: `{verification.get('test_method')}`", f"- p_value: `{verification.get('p_value')}`", f"- effect_size: `{verification.get('effect_size')}`", f"- notebook: `{result.get('notebook')}`", f"- result_path: `{verification.get('result_path')}`", ]) rejected = [r for r in reviews if not r.get("accepted")] if rejected: lines.extend(["", "## Rejected Ideas"]) for review in rejected: lines.append(f"- `{review['idea_id']}`: {'; '.join(review.get('errors', []))}") path.write_text("\n".join(lines) + "\n") def _json_default(value: Any) -> Any: if isinstance(value, Path): return str(value) if isinstance(value, float) and (math.isnan(value) or math.isinf(value)): return None return str(value)