Skip to content

DM-50087: Preload IERS data in Prompt Processing #328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions bin.src/export_iers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env python
# This file is part of prompt_processing.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.


"""Export an IERS cache to a local or remote location.
"""


import argparse
import logging
import sys
import tempfile

import astropy.utils.data
from astropy.utils import iers

import lsst.resources


IERS_URLS = [iers.IERS_A_URL, iers.IERS_B_URL, iers.IERS_LEAP_SECOND_URL]


def _make_parser():
parser = argparse.ArgumentParser()
parser.add_argument("destination", nargs="+",
help="The path(s) or URI(s) to put the exported IERS cache. Should be a zip file.")
return parser


def main():
logging.basicConfig(level=logging.INFO, stream=sys.stdout)

args = _make_parser().parse_args()
# Update download cache
for url in IERS_URLS:
astropy.utils.data.download_file(url, cache="update")
with tempfile.NamedTemporaryFile(suffix=".zip") as local_cache:
astropy.utils.data.export_download_cache(local_cache, IERS_URLS, overwrite=True)
src = lsst.resources.ResourcePath(local_cache.name, isTemporary=True)
# Any errors past this point may invalidate the remote cache
for d in args.destination:
logging.info("Writing %s...", d)
dest = lsst.resources.ResourcePath(d)
dest.transfer_from(src, "copy", overwrite=True)


if __name__ == "__main__":
main()
7 changes: 2 additions & 5 deletions config/astropy.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ allow_internet = False
## the IERS-A file was previously downloaded and cached). This parameter also
## controls whether internet resources will be queried to update the leap
## second table if the installed version is out of date. Default is True.
auto_download = False

## Maximum age (days) of predictive data before auto-downloading. See "Auto
## refresh behavior" in astropy.utils.iers documentation for details. Default
## is 30.
auto_max_age = 1000
# Defer to the cache (which is updated on pod start).
auto_download = True
5 changes: 5 additions & 0 deletions doc/playbook.rst
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ The bucket ``rubin-pp-dev-users`` holds:

* ``rubin-pp-dev-users/unobserved/`` contains raw files that the upload scripts can draw from to create incoming raws.

* ``rubin-pp-dev-users/apdb_config/`` contains the canonical configs identifying the development APDBs.

* ``rubin-pp-dev-users/iers-cache.zip`` contains an IERS data cache for direct download by Prompt Processing.
It can be updated manually by running ``export_iers.py``.

``rubin-pp-dev`` has notifications configured for new file arrival; these publish to the Kafka topic ``prompt-processing-dev``.
The notifications can be viewed at `Kafdrop <https://k8s.slac.stanford.edu/usdf-prompt-processing-dev/kafdrop>`_.

Expand Down
3 changes: 3 additions & 0 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import prometheus_client as prometheus
import redis

from shared.astropy import import_iers_cache
from shared.config import PipelinesConfig
from shared.logger import setup_usdf_logger
from shared.raw import (
Expand Down Expand Up @@ -456,6 +457,7 @@ def create_app():
)

# Check initialization and abort early
import_iers_cache()
_get_consumer()
_get_storage_client()
_get_read_butler()
Expand Down Expand Up @@ -504,6 +506,7 @@ def keda_start():
registry.init_tracker()

# Check initialization and abort early
import_iers_cache()
_get_consumer()
_get_storage_client()
_get_read_butler()
Expand Down
2 changes: 2 additions & 0 deletions python/initializer/write_init_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import lsst.daf.butler
import lsst.pipe.base

from shared.astropy import import_iers_cache
from shared.config import PipelinesConfig
from shared import connect_utils
from shared.logger import setup_usdf_logger
Expand Down Expand Up @@ -107,6 +108,7 @@ def main(args=None):
setup_usdf_logger(labels={"instrument": instrument_name},)
try:
repo = os.environ["CENTRAL_REPO"]
import_iers_cache() # Should not be needed, but best to be consistent
_log.info("Preparing init-outputs for %s in %s.", instrument_name, repo)

