Source code for scSLAT.metrics

r"""
Align metrics
"""
from typing import List, Optional, Union

import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from anndata import AnnData
from scipy import sparse
from torch_geometric.utils import to_scipy_sparse_matrix

from .model.prematch import rotate_via_numpy


[docs]def hit_k( features: List[torch.Tensor], ground_truth: torch.Tensor, k: Optional[int] = 10 ) -> pd.DataFrame: r""" Calculate first and top k-th hit ratio of matching Parameters ---------- features features of embed ground_truth in shape of (2, n_node) k top k """ embd0 = features[0] embd1 = features[1] g_map = {} for i in range(ground_truth.size(1)): g_map[ground_truth[1, i].item()] = ground_truth[0, i].item() g_list = list(g_map.keys()) cossim = torch.zeros(embd1.size(0), embd0.size(0)) for i in range(embd1.size(0)): cossim[i] = F.cosine_similarity( embd0, embd1[i : i + 1].expand(embd0.size(0), embd1.size(1)), dim=-1 ).view(-1) ind = cossim.argsort(dim=1, descending=True)[:, :k] # adata2 = adata.copy() a1 = 0 ak = 0 df = pd.DataFrame(columns=["node", "target", "trans", "h1", f"h{k}"]) for i, node in enumerate(g_list): # print(i, node) node = int(node) df.loc[i] = [-1, -1, -1, None, None] df.iloc[i, 0] = node df.iloc[i, 1] = ind[node, 0].item() df.iloc[i, 2] = g_map[node] df.iloc[i, 3] = False df.iloc[i, 4] = False if ind[node, 0].item() == g_map[node]: a1 += 1 ak += 1 df.iloc[i, 3] = True df.iloc[i, 4] = True else: for j in range(1, ind.shape[1]): if ind[node, j].item() == g_map[node]: ak += 1 df.iloc[i, 4] = True break a1 /= len(g_list) ak /= len(g_list) print("H@1 %.2f%% H@%d %.2f%%" % (a1 * 100, k, ak * 100)) return df
# TODO: modify to Calculate from adata directly
[docs]def hit_celltype( source_df: pd.DataFrame, target_df: pd.DataFrame, matching: np.ndarray, meta: Optional[str] = "celltype", ) -> np.ndarray: r""" Statistics of cell type correct mapping ratio Parameters ---------- source_df source dataset meta dataframe target_df target dataset meta dataframe matching cell correspondence array meta column name in source and target specify the celltype info """ assert all(item in ["index", "x", "y", meta] for item in source_df.columns.values) assert all(item in ["index", "x", "y", meta] for item in matching.columns.values) graph_map = {} for i in range(matching.shape[1]): graph_map[str(matching[0, i])] = str(matching[1, i]) source_df["target"] = "unknown" source_df["target_celltype"] = "unknown" for index in source_df["index"]: index = str(index) target = graph_map[index] target_celltype = target_df[target_df["index"] == int(target)]["celltype"].astype(str).values # print(dataset_B[dataset_B['index']==int(index)]['target'] ) source_df.loc[source_df["index"] == int(index), "target"] = int(target) source_df.loc[source_df["index"] == int(index), "target_celltype"] = target_celltype result = (source_df["celltype"] == source_df["target_celltype"]).value_counts() print(result) return result
[docs]def global_score( adatas: List[AnnData], matching: Union[np.ndarray, List], biology_meta: Optional[str] = "", topology_meta: Optional[str] = "", ) -> float: r""" Calculate global score, which consider celltype and histology, in two aligned graph (higher means better) Parameters ---------- adatas list of anndata object matching matching result biology_meta colname of adata.obs which keeps the biology info such as celltype of cells topology_meta colname of adata.obs which keeps the topology info such as histology region of cells Return ---------- global score of mapping, higher means better """ assert len(adatas) == 2 for adata in adatas: assert biology_meta in adata.obs.columns or topology_meta in adata.obs.columns if biology_meta not in adata.obs.columns: adata.obs[biology_meta] = "Unknown" print(f"Warning! column {biology_meta} not in adata.obs ") if topology_meta not in adata.obs.columns: adata.obs[topology_meta] = "Unknown" print(f"Warning! column {topology_meta} not in adata.obs ") adata.obs["global_meta"] = ( adata.obs[biology_meta].astype(str) + "-" + adata.obs[topology_meta].astype(str) ) count = 0 if isinstance(matching, list): print("Using probabilistic matching") for i in range(len(matching)): # query dataset query_meta = adatas[1].obs.iloc[i].loc["global_meta"] ref_meta = adatas[0].obs.iloc[matching[i][1]].loc[:, "global_meta"].to_list() if len(ref_meta) == 0: continue # find the most frequent meta in ref_meta vote = max(ref_meta, key=ref_meta.count) count = count + 1 if query_meta == vote else count elif isinstance(matching, np.ndarray): for i in range(matching.shape[0]): # query dataset query_meta = adatas[1].obs.iloc[i].loc["global_meta"] ref_meta = adatas[0].obs.iloc[matching[i, 1]].loc["global_meta"] count = count + 1 if query_meta == ref_meta else count else: raise ValueError("matching should be list or np.ndarray") score = count / adatas[1].shape[0] del adatas[0].obs["global_meta"] return score
[docs]def edge_score( edges: List[torch.Tensor], matching: torch.Tensor, score: Optional[List[float]] = [1, -1], ) -> float: r""" Calculate edge score in two aligned graph (higher means better) Parameters ---------- edges list of edge of every dataset matching matching result score score of [match_edge, mismatch_edge] Reference ---------- Joel Douglas et al. "Metrics for Evaluating Network Alignment" """ a0 = to_scipy_sparse_matrix(edges[0]).tocsc() a1 = to_scipy_sparse_matrix(edges[1]).tocsc() a0_reindex = a0[matching[1, :], :][:, matching[1, :]] res = a0_reindex + a1 - 2 * sparse.eye(a0_reindex.shape[0]) res.data[np.where(res.data == 2)] = score[0] # matched edge res.data[np.where(res.data == 1)] = score[1] # mismatch edge score = res.sum() / res.shape[0] return float(score)
[docs]def euclidean_dis( adata1: AnnData, adata2: AnnData, matching: np.ndarray, spatial_key: Optional[str] = "spatial" ) -> float: r""" Calculate euclidean distance between two datasets with ground truth (lower means better) Parameters ---------- adata1 adata1 with spatial adata2 adata2 with spatial matching matching result spatial_key key of spatial data in adata.obsm """ # reindex adata1 and adata2 by matching then calculate the pairwise euclidean distance for adata in [adata1, adata2]: coord = adata.obsm[spatial_key] if abs(coord.ptp()) > 1 or abs(coord.max()) > 1: adata.obsm["scale_spatial"] = (coord - coord.min(0)) / coord.ptp(0) else: adata.obsm["scale_spatial"] = coord coord1 = adata1.obsm["scale_spatial"][matching[1, :]] coord2 = adata2.obsm["scale_spatial"] distance = np.sqrt((coord1[:, 0] - coord2[:, 0]) ** 2 + (coord1[:, 1] - coord2[:, 1]) ** 2) return float(distance.sum() / distance.shape[0])
def __interval_statistics(data, intervals: List[str]) -> None: r""" Print the distribution of the value in intervals Parameters: ----------- data array or list intervals number of intervals """ if len(data) == 0: return for num in data: for interval in intervals: lr = tuple(interval.split("~")) left, right = float(lr[0]), float(lr[1]) if left <= num <= right: intervals[interval] += 1 for key, value in intervals.items(): print("%10s" % key, end="") print("%10s" % value, end="") print("%16s" % f"{value * 1.0 / len(data):.3%}")
[docs]def region_statistics( input, step: Optional[float] = 0.05, start: Optional[float] = 0.5, number_of_interval: Optional[int] = 10, ) -> None: r""" Print the region statistic results Parameters ----------- input list, 1D np.array or 1D torch.Tensor step stride size start start point number_of_interval number of interval """ intervals = { f"{step * x + start:.3f}~{step * (x + 1) + start:.3f}": 0 for x in range(number_of_interval) } __interval_statistics(input, intervals)
[docs]def rotation_angle( X, Y, pi, ground_truth: float = 0, output_angle: bool = True, output_matrix: bool = False ) -> tuple: r""" Finds and applies optimal rotation between spatial coordinates of two layers (may also do a reflection). Parameters ---------- X np array of spatial coordinates (ex: sliceA.obs['spatial']) Y np array of spatial coordinates (ex: sliceB.obs['spatial']) pi mapping between the two layers output in PASTE format (N x M matching matrix) ground_truth If known, the ground truth rotation angle to use for calculating error. output_angle Boolean of whether to return rotation angle. output_matrix Boolean of whether to return the rotation as a matrix or an angle. Returns ---------- Aligned spatial coordinates of X, Y, rotation angle, translation of X, translation of Y. Reference ---------- Modify from https://github.com/raphael-group/paste/blob/a9b10b24ba33e94a89dd89e8ee5e4900e18b1886/src/paste/visualization.py#L157 """ assert X.shape[1] == 2 and Y.shape[1] == 2 rad = np.deg2rad(ground_truth) X = rotate_via_numpy(X, rad) tX = pi.sum(axis=1).dot(X) tY = pi.sum(axis=0).dot(Y) X = X - tX Y = Y - tY H = Y.T.dot(pi.T.dot(X)) U, S, Vt = np.linalg.svd(H) R = Vt.T.dot(U.T) Y = R.dot(Y.T).T M = np.array([[0, -1], [1, 0]]) theta = np.arctan2(np.trace(M.dot(H)), np.trace(H)) theta = -np.degrees(theta) delta = np.absolute(theta - ground_truth) if output_angle and not output_matrix: return delta elif output_angle and output_angle: return delta, X, Y, R, tX, tY else: return X, Y