diff --git a/pyomo/contrib/doe/doe.py b/pyomo/contrib/doe/doe.py index ab68ae19814..50c00508a6b 100644 --- a/pyomo/contrib/doe/doe.py +++ b/pyomo/contrib/doe/doe.py @@ -46,11 +46,14 @@ import pyomo.environ as pyo from pyomo.contrib.doe.utils import ( - check_FIM, + check_matrix, compute_FIM_metrics, _SMALL_TOLERANCE_DEFINITENESS, + snake_traversal_grid_sampling, + update_model_from_suffix, ) + from pyomo.opt import SolverStatus @@ -67,6 +70,11 @@ class FiniteDifferenceStep(Enum): backward = "backward" +class SensitivityInitialization(Enum): + snake_traversal = "snake_traversal" + nested_for_loop = "nested_for_loop" + + class DesignOfExperiments: def __init__( self, @@ -621,6 +629,9 @@ def _sequential_FIM(self, model=None): if self.scale_nominal_param_value: scale_factor *= v + # TODO: scale by response values (i.e., measurement values) + # if self.scale_response_values: + # scale_factor /= measurement_vals_np[:, col_1].mean() # Calculate the column of the sensitivity matrix self.seq_jac[:, i] = ( measurement_vals_np[:, col_1] - measurement_vals_np[:, col_2] @@ -1376,7 +1387,7 @@ def check_model_FIM(self, model=None, FIM=None): ) # Check FIM is positive definite and symmetric - check_FIM(FIM) + check_matrix(FIM) self.logger.info( "FIM provided matches expected dimensions from model and is approximately positive (semi) definite." @@ -1611,6 +1622,333 @@ def compute_FIM_full_factorial( return self.fim_factorial_results + def compute_FIM_factorial( + self, + model=None, + design_vals: list = None, + abs_change: list = None, + rel_change: list = None, + n_designs: int = 5, + method="sequential", + df_settings=(True, None, None, 500), + initialization_scheme="snake_traversal", + file_name: str = None, + ): + """Will run a simulation-based factorial exploration of the experimental input + space (i.e., a ``grid search`` or ``parameter sweep``) to understand how the + FIM metrics change as a function of the experimental design space. This method + can be used for both full factorial and fractional factorial designs. + + Parameters + ---------- + model : DoE model, optional + The model to perform the full factorial exploration on. Default: None + design_vals : list, optional + A list of design values to use for the full factorial exploration. If + provided, will use this value to generate the design values and ignore + `abs_change`, `rel_change`, and `n_designs`. Default: None + Default: None + abs_change : list, optional + Absolute change in the design variable values. Default: None. + If provided, will use this value to generate the design values. + If `abs_change` is provided, but `rel_change` is not provided, `rel_change` + will be set to zero. + Formula to calculate the design values: + change_in_value = lower_bound * rel_change + abs_change` + design_value += design_value + change_in_value + rel_change : list, optional + Relative change in the design variable values. Default: None. + If provided, will use this value to generate the design values. + If `rel_change` is provided, but `abs_change` is not provided, `abs_change` + will be set to zero. + n_designs : int, optional + Number of designs to generate for each design variable. Default: 5. + If `abs_change` and/or `rel_change` are provided, this value will be ignored. + method : str, optional + string to specify which method should be used. options are ``kaug`` and + ``sequential`. Default: "sequential" + df_settings : tuple, optional + A tuple containing the settings for set_option() method of the pandas + DataFrame. Default: (True, None, None, 500) + - first element: whether to return a pandas DataFrame (True/False) + - second element: number of max_columns for the DataFrame. Default: None, + i.e., no limit on the number of columns. + - third element: number of max_rows for the DataFrame. Default: None, + i.e., no limit on the number of rows. + - fourth element: display width for the DataFrame. Default: 500. + initialization_scheme : str, optional + Which scheme to use for initializing the design variables. + Options are ``"snake_traversal"`` and ``"nested_for_loop"``. + Default: "snake_traversal" + file_name : str, optional + if provided, will save the results to a json file. Default: None + + Returns + ------- + factorial_results: dict + A dictionary containing the results of the factorial design with the + following keys: + - keys of model's experiment_inputs + - "log10(D-opt)": list of D-optimality values + - "log10(A-opt)": list of A-optimality values + - "log10(E-opt)": list of E-optimality values + - "log10(ME-opt)": list of ME-optimality values + - "eigval_min": list of minimum eigenvalues + - "eigval_max": list of maximum eigenvalues + - "det_FIM": list of determinants of the FIM + - "trace_FIM": list of traces of the FIM + - "solve_time": list of solve times + - "total_points": total number of points in the factorial design + - "success_count": number of successful runs + - "failure_count": number of failed runs + - "FIM_all": list of all FIMs computed for each point in the factorial + """ + + # Start timer + sp_timer = TicTocTimer() + sp_timer.tic(msg=None) + self.logger.info("Beginning Factorial Design.") + + # Make new model for factorial design + self.factorial_model = self.experiment.get_labeled_model( + **self.get_labeled_model_args + ).clone() + model = self.factorial_model + + design_keys = [k for k in model.experiment_inputs.keys()] + + # check if design_values, abs_change and rel_change are provided and have the + # same length as design_keys + # Design values are of higher priority than abs_change and rel_change. + if design_vals is not None: + if len(design_vals) != len(design_keys): + raise ValueError( + "`design_values` must have the same length of " + f"`{len(design_keys)}` as `design_keys`." + ) + design_values = design_vals + + else: + if abs_change: + if len(abs_change) != len(design_keys): + raise ValueError( + "`abs_change` must have the same length of " + f"`{len(design_keys)}` as `design_keys`." + ) + + if rel_change: + if len(rel_change) != len(design_keys): + raise ValueError( + "`rel_change` must have the same length of " + f"`{len(design_keys)}` as `design_keys`." + ) + + # if either abs_change or rel_change is not provided, set it to list of + # zeros + if abs_change or rel_change: + if not abs_change: + abs_change = [0] * len(design_keys) + elif not rel_change: + rel_change = [0] * len(design_keys) + + design_values = [] + # loop over design keys and generate design values + for i, comp in enumerate(design_keys): + lb = comp.lb + ub = comp.ub + # Check if the component has finite lower and upper bounds + if lb is None or ub is None: + raise ValueError( + f"{comp.name} does not have a lower or upper bound." + ) + + if abs_change is None and rel_change is None: + des_val = np.linspace(lb, ub, n_designs) + + # if abs_change and/or rel_change is provided, generate design values + # using the formula: + # change_in_value = lower_bound * rel_change + abs_change + elif abs_change or rel_change: + des_val = [] + del_val = comp.lb * rel_change[i] + abs_change[i] + if del_val == 0: + raise ValueError( + f"Design variable {comp.name} has no change in value - " + "check abs_change and rel_change values." + ) + val = lb + while val <= ub: + des_val.append(val) + val += del_val + + else: + raise ValueError( + "Unexpected error in generating design values. Please check the" + " input parameters." + ) + + design_values.append(des_val) + + # generate the factorial points based on the initialization scheme + try: + scheme_enum = SensitivityInitialization(initialization_scheme) + except ValueError: + self.logger.warning( + f"Initialization scheme '{initialization_scheme}' is not recognized. " + "Using `snake_traversal` as the default initialization scheme." + ) + scheme_enum = SensitivityInitialization.snake_traversal + + if scheme_enum == SensitivityInitialization.snake_traversal: + factorial_points = snake_traversal_grid_sampling(*design_values) + elif scheme_enum == SensitivityInitialization.nested_for_loop: + factorial_points = product(*design_values) + + # TODO: Add more initialization schemes + + factorial_points_list = list(factorial_points) + + factorial_results = {k.name: [] for k in model.experiment_inputs.keys()} + factorial_results.update( + { + "log10(D-opt)": [], + "log10(A-opt)": [], + "log10(E-opt)": [], + "log10(ME-opt)": [], + "eigval_min": [], + "eigval_max": [], + "det_FIM": [], + "trace_FIM": [], + "solve_time": [], + } + ) + + success_count = 0 + failure_count = 0 + total_points = len(factorial_points_list) + + # save the FIM for each point in the factorial design + self.n_parameters = len(model.unknown_parameters) + FIM_all = np.zeros((total_points, self.n_parameters, self.n_parameters)) + + time_set = [] + curr_point = 1 # Initial current point + for design_point in factorial_points_list: + # kept (commented out) the following code to check later whether it will + # be considerably faster for complex models. + # In a simple model, this code took 15.9s to compute 125 points in JN. + # For the same condition, `update_model_from_suffix` took 16.5s in JN + # Fix design variables at fixed experimental design point + # for i in range(len(design_point)): + # design_keys[i].fix(design_point[i]) + + update_model_from_suffix(model, "experiment_inputs", design_point) + + # Timing and logging objects + self.logger.info(f"=======Iteration Number: {curr_point} =======") + iter_timer = TicTocTimer() + iter_timer.tic(msg=None) + + try: + curr_point = success_count + failure_count + 1 + self.logger.info(f"This is run {curr_point} out of {total_points}.") + self.compute_FIM(model=model, method=method) + success_count += 1 + # iteration time + iter_t = iter_timer.toc(msg=None) + time_set.append(iter_t) + + # More logging + self.logger.info( + f"The code has run for {round(sum(time_set), 2)} seconds." + ) + self.logger.info( + "Estimated remaining time: %s seconds", + round( + sum(time_set) / (curr_point) * (total_points - curr_point + 1), + 2, + ), + ) + except: + self.logger.warning( + ":::::::::::Warning: Cannot converge this run.::::::::::::" + ) + failure_count += 1 + self.logger.warning("failed count:", failure_count) + + self._computed_FIM = np.zeros(self.prior_FIM.shape) + + iter_t = iter_timer.toc(msg=None) + time_set.append(iter_t) + + FIM = self._computed_FIM + + # Save FIM for the current design point + FIM_all[curr_point - 1, :, :] = FIM + + # Compute and record metrics on FIM + det_FIM, trace_FIM, E_vals, E_vecs, D_opt, A_opt, E_opt, ME_opt = ( + compute_FIM_metrics(FIM) + ) + + for k in model.experiment_inputs.keys(): + factorial_results[k.name].append(pyo.value(k)) + + factorial_results["log10(D-opt)"].append(D_opt) + factorial_results["log10(A-opt)"].append(A_opt) + factorial_results["log10(E-opt)"].append(E_opt) + factorial_results["log10(ME-opt)"].append(ME_opt) + factorial_results["eigval_min"].append(np.min(E_vals)) + factorial_results["eigval_max"].append(np.max(E_vals)) + factorial_results["det_FIM"].append(det_FIM) + factorial_results["trace_FIM"].append(trace_FIM) + factorial_results["solve_time"].append(time_set[-1]) + + factorial_results.update( + { + "total_points": total_points, + "success_count": success_count, + "failure_count": failure_count, + "FIM_all": FIM_all.tolist(), # Save all FIMs + } + ) + self.factorial_results = factorial_results + + # Set pandas DataFrame options + if df_settings[0]: + with pd.option_context( + "display.max_columns", + df_settings[1], + "display.max_rows", + df_settings[2], + "display.width", + df_settings[3], + ): + exclude_keys = { + "total_points", + "success_count", + "failure_count", + "FIM_all", + } + dict_for_df = { + k: v for k, v in factorial_results.items() if k not in exclude_keys + } + res_df = pd.DataFrame(dict_for_df) + print("\n\n=========Factorial results===========") + print("Total points:", total_points) + print("Success count:", success_count) + print("Failure count:", failure_count) + print("\n") + print(res_df) + + # Save the results to a json file based on the file_name provided + if file_name is not None: + with open(file_name + ".json", "w") as f: + json.dump(self.factorial_results, f, indent=4) + self.logger.info(f"Results saved to {file_name}.json.") + + return self.factorial_results + # TODO: Overhaul plotting functions to not use strings # TODO: Make the plotting functionalities work for >2 design features def draw_factorial_figure( diff --git a/pyomo/contrib/doe/tests/test_doe_errors.py b/pyomo/contrib/doe/tests/test_doe_errors.py index ce77fb4f553..578c3c253c5 100644 --- a/pyomo/contrib/doe/tests/test_doe_errors.py +++ b/pyomo/contrib/doe/tests/test_doe_errors.py @@ -183,7 +183,7 @@ def test_reactor_check_bad_prior_negative_eigenvalue(self): with self.assertRaisesRegex( ValueError, - r"FIM provided is not positive definite. It has one or more negative eigenvalue\(s\) less than -{:.1e}".format( + r"Matrix provided is not positive definite. It has one or more negative eigenvalue\(s\) less than -{:.1e}".format( _SMALL_TOLERANCE_DEFINITENESS ), ): @@ -208,7 +208,7 @@ def test_reactor_check_bad_prior_not_symmetric(self): with self.assertRaisesRegex( ValueError, - "FIM provided is not symmetric using absolute tolerance {}".format( + "Matrix provided is not symmetric using absolute tolerance {}".format( _SMALL_TOLERANCE_SYMMETRY ), ): diff --git a/pyomo/contrib/doe/tests/test_utils.py b/pyomo/contrib/doe/tests/test_utils.py index 47df75c401c..20c63b1060d 100644 --- a/pyomo/contrib/doe/tests/test_utils.py +++ b/pyomo/contrib/doe/tests/test_utils.py @@ -8,13 +8,20 @@ # rights in this software. # This software is distributed under the 3-clause BSD License. # ___________________________________________________________________________ -from pyomo.common.dependencies import numpy as np, numpy_available +from pyomo.common.dependencies import ( + numpy as np, + numpy_available, + pandas as pd, + pandas_available, +) import pyomo.common.unittest as unittest from pyomo.contrib.doe.utils import ( - check_FIM, + check_matrix, compute_FIM_metrics, get_FIM_metrics, + snake_traversal_grid_sampling, + compute_correlation_matrix as compcorr, _SMALL_TOLERANCE_DEFINITENESS, _SMALL_TOLERANCE_SYMMETRY, _SMALL_TOLERANCE_IMG, @@ -22,45 +29,49 @@ @unittest.skipIf(not numpy_available, "Numpy is not available") +@unittest.skipIf(not pandas_available, "Pandas is not available") class TestUtilsFIM(unittest.TestCase): - """Test the check_FIM() from utils.py.""" + """Test the check_matrix() from utils.py.""" - def test_check_FIM_valid(self): + # TODO: add tests when `check_pos_def = False` is used in check_matrix() + def test_check_matrix_valid(self): """Test case where the FIM is valid (square, positive definite, symmetric).""" FIM = np.array([[4, 1], [1, 3]]) try: - check_FIM(FIM) + check_matrix(FIM) except ValueError as e: self.fail(f"Unexpected error: {e}") - def test_check_FIM_non_square(self): + def test_check_matrix_non_square(self): """Test case where the FIM is not square.""" FIM = np.array([[4, 1], [1, 3], [2, 1]]) - with self.assertRaisesRegex(ValueError, "FIM must be a square matrix"): - check_FIM(FIM) + with self.assertRaisesRegex( + ValueError, "argument mat must be a 2D square matrix" + ): + check_matrix(FIM) - def test_check_FIM_non_positive_definite(self): + def test_check_matrix_non_positive_definite(self): """Test case where the FIM is not positive definite.""" FIM = np.array([[1, 0], [0, -2]]) with self.assertRaisesRegex( ValueError, - "FIM provided is not positive definite. It has one or more negative " + "Matrix provided is not positive definite. It has one or more negative " + r"eigenvalue\(s\) less than -{:.1e}".format( _SMALL_TOLERANCE_DEFINITENESS ), ): - check_FIM(FIM) + check_matrix(FIM) - def test_check_FIM_non_symmetric(self): + def test_check_matrix_non_symmetric(self): """Test case where the FIM is not symmetric.""" FIM = np.array([[4, 1], [0, 3]]) with self.assertRaisesRegex( ValueError, - "FIM provided is not symmetric using absolute tolerance {}".format( + "Matrix provided is not symmetric using absolute tolerance {}".format( _SMALL_TOLERANCE_SYMMETRY ), ): - check_FIM(FIM) + check_matrix(FIM) """Test the compute_FIM_metrics() from utils.py.""" @@ -144,6 +155,142 @@ def test_get_FIM_metrics(self): fim_metrics["log10(Modified E-Optimality)"], ME_opt_expected ) + def test_snake_traversal_grid_sampling_errors(self): + # Test the error handling with lists + list_2d_bad = [[1, 2, 3], [4, 5, 6]] + with self.assertRaises(ValueError) as cm: + list(snake_traversal_grid_sampling(list_2d_bad)) + self.assertEqual( + str(cm.exception), "Argument at position 0 is not 1D. Got shape (2, 3)." + ) + + list_2d_wrong_shape_bad = [[1, 2, 3], [4, 5, 6, 7]] + with self.assertRaises(ValueError) as cm: + list(snake_traversal_grid_sampling(list_2d_wrong_shape_bad)) + self.assertEqual( + str(cm.exception), "Argument at position 0 is not 1D array-like." + ) + + # Test the error handling with tuples + tuple_2d_bad = ((1, 2, 3), (4, 5, 6)) + with self.assertRaises(ValueError) as cm: + list(snake_traversal_grid_sampling(tuple_2d_bad)) + self.assertEqual( + str(cm.exception), "Argument at position 0 is not 1D. Got shape (2, 3)." + ) + + tuple_2d_wrong_shape_bad = ((1, 2, 3), (4, 5, 6, 7)) + with self.assertRaises(ValueError) as cm: + list(snake_traversal_grid_sampling(tuple_2d_wrong_shape_bad)) + self.assertEqual( + str(cm.exception), "Argument at position 0 is not 1D array-like." + ) + + # Test the error handling with numpy arrays + array_2d_bad = np.array([[1, 2, 3], [4, 5, 6]]) + with self.assertRaises(ValueError) as cm: + list(snake_traversal_grid_sampling(array_2d_bad)) + self.assertEqual( + str(cm.exception), "Argument at position 0 is not 1D. Got shape (2, 3)." + ) + + def test_snake_traversal_grid_sampling_values(self): + # Test with lists + # Test with a single list + list1 = [1, 2, 3] + result_list1 = list(snake_traversal_grid_sampling(list1)) + expected_list1 = [(1,), (2,), (3,)] + self.assertEqual(result_list1, expected_list1) + + # Test with two lists + list2 = [4, 5, 6] + result_list2 = list(snake_traversal_grid_sampling(list1, list2)) + expected_list2 = [ + (1, 4), + (1, 5), + (1, 6), + (2, 6), + (2, 5), + (2, 4), + (3, 4), + (3, 5), + (3, 6), + ] + self.assertEqual(result_list2, expected_list2) + + # Test with three lists + list3 = [7, 8] + result_list3 = list(snake_traversal_grid_sampling(list1, list2, list3)) + expected_list3 = [ + (1, 4, 7), + (1, 4, 8), + (1, 5, 8), + (1, 5, 7), + (1, 6, 7), + (1, 6, 8), + (2, 6, 8), + (2, 6, 7), + (2, 5, 7), + (2, 5, 8), + (2, 4, 8), + (2, 4, 7), + (3, 4, 7), + (3, 4, 8), + (3, 5, 8), + (3, 5, 7), + (3, 6, 7), + (3, 6, 8), + ] + self.assertEqual(result_list3, expected_list3) + + # Test with tuples + tuple1 = (1, 2, 3) + result_tuple1 = list(snake_traversal_grid_sampling(tuple1)) + tuple2 = (4, 5, 6) + result_tuple2 = list(snake_traversal_grid_sampling(tuple1, tuple2)) + tuple3 = (7, 8) + result_tuple3 = list(snake_traversal_grid_sampling(tuple1, tuple2, tuple3)) + self.assertEqual(result_tuple1, expected_list1) + self.assertEqual(result_tuple2, expected_list2) + self.assertEqual(result_tuple3, expected_list3) + + # Test with numpy arrays + array1 = np.array([1, 2, 3]) + array2 = np.array([4, 5, 6]) + array3 = np.array([7, 8]) + result_array1 = list(snake_traversal_grid_sampling(array1)) + result_array2 = list(snake_traversal_grid_sampling(array1, array2)) + result_array3 = list(snake_traversal_grid_sampling(array1, array2, array3)) + self.assertEqual(result_array1, expected_list1) + self.assertEqual(result_array2, expected_list2) + self.assertEqual(result_array3, expected_list3) + + # Test with mixed types(List, Tuple, numpy array) + result_mixed = list(snake_traversal_grid_sampling(list1, tuple2, array3)) + self.assertEqual(result_mixed, expected_list3) + + # TODO: Add more tests as needed + def test_compute_correlation_matrix(self): + # Create a sample covariance matrix + covariance_matrix = np.array([[4, 2], [2, 3]]) + var_name = ["X1", "X2"] + + # Compute the correlation matrix + correlation_matrix = compcorr(covariance_matrix, var_name) + + # Expected correlation matrix + expected_correlation_matrix = pd.DataFrame( + [[1.0, 0.577], [0.577, 1.0]], index=var_name, columns=var_name + ) + + # Check if the computed correlation matrix matches the expected one + pd.testing.assert_frame_equal( + correlation_matrix, + expected_correlation_matrix, + check_exact=False, + atol=1e-6, + ) + if __name__ == "__main__": unittest.main() diff --git a/pyomo/contrib/doe/utils.py b/pyomo/contrib/doe/utils.py index 26b991a753a..20c93fc73d5 100644 --- a/pyomo/contrib/doe/utils.py +++ b/pyomo/contrib/doe/utils.py @@ -27,7 +27,7 @@ import pyomo.environ as pyo -from pyomo.common.dependencies import numpy as np, numpy_available +from pyomo.common.dependencies import numpy as np, numpy_available, pandas as pd from pyomo.core.base.param import ParamData from pyomo.core.base.var import VarData @@ -125,41 +125,83 @@ def rescale_FIM(FIM, param_vals): # return param_list -def check_FIM(FIM): +# Adding utility to update parameter values in a model based on the suffix +def update_model_from_suffix(suffix_obj: pyo.Suffix, values): """ - Checks that the FIM is square, positive definite, and symmetric. + Overwrite each variable/parameter referenced by ``suffix_obj`` with the + corresponding value in ``values``. Parameters ---------- - FIM: 2D numpy array representing the FIM + suffix_obj : pyomo.core.base.suffix.Suffix + The suffix whose *keys* are the components you want to update. + Call like ``update_from_suffix(model.unknown_parameters, vals)``. + values : iterable of numbers + New numerical values for the components referenced by the suffix. + Must be the same length as ``suffix_obj``. + """ + # Check that the length of values matches the suffix length + items = list(suffix_obj.items()) + if len(items) != len(values): + raise ValueError("values length does not match suffix length") + + # Iterate through the items in the suffix and update their values + # Note: items are tuples of (component, suffix_value) + for (comp, _), new_val in zip(items, values): + + # update the component value + # Check if the component is a VarData or ParamData + if isinstance(comp, (VarData, ParamData)): + comp.set_value(new_val) + else: + raise TypeError( + f"Unsupported component type {type(comp)}; expected VarData or ParamData." + ) + + +def check_matrix(mat, check_pos_def=True): + """ + Checks that the matrix is square, positive definite, and symmetric. + + Parameters + ---------- + mat: 2D numpy array representing the matrix + check_pos_def: bool, optional + If True, checks if the matrix is positive definite. + Default: True. Returns ------- None, but will raise error messages as needed """ - # Check that the FIM is a square matrix - if FIM.shape[0] != FIM.shape[1]: - raise ValueError("FIM must be a square matrix") + # Check that the matrix is a square matrix + if mat.ndim != 2 or mat.shape[0] != mat.shape[1]: + raise ValueError("argument mat must be a 2D square matrix") - # Compute the eigenvalues of the FIM - evals = np.linalg.eigvals(FIM) + if check_pos_def: + # Compute the eigenvalues of the matrix + evals = np.linalg.eigvals(mat) - # Check if the FIM is positive definite - if np.min(evals) < -_SMALL_TOLERANCE_DEFINITENESS: - raise ValueError( - "FIM provided is not positive definite. It has one or more negative " - + "eigenvalue(s) less than -{:.1e}".format(_SMALL_TOLERANCE_DEFINITENESS) - ) + # Check if the matrix is positive definite + if np.min(evals) < -_SMALL_TOLERANCE_DEFINITENESS: + raise ValueError( + "Matrix provided is not positive definite. It has one or more negative " + + "eigenvalue(s) less than -{:.1e}".format( + _SMALL_TOLERANCE_DEFINITENESS + ) + ) - # Check if the FIM is symmetric - if not np.allclose(FIM, FIM.T, atol=_SMALL_TOLERANCE_SYMMETRY): + # Check if the matrix is symmetric + if not np.allclose(mat, mat.T, atol=_SMALL_TOLERANCE_SYMMETRY): raise ValueError( - "FIM provided is not symmetric using absolute tolerance {}".format( + "Matrix provided is not symmetric using absolute tolerance {}".format( _SMALL_TOLERANCE_SYMMETRY ) ) +# TODO: probably can merge compute_FIM_metrics() and get_FIM_metrics() in a single + +# TODO: function with an argument (e.g., return_dict = True) to compute the metrics # Functions to compute FIM metrics def compute_FIM_metrics(FIM): """ @@ -191,7 +233,7 @@ def compute_FIM_metrics(FIM): """ # Check whether the FIM is square, positive definite, and symmetric - check_FIM(FIM) + check_matrix(FIM) # Compute FIM metrics det_FIM = np.linalg.det(FIM) @@ -266,3 +308,104 @@ def get_FIM_metrics(FIM): "log10(E-Optimality)": E_opt, "log10(Modified E-Optimality)": ME_opt, } + + +def snake_traversal_grid_sampling(*array_like_args): + """ + Generates a multi-dimensional pattern for an arbitrary number of 1D array-like arguments. + This pattern is useful for generating patterns for sensitivity analysis when we want + to change one variable at a time. This function uses recursion and acts as a generator. + + Parameters + ---------- + *array_like_args: A variable number of 1D array-like objects as arguments. + + Yields + ------ + A tuple representing points in the snake pattern. + """ + # throw an error if the array_like_args are not array-like or all 1D + for i, arg in enumerate(array_like_args): + try: + arr_np = np.asarray(arg) + except Exception: + raise ValueError(f"Argument at position {i} is not 1D array-like.") + + if arr_np.ndim != 1: + raise ValueError( + f"Argument at position {i} is not 1D. Got shape {arr_np.shape}." + ) + + # The main logic is in a nested recursive helper function. + def _generate_recursive(depth, index_sum): + # Base case: If we've processed all lists, we're at the end of a path. + if depth == len(array_like_args): + yield () + return + + current_list = array_like_args[depth] + + # Determine the iteration direction based on the sum of parent indices. + # This is the mathematical rule for the zigzag. + is_forward = index_sum % 2 == 0 + iterable = current_list if is_forward else reversed(current_list) + + # Enumerate to get the index `i` for the *next* recursive call's sum. + for i, value in enumerate(iterable): + # Recur for the next list, updating the index_sum. + for sub_pattern in _generate_recursive(depth + 1, index_sum + i): + # Prepend the current value to the results from deeper levels. + yield (value,) + sub_pattern + + # Start the recursion at the first list (depth 0) with an initial sum of 0. + yield from _generate_recursive(depth=0, index_sum=0) + + +def compute_correlation_matrix( + covariance_matrix, var_name: list = None, significant_digits=3 +): + """ + Computes the correlation matrix from a covariance matrix. + + Parameters + ---------- + covariance_matrix : numpy.ndarray + 2D array representing the covariance matrix. + var_name : list, optional + List of variable names corresponding to the rows/columns of the covariance matrix. + significant_digits : int, optional + Number of significant digits to round the correlation matrix to. Default: 3. + + Returns + ------- + pandas.DataFrame/numpy.ndarray + If `var_name` is provided, returns a pandas DataFrame with the correlation matrix + and the specified variable names as both index and columns. If `var_name` is not + provided, returns a numpy array representing the correlation matrix. + """ + # Check if covariance matrix is symmetric and square + check_matrix(covariance_matrix, check_pos_def=False) + + if var_name: + assert len(var_name) == covariance_matrix.shape[0], ( + "Length of var_name must match the number of rows/columns in the " + "covariance matrix." + ) + + if not np.all(np.isfinite(covariance_matrix)): + raise ValueError("Covariance matrix contains non-finite values.") + + std_dev = np.sqrt(np.diag(covariance_matrix)) + + std_dev_matrix = np.outer(std_dev, std_dev) + + correlation_matrix = covariance_matrix / std_dev_matrix + + # Set the index to the variable names if provided, + corr_df = ( + pd.DataFrame(correlation_matrix, index=var_name, columns=var_name) + if var_name + else correlation_matrix + ) + + return corr_df.round(significant_digits) if significant_digits else corr_df