diff --git a/bin.src/export_iers.py b/bin.src/export_iers.py new file mode 100755 index 00000000..7fe82538 --- /dev/null +++ b/bin.src/export_iers.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python +# This file is part of prompt_processing. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +"""Export an IERS cache to a local or remote location. +""" + + +import argparse +import logging +import sys +import tempfile + +import astropy.utils.data +from astropy.utils import iers + +import lsst.resources + + +IERS_URLS = [iers.IERS_A_URL, iers.IERS_B_URL, iers.IERS_LEAP_SECOND_URL] + + +def _make_parser(): + parser = argparse.ArgumentParser() + parser.add_argument("destination", nargs="+", + help="The path(s) or URI(s) to put the exported IERS cache. Should be a zip file.") + return parser + + +def main(): + logging.basicConfig(level=logging.INFO, stream=sys.stdout) + + args = _make_parser().parse_args() + # Update download cache + for url in IERS_URLS: + astropy.utils.data.download_file(url, cache="update") + with tempfile.NamedTemporaryFile(suffix=".zip") as local_cache: + astropy.utils.data.export_download_cache(local_cache, IERS_URLS, overwrite=True) + src = lsst.resources.ResourcePath(local_cache.name, isTemporary=True) + # Any errors past this point may invalidate the remote cache + for d in args.destination: + logging.info("Writing %s...", d) + dest = lsst.resources.ResourcePath(d) + dest.transfer_from(src, "copy", overwrite=True) + + +if __name__ == "__main__": + main() diff --git a/config/astropy.cfg b/config/astropy.cfg index 5d521f74..297098bb 100644 --- a/config/astropy.cfg +++ b/config/astropy.cfg @@ -10,9 +10,6 @@ allow_internet = False ## the IERS-A file was previously downloaded and cached). This parameter also ## controls whether internet resources will be queried to update the leap ## second table if the installed version is out of date. Default is True. -auto_download = False -## Maximum age (days) of predictive data before auto-downloading. See "Auto -## refresh behavior" in astropy.utils.iers documentation for details. Default -## is 30. -auto_max_age = 1000 +# Defer to the cache (which is updated on pod start). +auto_download = True diff --git a/doc/playbook.rst b/doc/playbook.rst index 3b6679da..ed7883ae 100644 --- a/doc/playbook.rst +++ b/doc/playbook.rst @@ -232,6 +232,11 @@ The bucket ``rubin-pp-dev-users`` holds: * ``rubin-pp-dev-users/unobserved/`` contains raw files that the upload scripts can draw from to create incoming raws. +* ``rubin-pp-dev-users/apdb_config/`` contains the canonical configs identifying the development APDBs. + +* ``rubin-pp-dev-users/iers-cache.zip`` contains an IERS data cache for direct download by Prompt Processing. + It can be updated manually by running ``export_iers.py``. + ``rubin-pp-dev`` has notifications configured for new file arrival; these publish to the Kafka topic ``prompt-processing-dev``. The notifications can be viewed at `Kafdrop `_. diff --git a/python/activator/activator.py b/python/activator/activator.py index a351b5b0..8d787b2c 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -44,6 +44,7 @@ import prometheus_client as prometheus import redis +from shared.astropy import import_iers_cache from shared.config import PipelinesConfig from shared.logger import setup_usdf_logger from shared.raw import ( @@ -456,6 +457,7 @@ def create_app(): ) # Check initialization and abort early + import_iers_cache() _get_consumer() _get_storage_client() _get_read_butler() @@ -504,6 +506,7 @@ def keda_start(): registry.init_tracker() # Check initialization and abort early + import_iers_cache() _get_consumer() _get_storage_client() _get_read_butler() diff --git a/python/initializer/write_init_outputs.py b/python/initializer/write_init_outputs.py index 3eb0af27..f9ad7a00 100755 --- a/python/initializer/write_init_outputs.py +++ b/python/initializer/write_init_outputs.py @@ -36,6 +36,7 @@ import lsst.daf.butler import lsst.pipe.base +from shared.astropy import import_iers_cache from shared.config import PipelinesConfig from shared import connect_utils from shared.logger import setup_usdf_logger @@ -107,6 +108,7 @@ def main(args=None): setup_usdf_logger(labels={"instrument": instrument_name},) try: repo = os.environ["CENTRAL_REPO"] + import_iers_cache() # Should not be needed, but best to be consistent _log.info("Preparing init-outputs for %s in %s.", instrument_name, repo) parsed = make_parser().parse_args(args) diff --git a/python/shared/astropy.py b/python/shared/astropy.py new file mode 100644 index 00000000..75fe6bb4 --- /dev/null +++ b/python/shared/astropy.py @@ -0,0 +1,60 @@ +# This file is part of prompt_processing. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +__all__ = ["import_iers_cache"] + + +import logging +import os +import tempfile + +import astropy.utils.data + +import lsst.resources +import lsst.utils.timer + + +_log = logging.getLogger("lsst." + __name__) +_log.setLevel(logging.DEBUG) + + +LOCAL_CACHE = os.path.join(tempfile.gettempdir(), "iers-cache.zip") + + +def import_iers_cache(): + """Download the IERS cache from a shared resource at USDF. + """ + remote_cache = os.environ.get("CENTRAL_IERS_CACHE") + if not os.path.exists(LOCAL_CACHE): + if not remote_cache: + _log.warning("No IERS download has been configured. Time conversions may be inaccurate.") + return + with lsst.utils.timer.time_this(_log, msg="Download IERS", level=logging.DEBUG): + src = lsst.resources.ResourcePath(remote_cache) + dest = lsst.resources.ResourcePath(LOCAL_CACHE) + dest.transfer_from(src, "copy") + else: + _log.info("IERS cache already exists in this pod, skipping fresh download. " + "This normally means that more than one worker has run in this pod.") + + with lsst.utils.timer.time_this(_log, msg="Update IERS", level=logging.DEBUG): + astropy.utils.data.import_download_cache(LOCAL_CACHE, update_cache=True) + _log.info("IERS cache is up to date.") diff --git a/python/test_utils.py b/python/test_utils.py deleted file mode 100644 index 020e4d33..00000000 --- a/python/test_utils.py +++ /dev/null @@ -1,63 +0,0 @@ -# This file is part of prompt_processing. -# -# Developed for the LSST Data Management System. -# This product includes software developed by the LSST Project -# (https://www.lsst.org). -# See the COPYRIGHT file at the top-level directory of this distribution -# for details of code ownership. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -__all__ = ["MockTestCase"] - - -import unittest - - -class MockTestCase(unittest.TestCase): - """TestCase specialization with better mock support. - """ - - @classmethod - def setupclass_patcher(cls, patcher): - """Apply a patch to all unit tests. - - This method is intended to be run from within `setUpClass`. It avoids having - to add an argument to every single test, but it cannot access the Mock - from within the test. - - Parameters - ---------- - patcher : `unittest.mock` patcher object - An object returned from `patch`, - `patch.object `, or a similar callable. - """ - patcher.start() - cls.addClassCleanup(patcher.stop) - - def setup_patcher(self, patcher): - """Apply a patch to all unit tests. - - This method is intended to be run from within `setUp`. It avoids having - to add an argument to every single test, but it cannot access the Mock - from within the test. - - Parameters - ---------- - patcher : `unittest.mock` patcher object - An object returned from `patch`, - `patch.object `, or a similar callable. - """ - patcher.start() - self.addCleanup(patcher.stop) diff --git a/tests/test_astropy.py b/tests/test_astropy.py new file mode 100644 index 00000000..ee900e66 --- /dev/null +++ b/tests/test_astropy.py @@ -0,0 +1,99 @@ +# This file is part of prompt_processing. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os +import tempfile +import unittest +import zipfile + +import boto3 + +from lsst.resources import ResourcePath, s3utils + +from shared.astropy import import_iers_cache, LOCAL_CACHE + +try: + from moto import mock_aws +except ImportError: + from moto import mock_s3 as mock_aws # Backwards-compatible with moto 4 + + +class IersCacheTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.mock_aws = mock_aws() + + @classmethod + def _make_iers_cache(cls, path): + """Create an IERS cache to use for testing. + + Parameters + ---------- + path : `lsst.resources.ResourcePath` + The location at which to create the cache. + """ + with tempfile.NamedTemporaryFile(mode="wb", suffix=".zip") as temp: + with zipfile.ZipFile(temp, mode="w"): + # Don't actually need any contents + pass + path.transfer_from(ResourcePath(temp.name), transfer="copy") + + def setUp(self): + super().setUp() + self.enterContext(s3utils.clean_test_environment_for_s3()) + # Local cache should not exist before tests + try: + os.remove(LOCAL_CACHE) + except FileNotFoundError: + pass + + mock_endpoint = "https://this.is.a.test" + self.bucket = "test-bucket-test" + path = ResourcePath("test/test-cache.zip", root="s3://" + self.bucket) + + self.enterContext(unittest.mock.patch.dict( + os.environ, + {"MOTO_S3_CUSTOM_ENDPOINTS": mock_endpoint, + "S3_ENDPOINT_URL": mock_endpoint, + "CENTRAL_IERS_CACHE": path.geturl(), + })) + + self.enterContext(self.mock_aws) + s3 = boto3.resource("s3") + s3.create_bucket(Bucket=self.bucket) + + self._make_iers_cache(path) + + def test_import_iers_cache(self): + with unittest.mock.patch("astropy.utils.data.import_download_cache") as mock_import: + import_iers_cache() + mock_import.assert_called_once() + self.assertEqual(mock_import.call_args.kwargs["update_cache"], True) + + def test_import_iers_cache_twice(self): + with unittest.mock.patch("astropy.utils.data.import_download_cache") as mock_import: + import_iers_cache() + # Local cache should exist + import_iers_cache() + # Should import both times to ensure the latest version is used + self.assertEqual(mock_import.call_count, 2) + self.assertEqual(mock_import.call_args.kwargs["update_cache"], True) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 45a957b7..df4187f4 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -58,7 +58,6 @@ from shared.config import PipelinesConfig from shared.run_utils import get_output_run from shared.visit import FannedOutVisit -from test_utils import MockTestCase # The short name of the instrument used in the test repo. instname = "LSSTComCamSim" @@ -220,7 +219,7 @@ def wrapper(self, pipeline_file, include_optional=True): } -class MiddlewareInterfaceTest(MockTestCase): +class MiddlewareInterfaceTest(unittest.TestCase): """Test the MiddlewareInterface class with faked data. """ def setUp(self): @@ -243,21 +242,21 @@ def setUp(self): self.addCleanup(config_file.close) config.save(config_file.name) - self.setup_patcher(unittest.mock.patch.dict(os.environ, - {"CONFIG_APDB": config_file.name, - # Disable external queries - "MP_SKY_URL": "" - })) - self.setup_patcher(unittest.mock.patch("astropy.time.Time.now", return_value=sim_date)) - self.setup_patcher(unittest.mock.patch("shared.run_utils.get_deployment", - return_value=sim_deployment)) + self.enterContext(unittest.mock.patch.dict(os.environ, + {"CONFIG_APDB": config_file.name, + # Disable external queries + "MP_SKY_URL": "" + })) + self.enterContext(unittest.mock.patch("astropy.time.Time.now", return_value=sim_date)) + self.enterContext(unittest.mock.patch("shared.run_utils.get_deployment", + return_value=sim_deployment)) self.deploy_id = sim_deployment # Use new_callable instead of side_effect to make sure the right thing is patched - self.setup_patcher(unittest.mock.patch.object(MiddlewareInterface, "_get_pipeline_input_types", - new_callable=_filter_dataset_types, - func=MiddlewareInterface._get_pipeline_input_types, - types=preprocessing_types, - )) + self.enterContext(unittest.mock.patch.object(MiddlewareInterface, "_get_pipeline_input_types", + new_callable=_filter_dataset_types, + func=MiddlewareInterface._get_pipeline_input_types, + types=preprocessing_types, + )) # coordinates from OR4 visit 7024061700046 ra = 215.82729413263485 @@ -1237,7 +1236,7 @@ def test_generic_query_nodim(self): self.assertEqual(result, set()) -class MiddlewareInterfaceWriteableTest(MockTestCase): +class MiddlewareInterfaceWriteableTest(unittest.TestCase): """Test the MiddlewareInterface class with faked data. This class creates a fresh test repository for writing to. This means test @@ -1300,21 +1299,21 @@ def setUp(self): self.addCleanup(config_file.close) config.save(config_file.name) - self.setup_patcher(unittest.mock.patch.dict(os.environ, - {"CONFIG_APDB": config_file.name, - # Disable external queries - "MP_SKY_URL": "" - })) - self.setup_patcher(unittest.mock.patch("astropy.time.Time.now", return_value=sim_date)) - self.setup_patcher(unittest.mock.patch("shared.run_utils.get_deployment", - return_value=sim_deployment)) + self.enterContext(unittest.mock.patch.dict(os.environ, + {"CONFIG_APDB": config_file.name, + # Disable external queries + "MP_SKY_URL": "" + })) + self.enterContext(unittest.mock.patch("astropy.time.Time.now", return_value=sim_date)) + self.enterContext(unittest.mock.patch("shared.run_utils.get_deployment", + return_value=sim_deployment)) self.deploy_id = sim_deployment # Use new_callable instead of side_effect to make sure the right thing is patched - self.setup_patcher(unittest.mock.patch.object(MiddlewareInterface, "_get_pipeline_input_types", - new_callable=_filter_dataset_types, - func=MiddlewareInterface._get_pipeline_input_types, - types=preprocessing_types, - )) + self.enterContext(unittest.mock.patch.object(MiddlewareInterface, "_get_pipeline_input_types", + new_callable=_filter_dataset_types, + func=MiddlewareInterface._get_pipeline_input_types, + types=preprocessing_types, + )) # coordinates from OR4 visit 7024061700046 ra = 215.82729413263485 diff --git a/tests/test_raw.py b/tests/test_raw.py index c1db244d..d7ca0938 100644 --- a/tests/test_raw.py +++ b/tests/test_raw.py @@ -39,7 +39,6 @@ get_raw_path, ) from shared.visit import FannedOutVisit -from test_utils import MockTestCase try: import boto3 @@ -63,7 +62,7 @@ def setUp(self): super().setUp() self.enterContext(s3utils.clean_test_environment_for_s3()) - self.setup_patcher(self.mock_aws) + self.enterContext(self.mock_aws) s3 = boto3.resource("s3") self.bucket = "test-bucket-test" s3.create_bucket(Bucket=self.bucket) @@ -96,7 +95,7 @@ def setUp(self): def test_snap(self): path = get_raw_path(self.instrument, self.detector, self.group, self.snap, self.exposure, self.filter) - assert is_path_consistent(path, self.visit) + self.assertTrue(is_path_consistent(path, self.visit)) @unittest.skipIf(not boto3, "Warning: boto3 AWS SDK not found!") @@ -119,8 +118,8 @@ def test_snap_matching(self): warnings.filterwarnings("ignore", "S3 does not support flushing objects", UserWarning) with json_path.open("w") as f: json.dump(dict(GROUPID=self.group, CURINDEX=self.snap + 1), f) - assert is_path_consistent(path, self.visit) - assert get_group_id_from_oid(path) == self.group + self.assertTrue(is_path_consistent(path, self.visit)) + self.assertEqual(get_group_id_from_oid(path), self.group) def test_writeread(self): """Test that raw module can parse the paths it creates. @@ -210,7 +209,7 @@ def test_check_for_snap_error(self): self.assertTrue(any(error_msg in line for line in recorder.output)) -class LatissTest(LsstBase, MockTestCase): +class LatissTest(LsstBase, unittest.TestCase): def setUp(self): self.instrument = "LATISS" self.detector = 0 @@ -228,7 +227,7 @@ def test_get_raw_path(self): ) -class LsstComCamTest(LsstBase, MockTestCase): +class LsstComCamTest(LsstBase, unittest.TestCase): def setUp(self): self.instrument = "LSSTComCam" self.detector = 4 @@ -246,7 +245,7 @@ def test_get_raw_path(self): ) -class LsstCamTest(LsstBase, MockTestCase): +class LsstCamTest(LsstBase, unittest.TestCase): def setUp(self): self.instrument = "LSSTCam" self.detector = 42 @@ -264,7 +263,7 @@ def test_get_raw_path(self): ) -class LsstCamImSimTest(LsstBase, MockTestCase): +class LsstCamImSimTest(LsstBase, unittest.TestCase): def setUp(self): self.instrument = "LSSTCam-imSim" self.group = "2022-03-21T00:00:00.000088" @@ -294,7 +293,7 @@ def test_writeread(self): @unittest.skipIf(not boto3, "Warning: boto3 AWS SDK not found!") -class HscTest(RawBase, MockTestCase): +class HscTest(RawBase, unittest.TestCase): def setUp(self): self.instrument = "HSC" self.group = "2022032100001" diff --git a/tests/test_tester_utils.py b/tests/test_tester_utils.py index 0da07963..5453a5fd 100644 --- a/tests/test_tester_utils.py +++ b/tests/test_tester_utils.py @@ -35,7 +35,6 @@ decode_group, increment_group, ) -from test_utils import MockTestCase try: import boto3 @@ -49,7 +48,7 @@ @unittest.skipIf(not boto3, "Warning: boto3 AWS SDK not found!") -class TesterUtilsTest(MockTestCase): +class TesterUtilsTest(unittest.TestCase): """Test components in tester. """ bucket_name = "testBucketName" @@ -61,7 +60,7 @@ def setUpClass(cls): def setUp(self): self.enterContext(lsst.resources.s3utils.clean_test_environment_for_s3()) - self.setup_patcher(self.mock_aws) + self.enterContext(self.mock_aws) s3 = boto3.resource("s3") s3.create_bucket(Bucket=self.bucket_name) diff --git a/tests/test_write_init_outputs.py b/tests/test_write_init_outputs.py index ca0ae0ad..b5a09e93 100644 --- a/tests/test_write_init_outputs.py +++ b/tests/test_write_init_outputs.py @@ -195,7 +195,8 @@ def _make_init_outputs(butler, instrument, apdb, deploy_id, pipeline): unittest.mock.patch("initializer.write_init_outputs._make_init_outputs", side_effect=_make_init_outputs) as mock_make, \ unittest.mock.patch("initializer.write_init_outputs._get_current_day_obs", - return_value=""): + return_value=""), \ + unittest.mock.patch("initializer.write_init_outputs.import_iers_cache"): main(["--deploy-id", self.deploy_id]) # The preload collection is not associated with a pipeline