diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 00000000..0036caa8 Binary files /dev/null and b/.DS_Store differ diff --git a/openep/__init__.py b/openep/__init__.py index e775d84c..e2d4fd82 100644 --- a/openep/__init__.py +++ b/openep/__init__.py @@ -18,7 +18,7 @@ __all__ = ['case', 'mesh', 'draw'] -from .io.readers import load_openep_mat, load_opencarp, load_circle_cvi, load_vtk +from .io.readers import load_openep_mat, load_opencarp, load_circle_cvi, load_vtk, load_igb from .io.writers import export_openCARP, export_openep_mat, export_vtk from .converters.pyvista_converters import from_pyvista, to_pyvista from . import case, mesh, draw diff --git a/openep/analysis/__init__.py b/openep/analysis/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/openep/analysis/_conduction_velocity.py b/openep/analysis/_conduction_velocity.py new file mode 100644 index 00000000..e251b0b5 --- /dev/null +++ b/openep/analysis/_conduction_velocity.py @@ -0,0 +1,358 @@ +# OpenEP +# Copyright (c) 2021 OpenEP Collaborators +# +# This file is part of OpenEP. +# +# OpenEP is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# OpenEP is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program (LICENSE.txt). If not, see + +"""Module containing conduction velocity (CV) calculation methods and CV divergence method""" + +import math +from vedo import * +import pyvista as pv +from scipy.optimize import minimize +from sklearn.neighbors import KDTree + +from ..case.case_routines import interpolate_general_cloud_points_onto_surface + + +__all__ = ['preprocess_lat_egm', 'plane_fitting', 'divergence', + 'triangulation', 'radial_basis_function'] + + +def preprocess_lat_egm( + case, + include=None, + lat_threshold=-10000 +): + """ + Preprocess case data for conduction velocity calculation by removing local activation times + and corresponding bipolar electrogram points based on threshold and include. + + Args: + case (Case): The case data containing electrogram and annotation information. + + include (np.ndarray, optional): A boolean array specifying which data points to include + in the preprocessing for bipolar EGM data and LAT. + Defaults to "includes" specified in the imported data. + + lat_threshold (int, optional): Threshold for local activation times. Points with local + activation times less than this value will be removed. + Defaults to -1000. + If None, no threshold is applied. + + Returns: + tuple: A tuple containing two numpy arrays: + - local_activation_time (np.ndarray): Array of cleaned local activation times. + - bipolar_egm_pts (np.ndarray): Array of bipolar electrogram points corresponding + to the cleaned local activation times. + + """ + bipolar_egm_pts = case.electric.bipolar_egm.points[include] + local_activation_time = (case.electric.annotations.local_activation_time[include] + - case.electric.annotations.reference_activation_time[include]) + + if lat_threshold: + bipolar_egm_pts = np.delete(bipolar_egm_pts, np.where(local_activation_time == lat_threshold), 0) + local_activation_time = np.delete(local_activation_time, np.where(local_activation_time == lat_threshold)) + + return local_activation_time, bipolar_egm_pts + + +def plane_fitting( + bipolar_egm_pts, + local_activation_time, + leaf_size=5, + min_n_nearest_neighbours=5, + tree_query_radius=10 +): + """ + Fit a plane to bipolar electrogram points and calculate conduction velocity values and centroids. + + Args: + bipolar_egm_pts (array): + Array of bipolar electrogram points. + + local_activation_time (array): + Array of local activation times. + + leaf_size (int, optional): + Leaf size for KDTree. Defaults to 5. + + min_n_nearest_neighbours (int, optional): + Minimum number of nearest neighbours. Defaults to 5. + + tree_query_radius (int, optional): + Radius for tree query. Defaults to 10. + + Returns: + tuple: Array of conduction velocities and array of triangle center points. + + Example 1: + import openep + + case = openep.load_openep_mat("path/to/openep.mat") + mesh = case.create_mesh() + case.analyse.conduction_velocity.calculate(method='plane_fitting') + + Example 2: + >> import numpy as np + >> bipolar_egm_pts = np.array([[0, 0, 0], [1, 0, 0], [0, 1, 0], ...]) + >> local_activation_times = np.array([0.1, 0.2, 0.15, ...]) + >> cv_values, cv_centroids = plane_fitting(bipolar_egm_pts, local_activation_times) + + """ + cv_values = np.full(bipolar_egm_pts.shape[0], np.nan) + cv_centroids = np.empty((bipolar_egm_pts.shape)) + ones_arr = np.ones(10) + + tree = KDTree(bipolar_egm_pts, leaf_size=leaf_size) + all_nn_indices = tree.query_radius(bipolar_egm_pts, r=tree_query_radius) + all_nns = [[bipolar_egm_pts[idx] for idx in nn_indices] for nn_indices in all_nn_indices] + + for egm_i in range(bipolar_egm_pts.shape[0]): + + if len(all_nns[egm_i]) >= min_n_nearest_neighbours: + x1, x2, x3 = np.transpose(np.vstack((bipolar_egm_pts[egm_i], all_nns[egm_i]))) + m = np.append(local_activation_time[egm_i], local_activation_time[all_nn_indices[egm_i]]) + + res = minimize(_obj_func, ones_arr, args=(x1, x2, x3, m)) + a, b, c, d, e, f, g, h, q, l = res.x + + x, y, z = bipolar_egm_pts[egm_i] + + tx = 2 * a * x + d * y + e * z + g + ty = 2 * b * y + d * x + f * z + h + tz = 2 * c * z + e * x + f * y + q + + vx = tx / (tx ** 2 + ty ** 2 + tz ** 2) + vy = ty / (tx ** 2 + ty ** 2 + tz ** 2) + vz = tz / (tx ** 2 + ty ** 2 + tz ** 2) + + cv_values[egm_i] = np.sqrt(vx ** 2 + vy ** 2 + vz ** 2) + cv_centroids[egm_i] = [x, y, z] + else: + cv_centroids[egm_i] = bipolar_egm_pts[egm_i] + + return cv_values, cv_centroids + + +def triangulation( + bipolar_egm_pts, + local_activation_time, + min_theta=30, + electrode_distance_range=(1.5, 10), + min_lat_difference=2, + del_alpha=5 +): + """ + Perform triangulation on electrogram data to calculate conduction velocities. + + Args: + bipolar_egm_pts (array): + Array of bipolar electrogram points of size Nx3 + + local_activation_time (array): + Local activation times corresponding to bipolar_egm_pts. + + min_theta (float): + Minimum angle threshold for triangle cell acceptance (degrees). + + electrode_distance_range (tuple): + Range of acceptable electrode distances. + + min_lat_difference (float): + Minimum local activation time difference. + + del_alpha (float): + Alpha value for Delaunay triangulation. + + Returns: + tuple: Array of conduction velocities and array of triangle center points. + + Example: + import openep + + case = openep.load_openep_mat("path/to/openep.mat") + mesh = case.create_mesh() + case.analyse.conduction_velocity.calculate(method='triangulation') + + Reference: + This method is based on the paper: + Age-Related Changes in Human Left and Right Atrial Conduction by PIPIN KOJODJOJO M.R.C.P. + """ + + bi_egm_point_cloud = pv.PolyData(bipolar_egm_pts) + bi_egm_surface = bi_egm_point_cloud.delaunay_3d(alpha=del_alpha) + + # Only extract triangle cell types from mesh, see pyvista.CellType.TRIANGLE (indexed at 5 in cells_dict) + bi_egm_cells = bi_egm_surface.cells_dict[5] + + cv_values = np.full(bi_egm_cells.shape[0], np.nan) + cv_centroids = np.empty(bi_egm_cells.shape) + + for cell_i in range(bi_egm_cells.shape[0]): + + # Find vtx id of cell and sort by LAT value + vtx_id = bi_egm_cells[cell_i].astype(int) + lat = local_activation_time[vtx_id] + lat_sorted_id = np.argsort(lat) + + # Assign point O, A and B to the cell vtx + lat_O, lat_A, lat_B = lat[lat_sorted_id] + point_O, point_A, point_B = bipolar_egm_pts[vtx_id[lat_sorted_id]] + + # Calculate distance between OA, OB and AB + dist_OA = np.linalg.norm(np.subtract(point_O, point_A)) + dist_OB = np.linalg.norm(np.subtract(point_O, point_B)) + dist_AB = np.linalg.norm(np.subtract(point_A, point_B)) + + # Calculate difference in LAT between OA and OB + tOA = lat_A - lat_O + tOB = lat_B - lat_O + + # Calculate angles subtended by OA and OB + theta = np.arccos((dist_OA**2 + dist_OB**2 - dist_AB**2) / (2 * dist_OA * dist_OB)) + + # Conditions to check if triangle is acceptable + if (math.degrees(theta) >= min_theta + and electrode_distance_range[0] <= dist_OA <= electrode_distance_range[1] + and electrode_distance_range[0] <= dist_OB <= electrode_distance_range[1] + and tOA >= min_lat_difference + and tOB >= min_lat_difference + ): + # Calculate angle subtended by OA and the direction of wavefront propagation + alpha = np.arctan((tOB * dist_OA - tOA * dist_OB * np.cos(theta)) / (tOA * dist_OB * np.sin(theta))) + cv_temp = (dist_OA / tOA) * np.cos(alpha) + cv_values[cell_i] = cv_temp + else: + cv_values[cell_i] = np.nan + + centroid_i = np.mean([point_O, point_A, point_B], axis=0) + cv_centroids[cell_i] = centroid_i + + # return np.array(cv_values), np.array(cv_centroids) + return cv_values, cv_centroids + + +def _obj_func(coef, x1, x2, x3, m): + """ + Objective function for plane fitting method. + + This function is used to fit a surface to the activations in the plane fitting method. + It calculates the norm of the difference between the measured values 'm' and the + values predicted by the quadratic surface defined by the coefficients. + + Parameters: + coef (array): Coefficients of the quadratic surface (a, b, c, d, e, f, g, h, q, l). + x1 (array): X-coordinates of the data points. + x2 (array): Y-coordinates of the data points. + x3 (array): Z-coordinates of the data points. + m (array): Measured values at the data points. + + Returns: + float: The norm of the difference between measured and predicted values. + """ + a, b, c, d, e, f, g, h, q, l = coef + predicted = (a * x1 ** 2 + + b * x2 ** 2 + + c * x3 ** 2 + + d * x1 * x2 + + e * x1 * x3 + + f * x2 * x3 + + g * x1 + + h * x2 + + q * x3 + + l) + return np.linalg.norm(m - predicted) + + +def radial_basis_function( + bipolar_egm_pts, + local_activation_time +): + raise NotImplementedError("Method: radial_basis_function has not been implemented yet for estimating CV!") + + +def divergence( + case, + bipolar_egm_pts, + local_activation_time, + collision_threshold=-1, + focal_threshold=1, + output_binary_field=False, + interpolation_kws=None, +): + """ + Calculate divergence of the conduction velocity (CV) field to identify regions of collision and + focal activation within cardiac tissue based on local activation times (LATs) and electrogram points. + + Args: + case (Case): The case object containing cardiac geometry and electrogram data. + + bipolar_egm_pts (np.ndarray): An array of coordinates for bipolar electrogram points. + + local_activation_time (np.ndarray): An array of local activation times corresponding to + bipolar electrogram points. + + collision_threshold (float, optional): Divergence value below which regions are considered + as collision fronts. Defaults to -1. + + focal_threshold (float, optional): Divergence value above which regions are considered as + focal activation fronts. Defaults to 1. + + output_binary_field (bool, optional): If True, the output divergence field is binary, with 1 + indicating regions meeting the collision or focal threshold, + and 0 otherwise. If False, the continuous divergence field + is returned. Defaults to False. + + interpolation_kws (dict, optional): Keyword arguments for interpolation function. + > interpolate_general_cloud_points_onto_surface(**interpolation_kws) + Defaults to None. + + Returns: + tuple: A tuple containing two elements: + - norm_cv_direction (np.ndarray): The normalized direction of conduction velocity + across the cardiac tissue surface. + - divergence (np.ndarray): The divergence of the activation direction field. This + can either be a continuous field or a binary field indicating focal and collision + regions based on the 'output_binary_field' parameter. + """ + + temp_mesh = case.create_mesh() + basic_mesh = pv.PolyData(temp_mesh.points, temp_mesh.faces) + interpolation_kws = dict() if interpolation_kws is None else interpolation_kws + + interpolated_scalar = interpolate_general_cloud_points_onto_surface( + case=case, + cloud_values=local_activation_time, + cloud_points=bipolar_egm_pts, + **interpolation_kws + ) + + basic_mesh['LAT_scalar'] = interpolated_scalar + derivative = basic_mesh.compute_derivative(scalars='LAT_scalar') + + cv_direction = derivative['gradient'] / np.sum(derivative['gradient'] ** 2, axis=1)[:, np.newaxis] + magnitude = np.sqrt(np.sum(cv_direction ** 2, axis=1)) + norm_cv_direction = cv_direction / magnitude[:, np.newaxis] + + basic_mesh['activation_direction'] = cv_direction + div = basic_mesh.compute_derivative(scalars='activation_direction', divergence=True) + divergence = div['divergence'] + + if output_binary_field: + divergence = np.where((divergence < collision_threshold) | (divergence > focal_threshold), 1, 0) + + return norm_cv_direction, divergence diff --git a/openep/analysis/analyse.py b/openep/analysis/analyse.py new file mode 100644 index 00000000..84c2326d --- /dev/null +++ b/openep/analysis/analyse.py @@ -0,0 +1,227 @@ +# OpenEP +# Copyright (c) 2021 OpenEP Collaborators +# +# This file is part of OpenEP. +# +# OpenEP is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# OpenEP is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program (LICENSE.txt). If not, see + +"""Module containing analysis classes""" +from ._conduction_velocity import * +from ..case.case_routines import interpolate_general_cloud_points_onto_surface + + +class Analyse: + """ + Analyse class for managing analysis of case data. + + Args: + case (Case): The case data to be analysed. + + Attributes: + conduction_velocity (ConductionVelocity): An instance of ConductionVelocity for conducting + velocity analysis on the case. + + divergence (Divergence): An instance of Divergence for conducting divergence analysis on the case. + + """ + def __init__(self, case): + self.conduction_velocity = ConductionVelocity(case) + self.divergence = Divergence(case) + + +class ConductionVelocity: + """ + Class for calculating and analyzing conduction velocity. + + This class provides methods to calculate conduction velocity from case data and optionally + interpolate these values onto a mesh. It handles different methods of calculation and manages + the resulting data. + + Attributes: + _case (Case): Internal reference to the case data. + values (np.ndarray): The calculated conduction velocity values. + centers (np.ndarray): The centers corresponding to the conduction velocity values. + + Parameters: + case (Case): The case data from which conduction velocity is to be calculated. + """ + def __init__(self, case): + self._case = case + self.values = None + self.centers = None + + def calculate_cv( + self, + method='triangulation', + include=None, + apply_scalar_field=True, + interpolation_kws=None, + **method_kwargs + ): + """ + Calculate the conduction velocity and interpolate points onto a mesh. + + This method calculates conduction velocities using one of the specified methods and + optionally interpolates the calculated values onto a mesh, storing the results in the + 'conduction_velocity' field of the case object. + + Args: + method (str): The method to use for calculating conduction velocity. Options are: + - 'triangulation' + - 'plane_fitting' + - 'rbf' + Defaults to 'triangulation'. + + include (np.ndarray, optional): A boolean array specifying which data points to include + in the preprocessing for bipolar EGM data and LAT. + Defaults to "includes" specified in the imported data. + + apply_scalar_field (bool, optional): If True, applies the calculated conduction velocity + values as a scalar field on the case object's mesh. + Defaults to True. + + interpolation_kws (dict, optional): Keyword arguments for interpolation function. + > interpolate_general_cloud_points_onto_surface(**interpolation_kws) + Defaults to None. + + **method_kwargs: Additional keyword arguments specific to the chosen calculation method. + + Returns: + tuple: A tuple containing two elements: + - values (np.ndarray): An array of calculated conduction velocity values. + - centers (np.ndarray): An array of centers corresponding to the calculated values. + + Raises: + ValueError: If the specified method is not among the available options. + + Example: + >> cv = ConductionVelocity(case) + >> values, centers = cv.calculate(method='plane_fitting', apply_scalar_field=True) + """ + + supported_cv_methods = { + "triangulation": triangulation, + "plane_fitting": plane_fitting, + "rbf": radial_basis_function + } + + if method.lower() not in supported_cv_methods: + raise ValueError(f"`method` must be one of {supported_cv_methods.keys()}.") + + if include is None and self._case.electric.include is None: + raise TypeError(f"include object is of 'NoneType'") + else: + include = self._case.electric.include.astype(bool) if include is None else include + + interpolation_kws = dict() if interpolation_kws is None else interpolation_kws + + bipolar_egm_pts = self._case.electric.bipolar_egm.points[include] + lat = (self._case.electric.annotations.local_activation_time[include] + - self._case.electric.annotations.reference_activation_time[include]) + + cv_method = supported_cv_methods[method] + self.values, self.centers = cv_method(bipolar_egm_pts, lat, **method_kwargs) + + if apply_scalar_field: + self._case.fields.conduction_velocity = interpolate_general_cloud_points_onto_surface( + case=self._case, + cloud_values=self.values, + cloud_points=self.centers, + **interpolation_kws + ) + + return self.values, self.centers + + +class Divergence: + """ + Class for calculating and analyzing divergence of conduction velocity. + + This class provides methods to calculate the divergence of conduction velocity from case data. + It can output binary fields indicating regions of divergence and manage the resulting data. + + Attributes: + _case (Case): Internal reference to the case data. + direction (np.ndarray): The calculated divergence direction vectors. + values (np.ndarray): The calculated divergence values. + + Args: + case (Case): The case data from which divergence is to be calculated. + """ + def __init__(self, case): + self._case = case + self.direction = None + self.values = None + + def calculate_divergence( + self, + include=None, + output_binary_field=False, + apply_scalar_field=True, + interpolation_kws=None, + ): + """ + Calculate the divergence of conduction velocity and optionally apply the result as a scalar field. + + Parameters: + include (np.ndarray, optional): A boolean array specifying which data points to include in the + calculation. If None, the 'include' array from the case's electric + data is used. Defaults to None. + + output_binary_field (bool, optional): If True, the output will be a binary field where 1 indicates + regions of divergence and 0 otherwise. + Defaults to False. + + apply_scalar_field (bool, optional): If True, the calculated divergence values (or binary field, if + 'output_binary_field' is True) are applied as a scalar field to + the case object. + Defaults to True. + + interpolation_kws (dict, optional): Keyword arguments for interpolation function. + > interpolate_general_cloud_points_onto_surface(**interpolation_kws) + Defaults to None. + + Returns: + tuple of (np.ndarray, np.ndarray): A tuple containing the divergence direction and divergence values. + The direction is a 2D array of shape (N, 3), where N is the number + of points, and each row represents the (x, y, z) components of the + divergence direction. The values are a 1D array of divergence values. + + Example: + case = load_case_data('path/to/case_data') + cv = ConductionVelocity(case) + direction, values = cv.calculate_divergence(output_binary_field=True, apply_scalar_field=True) + """ + + if include is None and self._case.electric.include is None: + raise TypeError(f"include object is of 'NoneType'") + else: + include = self._case.electric.include.astype(bool) if include is None else include + + bipolar_egm_pts = self._case.electric.bipolar_egm.points[include] + lat = (self._case.electric.annotations.local_activation_time[include] + - self._case.electric.annotations.reference_activation_time[include]) + + self.direction, self.values = divergence( + case=self._case, + bipolar_egm_pts=bipolar_egm_pts, + local_activation_time=lat, + output_binary_field=output_binary_field, + interpolation_kws=interpolation_kws, + ) + + if apply_scalar_field: + self._case.fields.cv_divergence = self.values + + return self.direction, self.values diff --git a/openep/case/__init__.py b/openep/case/__init__.py index 61a81044..e6e4d3b5 100644 --- a/openep/case/__init__.py +++ b/openep/case/__init__.py @@ -9,5 +9,6 @@ Interpolator, interpolate_activation_time_onto_surface, interpolate_voltage_onto_surface, + interpolate_general_cloud_points_onto_surface, bipolar_from_unipolar_surface_points, ) diff --git a/openep/case/case_routines.py b/openep/case/case_routines.py index d3bbf431..69249c0d 100644 --- a/openep/case/case_routines.py +++ b/openep/case/case_routines.py @@ -76,6 +76,7 @@ class and associated keyword arguments to the `method` and `method_kws` 'interpolate_activation_time_onto_surface', 'interpolate_voltage_onto_surface', 'bipolar_from_unipolar_surface_points', + 'interpolate_general_cloud_points_onto_surface' ] @@ -454,6 +455,59 @@ def __repr__(self): return f"Interpolator: method={self.method}, kws={self.method_kws}" +def interpolate_general_cloud_points_onto_surface( + case, + cloud_values, + cloud_points, + method=scipy.interpolate.RBFInterpolator, + method_kws=None, + max_distance=None, + include=None, +): + """Interpolate general point data onto the points of a mesh. + + Args: + case (openep.case.Case): case from which the cloud values and points will be interpolated + cloud_values (ndarray): Array of values to be interpolated, corresponding to cloud_points. + cloud_points (ndarray): Array of points where cloud_values are defined. + method (callable): method to use for interpolation. The default is + scipy.interpolate.RBFInterpolator. + method_kws (dict): dictionary of keyword arguments to pass to `method` + when creating the interpolator. + max_distance (float, optional): If provided, any points on the surface of the mesh + further than this distance to all mapping coordinates will have their + interpolated activation times set NaN. The default it None, in which case + the distance from surface points to mapping points is not considered. + include (np.ndarray, optional): Flag for which mapping points to include when creating + the interpolator. If None, `case.electric.include` will be used. + + Returns: + interpolated_lat (ndarray): local activation times interpolated onto the surface of the mesh, + one value per point on the mesh. + """ + + surface_points = case.points + + include = [not np.isnan(x) for x in cloud_values] if include is None else include + cloud_points = cloud_points[include] + cloud_values = cloud_values[include] + + interpolator = Interpolator( + cloud_points, + cloud_values, + method=method, + method_kws=method_kws, + ) + + interpolated = interpolator(surface_points, max_distance=max_distance) + + # Any points that are not part of the mesh faces should have its value set to NaN + n_surface_points = surface_points.shape[0] + not_on_surface = ~np.in1d(np.arange(n_surface_points), case.indices) + interpolated[not_on_surface] = np.NaN + + return interpolated + def interpolate_activation_time_onto_surface( case, method=scipy.interpolate.RBFInterpolator, diff --git a/openep/converters/pyvista_converters.py b/openep/converters/pyvista_converters.py index 0c602607..92a6c893 100644 --- a/openep/converters/pyvista_converters.py +++ b/openep/converters/pyvista_converters.py @@ -93,5 +93,7 @@ def to_pyvista( mesh.point_data["LAT"] = case.fields.local_activation_time mesh.point_data["Impedance"] = case.fields.impedance mesh.point_data["Force"] = case.fields.force + mesh.point_data["Conduction velocity"] = case.fields.conduction_velocity + mesh.point_data["Divergence"] = case.fields.cv_divergence return mesh diff --git a/openep/data_structures/ablation.py b/openep/data_structures/ablation.py index 8f285968..d610c18a 100644 --- a/openep/data_structures/ablation.py +++ b/openep/data_structures/ablation.py @@ -114,7 +114,13 @@ def extract_ablation_data(ablation_data): as well as the force applied. """ - if isinstance(ablation_data, np.ndarray) or ablation_data['originaldata']['ablparams']['time'].size == 0: + if isinstance(ablation_data, np.ndarray): + return Ablation() + + try: + if not ablation_data['originaldata']['ablparams']['time'].size: + return Ablation() + except KeyError as e: return Ablation() times = ablation_data['originaldata']['ablparams']['time'].astype(float) diff --git a/openep/data_structures/arrows.py b/openep/data_structures/arrows.py new file mode 100644 index 00000000..10ca39f3 --- /dev/null +++ b/openep/data_structures/arrows.py @@ -0,0 +1,62 @@ +from attr import attrs +import numpy as np + +__all__ = [] + + +@attrs(auto_attribs=True, auto_detect=True) +class Arrows: + """ + Class for storing information about arrows and lines on surface + + Args: + fibres (np.ndarray): array of shape N_cells x 3 + divergence (np.ndarray): array of shape N_cells x 3 + linear_connections (np.ndarray): array of shape M x 3 (represents the linear connections between endo and epi) + linear_connection_regions (np.ndarray): array of shape N_cells + """ + + # TODO: move divergence arrows into Arrows class + # TODO: remove longitudinal and transversal arrows from Fields class + fibres: np.ndarray = None + divergence: np.ndarray = None + linear_connections: np.ndarray = None + linear_connection_regions: np.ndarray = None + + def __repr__(self): + return f"arrows: {tuple(self.__dict__.keys())}" + + def __getitem__(self, arrow): + try: + return self.__dict__[arrow] + except KeyError: + raise ValueError(f"There is no arrow '{arrow}'.") + + def __setitem__(self, arrow, value): + if arrow not in self.__dict__.keys(): + raise ValueError(f"'{arrow}' is not a valid arrow name.") + self.__dict__[arrow] = value + + def __iter__(self): + return iter(self.__dict__.keys()) + + def __contains__(self, arrow): + return arrow in self.__dict__.keys() + + @property + def linear_connection_regions_names(self): + if self.linear_connection_regions is None: + return [] + regions = np.unique(self.linear_connection_regions).astype(str) + return regions.tolist() + + def copy(self): + """Create a deep copy of Arrows""" + + arrows = Arrows() + for arrow in self: + if self[arrow] is None: + continue + arrows[arrow] = np.array(self[arrow]) + + return arrows diff --git a/openep/data_structures/case.py b/openep/data_structures/case.py index f4ced3b6..50394065 100644 --- a/openep/data_structures/case.py +++ b/openep/data_structures/case.py @@ -83,8 +83,10 @@ import pyvista from .surface import Fields +from .arrows import Arrows from .electric import Electric, Electrogram, Annotations, ElectricSurface from .ablation import Ablation +from ..analysis.analyse import Analyse from ..case.case_routines import ( bipolar_from_unipolar_surface_points, calculate_distance, @@ -121,6 +123,7 @@ def __init__( electric: Electric, ablation: Optional[Ablation] = None, notes: Optional[List] = None, + arrows: Optional[Arrows] = None, ): self.name = name @@ -130,6 +133,8 @@ def __init__( self.ablation = ablation self.electric = electric self.notes = notes + self.arrows = Arrows() if arrows is None else arrows + self.analyse = Analyse(case=self) def __repr__(self): return f"{self.name}( nodes: {self.points.shape} indices: {self.indices.shape} {self.fields} )" diff --git a/openep/data_structures/surface.py b/openep/data_structures/surface.py index 543e4a35..16711a31 100644 --- a/openep/data_structures/surface.py +++ b/openep/data_structures/surface.py @@ -20,6 +20,7 @@ from attr import attrs import numpy as np +import warnings __all__ = [] @@ -35,7 +36,7 @@ class Fields: local_activation_time (np.ndarray): array of shape N_points impedance (np.ndarray): array of shape N_points force (np.ndarray): array of shape N_points - region (np.ndarray): array of shape N_cells + cell_region (np.ndarray): array of shape N_cells longitudinal_fibres (np.ndarray): array of shape N_cells x 3 transverse_fibres (np.ndarray): array of shape N_cells x 3 pacing_site (np.ndarray): array of shape N_points @@ -51,6 +52,9 @@ class Fields: longitudinal_fibres: np.ndarray = None transverse_fibres: np.ndarray = None pacing_site: np.ndarray = None + conduction_velocity: np.ndarray = None + cv_divergence: np.ndarray = None + histogram: np.ndarray = None def __repr__(self): return f"fields: {tuple(self.__dict__.keys())}" @@ -139,31 +143,28 @@ def extract_surface_data(surface_data): unipolar_voltage = None impedance = None force = None + elif surface_data['uni_imp_frc'].size == 2: + unipolar_voltage, impedance = surface_data['uni_imp_frc'].T.astype(float) + force = None + warnings.warn("Force data was not detected in surface_data, force=None", UserWarning) else: unipolar_voltage, impedance, force = surface_data['uni_imp_frc'].T.astype(float) - if all(np.isnan(unipolar_voltage)): - unipolar_voltage = None - if all(np.isnan(impedance)): - impedance = None - if all(np.isnan(force)): - force = None - try: - thickness = surface_data['thickness'].astype(float) - except KeyError as e: - thickness = None + if isinstance(force, np.ndarray) and all(np.isnan(unipolar_voltage)): + unipolar_voltage = None + if isinstance(force, np.ndarray) and all(np.isnan(impedance)): + impedance = None + if isinstance(force, np.ndarray) and all(np.isnan(force)): + force = None - if isinstance(thickness, np.ndarray) and thickness.size == 0: - thickness = None + thickness = surface_data.get('thickness', None) + if isinstance(thickness, np.ndarray): + thickness = None if thickness.size == 0 else thickness.astype(float) # This is defined on a per-cell bases - try: - cell_region = surface_data['cell_region'].astype(int) - except KeyError as e: - cell_region = None - - if isinstance(cell_region, np.ndarray) and cell_region.size == 0: - cell_region = None + cell_region = surface_data.get('cell_region', None) + if isinstance(cell_region, np.ndarray): + cell_region = None if cell_region.size == 0 else cell_region.astype(int) # Fibre orientation are vectors defined on a per-cell basis try: @@ -183,13 +184,27 @@ def extract_surface_data(surface_data): transverse_fibres = None # Pacing site point ids (-1 is not pacing site) - try: - pacing_site = surface_data['pacing_site'].astype(int) - except KeyError as e: - pacing_site = None + pacing_site = surface_data.get('pacing_site', None) + if isinstance(pacing_site, np.ndarray): + pacing_site = None if pacing_site.size == 0 else pacing_site.astype(int) + + if surface_data.get('signalMaps'): + try: + conduction_velocity = surface_data['signalMaps']['conduction_velocity_field'].get('value', None) + if isinstance(conduction_velocity, np.ndarray): + conduction_velocity = None if conduction_velocity.size == 0 else conduction_velocity.astype(float) + except KeyError: + conduction_velocity = None - if isinstance(pacing_site, np.ndarray) and pacing_site.size == 0: - pacing_site = None + try: + cv_divergence = surface_data['signalMaps']['divergence_field'].get('value', None) + if isinstance(cv_divergence, np.ndarray): + cv_divergence = None if cv_divergence.size == 0 else cv_divergence.astype(float) + except KeyError: + cv_divergence = None + else: + conduction_velocity = None + cv_divergence = None fields = Fields( bipolar_voltage=bipolar_voltage, @@ -202,6 +217,8 @@ def extract_surface_data(surface_data): longitudinal_fibres=longitudinal_fibres, transverse_fibres=transverse_fibres, pacing_site=pacing_site, + conduction_velocity=conduction_velocity, + cv_divergence=cv_divergence, ) return points, indices, fields diff --git a/openep/draw/draw_routines.py b/openep/draw/draw_routines.py index e586be3c..8a56c9c0 100644 --- a/openep/draw/draw_routines.py +++ b/openep/draw/draw_routines.py @@ -63,9 +63,10 @@ def draw_free_boundaries( width: int = 5, plotter: pyvista.Plotter = None, names: List[str] = None, + combine: bool = False ): """ - Draw the freeboundaries of a mesh. + Draw the free boundaries of a mesh. Args: free_boundaries (FreeBoundary): `FreeBoundary` object. Can be generated using @@ -76,14 +77,15 @@ def draw_free_boundaries( If None, a new plotting object will be created. names (List(str)): List of names to associated with the actors. Default is None, in which case actors will be called 'free_boundary_n', where n is the index of the boundary. + combine (bool): Combines all free boundaries into one Actor (faster load time). Returns: plotter (pyvista.Plotter): Plotting object with the free boundaries added. - """ - + combined_lines = pyvista.PolyData() if combine else None plotter = pyvista.Plotter() if plotter is None else plotter colours = [colour] * free_boundaries.n_boundaries if isinstance(colour, str) else colour + if names is None: names = [f"free_boundary_{boundary_index:d}" for boundary_index in range(free_boundaries.n_boundaries)] @@ -91,12 +93,31 @@ def draw_free_boundaries( points = free_boundaries.points[boundary[:, 0]] points = np.vstack([points, points[:1]]) # we need to close the loop - plotter.add_lines( - points, - color=colours[boundary_index], - width=width, - name=names[boundary_index], - ) + + # store the lines to be added in later + if combine: + lines = pyvista.lines_from_points(points) + combined_lines = combined_lines.merge(lines) + + # add each line one-by-one + else: + plotter.add_lines( + points, + color=colours[boundary_index], + width=width, + name=names[boundary_index], + connected=True + ) + + if combine: + # add the combined lines as a single Actor manually (modified code of add_lines) + actor = pyvista.Actor(mapper=pyvista.DataSetMapper(combined_lines)) + actor.prop.line_width = width + actor.prop.show_edges = True + actor.prop.edge_color = colour + actor.prop.color = colour + actor.prop.lighting = False + plotter.add_actor(actor, reset_camera=False, name=names[0], pickable=False) return plotter diff --git a/openep/io/matlab.py b/openep/io/matlab.py index afe9a7a6..22be3156 100644 --- a/openep/io/matlab.py +++ b/openep/io/matlab.py @@ -51,6 +51,7 @@ def _visit_mat_v73_(file_pointer): 'userdata/electric/electrodeNames_uni', 'userdata/electric/names', 'userdata/electric/tags', + 'userdata/electric/ecgNames' } strings_to_decode = { diff --git a/openep/io/readers.py b/openep/io/readers.py index eba6199c..4b9077e7 100644 --- a/openep/io/readers.py +++ b/openep/io/readers.py @@ -53,6 +53,7 @@ """ import os +import re import scipy.io import numpy as np @@ -63,9 +64,10 @@ from ..data_structures.surface import extract_surface_data, Fields from ..data_structures.electric import extract_electric_data, Electric from ..data_structures.ablation import extract_ablation_data, Ablation +from ..data_structures.arrows import Arrows from ..data_structures.case import Case -__all__ = ["load_openep_mat", "_load_mat", "load_opencarp", "load_circle_cvi", "load_vtk"] +__all__ = ["load_openep_mat", "_load_mat", "load_opencarp", "load_circle_cvi", "load_vtk", "load_igb"] def _check_mat_version_73(filename): @@ -158,28 +160,43 @@ def load_opencarp( points_data = np.loadtxt(points, skiprows=1) points_data *= scale_points - indices_data = np.loadtxt(indices, skiprows=1, usecols=[1, 2, 3], dtype=int) - cell_region = np.loadtxt(indices, skiprows=1, usecols=4, dtype=int) - - longitudinal_fibres = None - transverse_fibres = None - if fibres is not None: - fibres_data = np.loadtxt(fibres, skiprows=1, dtype=float) - longitudinal_fibres = fibres_data[:, :3] - if fibres_data.shape[1] == 6: - transverse_fibres = fibres_data[:, 3:] - - # Create empty data structures for pass to Case + fibres_data = None if fibres is None else np.loadtxt(fibres) + + indices_data, cell_region_data = [], [] + linear_connection_data, linear_connection_regions = [], [] + + with open(indices) as elem_file: + data = elem_file.readlines() + for elem in data: + parts = elem.strip().split() + if parts[0] == 'Tr': + indices_data.append(list(map(int, parts[1:4]))) + cell_region_data.append(int(parts[4])) + elif parts[0] == 'Ln': + linear_connection_data.append(list(map(int, parts[1:3]))) + linear_connection_regions.append(int(parts[3])) + + indices_data = np.array(indices_data) + cell_region = np.array(cell_region_data) + linear_connection_data = np.array(linear_connection_data) + linear_connection_regions = np.array(linear_connection_regions) + + arrows = Arrows( + fibres=fibres_data, + linear_connections=linear_connection_data, + linear_connection_regions=linear_connection_regions + ) + fields = Fields( cell_region=cell_region, - longitudinal_fibres=longitudinal_fibres, - transverse_fibres=transverse_fibres, + longitudinal_fibres=None, + transverse_fibres=None, ) electric = Electric() ablation = Ablation() notes = np.asarray([], dtype=object) - return Case(name, points_data, indices_data, fields, electric, ablation, notes) + return Case(name, points_data, indices_data, fields, electric, ablation, notes, arrows) def load_vtk(filename, name=None): @@ -281,3 +298,43 @@ def load_circle_cvi(filename, dicoms_directory, extract_epi=True, extract_endo=T return epi_mesh elif extract_endo: return endo_mesh + + +def load_igb(igb_filepath): + """ + Reads an .igb file, returning the data and header information. + + Args: + igb_filepath (str): Path to the .igb file. + + Returns: + tuple: + - numpy.ndarray: 2D array of the file's data. + - dict: Contents of the header including 't' value (time steps) and other parameters. + """ + with open(igb_filepath, 'rb') as file: + header = file.read(1024).decode('utf-8') + header = header.replace('\r', ' ').replace('\n', ' ').replace('\0', ' ') + hdr_content = {} + + # Parse the header to dict format + for part in header.split(): + key, value = part.split(':') + if key in ['x', 'y', 'z', 't', 'bin', 'num', 'lut', 'comp']: + hdr_content[key] = int(value) + elif key in ['facteur','zero','epais'] or key.startswith('org_') or key.startswith('dim_') or key.startswith('inc_'): + hdr_content[key] = float(value) + else: + hdr_content[key] = value + + # Process file data + words = header.split() + word = [int(re.split(r"(\d+)", w)[1]) for w in words[:4]] + nnode = word[0] * word[1] * word[2] + size = os.path.getsize(igb_filepath) // 4 // nnode + + file.seek(1024) + data = np.fromfile(file, dtype=np.float32, count=size * nnode) + data = data.reshape((size, nnode)).transpose() + + return data, hdr_content diff --git a/openep/io/writers.py b/openep/io/writers.py index 06490d58..1f3c14c5 100644 --- a/openep/io/writers.py +++ b/openep/io/writers.py @@ -51,9 +51,10 @@ """ -import pathlib +import pandas as pd import numpy as np import scipy.io +import pathlib from openep.data_structures.ablation import Ablation from openep.data_structures.case import Case @@ -64,6 +65,8 @@ "export_openCARP", "export_openep_mat", "export_vtk", + "export_vtx", + "export_csv" ] @@ -72,6 +75,7 @@ def export_openCARP( prefix: str, scale_points: float = 1, export_transverse_fibres: bool = True, + export_pacing_site: bool = True, ): """Export mesh data from an OpenEP data to openCARP format. @@ -104,59 +108,58 @@ def export_openCARP( comments='', ) - # Save elements info + # Total number of lines n_triangles = case.indices.shape[0] + n_lin_conns = 0 if case.arrows.linear_connections is None else case.arrows.linear_connections.shape[0] + n_lines = n_triangles + n_lin_conns + + # Save triangulation elements info cell_type = np.full(n_triangles, fill_value="Tr") cell_region = case.fields.cell_region if case.fields.cell_region is not None else np.zeros(n_triangles, dtype=int) - elements = np.concatenate( - [ - cell_type[:, np.newaxis], - case.indices, - cell_region[:, np.newaxis], - ], - axis=1, - dtype=object, - ) + combined_cell_data = [cell_type[:, np.newaxis], case.indices, cell_region[:, np.newaxis]] + combined_cell_data = np.concatenate(combined_cell_data, axis=1, dtype=object) np.savetxt( output_path.with_suffix(".elem"), - elements, + combined_cell_data, fmt="%s %d %d %d %d", - header=str(n_triangles), + header=str(n_lines), comments='', ) - # Save fibres - n_fibre_vectors = 2 if export_transverse_fibres else 1 - fibres = np.zeros((n_triangles, n_fibre_vectors * 3), dtype=float) + # Save linear connections info + if case.arrows.linear_connections is not None: + conn_type = np.full(n_lin_conns, fill_value="Ln") + ln_region = case.arrows.linear_connection_regions if case.arrows.linear_connection_regions is not None else np.zeros(n_lin_conns, dtype=int) - if case.fields.longitudinal_fibres is not None: - fibres[:, :3] = case.fields.longitudinal_fibres - else: - fibres[:, 0] = 1 + combined_ln_conn_data = [conn_type[:, np.newaxis], case.arrows.linear_connections, ln_region[:, np.newaxis]] + combined_ln_conn_data = np.concatenate(combined_ln_conn_data, axis=1, dtype=object) - if export_transverse_fibres: - if case.fields.transverse_fibres is not None: - fibres[:, 3:] = case.fields.transverse_fibres - else: - fibres[:, 3] = 1 + # append to the elem file + with open(output_path.with_suffix(".elem"), 'a') as elem_file: + np.savetxt( + elem_file, + combined_ln_conn_data, + fmt="%s %d %d %d", + ) - np.savetxt( - output_path.with_suffix('.lon'), - fibres, - fmt="%.6f", - header=str(n_fibre_vectors), - comments='', - ) + # Save fibres + if case.arrows.fibres is not None: + np.savetxt( + output_path.with_suffix('.lon'), + case.arrows.fibres, + fmt="%.6f", + comments='', + ) # Saving pacing sites if they exist - if case.fields.pacing_site is None: + if case.fields.pacing_site is None or not export_pacing_site: return # Ignore all -1 values, as these points do not belong to any pacing site for site_index in np.unique(case.fields.pacing_site)[1:]: - + pacing_site_points = np.nonzero(case.fields.pacing_site == site_index)[0] n_points = pacing_site_points.size @@ -169,10 +172,61 @@ def export_openCARP( ) +def export_vtx( + case: Case, + prefix: str, + pacing_site_internal_name: str +): + """ + Export vertex indices for a specified pacing site to a .vtx file. + + Args: + - case (Case): The case object containing the pacing site and landmark information. + - prefix (str): The file path prefix for saving the output .vtx file. + - pacing_site_internal_name (str): The tag of the pacing site to export. + + Returns: + - pathlib.Path: The path to the created .vtx file, or None if the pacing site is not found. + + Raises: + - IndexError: If the specified pacing site name is not found within the case landmarks. + """ + # TODO: Need to improve indexing method to find pacing site from landmarks, + # currently finds pacing sites using tag string. + + output_path = pathlib.Path(prefix).resolve() + + if case.fields.pacing_site is None: + return + + # This extracts the pacing sites from all landmarks by checking if the tag is of the form "Pacing site 1" + landmarks = case.electric.landmark_points + try: + landmark_index = np.where(landmarks.internal_names == pacing_site_internal_name)[0][0] + except IndexError: + raise IndexError(f"Pacing site: Expecting {landmarks.internal_names}, received \"{pacing_site_internal_name}\".") + + landmark_name = landmarks.names[landmark_index] + site_index = int(landmark_name.replace('Pacing site ', '')) + + pacing_site_points = np.nonzero(case.fields.pacing_site == site_index)[0] + n_points = pacing_site_points.size + + output_path_with_suffix = output_path.with_name(f'{output_path.name}_{pacing_site_internal_name}.vtx') + + np.savetxt( + output_path_with_suffix, + pacing_site_points, + header=f'{n_points}\nintra', + comments='', + fmt='%d', + ) + return output_path_with_suffix + + def export_openep_mat( case: Case, filename: str, - separate_regions: bool = False, ): """Export data in OpenEP format. @@ -189,10 +243,24 @@ def export_openep_mat( indices=case.indices, fields=case.fields, ) + userdata['surface'] = _add_surface_maps( + surface_data=userdata['surface'], + cv_field=case.fields.conduction_velocity, + divergence_field=case.fields.cv_divergence + ) userdata['electric'] = _extract_electric_data(electric=case.electric) - userdata['rf'] = _export_ablation_data(ablation=case.ablation) + if case.analyse.conduction_velocity.values is not None: + userdata['electric'] = _add_electric_signal_props( + electric_data=userdata['electric'], + conduction_velocity=case.analyse.conduction_velocity, + divergence=case.analyse.divergence + ) + + if case.ablation is not None: + userdata['rf'] = _export_ablation_data(ablation=case.ablation) + scipy.io.savemat( file_name=filename, mdict={'userdata': userdata}, @@ -202,6 +270,112 @@ def export_openep_mat( ) +def _add_surface_maps(surface_data, **kwargs): + cv_field = kwargs.get('cv_field') + div_field = kwargs.get('divergence_field') + + if not surface_data.get('signalMaps'): + surface_data['signalMaps'] = {} + + # TODO: connect propSetting from user setting in UI + if cv_field is not None: + surface_data['signalMaps']['conduction_velocity_field'] = { + 'name': 'Conduction Velocity Field', + 'value': cv_field, + 'propSettings': {}, + } + + if div_field is not None: + surface_data['signalMaps']['divergence_field'] = { + 'name': 'Divergence Field', + 'value': div_field, + 'propSettings': {}, + } + + return surface_data + + +def export_csv( + system, + filename: str, + selections: dict, +): + + """Export data in CSV format. + + Saves selected field data. + + Args: + system (model.system_manager.System): System with dataset to be exported + filename (str): name of file to be written + selections (dict): the data field name and flag whether to export or not + """ + case = system.case + mesh = system.mesh + + point_region = _convert_cell_to_point( + cell_data=case.fields.cell_region, + mesh=mesh + ) + + available_exports = { + 'Bipolar voltage': case.fields.bipolar_voltage, + 'Unipolar voltage': case.fields.unipolar_voltage, + 'LAT': case.fields.local_activation_time, + 'Force': case.fields.force, + 'Impedance': case.fields.impedance, + 'Thickness': case.fields.thickness, + 'Cell region': point_region, + 'Pacing site': case.fields.pacing_site, + 'Conduction velocity': case.fields.conduction_velocity, + 'Divergence': case.fields.cv_divergence, + 'Histogram': case.fields.histogram, + } + + df = pd.DataFrame() + + for field_name, checked in selections.items(): + if checked: + field_data = available_exports.get(field_name) + if field_data is not None: + df[field_name] = pd.Series(field_data) + else: + df[field_name] = pd.NA + + df.to_csv(filename, index=False, encoding='utf-8') + + +def _add_electric_signal_props(electric_data, **kwargs): + conduction_velocity = kwargs.get('conduction_velocity') + divergence = kwargs.get('divergence') + + if not electric_data.get('annotations'): + electric_data['annotations'] = {} + + if not electric_data['annotations'].get('signalProps'): + electric_data['annotations']['signalProps'] = {} + + signal_props = electric_data['annotations']['signalProps'] + + if conduction_velocity: + signal_props['conduction_velocity'] = { + 'name' : 'Conduction Velocity Values', + 'value': conduction_velocity.values, + } + signal_props['cvX'] = { + 'name' : 'Conduction Velocity Coordinates', + 'value': conduction_velocity.centers, + } + + if divergence: + signal_props['divergence'] = { + 'name' : 'Divergence Values', + 'value': divergence.values, + } + + return electric_data + + def export_vtk( case: Case, filename: str, @@ -409,4 +583,19 @@ def _export_ablation_data(ablation: Ablation): ablation_data['originaldata']['force']['lateralangle'] = ablation.force.lateral_angle if ablation.force.lateral_angle is not None else empty_float_array ablation_data['originaldata']['force']['position'] = ablation.force.points if ablation.force.points is not None else empty_float_array - return ablation_data \ No newline at end of file + return ablation_data + + +def _convert_cell_to_point(cell_data, mesh): + """Convert cell data to point data by applying the cell value to its vertices""" + point_data = np.zeros(mesh.n_points, dtype=int) + for cell_i in range(mesh.n_cells): + + cell_value = cell_data[cell_i] + point_ids = mesh.get_cell(cell_i).point_ids + + # Add cell value to point ids + for pid in point_ids: + point_data[pid] = cell_value + + return point_data