-
Notifications
You must be signed in to change notification settings - Fork 39
Enable CRIC service and wmcore cache on TaskWorker #9082
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable CRIC service and wmcore cache on TaskWorker #9082
Conversation
Jenkins results:
|
which tests did you run ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fails when I test, see comment
at first sight at least these two lines need changing CRABServer/src/python/TaskWorker/Actions/DagmanCreator.py Lines 347 to 348 in 4456470
I wonder if a safer and simpler alternative would be to expand the black/white list in the beginning of DagmanCreator and replace the value in the And btw no need to port here the _checkASO thing, banned destinations is always empty and we should simply remove all code about it. There is no need for CRIC in PreDag. Another useless change. |
a more drastic alternative would be to add a new action in the Handler, where black/white list are expanded, subtracted, combined with global blacklist and a list of possible sites written in the task dictionary which can be used but all the following code which will become more simple. |
d47c875
to
8b3788e
Compare
Jenkins results:
|
… constructor signature.
8b3788e
to
750b0ca
Compare
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
Thanks @belforte! with your walkthrough/guidelines it's much obvious to me how to patch & debugging things in CRAB. In Short: I've fixed this PR as you mentioned and tested by both manual submissions on test14 and through all test cases in pipeline here. So, could you please have a final review and give me green or red light? for the merge please? Thanks! In DetailsYou were right this PR didn't worked at first glance. Actually it's silently stuck at SUBMITTED state till eternity when things goes wrong, and At that time, It's not apparent to me where to find more clues since the job itself not considered to be failed either.
After i've investigated deeper, My clumsy indeed! Thanks for pointed this out, where i've overlooked, before we dumped task infos including site {white|black}list into ClassADs string, that needed to be expanded as well. with the aforementioned flaws All jdl and Dags of every jobs were left unexpanded as you can see below job's artifacts at Schedd. (** I ignored details around prejob and adjustedout for easy readability)
After patched and fixed some JSON-Serializable bugs, it's went well and completed as expected. as you can see in the following evidences that it's really works on TaskWorker side and CRIC-related functionality were no longer facilitated by CRABServer. (P.S. breakpoint were before site list expanding at TaskWorker).
As a result, they were properly expanded and running fine as we expected!
And here is final status of submitted test task that looked identical with the one i got from test2.
|
@@ -344,8 +345,8 @@ def makeJobSubmit(self, task): | |||
# note about Lists | |||
# in the JDL everything is a string, we can't use the simple classAd[name]=somelist | |||
# but need the ExprTree format (what classAd.lookup() would return) | |||
jobSubmit['My.CRAB_SiteBlacklist'] = pythonListToClassAdExprTree(task['tm_site_blacklist']) | |||
jobSubmit['My.CRAB_SiteWhitelist'] = pythonListToClassAdExprTree(task['tm_site_whitelist']) | |||
jobSubmit['My.CRAB_SiteBlacklist'] = pythonListToClassAdExprTree(list(self._expandSites(set(task['tm_site_blacklist'])))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You were right about where i've overlooked! patched!
Regarding simpler alternative and elegance action in handler, I thought of opening new issue and PR as a kind of code clean-up/refactoring after this changes were bulletproof in canary. Separate elegance clean-up from this issue which mostly concern about migrating functionality from CRABServer to TaskWorker, and ensure everything works as usual. |
yes and no. I agree that "it works" usually go before "it is beautiful", but having non-needed changes may obscure thing, makes reviewing harder and adds useless work to cleanup. |
Understood! and seems like i am not thoughtful enough regarding testing coverage. |
Jenkins results:
|
Jenkins results:
|
…ail into SiteInfoResolver.
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
@belforte I've done with SiteInfoResolver TaskAction and ready for the review again. (Wishfully waiting for the green merge button... 🥲) I will skipped refactoring createSubDag function for now because as i try myself (Also consulted with ChatGPT) it might not worth the efforts as you can see [1]. Will proceeding to testing more coverages as you mentioned e.g. splitting=automatic, bunch of * pattern... etc.. Regarding refactoring/clean up, In my PoV, It does reduced boilerplate, gave us a bit more clarity but the refactored function will not be use elsewhere yet, so it might not worth add more complication into this PR (Increases our risk of losing robustness), so it could be postpone as new cleanup issue. [1] def resolve_available_sites(self, jobgroup, kwargs, global_blacklist, acceleratorsites, ignoreLocality, stage):
jobs = jobgroup.getJobs()
jgblocks = set()
allblocks = set()
for job in jobs:
if stage == 'probe':
random.shuffle(job['input_files'])
for inputfile in job['input_files']:
jgblocks.add(inputfile['block'])
allblocks.add(inputfile['block'])
self.logger.debug("Blocks: %s", list(jgblocks))
if not jobs:
locations = set()
else:
locations = set(jobs[0]['input_files'][0]['locations'])
self.logger.debug("Locations: %s", list(locations))
if not locations and not ignoreLocality:
return jgblocks, 'no_locations', locations
if ignoreLocality:
availablesites = kwargs['task']['all_possible_processing_sites']
else:
availablesites = locations - global_blacklist
self.logger.debug("Available sites: %s", list(availablesites))
if kwargs['task']['tm_activity'] in self.config.TaskWorker.ActivitiesToRunEverywhere and kwargs['task']['tm_site_whitelist']:
availablesites = set(kwargs['task']['tm_site_whitelist'])
if not availablesites:
return jgblocks, 'banned_locations', locations
if kwargs['task']['tm_user_config']['requireaccelerator']:
availablesites &= acceleratorsites
if availablesites:
msg = f"Site.requireAccelerator is True. CRAB will restrict sites to run the jobs to {list(availablesites)}."
self.logger.warning(msg)
self.uploadWarning(msg, kwargs['task']['user_proxy'], kwargs['task']['tm_taskname'])
else:
return jgblocks, 'banned_locations', locations
available = set(availablesites)
siteWhitelist = set(kwargs['task']['tm_site_whitelist'])
siteBlacklist = set(kwargs['task']['tm_site_blacklist'])
if siteWhitelist:
available &= siteWhitelist
if not available:
self._warn_user_skipped_blocks('whitelist', jgblocks, availablesites, kwargs)
return jgblocks, 'banned_locations', locations
available -= (siteBlacklist - siteWhitelist)
if not available:
self._warn_user_skipped_blocks('blacklist', jgblocks, availablesites, kwargs)
return jgblocks, 'banned_locations', locations
return jgblocks, 'ok', locations, availablesites
def _warn_user_skipped_blocks(self, reason, jgblocks, availablesites, kwargs):
trimmedList = sorted(list(jgblocks))[:3] + ['...']
if reason == 'whitelist':
msg = f"{len(jgblocks)} block(s) {trimmedList} present at {list(availablesites)} will be skipped because those sites are not in user white list"
else:
msg = f"{len(jgblocks)} block(s) {trimmedList} present at {list(availablesites)} will be skipped because those sites are in user black list"
self.logger.warning(msg)
self.uploadWarning(msg, kwargs['task']['user_proxy'], kwargs['task']['tm_taskname']) |
I am afraid that I do not understand your last comment. I do not see why we want ChatGpt involved and have no idea what that code snippet is, I failed to find That said, form a look at changes I would say:
Overall I am confused, and suspect that you are too |
Thank you Stefano for your review and patience, Apologize for confusing you. please, let me try to clarify my last comment.
In Response to reviews:
|
summary of zoom chat about 1. 2. 3. 4. in comment above:
|
…mwm#9085) * remove support for all input options except --jobId. * preserve sanitize input option, replace opts with jobId, input_args. * replace opts with descriptive naming. * deprecate extra args from DagmanCreator for backward compatibility. * jobId 'None' string could be take care elsewhere, delete dead code. * fixes DagmanCreator to preserve input_args. * fix settattr, replaced with more pythonic way, remove deprecated code. * fixes the renamed input_params should be same type as opts, delete unused userFiles attr.
* change handling of in/out metadata. Fix dmwm#9094 * move update of publish flag to ASO/Rucio * pylint
* deprecate numcores option for resubmit on CRABServer side * remove numcores option for resubmit * remove numcores from DagmanResubmitter * remove numcores option from post * drop tm_numcores from kwargs * fix * fix 2 * final fix
* retry_call * use wrapper class Client * rucio retries extended * add comments
* import correctly * fix
catalog, [3] utilize task kwargs.
Jenkins results:
|
This was migrated to #9132. |
enabling of _expandSites, _checkSites and _checkASODestination functionalities on TaskWorker. Suppressing CRIC service debugging log and also utilizing WMCore's Service cache mechanism by enabling usestalecache. regarding #6917