r"""
Extract cell features from anndata
"""
from typing import List, Optional
import numpy as np
import scanpy as sc
import scipy
import scipy.sparse
import torch
from anndata import AnnData
from torch_geometric.data import Data
from .batch import dual_pca
from .preprocess import scanpy_workflow
[docs]def Transfer_pyg_Data(adata: AnnData, feature: Optional[str] = "PCA") -> Data:
r"""
Transfer an adata with spatial info into PyG dataset (only for test)
Parameters
----------
adata
Anndata object
feature
use which data to build graph
- PCA (default)
Note:
----------
Only support 'Spatial_Net' which store in `adata.uns` yet
"""
adata = adata.copy()
G_df = adata.uns["Spatial_Net"].copy()
cells = np.array(adata.obs_names)
cells_id_tran = dict(zip(cells, range(cells.shape[0])))
G_df["Cell1"] = G_df["Cell1"].map(cells_id_tran)
G_df["Cell2"] = G_df["Cell2"].map(cells_id_tran)
# build Adjacent Matrix
G = scipy.sparse.coo_matrix(
(np.ones(G_df.shape[0]), (G_df["Cell1"], G_df["Cell2"])), shape=(adata.n_obs, adata.n_obs)
)
G = G + scipy.sparse.eye(G.shape[0])
edgeList = np.nonzero(G)
# select feature
assert feature.lower() in ["hvg", "pca", "raw"]
if feature.lower() == "raw":
if type(adata.X) == np.ndarray:
data = Data(
edge_index=torch.LongTensor(np.array([edgeList[0], edgeList[1]])),
x=torch.FloatTensor(adata.X),
) # .todense()
else:
data = Data(
edge_index=torch.LongTensor(np.array([edgeList[0], edgeList[1]])),
x=torch.FloatTensor(adata.X.todense()),
) # .todense()
return data
elif feature.lower() in ["pca", "hvg"]:
sc.pp.normalize_total(adata, target_sum=1e4)
sc.pp.log1p(adata)
sc.pp.highly_variable_genes(adata, min_mean=0.0125, max_mean=3, min_disp=0.5)
adata = adata[:, adata.var.highly_variable]
if feature.lower() == "hvg":
data = Data(
edge_index=torch.LongTensor(np.array([edgeList[0], edgeList[1]])),
x=torch.FloatTensor(adata.X.todense()),
)
sc.pp.scale(adata, max_value=10)
print("Use PCA to format graph")
sc.tl.pca(adata, svd_solver="arpack")
data = Data(
edge_index=torch.LongTensor(np.array([edgeList[0], edgeList[1]])),
x=torch.FloatTensor(adata.obsm["X_pca"].copy()),
)
return data, adata.varm["PCs"]
[docs]def load_anndata(
adata: AnnData,
feature: Optional[str] = "PCA",
noise_level: Optional[float] = 0,
noise_type: Optional[str] = "uniform",
edge_homo_ratio: Optional[float] = 0.9,
return_PCs: Optional[bool] = False,
) -> List:
r"""
Create 2 graphs from single anndata (only for test)
Parameters
----------
adata
Anndata object
feature
feature to use to build graph and align, now support
- `PCA`
- `Harmony` (default)
noise_level
node noise
noise_type
type of noise, support 'uniform' and 'normal'
edge_homo_ratio
ratio of edge in graph2
return_PCs
if return adata.varm['PCs'] if use feature 'PCA' (just for benchmark)
Warning
----------
This function is only for test. It generates `two` graphs
from single anndata by data augmentation
"""
if feature == "PCA":
dataset, PCs = Transfer_pyg_Data(adata, feature=feature)
else:
dataset = Transfer_pyg_Data(adata, feature=feature)
edge1 = dataset.edge_index
feature1 = dataset.x
edge2 = edge1.clone()
ledge = edge2.size(1) # get edge numbers
edge2 = edge2[:, torch.randperm(ledge)[: int(ledge * edge_homo_ratio)]]
perm = torch.randperm(feature1.size(0))
perm_back = torch.tensor(list(range(feature1.size(0))))
perm_mapping = torch.stack([perm_back, perm])
edge2 = perm[edge2.view(-1)].view(2, -1) # reset edge order
edge2 = edge2[:, torch.argsort(edge2[0])]
feature2 = torch.zeros(feature1.size())
feature2[perm] = feature1.clone()
if noise_type == "uniform":
feature2 = feature2 + 2 * (torch.rand(feature2.size()) - 0.5) * noise_level
elif noise_type == "normal":
feature2 = feature2 + torch.randn(feature2.size()) * noise_level
if feature == "PCA" and return_PCs:
return edge1, feature1, edge2, feature2, perm_mapping, PCs
return edge1, feature1, edge2, feature2, perm_mapping
[docs]def load_anndatas(
adatas: List[AnnData],
feature: Optional[str] = "DPCA",
dim: Optional[int] = 50,
self_loop: Optional[bool] = False,
join: Optional[str] = "inner",
backend: Optional[str] = "sklearn",
singular: Optional[bool] = True,
check_order: Optional[bool] = True,
n_top_genes: Optional[int] = 2500,
) -> List[Data]:
r"""
Transfer adatas with spatial info into PyG datasets
Parameters
----------
adatas
List of Anndata objects
feature
use which data to build graph
- `PCA` (default)
- `DPCA` (For batch effect correction)
- `Harmony` (For batch effect correction)
- `GLUE` (**NOTE**: only suitable for multi-omics integration)
dim
dimension of embedding, works for ['PCA', 'DPCA', 'Harmony', 'GLUE']
self_loop
whether to add self loop on graph
join
how to concatenate two adata
backend
backend to calculate DPCA
singular
whether to multiple singular value in DPCA
check_order
whether to check the order of adata1 and adata2
n_top_genes
number of highly variable genes
Note:
----------
Only support 'Spatial_Net' which store in `adata.uns` yet
"""
assert len(adatas) == 2
assert feature.lower() in ["raw", "hvg", "pca", "dpca", "harmony", "glue", "scglue"]
if check_order and adatas[0].shape[0] < adatas[1].shape[0]:
raise ValueError("Please change the order of adata1 and adata2 or set `check_order=False`")
gpu_flag = True if torch.cuda.is_available() else False
adatas = [adata.copy() for adata in adatas] # May consume more memory
# Edge
edgeLists = []
for adata in adatas:
G_df = adata.uns["Spatial_Net"].copy()
cells = np.array(adata.obs_names)
cells_id_tran = dict(zip(cells, range(cells.shape[0])))
G_df["Cell1"] = G_df["Cell1"].map(cells_id_tran)
G_df["Cell2"] = G_df["Cell2"].map(cells_id_tran)
# build adjacent matrix
G = scipy.sparse.coo_matrix(
(np.ones(G_df.shape[0]), (G_df["Cell1"], G_df["Cell2"])),
shape=(adata.n_obs, adata.n_obs),
)
if self_loop:
G = G + scipy.sparse.eye(G.shape[0])
edgeList = np.nonzero(G)
edgeLists.append(edgeList)
# Feature
datas = []
print(f"Use {feature} feature to format graph")
if feature.lower() == "raw":
for i, adata in enumerate(adatas):
if type(adata.X) == np.ndarray:
data = Data(
edge_index=torch.LongTensor(np.array([edgeLists[i][0], edgeLists[i][1]])),
x=torch.FloatTensor(adata.X),
) # .todense()
else:
data = Data(
edge_index=torch.LongTensor(np.array([edgeLists[i][0], edgeLists[i][1]])),
x=torch.FloatTensor(adata.X.todense()),
)
datas.append(data)
elif feature.lower() in ["glue", "scglue"]:
for i, adata in enumerate(adatas):
assert "X_glue" in adata.obsm.keys()
data = Data(
edge_index=torch.LongTensor(np.array([edgeLists[i][0], edgeLists[i][1]])),
x=torch.FloatTensor(adata.obsm["X_glue"][:, :dim]),
)
datas.append(data)
elif feature.lower() in ["hvg", "pca", "harmony"]:
adata_all = adatas[0].concatenate(adatas[1], join=join) # join can not be 'outer'!
adata_all = scanpy_workflow(adata_all, n_top_genes=n_top_genes, n_comps=-1)
if feature.lower() == "hvg":
if adata_all.var.highly_variable is not None:
adata_all = adata_all[:, adata_all.var.highly_variable]
for i in len(adatas):
adata = adata_all[adata_all.obs["batch"] == str(i)]
data = Data(
edge_index=torch.LongTensor(np.array([edgeLists[i][0], edgeLists[i][1]])),
x=torch.FloatTensor(adata.X.todense()),
)
datas.append(data)
sc.tl.pca(adata_all, svd_solver="auto")
if feature.lower() == "pca":
for i in range(len(adatas)):
adata = adata_all[adata_all.obs["batch"] == str(i)]
data = Data(
edge_index=torch.LongTensor(np.array([edgeLists[i][0], edgeLists[i][1]])),
x=torch.FloatTensor(adata.obsm["X_pca"][:, :dim]),
)
datas.append(data)
elif feature.lower() == "harmony":
from harmony import harmonize
if gpu_flag:
print("Harmony is using GPU!")
Z = harmonize(
adata_all.obsm["X_pca"],
adata_all.obs,
random_state=0,
max_iter_harmony=30,
batch_key="batch",
use_gpu=gpu_flag,
)
adata_all.obsm["X_harmony"] = Z[:, :dim]
for i in range(len(adatas)):
adata = adata_all[adata_all.obs["batch"] == str(i)]
data = Data(
edge_index=torch.LongTensor(np.array([edgeLists[i][0], edgeLists[i][1]])),
x=torch.FloatTensor(adata.obsm["X_harmony"][:, :dim]),
)
datas.append(data)
elif feature.lower() == "dpca":
adata_all = adatas[0].concatenate(adatas[1], join=join)
sc.pp.highly_variable_genes(adata_all, n_top_genes=12000, flavor="seurat_v3")
adata_all = adata_all[:, adata_all.var.highly_variable]
sc.pp.normalize_total(adata_all)
sc.pp.log1p(adata_all)
adata_1 = adata_all[adata_all.obs["batch"] == "0"]
adata_2 = adata_all[adata_all.obs["batch"] == "1"]
sc.pp.scale(adata_1)
sc.pp.scale(adata_2)
# adata_1, adata_2 = Parallel(n_jobs=2)(delayed(sc.pp.scale)(adata)
# for adata in [adata_1, adata_2])
if gpu_flag:
print(
"Warning! Dual PCA is using GPU, which may lead to OUT OF GPU MEMORY in big dataset!"
)
Z_x, Z_y = dual_pca(
adata_1.X, adata_2.X, dim=dim, singular=singular, backend=backend, use_gpu=gpu_flag
)
data_x = Data(
edge_index=torch.LongTensor(np.array([edgeLists[0][0], edgeLists[0][1]])), x=Z_x
)
data_y = Data(
edge_index=torch.LongTensor(np.array([edgeLists[1][0], edgeLists[1][1]])), x=Z_y
)
datas = [data_x, data_y]
edges = [dataset.edge_index for dataset in datas]
features = [dataset.x for dataset in datas]
return edges, features