Skip to content
133 changes: 99 additions & 34 deletions phrosty/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,36 @@
import nvtx

import re
import pathlib
# Imports STANDARD
import argparse
import cupy as cp
from functools import partial
import logging
import matplotlib.pyplot as plt
from multiprocessing import Pool
from functools import partial

import numpy as np
import cupy as cp

import matplotlib.pyplot as plt
import nvtx
import pathlib
import re

# Imports ASTRO
from astropy.coordinates import SkyCoord
from astropy.io import fits
import astropy.table
from astropy.table import Table
import astropy.units as u
from astropy.wcs import WCS
from astropy.coordinates import SkyCoord
from astropy.wcs.utils import skycoord_to_pixel
import astropy.units as u

from sfft.SpaceSFFTCupyFlow import SpaceSFFT_CupyFlow
from galsim import roman

from snpit_utils.logger import SNLogger
from snpit_utils.config import Config
from snappl.psf import PSF
from snappl.image import OpenUniverse2024FITSImage
from phrosty.utils import read_truth_txt, get_exptime
# Imports INTERNAL
from phrosty.imagesubtraction import sky_subtract, stampmaker
from phrosty.photometry import ap_phot, psfmodel, psf_phot

from galsim import roman

from phrosty.utils import get_exptime, read_truth_txt
from sfft.SpaceSFFTCupyFlow import SpaceSFFT_CupyFlow
from snappl.image import OpenUniverse2024FITSImage
from snappl.psf import PSF
from snpit_utils.config import Config
from snpit_utils.logger import SNLogger

class PipelineImage:

Check failure on line 33 in phrosty/pipeline.py

View workflow job for this annotation

GitHub Actions / check

Ruff (E302)

phrosty/pipeline.py:33:1: E302 Expected 2 blank lines, found 1
"""Holds a snappl.image.Image, with some other stuff the pipeline needs."""

def __init__( self, imagepath, pointing, sca ):
Expand All @@ -56,6 +53,11 @@
# self.psf is a object of a subclass of snappl.psf.PSF
self.config = Config.get()
self.temp_dir = pathlib.Path( self.config.value( 'photometry.phrosty.paths.temp_dir' ) )
self.keep_intermediate = self.config.value( 'photometry.phrosty.keep_intermediate' )
if not self.keep_intermediate:
self.save_dir = pathlib.Path( self.config.value( 'photometry.phrosty.paths.temp_dir' ) )
elif self.keep_intermediate:
self.save_dir = pathlib.Path( self.config.value( 'photometry.phrosty.paths.dia_out_dir' ) )

if self.config.value( 'photometry.phrosty.image_type' ) == 'ou2024fits':
self.image = OpenUniverse2024FITSImage( imagepath, None, sca )
Expand All @@ -65,14 +67,28 @@

self.pointing = pointing

# Intermediate files
if self.keep_intermediate:
self.skysub_path = None
self.detmask_path = None
self.input_sci_psf_path = None
self.input_templ_psf_path = None
self.aligned_sci_img_path = None
self.aligned_templ_img_path = None
self.aligned_sci_psf_path = None
self.aligned_templ_psf_path = None
self.crossconv_sci_path = None
self.crossconv_templ_path = None
self.decorr_kernel_path = None

# Always save and output these
self.decorr_psf_path = {}
self.decorr_zptimg_path = {}
self.decorr_diff_path = {}
self.zpt_stamp_path = {}
self.diff_stamp_path = {}

self.skysub_path = None
self.detmas_path = None
# Used internally, held in-memory
self.skyrms = None
self.psfobj = None
self.psf_data = None
Expand All @@ -95,9 +111,10 @@
if mp:
SNLogger.multiprocessing_replace()
SNLogger.debug( f"run_sky_subtract on {imname}" )
self.skysub_path = self.temp_dir / f"skysub_{imname}"
self.detmask_path = self.temp_dir / f"detmask_{imname}"
self.skyrms = sky_subtract( self.image.path, self.skysub_path, self.detmask_path, temp_dir=self.temp_dir,

self.skysub_path = self.save_dir / f"skysub_{imname}"
self.detmask_path = self.save_dir / f"detmask_{imname}"
self.skyrms = sky_subtract( self.image.path, self.skysub_path, self.detmask_path, temp_dir=self.save_dir,
force=self.config.value( 'photometry.phrosty.force_sky_subtract' ) )
SNLogger.debug( f"...done running sky subtraction on {self.image.name}" )
return ( self.skysub_path, self.detmask_path, self.skyrms )
Expand Down Expand Up @@ -131,6 +148,9 @@
# the psf... and it's different for each type of
# PSF. We need to fix that... somehow....

if self.keep_intermediate:
dump_file = True

if self.psfobj is None:
psftype = self.config.value( 'photometry.phrosty.psf.type' )
self.psfobj = PSF.get_psf_object( psftype, pointing=self.pointing, sca=self.image.sca )
Expand Down Expand Up @@ -200,6 +220,8 @@
if self.nuke_temp_dir:
SNLogger.warning( "nuke_temp_dir not implemented" )

self.keep_intermediate = self.config.value( 'photometry.phrosty.keep_intermediate' )


def sky_sub_all_images( self ):
# Currently, this writes out a bunch of FITS files. Further refactoring needed
Expand All @@ -224,7 +246,6 @@
img.save_sky_subtract_info( img.run_sky_subtract( mp=False ) )



def get_psfs( self ):
all_imgs = self.science_images.copy() # shallow copy
all_imgs.extend( self.template_images )
Expand Down Expand Up @@ -489,9 +510,9 @@

diffname = sci_image.decorr_diff_path[ templ_image.image.name ]
diff_stampname = stampmaker( self.ra, self.dec, np.array([100, 100]),
diffname,
savedir=self.dia_out_dir,
savename=f"stamp_{diffname.name}" )
diffname,
savedir=self.dia_out_dir,
savename=f"stamp_{diffname.name}" )

