Source code for sceleto._process

"""Preprocessing functions (ported from scjp, modernized)."""

from __future__ import annotations

from typing import Optional

import numpy as np
import scanpy as sc


def _load_cc_genes():
    """Load cell cycle gene list from gene_categories.json."""
    import json
    from pathlib import Path
    path = Path(__file__).parent / "data" / "gene_categories.json"
    with open(path) as f:
        d = json.load(f)
    return d["Cell_Cycle"]


[docs] def remove_geneset(adata, geneset): """Remove genes in *geneset* from *adata* and return a copy.""" return adata[:, ~adata.var_names.isin(list(geneset))].copy()
[docs] def sc_process(adata, steps: str = "fspkuc", n_pcs: int = 50): """Scanpy preprocessing pipeline controlled by a step string. Each letter in *steps* triggers one preprocessing step, executed in order: === ============================ n normalize_total (1e4) l log1p + store .raw f highly_variable_genes + filter r remove cell-cycle genes s scale (max_value=10) p PCA k kNN neighbors u UMAP c leiden clustering === ============================ Parameters ---------- adata : AnnData steps : str Letters selecting which steps to run. Default ``"fspkuc"``. n_pcs : int Number of PCs for neighbor search. Default 50. """ if "n" in steps: sc.pp.normalize_total(adata, target_sum=1e4) if "l" in steps: sc.pp.log1p(adata) adata.raw = adata print("adding raw...") if "f" in steps: if adata.raw is None: adata.raw = adata print("adding raw...") sc.pp.highly_variable_genes(adata) adata = adata[:, adata.var.highly_variable].copy() if "r" in steps: cc_genes = _load_cc_genes() adata = remove_geneset(adata, cc_genes) print("removing cc_genes...") if "s" in steps: sc.pp.scale(adata, max_value=10) if "p" in steps: sc.pp.pca(adata) if "k" in steps: sc.pp.neighbors(adata, n_pcs=n_pcs) if "u" in steps: sc.tl.umap(adata) if "c" in steps: sc.tl.leiden(adata) return adata
[docs] def read_process( adata, version: str, *, species: str = "human", sample: Optional[str] = None, define_var: bool = True, call_doublet: bool = True, write: bool = True, min_n_counts: int = 1000, min_n_genes: int = 500, max_n_genes: int = 7000, max_pct_mito: float = 0.5, ): """QC filtering + optional doublet detection + write. Parameters ---------- adata : AnnData Raw count matrix. version : str Version tag for the output filename. species : str ``"human"`` or ``"mouse"`` (determines mito gene prefix). sample : str, optional Sample name stored in ``adata.obs["Sample"]``. define_var : bool If True, copy gene names / Ensembl IDs into ``adata.var``. call_doublet : bool If True, run scrublet for doublet detection (lazy import). write : bool If True, save filtered adata as h5ad. min_n_counts, min_n_genes, max_n_genes : int Cell-level count / gene number thresholds. max_pct_mito : float Maximum mitochondrial fraction (0–1). """ if sample: adata.obs["Sample"] = sample if define_var: adata.var["GeneName"] = list(adata.var.gene_ids.index) adata.var["EnsemblID"] = list(adata.var.gene_ids) # QC metrics via scanpy mito_prefix = {"human": "MT-", "mouse": "mt-"} if species not in mito_prefix: raise ValueError(f"Unknown species: {species!r}. Use 'human' or 'mouse'.") adata.var["mt"] = adata.var_names.str.startswith(mito_prefix[species]) sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], inplace=True) print(f"calculating mito... as species = {species}") print( f"filtering cells... >{min_n_counts} counts, " f"{min_n_genes}{max_n_genes} genes, <{max_pct_mito} pct_mito..." ) keep = ( (adata.obs["total_counts"] > min_n_counts) & (adata.obs["n_genes_by_counts"] > min_n_genes) & (adata.obs["n_genes_by_counts"] < max_n_genes) & (adata.obs["pct_counts_mt"] / 100 < max_pct_mito) ) adata = adata[keep].copy() if call_doublet: import scrublet as scr print("calling doublets using scrublet...") scrub = scr.Scrublet(adata.X) doublet_scores, predicted_doublets = scrub.scrub_doublets(verbose=False) adata.obs["doublet_scores"] = doublet_scores adata.obs["predicted_doublets"] = predicted_doublets if write: out_path = f"{version}{sample}_filtered.h5ad" print(f"writing output to {out_path} ...") adata.write_h5ad(out_path) return adata