Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions benchmarks/data/object_collection/collection.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ all_indexes=objectId object_lc_objectId
hats_builder=hats-import v0.6.0
hats_creation_date=2025-06-25T15\:19UTC
hats_estsize=16574968
hats_release_date=2024-09-18
hats_version=v0.1
hats_release_date=2025-08-22
hats_version=v1.0
4 changes: 2 additions & 2 deletions benchmarks/data/object_collection/object_lc/hats.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ hats_skymap_alt_orders=2 4 6
hats_builder=hats-import v0.6.0
hats_creation_date=2025-06-25T15\:14UTC
hats_estsize=16126396
hats_release_date=2024-09-18
hats_version=v0.1
hats_release_date=2025-08-22
hats_version=v1.0
hats_max_rows=15000
hats_order=11
moc_sky_fraction=0.00053
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ moc_sky_fraction=0.00056
hats_builder=hats-import v0.6.0
hats_creation_date=2025-06-25T15\:19UTC
hats_estsize=1993575
hats_release_date=2024-09-18
hats_version=v0.1
hats_release_date=2025-08-22
hats_version=v1.0
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ hats_npix_suffix=.parquet
hats_builder=hats-import v0.6.0
hats_creation_date=2025-06-25T15\:19UTC
hats_estsize=27880
hats_release_date=2024-09-18
hats_version=v0.1
hats_release_date=2025-08-22
hats_version=v1.0
5 changes: 0 additions & 5 deletions src/lsdb/core/crossmatch/abstract_crossmatch_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import numpy.typing as npt
import pandas as pd
from hats.catalog import TableProperties
from hats.pixel_math.spatial_index import SPATIAL_INDEX_COLUMN

if TYPE_CHECKING:
from lsdb.catalog import Catalog
Expand Down Expand Up @@ -145,10 +144,6 @@ def validate(cls, left: Catalog, right: Catalog):
This must accept any additional arguments the `crossmatch` method accepts.
"""
# Check that we have the appropriate columns in our dataset.
if left._ddf.index.name != SPATIAL_INDEX_COLUMN:
raise ValueError(f"index of left table must be {SPATIAL_INDEX_COLUMN}")
if right._ddf.index.name != SPATIAL_INDEX_COLUMN:
raise ValueError(f"index of right table must be {SPATIAL_INDEX_COLUMN}")
column_names = left._ddf.columns
if left.hc_structure.catalog_info.ra_column not in column_names:
raise ValueError(f"left table must have column {left.hc_structure.catalog_info.ra_column}")
Expand Down
11 changes: 9 additions & 2 deletions src/lsdb/dask/concat_catalog_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,17 @@ def perform_concat(
"""
# Filter to aligned pixel when needed (handles order differences)
if left_pix is not None and aligned_pix.order > left_pix.order and left_df is not None:
left_df = filter_by_spatial_index_to_pixel(left_df, aligned_pix.order, aligned_pix.pixel)
left_df = filter_by_spatial_index_to_pixel(
left_df, aligned_pix.order, aligned_pix.pixel, spatial_index_order=left_catalog_info.healpix_order
)

if right_pix is not None and aligned_pix.order > right_pix.order and right_df is not None:
right_df = filter_by_spatial_index_to_pixel(right_df, aligned_pix.order, aligned_pix.pixel)
right_df = filter_by_spatial_index_to_pixel(
right_df,
aligned_pix.order,
aligned_pix.pixel,
spatial_index_order=right_catalog_info.healpix_order,
)