SNLogger.info(f"Decorrelated stamp path: {pathlib.Path( diff_stampname )}")
SNLogger.info(f"Zpt image stamp path: {pathlib.Path( zpt_stampname )}")
Expand Down Expand Up @@ -614,12 +635,56 @@
fits_writer_pool.apply_async( self.write_fits_file,
( cp.asnumpy( decorimg ).T, hdr, savepath ), {},
error_callback=partial(log_fits_write_error, savepath) )
sci_image.decorr_psf_path[ templ_image.image.name ]= decorr_psf_path
sci_image.decorr_zptimg_path[ templ_image.image.name ]= decorr_zptimg_path
sci_image.decorr_diff_path[ templ_image.image.name ]= decorr_diff_path
sci_image.decorr_psf_path[ templ_image.image.name ] = decorr_psf_path
sci_image.decorr_zptimg_path[ templ_image.image.name ] = decorr_zptimg_path
sci_image.decorr_diff_path[ templ_image.image.name ] = decorr_diff_path

if self.keep_intermediate:
# Each key is the file prefix addition.
# Each list has [descriptive filetype, image file name, data, header].
# The 'convolved' images have that [:-8] in their image file name and combined
# names because LNA thinks it's important that we keep track of which images
# have been convolved with which. Because also then if you use the same template
# image more than once, it gets overwritten. Also, [:-8] and [:-3] assume that
# image.image.name ends in fits.gz.

Check failure on line 649 in phrosty/pipeline.py

View workflow job for this annotation

GitHub Actions / check

Ruff (W291)

phrosty/pipeline.py:649:60: W291 Trailing whitespace

# TODO: Include score and variance images.

Check failure on line 651 in phrosty/pipeline.py

View workflow job for this annotation

GitHub Actions / check

Ruff (W291)

phrosty/pipeline.py:651:67: W291 Trailing whitespace
write_filepaths = {'aligned': [['img',
templ_image.image.name,
cp.asnumpy(sfftifier.PixA_resamp_object_GPU),
sfftifier.hdr_target],
['psf',

Check failure on line 656 in phrosty/pipeline.py

View workflow job for this annotation

GitHub Actions / check

Ruff (W291)

phrosty/pipeline.py:656:60: W291 Trailing whitespace
templ_image.image.name,

Check failure on line 657 in phrosty/pipeline.py

View workflow job for this annotation

GitHub Actions / check

Ruff (W291)

phrosty/pipeline.py:657:80: W291 Trailing whitespace
cp.asnumpy(sfftifier.PSF_resamp_object_GPU),

Check failure on line 658 in phrosty/pipeline.py

View workflow job for this annotation

GitHub Actions / check

Ruff (W291)

phrosty/pipeline.py:658:101: W291 Trailing whitespace
sfftifier.hdr_target],
['detmask',

Check failure on line 660 in phrosty/pipeline.py

View workflow job for this annotation

GitHub Actions / check

Ruff (W291)

phrosty/pipeline.py:660:64: W291 Trailing whitespace
sci_image.image.name,
cp.asnumpy(sfftifier.PixA_resamp_object_DMASK_GPU),
sfftifier.hdr_target]
],
'convolved': [['img',
f'{sci_image.image.name[:-8]}_{templ_image.image.name[:-3]}',
cp.asnumpy(sfftifier.PixA_Ctarget_GPU),
sfftifier.hdr_target],
['img',
f'{templ_image.image.name[:-8]}_{sci_image.image.name[:-3]}',
cp.asnumpy(sfftifier.PixA_Cresamp_object_GPU),
sfftifier.hdr_target]

Check failure on line 672 in phrosty/pipeline.py

View workflow job for this annotation

GitHub Actions / check

Ruff (W291)

phrosty/pipeline.py:672:78: W291 Trailing whitespace
],
'decorr': [['kernel',
sci_image.image.name,
cp.asnumpy(sfftifier.FKDECO_GPU),
sfftifier.hdr_target]
]

Check failure on line 678 in phrosty/pipeline.py

View workflow job for this annotation

GitHub Actions / check

Ruff (W291)

phrosty/pipeline.py:678:54: W291 Trailing whitespace
}

Check failure on line 679 in phrosty/pipeline.py

View workflow job for this annotation

GitHub Actions / check

Ruff (W291)

phrosty/pipeline.py:679:42: W291 Trailing whitespace
# Write the aligned images
for key in write_filepaths.keys():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does waiting until here mean that all the steps have to have been successful to get any of the images to be written out?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only for the portions on GPU.

for (imgtype, name, data, header) in write_filepaths[key]:
savepath = self.dia_out_dir / f'{key}_{imgtype}_{name}'
self.write_fits_file( data, header, savepath=savepath)

SNLogger.info( f"DONE processing {sci_image.image.name} minus {templ_image.image.name}" )

SNLogger.info( "Waiting for FITS writer processes to finish" )
with nvtx.annotate( "fits_write_wait", color=0xff8888 ):
fits_writer_pool.close()
Expand Down
Loading