diff --git a/pyomo/gdp/plugins/bigm_mixin.py b/pyomo/gdp/plugins/bigm_mixin.py index 4899551f918..db196d8b72f 100644 --- a/pyomo/gdp/plugins/bigm_mixin.py +++ b/pyomo/gdp/plugins/bigm_mixin.py @@ -9,12 +9,16 @@ # This software is distributed under the 3-clause BSD License. # ___________________________________________________________________________ +import logging + from pyomo.gdp import GDP_Error from pyomo.common.collections import ComponentSet from pyomo.contrib.fbbt.expression_bounds_walker import ExpressionBoundsVisitor import pyomo.contrib.fbbt.interval as interval from pyomo.core import Suffix +logger = logging.getLogger(__name__) + def _convert_M_to_tuple(M, constraint, disjunct=None): if not isinstance(M, (tuple, list)): @@ -23,7 +27,7 @@ def _convert_M_to_tuple(M, constraint, disjunct=None): else: try: M = (-M, M) - except: + except Exception: logger.error( "Error converting scalar M-value %s " "to (-M,M). Is %s not a numeric type?" % (M, type(M)) diff --git a/pyomo/gdp/plugins/multiple_bigm.py b/pyomo/gdp/plugins/multiple_bigm.py index b1e2581beca..81d91a467e0 100644 --- a/pyomo/gdp/plugins/multiple_bigm.py +++ b/pyomo/gdp/plugins/multiple_bigm.py @@ -11,32 +11,36 @@ import itertools import logging +import math +import multiprocessing +import os +import threading +import enum from pyomo.common.collections import ComponentMap, ComponentSet -from pyomo.common.config import ConfigDict, ConfigValue +from pyomo.common.config import ( + ConfigDict, + ConfigValue, + InEnum, + PositiveInt, + document_kwargs_from_configdict, +) from pyomo.common.gc_manager import PauseGC from pyomo.common.modeling import unique_component_name +from pyomo.common.dependencies import dill, dill_available from pyomo.core import ( - Any, - Binary, Block, - BooleanVar, - Connector, + ConcreteModel, Constraint, - Expression, - ExternalFunction, maximize, minimize, NonNegativeIntegers, Objective, - Param, - Set, - SetOf, SortComponents, Suffix, value, - Var, + Any, ) from pyomo.core.base import Reference, TransformationFactory import pyomo.core.expr as EXPR @@ -49,14 +53,15 @@ _warn_for_unused_bigM_args, ) from pyomo.gdp.plugins.gdp_to_mip_transformation import GDP_to_MIP_Transformation -from pyomo.gdp.transformed_disjunct import _TransformedDisjunct -from pyomo.gdp.util import get_gdp_tree, _to_dict -from pyomo.network import Port +from pyomo.gdp.util import _to_dict from pyomo.opt import SolverFactory, TerminationCondition +from pyomo.contrib.solver.common.base import SolverBase as NewSolverBase +from pyomo.contrib.solver.common.base import LegacySolverWrapper from pyomo.repn import generate_standard_repn from weakref import ref as weakref_ref + logger = logging.getLogger('pyomo.gdp.mbigm') _trusted_solvers = { @@ -71,15 +76,37 @@ 'highs', } +# Whether these are thread-local or at module scope makes no difference +# for 'spawn' or 'forkserver', but it matters if we run single-threaded, +# and possibly for edge cases of 'fork' (although it's not correct to +# use fork() here while having multiple threads anyway, making it moot +# in theory). +_thread_local = threading.local() +_thread_local.solver = None +_thread_local.model = None +_thread_local.config_use_primal_bound = None +_thread_local.in_progress = False + def Solver(val): if isinstance(val, str): return SolverFactory(val) if not hasattr(val, 'solve'): raise ValueError("Expected a string or solver object (with solve() method)") + if isinstance(val, NewSolverBase) and not isinstance(val, LegacySolverWrapper): + raise ValueError( + "Please pass an old-style solver object, using the " + "LegacySolverWrapper mechanism if necessary." + ) return val +class ProcessStartMethod(str, enum.Enum): + spawn = 'spawn' + fork = 'fork' + forkserver = 'forkserver' + + @TransformationFactory.register( 'gdp.mbigm', doc="Relax disjunctive model using big-M terms specific to each disjunct", @@ -217,6 +244,54 @@ class MultipleBigMTransformation(GDP_to_MIP_Transformation, _BigM_MixIn): """, ), ) + CONFIG.declare( + 'threads', + ConfigValue( + default=None, + domain=PositiveInt, + description="Number of worker processes to use when estimating M values", + doc=""" + If not specified, use up to the number of available cores, minus + one. If set to 1, do not spawn processes, and revert to + single-threaded operation. + """, + ), + ) + CONFIG.declare( + 'process_start_method', + ConfigValue( + default=None, + domain=InEnum(ProcessStartMethod), + description="Start method used for spawning processes during M calculation", + doc=""" + Options are the elements of the enum ProcessStartMethod, or equivalently the + strings 'fork', 'spawn', or 'forkserver', or None. See the Python + multiprocessing documentation for a full description of each of these. When + None is passed, we determine a reasonable default. On POSIX, the default is + 'fork', unless we detect that Python has multiple threads at the time the + process pool is created, in which case we instead use 'forkserver'. On + Windows, the default and only possible option is 'spawn'. Note that if + 'spawn' or 'forkserver' are selected, we depend on the `dill` module for + pickling, and model instances must be pickleable using `dill`. This option is + ignored if `threads` is set to 1. + """, + ), + ) + CONFIG.declare( + 'use_primal_bound', + ConfigValue( + default=False, + domain=bool, + description="When estimating M values, use the primal bound " + "instead of the dual bound.", + doc=""" + This is necessary when using a local solver such as ipopt, but be + aware that interior feasible points for this subproblem do not give + valid values for M. That is, in the presence of numerical error, + this option will lead to a slightly wrong reformulation. + """, + ), + ) transformation_name = 'mbigm' def __init__(self): @@ -225,7 +300,19 @@ def __init__(self): self._set_up_expr_bound_visitor() self.handlers[Suffix] = self._warn_for_active_suffix + @document_kwargs_from_configdict(CONFIG) + def apply_to(self, model, **kwds): + """ + Apply the transformation to the given model. + """ + return super().apply_to(model, **kwds) + def _apply_to(self, instance, **kwds): + # check for the rather implausible error case that + # solver.solve() is a metasolver that indirectly calls this + # transformation again + if _thread_local.in_progress: + raise GDP_Error("gdp.mbigm transformation cannot be called recursively") self.used_args = ComponentMap() with PauseGC(): try: @@ -236,8 +323,13 @@ def _apply_to(self, instance, **kwds): self._arg_list.clear() self._expr_bound_visitor.leaf_bounds.clear() self._expr_bound_visitor.use_fixed_var_values_as_bounds = False + _thread_local.model = None + _thread_local.solver = None + _thread_local.config_use_primal_bound = None + _thread_local.in_progress = False def _apply_to_impl(self, instance, **kwds): + _thread_local.in_progress = True self._process_arguments(instance, **kwds) if self._config.assume_fixed_vars_permanent: self._bound_visitor.use_fixed_var_values_as_bounds = True @@ -271,89 +363,260 @@ def _apply_to_impl(self, instance, **kwds): gdp_tree = self._get_gdp_tree_from_targets(instance, targets) preprocessed_targets = gdp_tree.reverse_topological_sort() - for t in preprocessed_targets: - if t.ctype is Disjunction: - self._transform_disjunctionData( - t, - t.index(), - parent_disjunct=gdp_tree.parent(t), - root_disjunct=gdp_tree.root_disjunct(t), - ) + arg_Ms = self._config.bigM if self._config.bigM is not None else {} + self._transform_disjunctionDatas( + instance, preprocessed_targets, arg_Ms, gdp_tree + ) # issue warnings about anything that was in the bigM args dict that we # didn't use _warn_for_unused_bigM_args(self._config.bigM, self.used_args, logger) - def _transform_disjunctionData(self, obj, index, parent_disjunct, root_disjunct): - if root_disjunct is not None: - # We do not support nested because, unlike in regular bigM, the - # constraints are not fully relaxed when the exactly-one constraint - # is not enforced. (For example, in this model: [1 <= x <= 3, [1 <= - # y <= 5] v [6 <= y <= 10]] v [5 <= x <= 10, 15 <= y <= 20]), we - # would need to put the relaxed inner-disjunction constraints on the - # parent Disjunct and process them again. This means the order in - # which we transformed Disjuncts would change the calculated M - # values. This is crazy, so we skip it. - raise GDP_Error( - "Found nested Disjunction '%s'. The multiple bigm " - "transformation does not support nested GDPs. " - "Please flatten the model before calling the " - "transformation" % obj.name - ) - - if not obj.xor: - # This transformation assumes it can relax constraints assuming that - # another Disjunct is chosen. If it could be possible to choose both - # then that logic might fail. - raise GDP_Error( - "Cannot do multiple big-M reformulation for " - "Disjunction '%s' with OR constraint. " - "Must be an XOR!" % obj.name - ) - - (transBlock, algebraic_constraint) = self._setup_transform_disjunctionData( - obj, root_disjunct - ) - - ## Here's the logic for the actual transformation + def _transform_disjunctionDatas( + self, instance, preprocessed_targets, arg_Ms, gdp_tree + ): + # We wish we could do this one Disjunction at a time, but we + # also want to calculate the Ms in parallel. So we first iterate + # the Disjunctions once to get a list of M calculation jobs, + # then we calculate the Ms in parallel, then we return to a + # single thread and iterate Disjunctions again to actually + # transform the constraints. + + # To-do list in form (constraint, other_disjunct, unsuccessful_message, is_upper) + jobs = [] + # map Disjunction -> set of its active Disjuncts + active_disjuncts = ComponentMap() + # set of Constraints processed during special handling of bound + # constraints: we will deactivate these, but not until we're + # done calculating Ms + transformed_constraints = set() + # Finished M values. If we are only doing the bound constraints, + # we will skip all the calculation steps and pass these through + # to transform_constraints. + Ms = {} + if self._config.only_mbigm_bound_constraints: + Ms = arg_Ms + # Disjunctions and their setup components. We will return to these after + # calculating Ms. + disjunction_setup = {} - arg_Ms = self._config.bigM if self._config.bigM is not None else {} + for t in preprocessed_targets: + if t.ctype is Disjunction: + if gdp_tree.root_disjunct(t) is not None: + # We do not support nested because, unlike in regular bigM, the + # constraints are not fully relaxed when the exactly-one constraint + # is not enforced. (For example, in this model: [1 <= x <= 3, [1 <= + # y <= 5] v [6 <= y <= 10]] v [5 <= x <= 10, 15 <= y <= 20]), we + # would need to put the relaxed inner-disjunction constraints on the + # parent Disjunct and process them again. This means the order in + # which we transformed Disjuncts would change the calculated M + # values. This is crazy, so we skip it. + raise GDP_Error( + "Found nested Disjunction '%s'. The multiple bigm " + "transformation does not support nested GDPs. " + "Please flatten the model before calling the " + "transformation" % t.name + ) + if not t.xor: + # This transformation assumes it can relax constraints assuming that + # another Disjunct is chosen. If it could be possible to choose both + # then that logic might fail. + raise GDP_Error( + "Cannot do multiple big-M reformulation for " + "Disjunction '%s' with OR constraint. " + "Must be an XOR!" % t.name + ) - # ESJ: I am relying on the fact that the ComponentSet is going to be - # ordered here, but using a set because I will remove infeasible - # Disjuncts from it if I encounter them calculating M's. - active_disjuncts = ComponentSet(disj for disj in obj.disjuncts if disj.active) - # First handle the bound constraints if we are dealing with them - # separately - transformed_constraints = set() - if self._config.reduce_bound_constraints: - transformed_constraints = self._transform_bound_constraints( - active_disjuncts, transBlock, arg_Ms + # start doing transformation + disjunction_setup[t] = (transBlock, algebraic_constraint) = ( + self._setup_transform_disjunctionData(t, gdp_tree.root_disjunct(t)) + ) + # Unlike python set(), ComponentSet keeps a stable + # ordering, so we use it for the sake of determinism. + active_disjuncts[t] = ComponentSet( + disj for disj in t.disjuncts if disj.active + ) + # this method returns the constraints transformed on this disjunct; + # update because we are saving these from all disjuncts for later + if self._config.reduce_bound_constraints: + transformed_constraints.update( + self._transform_bound_constraints( + active_disjuncts[t], transBlock, arg_Ms + ) + ) + # Get the jobs to calculate missing M values for this Disjunction. We + # skip this if we are only doing bound constraints, in which case Ms was + # already set. + if not self._config.only_mbigm_bound_constraints: + self._setup_jobs_for_disjunction( + t, + active_disjuncts, + transformed_constraints, + arg_Ms, + Ms, + jobs, + transBlock, + ) + # (Now exiting the DisjunctionDatas loop) + if jobs: + jobs_by_name = [ + ( + constraint.getname(fully_qualified=True), + other_disjunct.getname(fully_qualified=True), + unsuccessful_solve_msg, + is_upper, + ) + for ( + constraint, + other_disjunct, + unsuccessful_solve_msg, + is_upper, + ) in jobs + ] + threads = ( + self._config.threads + if self._config.threads is not None + # It would be better to use len(os.sched_getaffinity(0)), + # but it is not available on all platforms. + else os.cpu_count() - 1 ) - - Ms = arg_Ms - if not self._config.only_mbigm_bound_constraints: - Ms = transBlock.calculated_missing_m_values = ( - self._calculate_missing_M_values( - active_disjuncts, arg_Ms, transBlock, transformed_constraints + if threads > 1: + with self._setup_pool(threads, instance, len(jobs)) as pool: + results = pool.starmap(func=_calc_M, iterable=jobs_by_name) + pool.close() + pool.join() + else: + _thread_local.model = instance + _thread_local.solver = self._config.solver + _thread_local.config_use_primal_bound = self._config.use_primal_bound + logger.info(f"Running {len(jobs)} jobs single-threaded.") + results = itertools.starmap(_calc_M, jobs_by_name) + deactivated = set() + for (constraint, other_disjunct, _, is_upper), ( + M, + disjunct_infeasible, + ) in zip(jobs, results): + if disjunct_infeasible: + if other_disjunct not in deactivated: + # If we made it here without an exception, the solver is on the + # trusted solvers list + logger.debug( + "Disjunct '%s' is infeasible, deactivating." + % other_disjunct.name + ) + other_disjunct.deactivate() + active_disjuncts[gdp_tree.parent(other_disjunct)].remove( + other_disjunct + ) + deactivated.add(other_disjunct) + # Note that we can't just transform immediately because we might be + # waiting on the other one of upper_M or lower_M. + if is_upper: + Ms[constraint, other_disjunct] = ( + Ms[constraint, other_disjunct][0], + M, + ) + else: + Ms[constraint, other_disjunct] = ( + M, + Ms[constraint, other_disjunct][1], + ) + transBlock._mbm_values[constraint, other_disjunct] = Ms[ + constraint, other_disjunct + ] + + for con in transformed_constraints: + con.deactivate() + + # Iterate the Disjunctions again and actually transform them + for disjunction, ( + transBlock, + algebraic_constraint, + ) in disjunction_setup.items(): + or_expr = 0 + for disjunct in active_disjuncts[disjunction]: + or_expr += disjunct.indicator_var.get_associated_binary() + self._transform_disjunct( + disjunct, transBlock, active_disjuncts[disjunction], Ms ) + algebraic_constraint.add(disjunction.index(), or_expr == 1) + # map the DisjunctionData to its XOR constraint to mark it as + # transformed + disjunction._algebraic_constraint = weakref_ref( + algebraic_constraint[disjunction.index()] ) + disjunction.deactivate() + + def _setup_jobs_for_disjunction( + self, + disjunction, + active_disjuncts, + transformed_constraints, + arg_Ms, + Ms, + jobs, + transBlock, + ): + """ + Do the inner work setting up or skipping M calculation jobs for a + disjunction. Mutate the parameters Ms, jobs, transBlock._mbm_values, + and self.used_args. + + Args: + self: self.used_args map from (constraint, disjunct) to M tuple + updated with the keys used from arg_Ms + disjunction: disjunction to set up the jobs for + active_disjuncts: map from disjunctions to ComponentSets of active + disjuncts + arg_Ms: user-provided map from (constraint, disjunct) to M value + or tuple + Ms: working map from (constraint, disjunct) to M tuples to update + jobs: working list of (constraint, other_disjunct, + unsuccessful_solve_msg, is_upper) job tuples to update + transBlock: working transformation block. Update the + transBlock._mbigm_values map from (constraint, disjunct) to + M tuples + """ + for disjunct, other_disjunct in itertools.product( + active_disjuncts[disjunction], active_disjuncts[disjunction] + ): + if disjunct is other_disjunct: + continue - # Now we can deactivate the constraints we deferred, so that we don't - # re-transform them - for cons in transformed_constraints: - cons.deactivate() + for constraint in disjunct.component_data_objects( + Constraint, + active=True, + descend_into=Block, + sort=SortComponents.deterministic, + ): + if constraint in transformed_constraints: + continue + # First check args + if (constraint, other_disjunct) in arg_Ms: + (lower_M, upper_M) = _convert_M_to_tuple( + arg_Ms[constraint, other_disjunct], constraint, other_disjunct + ) + self.used_args[constraint, other_disjunct] = (lower_M, upper_M) + else: + (lower_M, upper_M) = (None, None) + unsuccessful_solve_msg = ( + "Unsuccessful solve to calculate M value to " + "relax constraint '%s' on Disjunct '%s' when " + "Disjunct '%s' is selected." + % (constraint.name, disjunct.name, other_disjunct.name) + ) - or_expr = 0 - for disjunct in active_disjuncts: - or_expr += disjunct.indicator_var.get_associated_binary() - self._transform_disjunct(disjunct, transBlock, active_disjuncts, Ms) - algebraic_constraint.add(index, or_expr == 1) - # map the DisjunctionData to its XOR constraint to mark it as - # transformed - obj._algebraic_constraint = weakref_ref(algebraic_constraint[index]) + if constraint.lower is not None and lower_M is None: + jobs.append( + (constraint, other_disjunct, unsuccessful_solve_msg, False) + ) + if constraint.upper is not None and upper_M is None: + jobs.append( + (constraint, other_disjunct, unsuccessful_solve_msg, True) + ) - obj.deactivate() + Ms[constraint, other_disjunct] = (lower_M, upper_M) + transBlock._mbm_values[constraint, other_disjunct] = (lower_M, upper_M) def _transform_disjunct(self, obj, transBlock, active_disjuncts, Ms): # We've already filtered out deactivated disjuncts, so we know obj is @@ -560,6 +823,53 @@ def _transform_bound_constraints(self, active_disjuncts, transBlock, Ms): return transformed_constraints + def _setup_pool(self, threads, instance, num_jobs): + method = ( + self._config.process_start_method + if self._config.process_start_method is not None + else ( + ProcessStartMethod.spawn + if os.name == 'nt' + else ( + ProcessStartMethod.forkserver + if len(threading.enumerate()) > 1 + else ProcessStartMethod.fork + ) + ) + ) + logger.info( + f"Running {num_jobs} jobs on {threads} worker " + f"processes with process start method {method}." + ) + if ( + method == ProcessStartMethod.spawn + or method == ProcessStartMethod.forkserver + ): + if not dill_available: + raise GDP_Error( + "Dill is required when spawning processes using " + "methods 'spawn' or 'forkserver', but it could " + "not be imported." + ) + pool = multiprocessing.get_context(method.value).Pool( + processes=threads, + initializer=_setup_spawn, + initargs=( + dill.dumps(instance), + dill.dumps(self._config.solver.__class__), + dill.dumps(self._config.solver.options), + self._config.use_primal_bound, + ), + ) + elif method == ProcessStartMethod.fork: + _thread_local.model = instance + _thread_local.solver = self._config.solver + _thread_local.config_use_primal_bound = self._config.use_primal_bound + pool = multiprocessing.get_context('fork').Pool( + processes=threads, initializer=_setup_fork, initargs=() + ) + return pool + def _add_transformation_block(self, to_block): transBlock, new_block = super()._add_transformation_block(to_block) @@ -571,149 +881,6 @@ def _add_transformation_block(self, to_block): ) return transBlock, new_block - def _get_all_var_objects(self, active_disjuncts): - # This is actually a general utility for getting all Vars that appear in - # active Disjuncts in a Disjunction. - seen = set() - for disj in active_disjuncts: - for constraint in disj.component_data_objects( - Constraint, - active=True, - sort=SortComponents.deterministic, - descend_into=Block, - ): - for var in EXPR.identify_variables(constraint.expr, include_fixed=True): - if id(var) not in seen: - seen.add(id(var)) - yield var - - def _calculate_missing_M_values( - self, active_disjuncts, arg_Ms, transBlock, transformed_constraints - ): - scratch_blocks = {} - all_vars = list(self._get_all_var_objects(active_disjuncts)) - for disjunct, other_disjunct in itertools.product( - active_disjuncts, active_disjuncts - ): - if disjunct is other_disjunct: - continue - elif id(other_disjunct) in scratch_blocks: - scratch = scratch_blocks[id(other_disjunct)] - else: - scratch = scratch_blocks[id(other_disjunct)] = Block() - other_disjunct.add_component( - unique_component_name(other_disjunct, "scratch"), scratch - ) - scratch.obj = Objective(expr=0) # placeholder, but I want to - # take the name before I add a - # bunch of random reference - # objects. - - # If the writers don't assume Vars are declared on the Block - # being solved, we won't need this! - for v in all_vars: - ref = Reference(v) - scratch.add_component(unique_component_name(scratch, v.name), ref) - - for constraint in disjunct.component_data_objects( - Constraint, - active=True, - descend_into=Block, - sort=SortComponents.deterministic, - ): - if constraint in transformed_constraints: - continue - # First check args - if (constraint, other_disjunct) in arg_Ms: - (lower_M, upper_M) = _convert_M_to_tuple( - arg_Ms[constraint, other_disjunct], constraint, other_disjunct - ) - self.used_args[constraint, other_disjunct] = (lower_M, upper_M) - else: - (lower_M, upper_M) = (None, None) - unsuccessful_solve_msg = ( - "Unsuccessful solve to calculate M value to " - "relax constraint '%s' on Disjunct '%s' when " - "Disjunct '%s' is selected." - % (constraint.name, disjunct.name, other_disjunct.name) - ) - if constraint.lower is not None and lower_M is None: - # last resort: calculate - if lower_M is None: - scratch.obj.expr = constraint.body - constraint.lower - scratch.obj.sense = minimize - lower_M = self._solve_disjunct_for_M( - other_disjunct, - scratch, - unsuccessful_solve_msg, - active_disjuncts, - ) - if constraint.upper is not None and upper_M is None: - # last resort: calculate - if upper_M is None: - scratch.obj.expr = constraint.body - constraint.upper - scratch.obj.sense = maximize - upper_M = self._solve_disjunct_for_M( - other_disjunct, - scratch, - unsuccessful_solve_msg, - active_disjuncts, - ) - arg_Ms[constraint, other_disjunct] = (lower_M, upper_M) - transBlock._mbm_values[constraint, other_disjunct] = (lower_M, upper_M) - - # clean up the scratch blocks - for blk in scratch_blocks.values(): - blk.parent_block().del_component(blk) - - return arg_Ms - - def _solve_disjunct_for_M( - self, other_disjunct, scratch_block, unsuccessful_solve_msg, active_disjuncts - ): - if not other_disjunct.active: - # If a Disjunct is infeasible, we will discover that and deactivate - # it when we are calculating the M values. We remove that disjunct - # from active_disjuncts inside of the loop in - # _calculate_missing_M_values. So that means that we might have - # deactivated Disjuncts here that we should skip over. - return 0 - - solver = self._config.solver - - results = solver.solve(other_disjunct, load_solutions=False) - if results.solver.termination_condition is TerminationCondition.infeasible: - # [2/18/24]: TODO: After the solver rewrite is complete, we will not - # need this check since we can actually determine from the - # termination condition whether or not the solver proved - # infeasibility or just terminated at local infeasiblity. For now, - # while this is not complete, it catches most of the solvers we - # trust, and, unless someone is so pathological as to *rename* an - # untrusted solver using a trusted solver name, it will never do the - # *wrong* thing. - if any(s in solver.name for s in _trusted_solvers): - logger.debug( - "Disjunct '%s' is infeasible, deactivating." % other_disjunct.name - ) - other_disjunct.deactivate() - active_disjuncts.remove(other_disjunct) - M = 0 - else: - # This is a solver that might report - # 'infeasible' for local infeasibility, so we - # can't deactivate with confidence. To be - # conservative, we'll just complain about - # it. Post-solver-rewrite we will want to change - # this so that we check for 'proven_infeasible' - # and then we can abandon this hack - raise GDP_Error(unsuccessful_solve_msg) - elif results.solver.termination_condition is not TerminationCondition.optimal: - raise GDP_Error(unsuccessful_solve_msg) - else: - other_disjunct.solutions.load_from(results) - M = value(scratch_block.obj.expr) - return M - def _warn_for_active_suffix(self, suffix, disjunct, active_disjuncts, Ms): if suffix.local_name == 'BigM': logger.debug( @@ -775,3 +942,132 @@ def get_all_M_values(self, model): all_ms.update(transBlock._mbm_values) return all_ms + + +# Things we call in subprocesses. These can't be member functions, or +# else we'd have to pickle `self`, which is problematic. +def _setup_spawn(model, solver_class, solver_options, use_primal_bound): + # When using 'spawn' or 'forkserver', Python starts in a new + # environment and executes only this file, so we need to manually + # ensure necessary plugins are registered (even if the main process + # has already registered them). + import pyomo.environ + + _thread_local.model = dill.loads(model) + _thread_local.solver = dill.loads(solver_class)(options=dill.loads(solver_options)) + _thread_local.config_use_primal_bound = use_primal_bound + + +def _setup_fork(): + # model and config_use_primal_bound were already properly set, but + # remake the solver instead of using the passed argument. All these + # processes are copies of the calling thread so the thread-local + # still works. + _thread_local.solver = _thread_local.solver.__class__( + options=_thread_local.solver.options + ) + + +def _calc_M(constraint_name, other_disjunct_name, unsuccessful_message, is_upper): + solver = _thread_local.solver + model = _thread_local.model + scratch = _get_scratch_block( + model.find_component(constraint_name), + model.find_component(other_disjunct_name), + is_upper, + ) + results = solver.solve(scratch, tee=False, load_solutions=False, keepfiles=False) + termination_condition = results.solver.termination_condition + if is_upper: + incumbent = results.problem.lower_bound + bound = results.problem.upper_bound + else: + incumbent = results.problem.upper_bound + bound = results.problem.lower_bound + if termination_condition is TerminationCondition.infeasible: + # [2/18/24]: TODO: After the solver rewrite is complete, we will not + # need this check since we can actually determine from the + # termination condition whether or not the solver proved + # infeasibility or just terminated at local infeasiblity. For now, + # while this is not complete, it catches most of the solvers we + # trust, and, unless someone is so pathological as to *rename* an + # untrusted solver using a trusted solver name, it will never do the + # *wrong* thing. + if any(s in solver.name for s in _trusted_solvers): + return (0, True) + else: + # This is a solver that might report + # 'infeasible' for local infeasibility, so we + # can't deactivate with confidence. To be + # conservative, we'll just complain about + # it. Post-solver-rewrite we will want to change + # this so that we check for 'proven_infeasible' + # and then we can abandon this hack + raise GDP_Error(unsuccessful_message) + elif termination_condition is not TerminationCondition.optimal: + raise GDP_Error(unsuccessful_message) + else: + # NOTE: This transformation can be made faster by allowing the + # solver a gap. As long as we have a bound, it's still valid + # (though not as tight). + # + # We should use the dual bound here. The primal bound is + # mathematically incorrect in the presence of numerical error, + # but it's the best a local solver like ipopt can do, so we + # allow it to be used by setting an option. + if not _thread_local.config_use_primal_bound: + M = bound + if not math.isfinite(M): + raise GDP_Error( + "Subproblem solved to optimality, but no finite dual bound was " + "obtained. If you would like to instead use the best obtained " + "solution, set the option use_primal_bound=True. This is " + "necessary when using a local solver such as ipopt, but be " + "aware that interior feasible points for this subproblem do " + "not give valid values for M in general." + ) + else: + M = incumbent + if not math.isfinite(M): + # Solved to optimality, but we did not find an incumbent + # objective value. Try again by actually loading the + # solution and evaluating the objective expression. + try: + scratch.solutions.load_from(results) + M = value(scratch.obj) + if not math.isfinite(M): + raise ValueError() + except Exception: + raise GDP_Error( + "`use_primal_bound` is enabled, but could not find a finite " + "objective value after optimal solve." + ) + return (M, False) + + +def _get_scratch_block(constraint, other_disjunct, is_upper): + scratch = ConcreteModel() + if is_upper: + scratch.obj = Objective(expr=constraint.body - constraint.upper, sense=maximize) + else: + scratch.obj = Objective(expr=constraint.body - constraint.lower, sense=minimize) + # Create a Block component (via Reference) that actually points to a + # DisjunctData, as we want the writer to write it as if it were an + # ordinary Block rather than getting any special + # handling. DisjunctData inherits BlockData, so this should be + # valid. + scratch.disjunct_ref = Reference(other_disjunct, ctype=Block) + + # Add references to every Var that appears in an active constraint. + # TODO: If the writers don't assume Vars are declared on the Block + # being solved, we won't need this! + seen = set() + for constraint in scratch.component_data_objects( + Constraint, active=True, sort=SortComponents.deterministic, descend_into=Block + ): + for var in EXPR.identify_variables(constraint.expr, include_fixed=True): + if id(var) not in seen: + seen.add(id(var)) + ref = Reference(var) + scratch.add_component(unique_component_name(scratch, var.name), ref) + return scratch diff --git a/pyomo/gdp/tests/test_mbigm.py b/pyomo/gdp/tests/test_mbigm.py index aaa4750681c..37e11251978 100644 --- a/pyomo/gdp/tests/test_mbigm.py +++ b/pyomo/gdp/tests/test_mbigm.py @@ -13,6 +13,7 @@ import logging from os.path import join, normpath import pickle +import os from pyomo.common.fileutils import import_file, PYOMO_ROOT_DIR from pyomo.common.log import LoggingIntercept @@ -44,6 +45,7 @@ from pyomo.gdp.tests.models import make_indexed_equality_model from pyomo.repn import generate_standard_repn + gurobi_available = ( SolverFactory('gurobi').available(exception_flag=False) and SolverFactory('gurobi').license_is_valid() @@ -334,7 +336,7 @@ def check_linear_func_constraints(self, m, mbm, Ms=None): @unittest.skipUnless(gurobi_available, "Gurobi is not available") def test_calculated_Ms_correct(self): # Calculating all the Ms is expensive, so we just do it in this one test - # and then specify them for the others + # and then specify them for most of the others m = self.make_model() mbm = TransformationFactory('gdp.mbigm') mbm.apply_to(m, reduce_bound_constraints=False) @@ -906,6 +908,82 @@ def test_only_multiple_bigm_bound_constraints_arg_Ms(self): m, mbm, {m.d1: (-1050, 1050), m.d2: (-2000, 1200), m.d3: (-4000, 4000)} ) + # A set of tests identical to test_calculated_Ms_correct, except + # that we use each possible process spawning method for + # multiprocessing + @unittest.skipUnless(gurobi_available, "Gurobi is not available") + def test_calculated_Ms_spawn(self): + m = self.make_model() + mbm = TransformationFactory('gdp.mbigm') + mbm.apply_to( + m, reduce_bound_constraints=False, threads=3, process_start_method='spawn' + ) + + self.check_all_untightened_bounds_constraints(m, mbm) + self.check_linear_func_constraints(m, mbm) + + self.assertStructuredAlmostEqual(mbm.get_all_M_values(m), self.get_Ms(m)) + + @unittest.skipUnless(gurobi_available, "Gurobi is not available") + def test_calculated_Ms_singlethreaded(self): + m = self.make_model() + mbm = TransformationFactory('gdp.mbigm') + mbm.apply_to(m, reduce_bound_constraints=False, threads=1) + + self.check_all_untightened_bounds_constraints(m, mbm) + self.check_linear_func_constraints(m, mbm) + + self.assertStructuredAlmostEqual(mbm.get_all_M_values(m), self.get_Ms(m)) + + @unittest.skipUnless(gurobi_available, "Gurobi is not available") + @unittest.skipIf(os.name == 'nt', "'forkserver' is not available on Windows") + def test_calculated_Ms_forkserver(self): + m = self.make_model() + mbm = TransformationFactory('gdp.mbigm') + mbm.apply_to( + m, + reduce_bound_constraints=False, + threads=3, + process_start_method='forkserver', + ) + + self.check_all_untightened_bounds_constraints(m, mbm) + self.check_linear_func_constraints(m, mbm) + + self.assertStructuredAlmostEqual(mbm.get_all_M_values(m), self.get_Ms(m)) + + @unittest.skipUnless(gurobi_available, "Gurobi is not available") + @unittest.skipIf(os.name == 'nt', "'fork' is not available on Windows") + def test_calculated_Ms_fork(self): + m = self.make_model() + mbm = TransformationFactory('gdp.mbigm') + mbm.apply_to( + m, reduce_bound_constraints=False, threads=3, process_start_method='fork' + ) + + self.check_all_untightened_bounds_constraints(m, mbm) + self.check_linear_func_constraints(m, mbm) + + self.assertStructuredAlmostEqual(mbm.get_all_M_values(m), self.get_Ms(m)) + + # Make sure we don't choke on a LegacySolverWrapper + @unittest.skipUnless(gurobi_available, "Gurobi is not available") + def test_calculated_Ms_legacy_solver_wrapper(self): + m = self.make_model() + mbm = TransformationFactory('gdp.mbigm') + mbm.apply_to( + m, + reduce_bound_constraints=False, + threads=3, + process_start_method='spawn', + solver=SolverFactory('gurobi_direct_v2'), + ) + + self.check_all_untightened_bounds_constraints(m, mbm) + self.check_linear_func_constraints(m, mbm) + + self.assertStructuredAlmostEqual(mbm.get_all_M_values(m), self.get_Ms(m)) + @unittest.skipUnless(gurobi_available, "Gurobi is not available") class NestedDisjunctsInFlatGDP(unittest.TestCase): @@ -1054,15 +1132,21 @@ def test_calculate_Ms_infeasible_Disjunct(self): ) def test_calculate_Ms_infeasible_Disjunct_local_solver(self): m = self.make_infeasible_disjunct_model() + # When multiple exceptions are raised during a + # multiprocessing.Pool.map call, it is indeterminate which + # exception will be raised to the caller. with self.assertRaisesRegex( GDP_Error, r"Unsuccessful solve to calculate M value to " - r"relax constraint 'disjunction_disjuncts\[1\].constraint\[1\]' " - r"on Disjunct 'disjunction_disjuncts\[1\]' when " - r"Disjunct 'disjunction_disjuncts\[0\]' is selected.", + r"relax constraint 'disjunction_disjuncts\[\d+\].constraint\[\d+\]' " + r"on Disjunct 'disjunction_disjuncts\[\d+\]' when " + r"Disjunct 'disjunction_disjuncts\[\d+\]' is selected.", ): TransformationFactory('gdp.mbigm').apply_to( - m, solver=SolverFactory('ipopt'), reduce_bound_constraints=False + m, + solver=SolverFactory('ipopt'), + reduce_bound_constraints=False, + use_primal_bound=True, ) @unittest.skipUnless(gurobi_available, "Gurobi is not available")