Source code for eegrasp.graph

r"""Graph Creation.
===================

Contains the functions used in EEGrasp to create Graphs 
"""

import numpy as np
from pygsp2 import graph_learning, graphs
from tqdm import tqdm

from .interpolate import interpolate_channel


[docs] def gaussian_kernel(x, sigma=0.1): """Gaussian Kernel Weighting function. Notes ----- This function is supposed to be used in the PyGSP2 module but is repeated here since there is an error in the available version of the toolbox. References ---------- * D. I. Shuman, S. K. Narang, P. Frossard, A. Ortega and P. Vandergheynst, "The emerging field of signal processing on graphs: Extending high-dimensional data analysis to networks and other irregular domains," in IEEE Signal Processing Magazine, vol. 30, no. 3, pp. 83-98, May 2013, doi: 10.1109/MSP.2012.2235192. """ return np.exp(-np.power(x, 2.) / (2. * np.power(float(sigma), 2)))
[docs] def compute_graph(W=None, epsilon=.5, sigma=.1, distances=None, graph=None, coordinates=None, method='enn', k=5): """Parameters ---------- W : numpy ndarray | None If W is passed, then the graph is computed. Otherwise the graph will be computed with `self.W`. `W` should correspond to a non-sparse 2-D array. If None, the function will use the distance matrix computed in the instance of the class (`self.W`). epsilon : float Any distance greater than epsilon will be set to zero on the adjacency matrix. sigma : float Sigma parameter for the gaussian kernel. method : str, default="enn" Method to determine graph connectivity. - "enn": Use ε-neighbourhood with Gaussian kernel and distance thresholding. - "knn": Use K-Nearest Neighbors. Each node connects to its `k` nearest neighbors with Gaussian weighting. k : int, default=5 Number of neighbors to connect when using method="knn". Ignored if method="epsilon". Returns ------- G: PyGSP2 Graph object. """ if W is None: if distances is None: raise TypeError( 'No distances found. Distances have to be computed if W is not provided' ) if method == "enn": graph_weights = gaussian_kernel(distances, sigma=sigma) graph_weights[distances > epsilon] = 0 np.fill_diagonal(graph_weights, 0) elif method == "knn": from sklearn.neighbors import NearestNeighbors N = distances.shape[0] graph_weights = np.zeros_like(distances) nbrs = NearestNeighbors(n_neighbors=k+1).fit(distances) _, indices = nbrs.kneighbors(distances) for i in range(N): for j in indices[i][1:]: # skip self graph_weights[i, j] = gaussian_kernel(distances[i, j], sigma=sigma) else: raise ValueError(f"Unknown method '{method}'. Use 'enn' or 'knn'.") graph = graphs.Graph(graph_weights) else: graph_weights = W graph = graphs.Graph(W) if coordinates is not None: graph.set_coordinates(coordinates) return graph, graph_weights
[docs] def learn_graph(Z=None, a=0.1, b=0.1, gamma=0.04, maxiter=1000, w_max=np.inf, mode='Average', data=None, **kwargs): """Learn the graph based on smooth signals. Parameters ---------- Z : ndarray Distance between the nodes. If not passed, the function will try to compute the euclidean distance using `self.data`. If `self.data` is a 2d array it will compute the euclidean distance between the channels. If the data is a 3d array it will compute a Z matrix per trial, assuming the first dimension in data is trials/epochs. Depending on the mode parameter, the function will average distance matrizes and learn the graph on the average distance or return a collection of adjacency matrices. Default is None. a : float Parameter for the graph learning algorithm, this controls the weights of the learned graph. Bigger a -> bigger weights in W. Default is 0.1. b : float Parameter for the graph learning algorithm, this controls the density of the learned graph. Bigger b -> more dense W. Default is 0.1. mode : string Options are: 'Average', 'Trials'. If 'average', the function returns a single W and Z. If 'Trials' the function returns a generator list of Ws and Zs. Default is 'Average'. data : ndarray | None 2d array of channels by samples. If None, the function will use the data computed in the instance of the class (`self.data`). Returns ------- W : ndarray Weighted adjacency matrix or matrices depending on mode parameter used. If run in 'Trials' mode then Z is a 3d array where the first dim corresponds to trials. Z : ndarray. Used distance matrix or matrices depending on mode parameter used. If run in 'Trials' mode then Z is a 3d array where the first dim corresponds to trials. """ from .utils import euc_dist # If no distance matrix is given compute based on # data's euclidean distance # Check if data contains trials if data.ndim == 3: Zs = np.zeros((data.shape[0], data.shape[1], data.shape[1])) # Check if we want to return average or trials if mode == 'Trials': Ws = np.zeros((data.shape[0], data.shape[1], data.shape[1])) for i, d in enumerate(tqdm(data)): # Compute euclidean distance Z = euc_dist(d) W = graph_learning.graph_log_degree(Z, a, b, gamma=gamma, w_max=w_max, maxiter=maxiter, **kwargs) W[W < 1e-5] = 0 Ws[i, :, :] = W.copy() Zs[i, :, :] = Z.copy() return Ws, Zs elif mode == 'Average': for i, d in enumerate(tqdm(data)): # Compute euclidean distance Zs[i, :, :] = euc_dist(d) Z = np.mean(Zs, axis=0) W = graph_learning.graph_log_degree(Z, a, b, gamma=gamma, w_max=w_max, maxiter=maxiter, **kwargs) W[W < 1e-5] = 0 return W, Z else: Z = euc_dist(data) W = graph_learning.graph_log_degree(Z, a, b, gamma=gamma, w_max=w_max, maxiter=maxiter, **kwargs) W = graph_learning.graph_log_degree( Z, a, b, gamma=gamma, w_max=w_max, maxiter=maxiter, **kwargs) W[W < 1e-5] = 0 return W, Z
[docs] def fit_sigma(missing_idx: int | list[int] | tuple[int], data=None, distances=None, epsilon=0.5, min_sigma=0.1, max_sigma=1., step=0.1): """Find the best parameter for the gaussian kernel. Parameters ---------- missing_idx : int | list | tuple Index of the missing channel. data : ndarray | None 2d array of channels by samples. If None, the function will use the data computed in the instance of the class (`self.data`). distances : ndarray | None Distance matrix (2-dimensional array). It can be passed to the instance of the class or as an argument of the method. If None, the function will use the distance computed in the instance of the class (`self.distances`). epsilon : float Maximum distance to threshold the array. Default is 0.5. min_sigma : float Minimum value for the sigma parameter. Default is 0.1. max_sigma : float Maximum value for the sigma parameter. Default is 1. step : float Step for the sigma parameter. Default is 0.1. Notes ----- Look for the best parameter of sigma for the gaussian kernel. This is done by interpolating a channel and comparing the interpolated data to the real data. After finding the parameter the graph is saved and computed in the instance class. The distance threshold is maintained. """ # Create array of parameter values vsigma = np.arange(min_sigma, max_sigma, step=step) # Create time array time = np.arange(data.shape[1]) # Mask to ignore missing channel ch_mask = np.ones(data.shape[0]).astype(bool) ch_mask[missing_idx] = False # Simulate eliminating the missing channel signal = data.copy() signal[missing_idx, :] = np.nan # Allocate array to reconstruct the signal all_reconstructed = np.zeros([len(vsigma), len(time)]) # Allocate Error array error = np.zeros([len(vsigma)]) # Loop to look for the best parameter for i, sigma in enumerate(tqdm(vsigma)): # Compute thresholded weight matrix graph, W = compute_graph(epsilon=epsilon, sigma=sigma, distances=distances) # Interpolate signal, iterating over time reconstructed = interpolate_channel(missing_idx=missing_idx, graph=graph, data=signal) all_reconstructed[i, :] = reconstructed[missing_idx, :] # Calculate error error[i] = np.linalg.norm(data[missing_idx, :] - all_reconstructed[i, :]) # Eliminate invalid trials valid_idx = ~np.isnan(error) error = error[valid_idx] vsigma = vsigma[valid_idx] all_reconstructed = all_reconstructed[valid_idx, :] # Find best reconstruction best_idx = np.argmin(np.abs(error)) best_sigma = vsigma[np.argmin(np.abs(error))] # Save best result in the signal array signal[missing_idx, :] = all_reconstructed[best_idx, :] # Compute the graph with the best result graph = compute_graph(distances, epsilon=epsilon, sigma=best_sigma) results = _return_results(error, signal, vsigma, 'sigma') return results
[docs] def fit_epsilon(missing_idx: int | list[int] | tuple[int], data=None, distances=None, sigma=0.1): """Find the best distance to use as threshold. Parameters ---------- missing_idx : int Index of the missing channel. Not optional. data : ndarray | None 2d array of channels by samples. If None, the function will use the data computed in the instance of the class (`self.data`). Default is `None`. distances : ndarray | None. Unthresholded distance matrix (2-dimensional array). It can be passed to the instance of the class or as an argument of the method. If None, the function will use the distance computed in the instance of the class (`self.distances`). Default is `None`. sigma : float Parameter of the Gaussian Kernel transformation. Default is 0.1. Returns ------- results : dict Dictionary containing the error, signal, best_epsilon and epsilon values. Notes ----- It will iterate through all the unique values of the distance matrix. data : 2-dimensional array. The first dim. is Channels and second is time. It can be passed to the instance class or the method """ # Vectorize the distance matrix dist_tril = _vectorize_matrix(distances) # Sort and extract unique values vdistances = np.sort(np.unique(dist_tril)) # Create time array time = np.arange(data.shape[1]) # Mask to ignore missing channel ch_mask = np.ones(data.shape[0]).astype(bool) ch_mask[missing_idx] = False # Simulate eliminating the missing channel signal = data.copy() signal[missing_idx, :] = np.nan # Allocate array to reconstruct the signal all_reconstructed = np.zeros([len(vdistances), len(time)]) # Allocate Error array error = np.zeros([len(vdistances)]) # Loop to look for the best parameter for i, epsilon in enumerate(tqdm(vdistances)): # Compute thresholded weight matrix graph, W = compute_graph(distances, epsilon=epsilon, sigma=sigma) # Interpolate signal, iterating over time reconstructed = interpolate_channel(missing_idx=missing_idx, graph=graph, data=signal) all_reconstructed[i, :] = reconstructed[missing_idx, :] # Calculate error error[i] = np.linalg.norm(data[missing_idx, :] - all_reconstructed[i, :]) # Eliminate invalid distances valid_idx = ~np.isnan(error) error = error[valid_idx] vdistances = vdistances[valid_idx] all_reconstructed = all_reconstructed[valid_idx, :] # Find best reconstruction best_idx = np.argmin(np.abs(error)) best_epsilon = vdistances[np.argmin(np.abs(error))] # Save best result in the signal array signal[missing_idx, :] = all_reconstructed[best_idx, :] # Compute the graph with the best result graph, W = compute_graph(distances, epsilon=best_epsilon, sigma=sigma) results = _return_results(error, signal, vdistances, 'epsilon') return results
def _return_results(error, signal, vparameter, param_name): """Wrap results into a dictionary. Parameters ---------- error : ndarray Errors corresponding to each tried parameter. vparameter : ndarray Values of the parameter used in the fit function. signal : ndarray Reconstructed signal. Notes ----- In order to keep everything under the same structure this function should be used to return the results of any self.fit_* function. """ best_idx = np.argmin(np.abs(error)) best_param = vparameter[best_idx] results = { 'error': error, 'signal': signal, f'best_{param_name}': best_param, f'{param_name}': vparameter } return results def _vectorize_matrix(mat): """Vectorize a symmetric matrix using the lower triangle. Returns ------- mat : ndarray. lower triangle of mat """ tril_indices = np.tril_indices(len(mat), -1) vec = mat[tril_indices] return vec