Multi-Condition Causal Signaling
In [ ]:
Copied!
"""
UC1: Multi-Condition Causal Signaling with AnnNet as the Hub
Dependencies:
pip install annnet cptac decoupler omnipath corneto torch-geometric polars narwhals
"""
import time
import polars as pl
import pandas as pd
import numpy as np
import cptac
import decoupler as dc
import corneto as cn
from corneto.methods.signalling.carnival import multi_carnival
import sys
import os
sys.path.insert(0, os.path.abspath('..'))
from annnet.io.omnipath import from_omnipath
"""
UC1: Multi-Condition Causal Signaling with AnnNet as the Hub
Dependencies:
pip install annnet cptac decoupler omnipath corneto torch-geometric polars narwhals
"""
import time
import polars as pl
import pandas as pd
import numpy as np
import cptac
import decoupler as dc
import corneto as cn
from corneto.methods.signalling.carnival import multi_carnival
import sys
import os
sys.path.insert(0, os.path.abspath('..'))
from annnet.io.omnipath import from_omnipath
In [ ]:
Copied!
# =============================================================================
# STEP 1 — Load OmniPath into AnnNet
# All metadata into vertex_attributes / edge_attributes.
# =============================================================================
# Load the core signed directed PPI + kinase-substrate network.
# from_omnipath handles fetch, cache (~114MB one-time), and bulk loading.
G = from_omnipath(
dataset='omnipath',
query={'organism': 'human', 'genesymbols': True},
source_col='source_genesymbol',
target_col='target_genesymbol',
edge_attr_cols=[
'is_stimulation',
'is_inhibition',
'consensus_direction',
'consensus_stimulation',
'consensus_inhibition',
'n_sources',
'n_references',
'sources',
],
slice='omnipath_pkn',
vertex_annotation_sources=[
'HGNC',
'CancerGeneCensus',
'SignaLink_function',
'SignaLink_pathway',
'UniProt_location',
'IntOGen',
'kinase.com',
'Phosphatome',
],
load_vertex_annotations=True,
)
# =============================================================================
# STEP 1 — Load OmniPath into AnnNet
# All metadata into vertex_attributes / edge_attributes.
# =============================================================================
# Load the core signed directed PPI + kinase-substrate network.
# from_omnipath handles fetch, cache (~114MB one-time), and bulk loading.
G = from_omnipath(
dataset='omnipath',
query={'organism': 'human', 'genesymbols': True},
source_col='source_genesymbol',
target_col='target_genesymbol',
edge_attr_cols=[
'is_stimulation',
'is_inhibition',
'consensus_direction',
'consensus_stimulation',
'consensus_inhibition',
'n_sources',
'n_references',
'sources',
],
slice='omnipath_pkn',
vertex_annotation_sources=[
'HGNC',
'CancerGeneCensus',
'SignaLink_function',
'SignaLink_pathway',
'UniProt_location',
'IntOGen',
'kinase.com',
'Phosphatome',
],
load_vertex_annotations=True,
)
In [ ]:
Copied!
print(G.shape)
print(G.vertex_attributes.head(2))
print(G.edge_attributes.head(2))
print(G.shape)
print(G.vertex_attributes.head(2))
print(G.edge_attributes.head(2))
In [ ]:
Copied!
# AnnNet value prop: one Polars filter that would be a messy join in other network packages.
trusted_edges = (
G.edge_attributes.filter((pl.col('n_sources') >= 3) & pl.col('consensus_direction'))
.select('edge_id')
.to_series()
.to_list()
)
print(f'Trusted edges (>=3 sources, consensus direction): {len(trusted_edges)}')
# AnnNet value prop: one Polars filter that would be a messy join in other network packages.
trusted_edges = (
G.edge_attributes.filter((pl.col('n_sources') >= 3) & pl.col('consensus_direction'))
.select('edge_id')
.to_series()
.to_list()
)
print(f'Trusted edges (>=3 sources, consensus direction): {len(trusted_edges)}')
In [ ]:
Copied!
# Load kinase-substrate network separately for kinase activity inference.
# We load it into a plain DataFrame for decoupler.
import omnipath as op
kin_df = op.interactions.KinaseExtra.get(genesymbols=True)
kin_net = kin_df[['source_genesymbol', 'target_genesymbol', 'is_stimulation']].copy()
kin_net.columns = ['source', 'target', 'weight']
kin_net['weight'] = kin_net['weight'].map({True: 1, False: -1}).fillna(1)
# Load kinase-substrate network separately for kinase activity inference.
# We load it into a plain DataFrame for decoupler.
import omnipath as op
kin_df = op.interactions.KinaseExtra.get(genesymbols=True)
kin_net = kin_df[['source_genesymbol', 'target_genesymbol', 'is_stimulation']].copy()
kin_net.columns = ['source', 'target', 'weight']
kin_net['weight'] = kin_net['weight'].map({True: 1, False: -1}).fillna(1)
In [ ]:
Copied!
# CollecTRI for TF regulons from decoupler
collectri = dc.op.collectri()
# CollecTRI for TF regulons from decoupler
collectri = dc.op.collectri()
In [ ]:
Copied!
# =============================================================================
# STEP 2 — Load CPTAC LUAD transcriptomics, infer TF and kinase activities
# =============================================================================
luad = cptac.Luad()
rna = luad.get_transcriptomics('bcm')
rna = rna.dropna(axis=1, how='all').transpose()
# =============================================================================
# STEP 2 — Load CPTAC LUAD transcriptomics, infer TF and kinase activities
# =============================================================================
luad = cptac.Luad()
rna = luad.get_transcriptomics('bcm')
rna = rna.dropna(axis=1, how='all').transpose()
In [ ]:
Copied!
# =============================================================================
# STEP 2.1 — Define AnnNet (Kivelä) layers: one aspect (patient)
# Expression + decoupler outputs will live as vertex–layer attributes (u, (patient,))
# =============================================================================
# Ensure CPTAC sample IDs are flat strings (CPTAC can use MultiIndex sometimes)
if hasattr(rna.columns, 'levels'): # pandas MultiIndex
rna.columns = rna.columns.get_level_values(0)
rna.columns = rna.columns.astype(str)
# rna is currently genes x patients
patients = rna.columns.tolist()
# Configure AnnNet multilayer structure: 1 aspect ("patient"), layers = CPTAC sample ids
G.set_aspects(['patient'], {'patient': patients})
print(f'Configured aspects: {G.aspects}')
print(f' Number of patient layers: {len(G.elem_layers["patient"])}')
# =============================================================================
# STEP 2.1 — Define AnnNet (Kivelä) layers: one aspect (patient)
# Expression + decoupler outputs will live as vertex–layer attributes (u, (patient,))
# =============================================================================
# Ensure CPTAC sample IDs are flat strings (CPTAC can use MultiIndex sometimes)
if hasattr(rna.columns, 'levels'): # pandas MultiIndex
rna.columns = rna.columns.get_level_values(0)
rna.columns = rna.columns.astype(str)
# rna is currently genes x patients
patients = rna.columns.tolist()
# Configure AnnNet multilayer structure: 1 aspect ("patient"), layers = CPTAC sample ids
G.set_aspects(['patient'], {'patient': patients})
print(f'Configured aspects: {G.aspects}')
print(f' Number of patient layers: {len(G.elem_layers["patient"])}')
In [ ]:
Copied!
# =============================================================================
# STEP 2.2 — Add vertex presence + write expression as vertex–layer attributes
# Contract: (gene, (patient,)) -> {"expr": float}
# =============================================================================
# CPTAC transcriptomics rows are MultiIndex -> extract gene symbols
if isinstance(rna.index, pd.MultiIndex):
genes = rna.index.get_level_values(0).astype(str)
rna = rna.copy()
rna.index = genes
else:
genes = rna.index.astype(str)
# Keep only genes that exist as vertices in the PKN graph
genes_in_graph = [g for g in genes if (g in G.entity_to_idx and G.entity_types.get(g) == 'vertex')]
print(f'Genes in CPTAC rna: {len(genes)}')
print(f'Genes present as vertices in AnnNet: {len(genes_in_graph)}')
# Collect all vertex–layer attrs into a dict, then write once (avoids
# per-call DataFrame reconstruction which is O(n²) for large data)
_vla: dict = {}
for p in patients:
aa = (p,)
col = rna.loc[genes_in_graph, p]
for g, v in col.items():
if not G.has_presence(g, aa):
G.add_presence(g, aa)
_vla[(g, aa)] = {'expr': float(v)}
G._vertex_layer_attrs = _vla # single DataFrame build via property setter
print('Expression injection done (vertex–layer attrs).')
# Sanity check
_test_gene = genes_in_graph[0]
_test_patient = patients[0]
print(
'Example vertex–layer attrs:',
_test_gene,
_test_patient,
G.get_vertex_layer_attrs(_test_gene, (_test_patient,)),
)
# =============================================================================
# STEP 2.2 — Add vertex presence + write expression as vertex–layer attributes
# Contract: (gene, (patient,)) -> {"expr": float}
# =============================================================================
# CPTAC transcriptomics rows are MultiIndex -> extract gene symbols
if isinstance(rna.index, pd.MultiIndex):
genes = rna.index.get_level_values(0).astype(str)
rna = rna.copy()
rna.index = genes
else:
genes = rna.index.astype(str)
# Keep only genes that exist as vertices in the PKN graph
genes_in_graph = [g for g in genes if (g in G.entity_to_idx and G.entity_types.get(g) == 'vertex')]
print(f'Genes in CPTAC rna: {len(genes)}')
print(f'Genes present as vertices in AnnNet: {len(genes_in_graph)}')
# Collect all vertex–layer attrs into a dict, then write once (avoids
# per-call DataFrame reconstruction which is O(n²) for large data)
_vla: dict = {}
for p in patients:
aa = (p,)
col = rna.loc[genes_in_graph, p]
for g, v in col.items():
if not G.has_presence(g, aa):
G.add_presence(g, aa)
_vla[(g, aa)] = {'expr': float(v)}
G._vertex_layer_attrs = _vla # single DataFrame build via property setter
print('Expression injection done (vertex–layer attrs).')
# Sanity check
_test_gene = genes_in_graph[0]
_test_patient = patients[0]
print(
'Example vertex–layer attrs:',
_test_gene,
_test_patient,
G.get_vertex_layer_attrs(_test_gene, (_test_patient,)),
)
In [ ]:
Copied!
# =============================================================================
# STEP 2.3 — Prepare decoupler input (genes x samples) from the same rna table
# We'll run decoupler next, then write outputs back as vertex–layer attrs:
# (TF, patient) -> tf_activity
# (kinase, patient) -> kinase_activity
# =============================================================================
expr = rna.loc[genes_in_graph, patients].copy()
# decoupler likes string indices/columns
expr.index = expr.index.astype(str)
expr.columns = expr.columns.astype(str)
print('expr shape (genes x patients):', expr.shape)
# =============================================================================
# STEP 2.3 — Prepare decoupler input (genes x samples) from the same rna table
# We'll run decoupler next, then write outputs back as vertex–layer attrs:
# (TF, patient) -> tf_activity
# (kinase, patient) -> kinase_activity
# =============================================================================
expr = rna.loc[genes_in_graph, patients].copy()
# decoupler likes string indices/columns
expr.index = expr.index.astype(str)
expr.columns = expr.columns.astype(str)
print('expr shape (genes x patients):', expr.shape)
In [ ]:
Copied!
# Full transcriptome for TF inference
expr_full = rna.copy() # genes x patients (≈59k genes)
# PKN-restricted for kinase inference + CORNETO
expr_pkn = expr.loc[genes_in_graph, patients]
# Full transcriptome for TF inference
expr_full = rna.copy() # genes x patients (≈59k genes)
# PKN-restricted for kinase inference + CORNETO
expr_pkn = expr.loc[genes_in_graph, patients]
In [ ]:
Copied!
expr_genes = set(expr_full.index)
collectri_targets = set(collectri['target'])
print('Expr genes:', len(expr_genes))
print('CollecTRI targets:', len(collectri_targets))
print('Intersection:', len(expr_genes & collectri_targets))
expr_genes = set(expr_full.index)
collectri_targets = set(collectri['target'])
print('Expr genes:', len(expr_genes))
print('CollecTRI targets:', len(collectri_targets))
print('Intersection:', len(expr_genes & collectri_targets))
In [ ]:
Copied!
shared_genes = sorted(expr_genes & collectri_targets)
print('Shared genes used for TF inference:', len(shared_genes))
expr_tf = expr_full.loc[shared_genes]
shared_genes = sorted(expr_genes & collectri_targets)
print('Shared genes used for TF inference:', len(shared_genes))
expr_tf = expr_full.loc[shared_genes]
In [ ]:
Copied!
# =============================================================================
# STEP 2.4 — TF activity inference (decoupler ULM)
# =============================================================================
tf_es, tf_pv = dc.mt.ulm(
expr_tf.T, # samples x genes
net=collectri,
)
print('TF activity matrix shape:', tf_es.shape)
print('TF p-value matrix shape:', tf_pv.shape)
tf_es.head()
# =============================================================================
# STEP 2.4 — TF activity inference (decoupler ULM)
# =============================================================================
tf_es, tf_pv = dc.mt.ulm(
expr_tf.T, # samples x genes
net=collectri,
)
print('TF activity matrix shape:', tf_es.shape)
print('TF p-value matrix shape:', tf_pv.shape)
tf_es.head()
In [ ]:
Copied!
# =============================================================================
# STEP 2.5 — Kinase activity inference (decoupler ULM)
# =============================================================================
# Deduplicate kinase network/dataframe (required by decoupler)
kin_net = kin_net.drop_duplicates(subset=['source', 'target'])
kin_es, kin_pv = dc.mt.ulm(
expr_pkn.T, # samples x genes
net=kin_net,
)
print('Kinase activity matrix shape:', kin_es.shape)
print('Kinase p-value matrix shape:', kin_pv.shape)
kin_es.head()
# =============================================================================
# STEP 2.5 — Kinase activity inference (decoupler ULM)
# =============================================================================
# Deduplicate kinase network/dataframe (required by decoupler)
kin_net = kin_net.drop_duplicates(subset=['source', 'target'])
kin_es, kin_pv = dc.mt.ulm(
expr_pkn.T, # samples x genes
net=kin_net,
)
print('Kinase activity matrix shape:', kin_es.shape)
print('Kinase p-value matrix shape:', kin_pv.shape)
kin_es.head()
In [ ]:
Copied!
# =============================================================================
# STEP 2.6a — Store TF activities as vertex–layer attributes
# =============================================================================
tfs_in_graph = [
tf for tf in tf_es.columns if tf in G.entity_to_idx and G.entity_types.get(tf) == 'vertex'
]
print(f'TFs inferred: {tf_es.shape[1]}')
print(f'TFs present in AnnNet: {len(tfs_in_graph)}')
for patient in tf_es.index:
aa = (patient,)
row = tf_es.loc[patient, tfs_in_graph]
for tf, val in row.items():
if not G.has_presence(tf, aa):
G.add_presence(tf, aa)
G.set_vertex_layer_attrs(tf, aa, tf_activity=float(val))
print('TF activity written to AnnNet.')
# =============================================================================
# STEP 2.6a — Store TF activities as vertex–layer attributes
# =============================================================================
tfs_in_graph = [
tf for tf in tf_es.columns if tf in G.entity_to_idx and G.entity_types.get(tf) == 'vertex'
]
print(f'TFs inferred: {tf_es.shape[1]}')
print(f'TFs present in AnnNet: {len(tfs_in_graph)}')
for patient in tf_es.index:
aa = (patient,)
row = tf_es.loc[patient, tfs_in_graph]
for tf, val in row.items():
if not G.has_presence(tf, aa):
G.add_presence(tf, aa)
G.set_vertex_layer_attrs(tf, aa, tf_activity=float(val))
print('TF activity written to AnnNet.')
In [ ]:
Copied!
# =============================================================================
# STEP 2.6b — Store kinase activities as vertex–layer attributes
# =============================================================================
kinases_in_graph = [
k for k in kin_es.columns if k in G.entity_to_idx and G.entity_types.get(k) == 'vertex'
]
print(f'Kinases inferred: {kin_es.shape[1]}')
print(f'Kinases present in AnnNet: {len(kinases_in_graph)}')
for patient in kin_es.index:
aa = (patient,)
row = kin_es.loc[patient, kinases_in_graph]
for k, val in row.items():
if not G.has_presence(k, aa):
G.add_presence(k, aa)
G.set_vertex_layer_attrs(k, aa, kinase_activity=float(val))
print('Kinase activity written to AnnNet.')
# =============================================================================
# STEP 2.6b — Store kinase activities as vertex–layer attributes
# =============================================================================
kinases_in_graph = [
k for k in kin_es.columns if k in G.entity_to_idx and G.entity_types.get(k) == 'vertex'
]
print(f'Kinases inferred: {kin_es.shape[1]}')
print(f'Kinases present in AnnNet: {len(kinases_in_graph)}')
for patient in kin_es.index:
aa = (patient,)
row = kin_es.loc[patient, kinases_in_graph]
for k, val in row.items():
if not G.has_presence(k, aa):
G.add_presence(k, aa)
G.set_vertex_layer_attrs(k, aa, kinase_activity=float(val))
print('Kinase activity written to AnnNet.')
In [ ]:
Copied!
# =============================================================================
# STEP 3 — CORNETO
# =============================================================================
# --- 3.0 Differential profiles (tumor vs matched normal) -------------------
def compute_diff(input_df):
cols = list(input_df.columns)
normals = {s[:-2]: s for s in cols if s.endswith('.N')}
tumors = [s for s in cols if not s.endswith('.N')]
matched = {t: normals[t] for t in tumors if t in normals}
print(f' Matched tumor/normal pairs: {len(matched)}')
diffs = [input_df[t] / input_df[n] for t, n in matched.items()]
diff_df = pd.concat(diffs, axis=1)
diff_df.columns = list(matched.keys())
diff_df = diff_df.replace([np.inf, -np.inf], np.nan).replace(0, np.nan)
return np.log2(diff_df)
rna = luad.get_transcriptomics('bcm')
pp = luad.get_phosphoproteomics('umich')
print('Computing RNA differential profiles...')
rna_diff = compute_diff(rna.T)
rna_diff = rna_diff.groupby(level=0).mean()
print('Computing phospho differential profiles...')
pp_diff = compute_diff(pp.T)
pp_diff = pp_diff.groupby(level=[0, 1]).mean()
pp_diff.index = pp_diff.index.map('_'.join)
# --- 3.1 Kinase activity from phosphoproteomics ----------------------------
# op.interactions.post_translational doesn't exist in all versions — use direct URL
psiteplus = pd.read_csv('https://omnipathdb.org/enz_sub?genesymbols=1', sep='\t')
psiteplus_kin = psiteplus[psiteplus['modification'] == 'phosphorylation'][
['enzyme_genesymbol', 'substrate_genesymbol', 'residue_type', 'residue_offset']
].copy()
psiteplus_kin['target'] = (
psiteplus_kin['substrate_genesymbol']
+ '_'
+ psiteplus_kin['residue_type']
+ psiteplus_kin['residue_offset'].astype(str)
)
psiteplus_kin = (
psiteplus_kin[['enzyme_genesymbol', 'target']]
.rename(columns={'enzyme_genesymbol': 'source'})
.drop_duplicates()
.assign(weight=1)
)
pp_dc_input = pp_diff.fillna(0)
kin_es_new, _ = dc.mt.ulm(pp_dc_input.T, net=psiteplus_kin)
print(f'Kinase activity from phospho: {kin_es_new.shape}')
# =============================================================================
# STEP 3 — CORNETO
# =============================================================================
# --- 3.0 Differential profiles (tumor vs matched normal) -------------------
def compute_diff(input_df):
cols = list(input_df.columns)
normals = {s[:-2]: s for s in cols if s.endswith('.N')}
tumors = [s for s in cols if not s.endswith('.N')]
matched = {t: normals[t] for t in tumors if t in normals}
print(f' Matched tumor/normal pairs: {len(matched)}')
diffs = [input_df[t] / input_df[n] for t, n in matched.items()]
diff_df = pd.concat(diffs, axis=1)
diff_df.columns = list(matched.keys())
diff_df = diff_df.replace([np.inf, -np.inf], np.nan).replace(0, np.nan)
return np.log2(diff_df)
rna = luad.get_transcriptomics('bcm')
pp = luad.get_phosphoproteomics('umich')
print('Computing RNA differential profiles...')
rna_diff = compute_diff(rna.T)
rna_diff = rna_diff.groupby(level=0).mean()
print('Computing phospho differential profiles...')
pp_diff = compute_diff(pp.T)
pp_diff = pp_diff.groupby(level=[0, 1]).mean()
pp_diff.index = pp_diff.index.map('_'.join)
# --- 3.1 Kinase activity from phosphoproteomics ----------------------------
# op.interactions.post_translational doesn't exist in all versions — use direct URL
psiteplus = pd.read_csv('https://omnipathdb.org/enz_sub?genesymbols=1', sep='\t')
psiteplus_kin = psiteplus[psiteplus['modification'] == 'phosphorylation'][
['enzyme_genesymbol', 'substrate_genesymbol', 'residue_type', 'residue_offset']
].copy()
psiteplus_kin['target'] = (
psiteplus_kin['substrate_genesymbol']
+ '_'
+ psiteplus_kin['residue_type']
+ psiteplus_kin['residue_offset'].astype(str)
)
psiteplus_kin = (
psiteplus_kin[['enzyme_genesymbol', 'target']]
.rename(columns={'enzyme_genesymbol': 'source'})
.drop_duplicates()
.assign(weight=1)
)
pp_dc_input = pp_diff.fillna(0)
kin_es_new, _ = dc.mt.ulm(pp_dc_input.T, net=psiteplus_kin)
print(f'Kinase activity from phospho: {kin_es_new.shape}')
In [ ]:
Copied!
# --- 3.2 TF activity from RNA differential ----------------------------------
rna_dc_input = rna_diff.fillna(0)
tf_es_new, _ = dc.mt.ulm(rna_dc_input.T, net=collectri)
print(f'TF activity from RNA diff: {tf_es_new.shape}')
# --- 3.3 Patient selection: top mutational burden (top 10%) -----------------
soms = luad.get_somatic_mutation('harmonized')
int_muts = ['Missense_Mutation', 'Nonsense_Mutation', 'Frame_Shift_Del', 'Frame_Shift_Ins']
mut_mat = (
soms[soms.Mutation.isin(int_muts)][['Gene', 'Tumor_Sample_Barcode']]
.drop_duplicates()
.assign(mutated=1)
.pivot(index='Gene', columns='Tumor_Sample_Barcode', values='mutated')
.fillna(0)
)
mut_mat.columns = mut_mat.columns.str.replace('_T', '')
common = set(tf_es_new.index) & set(kin_es_new.index) & set(mut_mat.columns)
mut_sum = mut_mat[list(common)].sum(axis=0).sort_values(ascending=False)
top_patients = mut_sum[mut_sum >= mut_sum.quantile(0.9)].index.tolist()
print(f'Top mutational burden patients: {len(top_patients)}')
# --- 3.2 TF activity from RNA differential ----------------------------------
rna_dc_input = rna_diff.fillna(0)
tf_es_new, _ = dc.mt.ulm(rna_dc_input.T, net=collectri)
print(f'TF activity from RNA diff: {tf_es_new.shape}')
# --- 3.3 Patient selection: top mutational burden (top 10%) -----------------
soms = luad.get_somatic_mutation('harmonized')
int_muts = ['Missense_Mutation', 'Nonsense_Mutation', 'Frame_Shift_Del', 'Frame_Shift_Ins']
mut_mat = (
soms[soms.Mutation.isin(int_muts)][['Gene', 'Tumor_Sample_Barcode']]
.drop_duplicates()
.assign(mutated=1)
.pivot(index='Gene', columns='Tumor_Sample_Barcode', values='mutated')
.fillna(0)
)
mut_mat.columns = mut_mat.columns.str.replace('_T', '')
common = set(tf_es_new.index) & set(kin_es_new.index) & set(mut_mat.columns)
mut_sum = mut_mat[list(common)].sum(axis=0).sort_values(ascending=False)
top_patients = mut_sum[mut_sum >= mut_sum.quantile(0.9)].index.tolist()
print(f'Top mutational burden patients: {len(top_patients)}')
In [ ]:
Copied!
# --- 3.4 PKN pre-filtered to kinases -> (kinases∪TFs) ------------
kinase_set = set(kin_es_new.columns)
tf_set = set(tf_es_new.columns)
pkn_raw = G.edge_attributes.to_pandas().merge(
pd.DataFrame(
[
(src, tgt, eid)
for eid, (src, tgt, _) in G.edge_definitions.items()
if src is not None and tgt is not None
],
columns=['source', 'target', 'edge_id'],
),
on='edge_id',
)
pkn_filt = pkn_raw[
pkn_raw['source'].isin(kinase_set) & pkn_raw['target'].isin(kinase_set | tf_set)
].copy()
pkn_filt['sign'] = np.where(
pkn_filt['consensus_stimulation'], 1, np.where(pkn_filt['consensus_inhibition'], -1, 0)
)
pkn_filt = pkn_filt[pkn_filt['sign'] != 0]
C_filt = cn.Graph.from_tuples(pkn_filt[['source', 'sign', 'target']].apply(tuple, axis=1).values)
print(f'Pre-filtered PKN: {C_filt.num_vertices} nodes, {C_filt.num_edges} edges')
# --- 3.4 PKN pre-filtered to kinases -> (kinases∪TFs) ------------
kinase_set = set(kin_es_new.columns)
tf_set = set(tf_es_new.columns)
pkn_raw = G.edge_attributes.to_pandas().merge(
pd.DataFrame(
[
(src, tgt, eid)
for eid, (src, tgt, _) in G.edge_definitions.items()
if src is not None and tgt is not None
],
columns=['source', 'target', 'edge_id'],
),
on='edge_id',
)
pkn_filt = pkn_raw[
pkn_raw['source'].isin(kinase_set) & pkn_raw['target'].isin(kinase_set | tf_set)
].copy()
pkn_filt['sign'] = np.where(
pkn_filt['consensus_stimulation'], 1, np.where(pkn_filt['consensus_inhibition'], -1, 0)
)
pkn_filt = pkn_filt[pkn_filt['sign'] != 0]
C_filt = cn.Graph.from_tuples(pkn_filt[['source', 'sign', 'target']].apply(tuple, axis=1).values)
print(f'Pre-filtered PKN: {C_filt.num_vertices} nodes, {C_filt.num_edges} edges')
In [ ]:
Copied!
# --- 3.5 per-patient CORNETO loop, top 10% cohort ---
KIN_TH = 1.5
TF_TH = 1.5
TOP_K = 10
TOP_TF_POS = 8
TOP_TF_NEG = 7
patient_data = {}
for p in top_patients:
if p not in kin_es_new.index or p not in tf_es_new.index:
continue
kin_row = kin_es_new.loc[p]
tf_row = tf_es_new.loc[p]
kin_top = kin_row[kin_row.abs() >= KIN_TH].abs().nlargest(TOP_K).index.tolist()
tf_pos = tf_row[tf_row >= TF_TH].nlargest(TOP_TF_POS).index.tolist()
tf_neg = tf_row[tf_row <= -TF_TH].nsmallest(TOP_TF_NEG).index.tolist()
inputs = {k: float(np.sign(kin_row[k])) for k in kin_top}
outputs = {tf: float(tf_row[tf]) for tf in tf_pos + tf_neg}
if inputs and outputs:
patient_data[p] = {'input': inputs, 'output': outputs}
print(f'Patients with valid data: {len(patient_data)}')
results = {}
t_start = time.time()
for idx, (p, data) in enumerate(patient_data.items()):
t0 = time.time()
P, G_p, summary = multi_carnival(C_filt, {p: data}, lambd=0)
sol = P.solve(solver='scip')
elapsed = time.time() - t0
print(
f'[{idx + 1}/{len(patient_data)}] {p}: status={sol.status} | '
f'obj={sol.value:.1f} | {elapsed:.1f}s'
)
results[p] = {'P': P, 'G': G_p, 'sol': sol}
print(f'\nDone in {time.time() - t_start:.1f}s')
# --- 3.5 per-patient CORNETO loop, top 10% cohort ---
KIN_TH = 1.5
TF_TH = 1.5
TOP_K = 10
TOP_TF_POS = 8
TOP_TF_NEG = 7
patient_data = {}
for p in top_patients:
if p not in kin_es_new.index or p not in tf_es_new.index:
continue
kin_row = kin_es_new.loc[p]
tf_row = tf_es_new.loc[p]
kin_top = kin_row[kin_row.abs() >= KIN_TH].abs().nlargest(TOP_K).index.tolist()
tf_pos = tf_row[tf_row >= TF_TH].nlargest(TOP_TF_POS).index.tolist()
tf_neg = tf_row[tf_row <= -TF_TH].nsmallest(TOP_TF_NEG).index.tolist()
inputs = {k: float(np.sign(kin_row[k])) for k in kin_top}
outputs = {tf: float(tf_row[tf]) for tf in tf_pos + tf_neg}
if inputs and outputs:
patient_data[p] = {'input': inputs, 'output': outputs}
print(f'Patients with valid data: {len(patient_data)}')
results = {}
t_start = time.time()
for idx, (p, data) in enumerate(patient_data.items()):
t0 = time.time()
P, G_p, summary = multi_carnival(C_filt, {p: data}, lambd=0)
sol = P.solve(solver='scip')
elapsed = time.time() - t0
print(
f'[{idx + 1}/{len(patient_data)}] {p}: status={sol.status} | '
f'obj={sol.value:.1f} | {elapsed:.1f}s'
)
results[p] = {'P': P, 'G': G_p, 'sol': sol}
print(f'\nDone in {time.time() - t_start:.1f}s')
In [ ]:
Copied!
# =============================================================================
# STEP 3.6 — Extract CORNETO results into AnnNet vertex-layer attributes
# =============================================================================
for p, res in results.items():
aa = (p,)
P = res['P']
G_p = res['G']
vvals = P.expr.vertex_value.value # (n_nodes x 1)
nodes = list(G_p.V)
n_written = 0
for i, node in enumerate(nodes):
if node.startswith('_'):
continue
val = float(vvals[i, 0])
if abs(val) < 0.5:
continue
if not G.has_vertex(node):
continue
if not G.has_presence(node, aa):
G.add_presence(node, aa)
G.set_vertex_layer_attrs(node, aa, corneto_signal=val)
n_written += 1
print(f' {p}: {n_written} nodes with corneto_signal')
print('Extraction done.')
# =============================================================================
# STEP 3.6 — Extract CORNETO results into AnnNet vertex-layer attributes
# =============================================================================
for p, res in results.items():
aa = (p,)
P = res['P']
G_p = res['G']
vvals = P.expr.vertex_value.value # (n_nodes x 1)
nodes = list(G_p.V)
n_written = 0
for i, node in enumerate(nodes):
if node.startswith('_'):
continue
val = float(vvals[i, 0])
if abs(val) < 0.5:
continue
if not G.has_vertex(node):
continue
if not G.has_presence(node, aa):
G.add_presence(node, aa)
G.set_vertex_layer_attrs(node, aa, corneto_signal=val)
n_written += 1
print(f' {p}: {n_written} nodes with corneto_signal')
print('Extraction done.')
In [ ]:
Copied!
# =============================================================================
# STEP 3.7 — Extract CORNETO results into AnnNet as intra-layer edges + node attrs
# Each patient layer is their active inferred causal subgraph
# =============================================================================
for p, res in results.items():
aa = (p,)
P = res['P']
G_p = res['G']
vvals = P.expr.vertex_value.value # (n_nodes, 1) or (n_nodes,)
evals = P.expr.edge_value.value # (n_edges, 1) or (n_edges,)
nodes = list(G_p.V)
edges = list(G_p.E) # list of (frozenset_src, frozenset_tgt)
# flatten shape
if vvals.ndim == 2:
vvals = vvals[:, 0]
if evals.ndim == 2:
evals = evals[:, 0]
# 1 — node signals as vertex-layer attrs (keep for cross-layer queries)
active_nodes = set()
for i, node in enumerate(nodes):
if node.startswith('_'):
continue
val = float(vvals[i])
if abs(val) < 0.5:
continue
if not G.has_vertex(node):
continue
if not G.has_presence(node, aa):
G.add_presence(node, aa)
G.set_vertex_layer_attrs(node, aa, corneto_signal=val)
active_nodes.add(node)
# 2 — active edges as intra-layer edges (the actual Kivelä representation)
n_edges_added = 0
for j, (src_fs, tgt_fs) in enumerate(edges):
val = float(evals[j])
if abs(val) < 0.5:
continue
# flow graph frozensets — extract single node
src_list = [n for n in src_fs if not str(n).startswith('_')]
tgt_list = [n for n in tgt_fs if not str(n).startswith('_')]
if not src_list or not tgt_list:
continue
src, tgt = src_list[0], tgt_list[0]
if not G.has_vertex(src) or not G.has_vertex(tgt):
continue
# ensure presence before adding intra-layer edge
if not G.has_presence(src, aa):
G.add_presence(src, aa)
G.set_vertex_layer_attrs(
src, aa, corneto_signal=float(vvals[nodes.index(src)]) if src in nodes else 0.0
)
if not G.has_presence(tgt, aa):
G.add_presence(tgt, aa)
G.set_vertex_layer_attrs(
tgt, aa, corneto_signal=float(vvals[nodes.index(tgt)]) if tgt in nodes else 0.0
)
G.add_edge((src, aa), (tgt, aa), weight=val)
n_edges_added += 1
print(f' {p}: {len(active_nodes)} nodes, {n_edges_added} intra-layer edges')
print('Extraction done.')
# =============================================================================
# STEP 3.7 — Extract CORNETO results into AnnNet as intra-layer edges + node attrs
# Each patient layer is their active inferred causal subgraph
# =============================================================================
for p, res in results.items():
aa = (p,)
P = res['P']
G_p = res['G']
vvals = P.expr.vertex_value.value # (n_nodes, 1) or (n_nodes,)
evals = P.expr.edge_value.value # (n_edges, 1) or (n_edges,)
nodes = list(G_p.V)
edges = list(G_p.E) # list of (frozenset_src, frozenset_tgt)
# flatten shape
if vvals.ndim == 2:
vvals = vvals[:, 0]
if evals.ndim == 2:
evals = evals[:, 0]
# 1 — node signals as vertex-layer attrs (keep for cross-layer queries)
active_nodes = set()
for i, node in enumerate(nodes):
if node.startswith('_'):
continue
val = float(vvals[i])
if abs(val) < 0.5:
continue
if not G.has_vertex(node):
continue
if not G.has_presence(node, aa):
G.add_presence(node, aa)
G.set_vertex_layer_attrs(node, aa, corneto_signal=val)
active_nodes.add(node)
# 2 — active edges as intra-layer edges (the actual Kivelä representation)
n_edges_added = 0
for j, (src_fs, tgt_fs) in enumerate(edges):
val = float(evals[j])
if abs(val) < 0.5:
continue
# flow graph frozensets — extract single node
src_list = [n for n in src_fs if not str(n).startswith('_')]
tgt_list = [n for n in tgt_fs if not str(n).startswith('_')]
if not src_list or not tgt_list:
continue
src, tgt = src_list[0], tgt_list[0]
if not G.has_vertex(src) or not G.has_vertex(tgt):
continue
# ensure presence before adding intra-layer edge
if not G.has_presence(src, aa):
G.add_presence(src, aa)
G.set_vertex_layer_attrs(
src, aa, corneto_signal=float(vvals[nodes.index(src)]) if src in nodes else 0.0
)
if not G.has_presence(tgt, aa):
G.add_presence(tgt, aa)
G.set_vertex_layer_attrs(
tgt, aa, corneto_signal=float(vvals[nodes.index(tgt)]) if tgt in nodes else 0.0
)
G.add_edge((src, aa), (tgt, aa), weight=val)
n_edges_added += 1
print(f' {p}: {len(active_nodes)} nodes, {n_edges_added} intra-layer edges')
print('Extraction done.')
In [ ]:
Copied!
# =============================================================================
# STEP 3.8 — Consensus layer: nodes active in >= 50% of patients
# =============================================================================
from collections import Counter
min_freq = 0.5
n_pts = len(results)
signal_counter = Counter()
for p in results:
aa = (p,)
for node in G.vertices():
if abs(G.get_vertex_layer_attrs(node, aa).get('corneto_signal', 0.0)) >= 0.5:
signal_counter[node] += 1
consensus_nodes = [n for n, cnt in signal_counter.items() if cnt >= min_freq * n_pts]
if 'consensus' not in G.elem_layers.get('patient', []):
G.elem_layers['patient'].append('consensus')
consensus_aa = ('consensus',)
for node in consensus_nodes:
if not G.has_presence(node, consensus_aa):
G.add_presence(node, consensus_aa)
sigs = [
G.get_vertex_layer_attrs(node, (p,)).get('corneto_signal', 0.0)
for p in results
if abs(G.get_vertex_layer_attrs(node, (p,)).get('corneto_signal', 0.0)) >= 0.5
]
G.set_vertex_layer_attrs(
node,
consensus_aa,
corneto_signal=float(np.mean(sigs)),
patient_frequency=signal_counter[node] / n_pts,
)
print(
f'Consensus layer: {len(consensus_nodes)} nodes active in >={min_freq * 100:.0f}% of patients'
)
print('Sample consensus nodes:', consensus_nodes[:10])
# =============================================================================
# STEP 3.8 — Consensus layer: nodes active in >= 50% of patients
# =============================================================================
from collections import Counter
min_freq = 0.5
n_pts = len(results)
signal_counter = Counter()
for p in results:
aa = (p,)
for node in G.vertices():
if abs(G.get_vertex_layer_attrs(node, aa).get('corneto_signal', 0.0)) >= 0.5:
signal_counter[node] += 1
consensus_nodes = [n for n, cnt in signal_counter.items() if cnt >= min_freq * n_pts]
if 'consensus' not in G.elem_layers.get('patient', []):
G.elem_layers['patient'].append('consensus')
consensus_aa = ('consensus',)
for node in consensus_nodes:
if not G.has_presence(node, consensus_aa):
G.add_presence(node, consensus_aa)
sigs = [
G.get_vertex_layer_attrs(node, (p,)).get('corneto_signal', 0.0)
for p in results
if abs(G.get_vertex_layer_attrs(node, (p,)).get('corneto_signal', 0.0)) >= 0.5
]
G.set_vertex_layer_attrs(
node,
consensus_aa,
corneto_signal=float(np.mean(sigs)),
patient_frequency=signal_counter[node] / n_pts,
)
print(
f'Consensus layer: {len(consensus_nodes)} nodes active in >={min_freq * 100:.0f}% of patients'
)
print('Sample consensus nodes:', consensus_nodes[:10])
In [ ]:
Copied!
# =============================================================================
# STEP 4 — Cross-layer query: AnnNet
# Consensus-active stimulatory edges with both endpoints signaling
# ==============================================================================
consensus_set = set(consensus_nodes)
rows = []
edge_attr = G.edge_attributes.to_pandas().set_index('edge_id')
for eid, (src, tgt, _) in G.edge_definitions.items():
if src not in consensus_set or tgt not in consensus_set:
continue
if eid not in edge_attr.index:
continue
row = edge_attr.loc[eid]
src_attr = G.get_vertex_layer_attrs(src, consensus_aa)
tgt_attr = G.get_vertex_layer_attrs(tgt, consensus_aa)
rows.append(
{
'source': src,
'target': tgt,
'edge_sign': 1 if row.get('consensus_stimulation', False) else -1,
'n_sources': int(
row.get('n_sources', 0) if not pd.isna(row.get('n_sources', 0)) else 0
),
'src_signal': src_attr.get('corneto_signal', 0.0),
'tgt_signal': tgt_attr.get('corneto_signal', 0.0),
'src_freq': src_attr.get('patient_frequency', 0.0),
'tgt_freq': tgt_attr.get('patient_frequency', 0.0),
}
)
query_result = (
pl.DataFrame(rows)
.filter((pl.col('n_sources') >= 3) & (pl.col('src_freq') >= 0.5) & (pl.col('tgt_freq') >= 0.5))
.sort('n_sources', descending=True)
)
print(f'High-confidence consensus edges: {len(query_result)}')
print(query_result.head(20))
# =============================================================================
# STEP 4 — Cross-layer query: AnnNet
# Consensus-active stimulatory edges with both endpoints signaling
# ==============================================================================
consensus_set = set(consensus_nodes)
rows = []
edge_attr = G.edge_attributes.to_pandas().set_index('edge_id')
for eid, (src, tgt, _) in G.edge_definitions.items():
if src not in consensus_set or tgt not in consensus_set:
continue
if eid not in edge_attr.index:
continue
row = edge_attr.loc[eid]
src_attr = G.get_vertex_layer_attrs(src, consensus_aa)
tgt_attr = G.get_vertex_layer_attrs(tgt, consensus_aa)
rows.append(
{
'source': src,
'target': tgt,
'edge_sign': 1 if row.get('consensus_stimulation', False) else -1,
'n_sources': int(
row.get('n_sources', 0) if not pd.isna(row.get('n_sources', 0)) else 0
),
'src_signal': src_attr.get('corneto_signal', 0.0),
'tgt_signal': tgt_attr.get('corneto_signal', 0.0),
'src_freq': src_attr.get('patient_frequency', 0.0),
'tgt_freq': tgt_attr.get('patient_frequency', 0.0),
}
)
query_result = (
pl.DataFrame(rows)
.filter((pl.col('n_sources') >= 3) & (pl.col('src_freq') >= 0.5) & (pl.col('tgt_freq') >= 0.5))
.sort('n_sources', descending=True)
)
print(f'High-confidence consensus edges: {len(query_result)}')
print(query_result.head(20))
In [ ]:
Copied!
# =============================================================================
# STEP 5 — PyG export using AnnNet's native adapter
# =============================================================================
from annnet.adapters.pyg_adapter import to_pyg
pyg_data = to_pyg(
G,
node_features={
'default': ['corneto_signal', 'patient_frequency', 'tf_activity', 'kinase_activity']
},
edge_features={
('default', 'edge', 'default'): [
'consensus_stimulation',
'consensus_inhibition',
'n_sources',
]
},
slice_id=None,
hyperedge_mode='skip',
device='cpu',
)
print(f'Node types : {pyg_data.node_types}')
print(f'Edge types : {pyg_data.edge_types}')
# =============================================================================
# STEP 5 — PyG export using AnnNet's native adapter
# =============================================================================
from annnet.adapters.pyg_adapter import to_pyg
pyg_data = to_pyg(
G,
node_features={
'default': ['corneto_signal', 'patient_frequency', 'tf_activity', 'kinase_activity']
},
edge_features={
('default', 'edge', 'default'): [
'consensus_stimulation',
'consensus_inhibition',
'n_sources',
]
},
slice_id=None,
hyperedge_mode='skip',
device='cpu',
)
print(f'Node types : {pyg_data.node_types}')
print(f'Edge types : {pyg_data.edge_types}')
In [ ]:
Copied!
# =============================================================================
# STEP 7 — Save AnnNet (containing all information from workflow) as .annnet
# =============================================================================
G.write('OP-dR-CN.annnet', overwrite=True)
# =============================================================================
# STEP 7 — Save AnnNet (containing all information from workflow) as .annnet
# =============================================================================
G.write('OP-dR-CN.annnet', overwrite=True)
In [ ]:
Copied!