r"""
Useful functions
"""
import math
import time
from typing import List, Optional, Union
import faiss
import numpy as np
import torch
from anndata import AnnData
from joblib import Parallel, delayed
from sklearn.metrics.pairwise import euclidean_distances
from ..utils import get_free_gpu
from .graphmodel import LGCN, LGCN_mlp, ReconDNN, WDiscriminator
from .loaddata import load_anndatas
from .preprocess import Cal_Spatial_Net
from .train import train_GAN, train_reconstruct
[docs]def run_LGCN(features: List[np.ndarray], edges: List[torch.Tensor], LGCN_layer: int = 2):
"""
Run LGCN model
Parameters
----------
features
list of graph node features
edges
list of graph edges
LGCN_layer
LGCN layer number, we suggest set 2 for barcode based and 4 for fluorescence based
"""
if torch.cuda.is_available():
gpu_index = get_free_gpu()
print(f"Choose GPU:{gpu_index} as device")
device = torch.device(f"cuda:{gpu_index}")
else:
device = torch.device("cpu")
print("GPU is not available")
for i in range(len(features)):
features[i] = features[i].to(device)
for j in range(len(edges)):
edges[j] = edges[j].to(device)
LGCN_model = LGCN(input_size=features[0].size(1), K=LGCN_layer).to(device=device)
time1 = time.time()
embd0 = LGCN_model(features[0], edges[0])
embd1 = LGCN_model(features[1], edges[1])
run_time = time.time() - time1
print(f"LGCN time: {run_time}")
return embd0, embd1, run_time
[docs]def run_SLAT(
features: List[np.ndarray],
edges: List[torch.Tensor],
epochs: Optional[int] = 6,
LGCN_layer: Optional[int] = 1,
mlp_hidden: Optional[int] = 256,
hidden_size: Optional[int] = 2048,
alpha: Optional[float] = 0.01,
anchor_scale: Optional[float] = 0.8,
lr_mlp: Optional[float] = 0.0001,
lr_wd: Optional[float] = 0.0001,
lr_recon: Optional[float] = 0.01,
batch_d_per_iter: Optional[int] = 5,
batch_r_per_iter: Optional[int] = 10,
verbose: Optional[bool] = False,
) -> tuple:
r"""
Run SLAT model
Parameters
----------
features
list of graph node features
edges
list of graph edges
epochs
epoch number of SLAT (not exceed 10)
LGCN_layer
LGCN layer number, we suggest set 1 for barcode based and 4 for fluorescence based
mlp_hidden
MLP hidden layer size
hidden_size
size of LGCN output
alpha
scale of loss
anchor_scale
ratio of cells selected as pairs
lr_mlp
learning rate of MLP
lr_wd
learning rate of WGAN discriminator
lr_recon
learning rate of reconstruction
batch_d_per_iter
batch number for WGAN train per iter
batch_r_per_iter
batch number for reconstruct train per iter
verbose
if print log
Return
----------
embd0
cell embedding of dataset1
embd1
cell embedding of dataset2
time
run time of SLAT model
"""
feature_size = features[0].size(1)
feature_output_size = hidden_size
if torch.cuda.is_available():
gpu_index = get_free_gpu()
print(f"Choose GPU:{gpu_index} as device")
device = torch.device(f"cuda:{gpu_index}")
else:
device = torch.device("cpu")
print("GPU is not available")
for i in range(len(features)):
features[i] = features[i].to(device)
for j in range(len(edges)):
edges[j] = edges[j].to(device)
feature_size = features[0].size(1)
feature_output_size = hidden_size
LGCN_model = LGCN_mlp(feature_size, hidden_size, K=LGCN_layer, hidden_size=mlp_hidden).to(
device
)
optimizer_LGCN = torch.optim.Adam(LGCN_model.parameters(), lr=lr_mlp, weight_decay=5e-4)
wdiscriminator = WDiscriminator(feature_output_size).to(device)
optimizer_wd = torch.optim.Adam(wdiscriminator.parameters(), lr=lr_wd, weight_decay=5e-4)
recon_model0 = ReconDNN(feature_output_size, feature_size).to(device)
recon_model1 = ReconDNN(feature_output_size, feature_size).to(device)
optimizer_recon0 = torch.optim.Adam(recon_model0.parameters(), lr=lr_recon, weight_decay=5e-4)
optimizer_recon1 = torch.optim.Adam(recon_model1.parameters(), lr=lr_recon, weight_decay=5e-4)
print("Running")
time1 = time.time()
for i in range(1, epochs + 1):
if verbose:
print(f"Epochs: {i} -----------")
LGCN_model.train()
optimizer_LGCN.zero_grad()
embd0 = LGCN_model(features[0], edges[0])
embd1 = LGCN_model(features[1], edges[1])
loss = train_GAN(
wdiscriminator,
optimizer_wd,
[embd0, embd1],
batch_d_per_iter=batch_d_per_iter,
anchor_scale=anchor_scale,
)
loss_feature = train_reconstruct(
[recon_model0, recon_model1],
[optimizer_recon0, optimizer_recon1],
[embd0, embd1],
features,
batch_r_per_iter=batch_r_per_iter,
)
loss = (1 - alpha) * loss + alpha * loss_feature
loss.backward()
optimizer_LGCN.step()
LGCN_model.eval()
embd0 = LGCN_model(features[0], edges[0])
embd1 = LGCN_model(features[1], edges[1])
time2 = time.time()
print("Training model time: %.2f" % (time2 - time1))
return embd0, embd1, time2 - time1
[docs]def spatial_match(
embds: List[torch.Tensor],
reorder: Optional[bool] = True,
smooth: Optional[bool] = True,
smooth_range: Optional[int] = 20,
scale_coord: Optional[bool] = True,
adatas: Optional[List[AnnData]] = None,
return_euclid: Optional[bool] = False,
verbose: Optional[bool] = False,
get_null_distri: Optional[bool] = False,
) -> List[Union[np.ndarray, torch.Tensor]]:
r"""
Use embedding to match cells from different datasets based on cosine similarity
Parameters
----------
embds
list of embeddings
reorder
if reorder embedding by cell numbers
smooth
if smooth the mapping by Euclid distance
smooth_range
use how many candidates to do smooth
scale_coord
if scale the coordinate to [0,1]
adatas
list of adata object
verbose
if print log
get_null_distri
if get null distribution of cosine similarity
Note
----------
Automatically use larger dataset as source
Return
----------
Best matching, Top n matching and cosine similarity matrix of top n
Note
----------
Use faiss to accelerate, refer https://github.com/facebookresearch/faiss/issues/95
"""
if reorder and embds[0].shape[0] < embds[1].shape[0]:
embd0 = embds[1]
embd1 = embds[0]
adatas = adatas[::-1] if adatas is not None else None
else:
embd0 = embds[0]
embd1 = embds[1]
if get_null_distri:
embd0 = torch.tensor(embd0)
embd1 = torch.tensor(embd1)
sample1_index = torch.randint(0, embd0.shape[0], (1000,))
sample2_index = torch.randint(0, embd1.shape[0], (1000,))
cos = torch.nn.CosineSimilarity(dim=1)
null_distri = cos(embd0[sample1_index], embd1[sample2_index])
index = faiss.index_factory(embd1.shape[1], "Flat", faiss.METRIC_INNER_PRODUCT)
embd0_np = embd0.detach().cpu().numpy() if torch.is_tensor(embd0) else embd0
embd1_np = embd1.detach().cpu().numpy() if torch.is_tensor(embd1) else embd1
embd0_np = embd0_np.copy().astype("float32")
embd1_np = embd1_np.copy().astype("float32")
faiss.normalize_L2(embd0_np)
faiss.normalize_L2(embd1_np)
index.add(embd0_np)
similarity, order = index.search(embd1_np, smooth_range)
best = []
if smooth and adatas is not None:
if verbose:
print("Smoothing mapping, make sure object is in same direction")
if scale_coord:
# scale spatial coordinate of every adata to [0,1]
adata1_coord = adatas[0].obsm["spatial"].copy()
adata2_coord = adatas[1].obsm["spatial"].copy()
for i in range(2):
adata1_coord[:, i] = (adata1_coord[:, i] - np.min(adata1_coord[:, i])) / (
np.max(adata1_coord[:, i]) - np.min(adata1_coord[:, i])
)
adata2_coord[:, i] = (adata2_coord[:, i] - np.min(adata2_coord[:, i])) / (
np.max(adata2_coord[:, i]) - np.min(adata2_coord[:, i])
)
dis_list = []
for query in range(embd1_np.shape[0]):
ref_list = order[query, :smooth_range]
dis = euclidean_distances(
adata2_coord[query, :].reshape(1, -1), adata1_coord[ref_list, :]
)
dis_list.append(dis)
best.append(ref_list[np.argmin(dis)])
else:
best = order[:, 0]
if return_euclid and smooth and adatas is not None:
dis_array = np.squeeze(np.array(dis_list))
if get_null_distri:
return np.array(best), order, similarity, dis_array, null_distri
else:
return np.array(best), order, similarity, dis_array
else:
return np.array(best), order, similarity
[docs]def probabilistic_match(cos_cutoff: float = 0.6, euc_cutoff: int = 5, **kargs) -> List[List[int]]:
best, index, similarity, eucli_array, null_distri = spatial_match(
**kargs, return_euclid=True, get_null_distri=True
)
# filter the cosine similarity via p_value
# mask1 = similarity > cos_cutoff
null_distri = np.sort(null_distri)
p_val = 1 - np.searchsorted(null_distri, similarity) / null_distri.shape[0]
mask1 = p_val < 0.05
# filter the euclidean distance
sorted_indices = np.argpartition(eucli_array, euc_cutoff, axis=1)[:, :euc_cutoff]
mask2 = np.full(eucli_array.shape, False, dtype=bool)
mask2[np.arange(eucli_array.shape[0])[:, np.newaxis], sorted_indices] = True
mask_mat = np.logical_and(mask1, mask2)
filter_list = [row[mask].tolist() for row, mask in zip(index, mask_mat)]
matching = [[i, j] for i, j in zip(np.arange(index.shape[0]), filter_list)]
return matching
[docs]def run_SLAT_multi(
adatas: List[AnnData],
order: Optional[list] = None,
k_cutoff: Optional[int] = 10,
feature: Optional[str] = "DPCA",
cos_cutoff: Optional[float] = 0.85,
n_jobs: Optional[int] = -1,
top_K: Optional[int] = 50,
) -> List[np.ndarray]:
r"""
Run SLAT on multi-dataset for 3D re-construct
Parameters
-----------
adatas
list of adatas
order
biological order of the slides
k_cutoff
k nearest neighbor
feature
feature to use, one of ['DPCA', 'PCA', 'harmony']
cos_cutoff
cosine similarity cutoff of mapping results
n_jobs
cpu cores to use
top_K
top K smooth mapping results
Return
----------
matching_list
list of precise mapping results
index_list
list of top mapping index
"""
order = range(len(adatas)) if order is None else order
n_jobs = len(adatas) + 1 if n_jobs < 0 else n_jobs
# for adata in adatas:
# Cal_Spatial_Net(adata, k_cutoff=k_cutoff, model='KNN')
adatas = Parallel(n_jobs=n_jobs)(
delayed(Cal_Spatial_Net)(adata, k_cutoff=k_cutoff, model="KNN", return_data=True)
for adata in adatas
)
matching_list = []
def parall_SLAT(a1, a2, i):
print(f"Parallel mapping dataset:{i} --- dataset:{i+1}")
edges, features = load_anndatas([a1, a2], feature=feature, check_order=False)
embd0, embd1, _ = run_SLAT(features, edges)
best, index, distance = spatial_match(
[embd0, embd1], reorder=False, smooth_range=top_K, adatas=[a1, a2]
)
return best, index, distance
unfiltered_zip_res = Parallel(n_jobs=n_jobs)(
delayed(parall_SLAT)(a1, a2, i) for i, (a1, a2) in enumerate(zip(adatas, adatas[1:]))
)
matching_list = []
for best, index, distance in unfiltered_zip_res:
matching = np.array([range(index.shape[0]), best])
matching_filter = matching[:, distance[:, 0] > cos_cutoff]
matching_list.append(matching_filter)
# assert [x.shape[1] for x in matching_list] == [y.shape[0] for y in adatas[1:]] # check order
return matching_list, unfiltered_zip_res
[docs]def add_noise(adata, noise: Optional[str] = "nb", inverse_noise: Optional[float] = 5) -> AnnData:
r"""
Add poisson / negative binomial noise on raw counts also run scanpy pipeline to PCA step
Parameters
----------
adata
anndata object
noise
type of noise, one of 'poisson' or 'nb'
inverse_noise
if noise is 'nb', control the noise level
(smaller means larger variance)
"""
if "counts" not in adata.layers.keys():
adata.layers["counts"] = adata.X.copy()
mu = torch.tensor(adata.X.todense())
if noise.lower() == "poisson":
adata.X = torch.distributions.poisson.Poisson(mu).sample().numpy()
elif noise.lower() == "nb":
adata.X = (
torch.distributions.negative_binomial.NegativeBinomial(
inverse_noise, logits=(mu.log() - math.log(inverse_noise))
)
.sample()
.numpy()
)
else:
raise NotImplementedError("Can not add this type noise")
return adata.copy()