parsed = make_parser().parse_args(args)
Expand Down
60 changes: 60 additions & 0 deletions python/shared/astropy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# This file is part of prompt_processing.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

__all__ = ["import_iers_cache"]


import logging
import os
import tempfile

import astropy.utils.data

import lsst.resources
import lsst.utils.timer


_log = logging.getLogger("lsst." + __name__)
_log.setLevel(logging.DEBUG)


LOCAL_CACHE = os.path.join(tempfile.gettempdir(), "iers-cache.zip")


def import_iers_cache():
"""Download the IERS cache from a shared resource at USDF.
"""
remote_cache = os.environ.get("CENTRAL_IERS_CACHE")
if not os.path.exists(LOCAL_CACHE):
if not remote_cache:
_log.warning("No IERS download has been configured. Time conversions may be inaccurate.")
return
with lsst.utils.timer.time_this(_log, msg="Download IERS", level=logging.DEBUG):
src = lsst.resources.ResourcePath(remote_cache)
dest = lsst.resources.ResourcePath(LOCAL_CACHE)
dest.transfer_from(src, "copy")
else:
_log.info("IERS cache already exists in this pod, skipping fresh download. "
"This normally means that more than one worker has run in this pod.")

with lsst.utils.timer.time_this(_log, msg="Update IERS", level=logging.DEBUG):
astropy.utils.data.import_download_cache(LOCAL_CACHE, update_cache=True)
_log.info("IERS cache is up to date.")
63 changes: 0 additions & 63 deletions python/test_utils.py

This file was deleted.

99 changes: 99 additions & 0 deletions tests/test_astropy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# This file is part of prompt_processing.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import os
import tempfile
import unittest
import zipfile

import boto3

from lsst.resources import ResourcePath, s3utils

from shared.astropy import import_iers_cache, LOCAL_CACHE

try:
from moto import mock_aws
except ImportError:
from moto import mock_s3 as mock_aws # Backwards-compatible with moto 4


class IersCacheTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.mock_aws = mock_aws()

@classmethod
def _make_iers_cache(cls, path):
"""Create an IERS cache to use for testing.

Parameters
----------
path : `lsst.resources.ResourcePath`
The location at which to create the cache.
"""
with tempfile.NamedTemporaryFile(mode="wb", suffix=".zip") as temp:
with zipfile.ZipFile(temp, mode="w"):
# Don't actually need any contents
pass
path.transfer_from(ResourcePath(temp.name), transfer="copy")

def setUp(self):
super().setUp()
self.enterContext(s3utils.clean_test_environment_for_s3())
# Local cache should not exist before tests
try:
os.remove(LOCAL_CACHE)
except FileNotFoundError:
pass

mock_endpoint = "https://this.is.a.test"
self.bucket = "test-bucket-test"
path = ResourcePath("test/test-cache.zip", root="s3://" + self.bucket)

self.enterContext(unittest.mock.patch.dict(
os.environ,
{"MOTO_S3_CUSTOM_ENDPOINTS": mock_endpoint,
"S3_ENDPOINT_URL": mock_endpoint,
"CENTRAL_IERS_CACHE": path.geturl(),
}))

self.enterContext(self.mock_aws)
s3 = boto3.resource("s3")
s3.create_bucket(Bucket=self.bucket)

self._make_iers_cache(path)

def test_import_iers_cache(self):
with unittest.mock.patch("astropy.utils.data.import_download_cache") as mock_import:
import_iers_cache()
mock_import.assert_called_once()
self.assertEqual(mock_import.call_args.kwargs["update_cache"], True)

def test_import_iers_cache_twice(self):
with unittest.mock.patch("astropy.utils.data.import_download_cache") as mock_import:
import_iers_cache()
# Local cache should exist
import_iers_cache()
# Should import both times to ensure the latest version is used
self.assertEqual(mock_import.call_count, 2)
self.assertEqual(mock_import.call_args.kwargs["update_cache"], True)
Loading