diff --git a/joshua/joshua_agent.py b/joshua/joshua_agent.py index 0426cc0..c61e1fe 100644 --- a/joshua/joshua_agent.py +++ b/joshua/joshua_agent.py @@ -110,25 +110,6 @@ def stopAgent(): return stop_agent -def trim_jobqueue(cutoff_date, remove_jobs=True): - global job_queue - jobs_pass = 0 - jobs_fail = 0 - cutoff_string = joshua_model.format_datetime(cutoff_date) - - for job_record in list(job_queue.queue): - (result, job_date) = fdb.tuple.unpack(job_record) - if job_date <= cutoff_string: - if remove_jobs: - old_record = job_queue.get_nowait() - elif result == 0: - jobs_pass += 1 - else: - jobs_fail += 1 - - return (jobs_pass + jobs_fail, jobs_pass, jobs_fail) - - def log(outputText, newline=True): return ( print(outputText, file=getFileHandle()) @@ -320,7 +301,8 @@ def remove_old_artifacts(path, age=24 * 60 * 60): # Returns whether the artifacts should be saved based on run state. def should_save(retcode, save_on="FAILURE"): - return save_on == "ALWAYS" or save_on == "FAILURE" and retcode != 0 + # do not save when cancelled (retcode == -1) + return save_on == "ALWAYS" or save_on == "FAILURE" and retcode != 0 and retcode != -1 # Removes artifacts from the current run, saving them if necessary. @@ -411,7 +393,6 @@ def run_ensemble( :param sanity: :return: 0 for success """ - global jobs_pass, jobs_fail if not work_dir: raise JoshuaError( "Unable to run function since work_dir is not defined. Exiting. (CWD=" @@ -556,6 +537,12 @@ def run_ensemble( cleanup(ensemble, where, seed, retcode, save_on, work_dir=work_dir) + if retcode == -1: + # no results to record when cancelled + self._retcode = retcode + joshua_model.cancel_agent_cleanup(ensemble) + return + try: joshua_model.insert_results( ensemble, @@ -582,17 +569,6 @@ def run_ensemble( duration, ) - # Add the result to the job queue - job_queue.put(fdb.tuple.pack((retcode, done_timestamp))) - - # Update the job counts - job_mutex.acquire() - if retcode == 0: - jobs_pass += 1 - else: - jobs_fail += 1 - job_mutex.release() - self._retcode = retcode @@ -773,14 +749,9 @@ def agent( ensembles_can_run = list( filter(joshua_model.should_run_ensemble, ensembles) ) - if not ensembles_can_run: - # All the ensembles have enough runs started for now. Don't - # time the agent out, just wait until there are no - # ensembles or the other agents might have died. - time.sleep(1) - continue - else: - # No ensembles at all. Consider timing this agent out. + + if not ensembles or (ensembles and not ensembles_can_run): + # No ensembles to run. Consider timing this agent out. try: watch.wait_for_any(watch, sanity_watch, TimeoutFuture(1.0)) except Exception as e: diff --git a/joshua/joshua_model.py b/joshua/joshua_model.py index a3d12db..1a0513b 100644 --- a/joshua/joshua_model.py +++ b/joshua/joshua_model.py @@ -50,6 +50,7 @@ FDBError = fdb.FDBError ONE = b"\x01" + b"\x00" * 7 +NEGATIVE_ONE = struct.pack(' None: tr.add(dir_all_ensembles[ensemble_id]["count"][counter], ONE) +def _decrement(tr: fdb.Transaction, ensemble_id: str, counter: str) -> None: + tr.add(dir_all_ensembles[ensemble_id]["count"][counter], NEGATIVE_ONE) + + def _add(tr: fdb.Transaction, ensemble_id: str, counter: str, value: int) -> None: byte_val = struct.pack(" int: - value = tr.snapshot.get(dir_all_ensembles[ensemble_id]["count"][counter]) +def _get_counter_impl(tr: fdb.Transaction, ensemble_id: str, counter: str, snapshot: bool) -> int: + if snapshot: + value = tr.snapshot.get(dir_all_ensembles[ensemble_id]["count"][counter]) + else: + value = tr.get(dir_all_ensembles[ensemble_id]["count"][counter]) if value == None: return 0 else: return struct.unpack(" int: + return _get_counter_impl(tr, ensemble_id, counter, True) + +def _get_counter(tr: fdb.Transaction, ensemble_id: str, counter: str) -> int: + return _get_counter_impl(tr, ensemble_id, counter, False) def _get_seeds_and_heartbeats( ensemble_id: str, tr: fdb.Transaction @@ -768,9 +781,9 @@ def _insert_results( results = dir_ensemble_results_pass if max_runs > 0: - # This is a snapshot read so that two insertions don't conflict. - ended = _get_snap_counter(tr, ensemble_id, "ended") - if ended >= max_runs: + started = _get_counter(tr, ensemble_id, "started") + ended = _get_counter(tr, ensemble_id, "ended") + if ended >= max(max_runs, started): _stop_ensemble(tr, ensemble_id, sanity) if duration: @@ -978,3 +991,14 @@ def get_agent_failures(tr, time_start=None, time_end=None): failures.append((info, msg)) return failures + +@transactional +def cancel_agent_cleanup(tr, ensemble_id): + """ + Clean-up method for when an agent takes a job but gets cancelled + """ + + # TODO(qhoang) let's try this but there must be a better way + # When an agent is cancelled, it has already incremented the __started__ counter + # but will never get to increment the __ended__ counter + _decrement(tr, ensemble_id, "started") \ No newline at end of file diff --git a/test_joshua_model.py b/test_joshua_model.py index 13a1bc7..12f352a 100644 --- a/test_joshua_model.py +++ b/test_joshua_model.py @@ -168,12 +168,15 @@ def test_dead_agent(tmp_path, empty_ensemble): # simulate another agent dying after starting a test assert joshua_model.try_starting_test(ensemble_id, 12345) + # agent needs to wait for >10 seconds in order to detect the dead agent. + # in the real deployment, agent should exit early + # and scaler should spin up another one if necessary. agent = threading.Thread( target=joshua_agent.agent, args=(), kwargs={ "work_dir": tmp_path, - "agent_idle_timeout": 1, + "agent_idle_timeout": 15, }, ) agent.setDaemon(True)