# Substitute None with meta to preserve schema
if left_df is None:
Expand Down
8 changes: 6 additions & 2 deletions src/lsdb/dask/crossmatch_catalog_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def perform_crossmatch(
the result.
"""
if right_pix.order > left_pix.order:
left_df = filter_by_spatial_index_to_pixel(left_df, right_pix.order, right_pix.pixel)
left_df = filter_by_spatial_index_to_pixel(
left_df, right_pix.order, right_pix.pixel, spatial_index_order=left_catalog_info.healpix_order
)

if len(left_df) == 0:
return meta_df
Expand Down Expand Up @@ -91,7 +93,9 @@ def perform_crossmatch_nested(
the result.
"""
if right_pix.order > left_pix.order:
left_df = filter_by_spatial_index_to_pixel(left_df, right_pix.order, right_pix.pixel)
left_df = filter_by_spatial_index_to_pixel(
left_df, right_pix.order, right_pix.pixel, spatial_index_order=left_catalog_info.healpix_order
)

if len(left_df) == 0:
return meta_df
Expand Down
23 changes: 15 additions & 8 deletions src/lsdb/dask/join_catalog_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import pandas as pd
from hats.catalog import TableProperties
from hats.pixel_math import HealpixPixel
from hats.pixel_math.spatial_index import SPATIAL_INDEX_COLUMN
from hats.pixel_tree import PixelAlignment
from nested_pandas.series.packer import pack_flat

Expand Down Expand Up @@ -88,15 +87,17 @@ def perform_join_on(
A dataframe with the result of merging the left and right partitions on the specified columns
"""
if right_pixel.order > left_pixel.order:
left = filter_by_spatial_index_to_pixel(left, right_pixel.order, right_pixel.pixel)
left = filter_by_spatial_index_to_pixel(
left, right_pixel.order, right_pixel.pixel, spatial_index_order=left_catalog_info.healpix_order
)

right_joined_df = concat_partition_and_margin(right, right_margin)

left, right_joined_df = rename_columns_with_suffixes(left, right_joined_df, suffixes)
merged = left.reset_index().merge(
right_joined_df, left_on=left_on + suffixes[0], right_on=right_on + suffixes[1]
)
merged.set_index(SPATIAL_INDEX_COLUMN, inplace=True)
merged.set_index(left_catalog_info.healpix_column, inplace=True)
return merged


Expand Down Expand Up @@ -136,14 +137,16 @@ def perform_join_nested(
A dataframe with the result of merging the left and right partitions on the specified columns
"""
if right_pixel.order > left_pixel.order:
left = filter_by_spatial_index_to_pixel(left, right_pixel.order, right_pixel.pixel)
left = filter_by_spatial_index_to_pixel(
left, right_pixel.order, right_pixel.pixel, spatial_index_order=left_catalog_info.healpix_order
)

right_joined_df = concat_partition_and_margin(right, right_margin)

right_joined_df = pack_flat(npd.NestedFrame(right_joined_df.set_index(right_on))).rename(right_name)

merged = left.reset_index().merge(right_joined_df, left_on=left_on, right_index=True)
merged.set_index(SPATIAL_INDEX_COLUMN, inplace=True)
merged.set_index(left_catalog_info.healpix_column, inplace=True)
return merged


Expand Down Expand Up @@ -187,7 +190,9 @@ def perform_join_through(
if assoc_catalog_info.primary_column is None or assoc_catalog_info.join_column is None:
raise ValueError("Invalid catalog_info")
if right_pixel.order > left_pixel.order:
left = filter_by_spatial_index_to_pixel(left, right_pixel.order, right_pixel.pixel)
left = filter_by_spatial_index_to_pixel(
left, right_pixel.order, right_pixel.pixel, spatial_index_order=left_catalog_info.healpix_order
)

right_joined_df = concat_partition_and_margin(right, right_margin)

Expand Down Expand Up @@ -225,7 +230,7 @@ def perform_join_through(
)
)

merged.set_index(SPATIAL_INDEX_COLUMN, inplace=True)
merged.set_index(left_catalog_info.healpix_column, inplace=True)
if len(join_columns_to_drop) > 0:
merged.drop(join_columns_to_drop, axis=1, inplace=True)
return merged
Expand Down Expand Up @@ -259,7 +264,9 @@ def perform_merge_asof(
`merge_asof`
"""
if right_pixel.order > left_pixel.order:
left = filter_by_spatial_index_to_pixel(left, right_pixel.order, right_pixel.pixel)
left = filter_by_spatial_index_to_pixel(
left, right_pixel.order, right_pixel.pixel, spatial_index_order=left_catalog_info.healpix_order
)

left, right = rename_columns_with_suffixes(left, right, suffixes)
left.sort_index(inplace=True)
Expand Down
26 changes: 19 additions & 7 deletions src/lsdb/dask/merge_catalog_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,13 @@ def perform_align_and_apply_func(num_partitions, func, *args, **kwargs):
)


def filter_by_spatial_index_to_pixel(dataframe: npd.NestedFrame, order: int, pixel: int) -> npd.NestedFrame:
def filter_by_spatial_index_to_pixel(
dataframe: npd.NestedFrame,
order: int,
pixel: int,
*,
spatial_index_order: int | None = SPATIAL_INDEX_ORDER,
) -> npd.NestedFrame:
"""Filters a catalog dataframe to the points within a specified HEALPix pixel using the spatial index

Args:
Expand All @@ -360,8 +366,11 @@ def filter_by_spatial_index_to_pixel(dataframe: npd.NestedFrame, order: int, pix
Returns:
The filtered dataframe with only the rows that are within the specified HEALPix pixel
"""
lower_bound = healpix_to_spatial_index(order, pixel)
upper_bound = healpix_to_spatial_index(order, pixel + 1)
if spatial_index_order is None:
# This is the default value, but needed for type-checking.
spatial_index_order = SPATIAL_INDEX_ORDER
lower_bound = healpix_to_spatial_index(order, pixel, spatial_index_order=spatial_index_order)
upper_bound = healpix_to_spatial_index(order, pixel + 1, spatial_index_order=spatial_index_order)
filtered_df = dataframe[(dataframe.index >= lower_bound) & (dataframe.index < upper_bound)]
return filtered_df

Expand All @@ -371,6 +380,7 @@ def filter_by_spatial_index_to_margin(
order: int,
pixel: int,
margin_radius: float,
spatial_index_order: int = SPATIAL_INDEX_ORDER,
) -> npd.NestedFrame:
"""
Filter rows to those that fall within the margin footprint of a
Expand All @@ -379,7 +389,7 @@ def filter_by_spatial_index_to_margin(
Args:
dataframe (nested_pandas.NestedFrame):
DataFrame to be filtered. Its index must be the spatial
index at SPATIAL_INDEX_ORDER (NESTED scheme).
index at spatial_index_order (NESTED scheme).
order (int): HEALPix order of the central pixel.
pixel (int): HEALPix pixel number (NESTED numbering) at `order`.
margin_radius (float):
Expand All @@ -403,7 +413,7 @@ def filter_by_spatial_index_to_margin(
then to a margin order via `hp.margin2order`.
2) Enumerate the margin pixels at margin order using
`get_margin`.
3) Map each rows index at SPATIAL_INDEX_ORDER down to
3) Map each row's index at spatial_index_order down to
margin order (via `get_lower_order_pixel`) and keep rows
whose mapped pixel is in the margin set.
"""
Expand All @@ -420,9 +430,9 @@ def filter_by_spatial_index_to_margin(
)

margin_pixels = get_margin(order, pixel, margin_order - order)
healpix_29 = dataframe.index.to_numpy()
spatial_index_values = dataframe.index.to_numpy()
margin_order_hp_pix = get_lower_order_pixel(
SPATIAL_INDEX_ORDER, healpix_29, SPATIAL_INDEX_ORDER - margin_order
spatial_index_order, spatial_index_values, spatial_index_order - margin_order
)
mask = np.isin(margin_order_hp_pix, margin_pixels)
filtered_df = dataframe[mask]
Expand Down Expand Up @@ -559,6 +569,8 @@ def generate_meta_df_for_joined_tables(
if index_type is None:
# pylint: disable=protected-access
index_type = catalogs[0]._ddf._meta.index.dtype
if catalogs[0].hc_structure.has_healpix_column():
index_name = catalogs[0].hc_structure.catalog_info.healpix_column # type: ignore[assignment]
index = pd.Index(pd.Series(dtype=index_type), name=index_name)
meta_df = npd.NestedFrame(pd.DataFrame(meta, index))
return meta_df
Expand Down
5 changes: 4 additions & 1 deletion src/lsdb/dask/merge_map_catalog_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ def perform_merge_map(
"""
if map_pixel.order > catalog_pixel.order:
catalog_partition = filter_by_spatial_index_to_pixel(
catalog_partition, map_pixel.order, map_pixel.pixel
catalog_partition,
map_pixel.order,
map_pixel.pixel,
spatial_index_order=catalog_structure.healpix_order,
)

catalog_partition.sort_index(inplace=True)
Expand Down
3 changes: 3 additions & 0 deletions src/lsdb/loaders/dataframe/dataframe_catalog_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from hats.pixel_math.healpix_pixel_function import get_pixel_argsort
from hats.pixel_math.spatial_index import (
SPATIAL_INDEX_COLUMN,
SPATIAL_INDEX_ORDER,
compute_spatial_index,
healpix_to_spatial_index,
)
Expand Down Expand Up @@ -182,6 +183,8 @@ def _create_catalog_info(
ra_column=ra_column,
dec_column=dec_column,
catalog_type=catalog_type,
healpix_column=SPATIAL_INDEX_COLUMN,
healpix_order=SPATIAL_INDEX_ORDER,
**kwargs,
)

Expand Down
42 changes: 22 additions & 20 deletions src/lsdb/loaders/hats/read_hats.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,27 +297,24 @@ def _load_map_catalog(hc_catalog, config):
def _load_dask_meta_schema(hc_catalog, config) -> npd.NestedFrame:
"""Loads the Dask meta DataFrame from the parquet _metadata file."""
columns = config.columns
if (
columns is not None
and hc_catalog.schema is not None
and SPATIAL_INDEX_COLUMN in hc_catalog.schema.names
and SPATIAL_INDEX_COLUMN not in columns
):
columns = columns + [SPATIAL_INDEX_COLUMN]
dask_meta_schema = from_pyarrow(hc_catalog.schema.empty_table())
if not hc_catalog.has_healpix_column():
if columns is not None:
dask_meta_schema = dask_meta_schema[columns]
return dask_meta_schema
healpix_column = hc_catalog.catalog_info.healpix_column
if columns is not None and healpix_column not in columns:
columns = columns + [healpix_column]
if columns is not None:
dask_meta_schema = dask_meta_schema[columns]
if (
dask_meta_schema.index.name != SPATIAL_INDEX_COLUMN
and SPATIAL_INDEX_COLUMN in dask_meta_schema.columns
):
dask_meta_schema = dask_meta_schema.set_index(SPATIAL_INDEX_COLUMN)
if dask_meta_schema.index.name != healpix_column and healpix_column in dask_meta_schema.columns:
dask_meta_schema = dask_meta_schema.set_index(healpix_column)
if (
config.columns is not None
and SPATIAL_INDEX_COLUMN in config.columns
and dask_meta_schema.index.name == SPATIAL_INDEX_COLUMN
and healpix_column in config.columns
and dask_meta_schema.index.name == healpix_column
):
config.columns.remove(SPATIAL_INDEX_COLUMN)
config.columns.remove(healpix_column)
return dask_meta_schema


Expand All @@ -327,6 +324,7 @@ def _load_dask_df_and_map(catalog: HCHealpixDataset, config) -> tuple[nd.NestedF
ordered_pixels = np.array(pixels)[get_pixel_argsort(pixels)]
divisions = get_pixels_divisions(ordered_pixels)
dask_meta_schema = _load_dask_meta_schema(catalog, config)
index_column = dask_meta_schema.index.name
query_url_params = None
if isinstance(get_upath(catalog.catalog_base_dir).fs, HTTPFileSystem):
query_url_params = config.make_query_url_params()
Expand All @@ -340,6 +338,7 @@ def _load_dask_df_and_map(catalog: HCHealpixDataset, config) -> tuple[nd.NestedF
columns=config.columns,
schema=catalog.schema,
filters=config.filters,
index_column=index_column,
**config.kwargs,
divisions=divisions,
meta=dask_meta_schema,
Expand All @@ -356,6 +355,7 @@ def read_pixel(
npix_suffix: str,
*,
query_url_params: dict | None = None,
index_column: str = SPATIAL_INDEX_COLUMN,
columns=None,
schema=None,
**kwargs,
Expand All @@ -369,6 +369,7 @@ def read_pixel(
hc.io.pixel_catalog_file(catalog_base_dir, pixel, query_url_params, npix_suffix=npix_suffix),
columns=columns,
schema=schema,
index_column=index_column,
**kwargs,
)

Expand All @@ -378,18 +379,19 @@ def _read_parquet_file(
*,
columns=None,
schema=None,
index_column=None,
**kwargs,
):
if (
columns is not None
and schema is not None
and SPATIAL_INDEX_COLUMN in schema.names
and SPATIAL_INDEX_COLUMN not in columns
and index_column in schema.names
and index_column not in columns
):
columns = columns + [SPATIAL_INDEX_COLUMN]
columns = columns + [index_column]
dataframe = file_io.read_parquet_file_to_pandas(path, columns=columns, schema=schema, **kwargs)

if dataframe.index.name != SPATIAL_INDEX_COLUMN and SPATIAL_INDEX_COLUMN in dataframe.columns:
dataframe = dataframe.set_index(SPATIAL_INDEX_COLUMN)
if dataframe.index.name != index_column and index_column in dataframe.columns:
dataframe = dataframe.set_index(index_column)

return dataframe
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ def small_sky_order1_source_dir(test_data_dir):
return test_data_dir / SMALL_SKY_ORDER1_SOURCE_COLLECTION_DIR_NAME / SMALL_SKY_ORDER1_SOURCE_NAME


@pytest.fixture
def small_sky_healpix13_dir(test_data_dir):
return test_data_dir / "small_sky_healpix13"


@pytest.fixture
def small_sky_order1_source_object_id_index_dir(test_data_dir):
return (
Expand Down
Loading
Loading