Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ def time_open_many_columns_all():
return lsdb.open_catalog(BENCH_DATA_DIR / "object_collection", columns="all")


def time_lazy_crossmatch_many_columns_all_suffixes():
cat = lsdb.open_catalog(BENCH_DATA_DIR / "object_collection", columns="all")
return cat.crossmatch(
cat, require_right_margin=False, suffixes=("_left", "_right"), suffix_method="all_columns"
)


def time_lazy_crossmatch_many_columns_overlapping_suffixes():
cat = lsdb.open_catalog(BENCH_DATA_DIR / "object_collection", columns="all")
return cat.crossmatch(
cat, require_right_margin=False, suffixes=("_left", "_right"), suffix_method="overlapping_columns"
)


def time_open_many_columns_list():
return lsdb.open_catalog(
BENCH_DATA_DIR / "object_collection",
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies = [

# NOTE: package PINNED at <0.3.0, see https://github.com/astronomy-commons/lsdb/issues/1047
"universal-pathlib>=0.2.2,<0.3.0",
"tabulate>=0.7.0",
]

[project.urls]
Expand All @@ -52,6 +53,7 @@ dev = [
"pytest",
"pytest-cov", # Used to report total code coverage
"pytest-mock", # Used to mock objects in tests
"types-tabulate", # Type information for tabulate
]
full = [
"fsspec[full]", # complete file system specs.
Expand Down
102 changes: 94 additions & 8 deletions src/lsdb/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
join_catalog_data_through,
merge_asof_catalog_data,
)
from lsdb.dask.merge_catalog_functions import create_merged_catalog_info
from lsdb.dask.merge_catalog_functions import DEFAULT_SUFFIX_METHOD, create_merged_catalog_info
from lsdb.dask.merge_map_catalog_data import merge_map_catalog_data
from lsdb.io.schema import get_arrow_schema
from lsdb.loaders.hats.hats_loading_config import HatsLoadingConfig
Expand Down Expand Up @@ -161,6 +161,8 @@ def crossmatch(
) = BuiltInCrossmatchAlgorithm.KD_TREE,
output_catalog_name: str | None = None,
require_right_margin: bool = False,
suffix_method: str | None = None,
log_changes: bool = True,
**kwargs,
) -> Catalog:
"""Perform a cross-match between two catalogs
Expand Down Expand Up @@ -223,6 +225,13 @@ def crossmatch(
Default: {left_name}_x_{right_name}
require_right_margin (bool): If true, raises an error if the right margin is missing which could
lead to incomplete crossmatches. Default: False
suffix_method (str): Method to use to add suffixes to columns. Options are:
- "overlapping_columns": only add suffixes to columns that are present in both catalogs
- "all_columns": add suffixes to all columns from both catalogs
Default: "all_columns" Warning: This default will change to "overlapping_columns" in a future
release.
log_changes (bool): If True, logs an info message for each column that is being renamed.
This only applies when suffix_method is 'overlapping_columns'. Default: True

Returns:
A Catalog with the data from the left and right catalogs merged with one row for each
Expand All @@ -242,15 +251,34 @@ def crossmatch(
suffixes = (f"_{self.name}", f"_{other.name}")
if len(suffixes) != 2:
raise ValueError("`suffixes` must be a tuple with two strings")
if suffix_method is None:
suffix_method = DEFAULT_SUFFIX_METHOD
warnings.warn(
"The default suffix behavior will change from applying suffixes to all columns to only "
"applying suffixes to overlapping columns in a future release."
"To maintain the current behavior, explicitly set `suffix_method='all_columns'`. "
"To change to the new behavior, set `suffix_method='overlapping_columns'`.",
FutureWarning,
)
if other.margin is None and require_right_margin:
raise ValueError("Right catalog margin cache is required for cross-match.")
if output_catalog_name is None:
output_catalog_name = f"{self.name}_x_{other.name}"
ddf, ddf_map, alignment = crossmatch_catalog_data(
self, other, suffixes, algorithm=algorithm, **kwargs
self,
other,
suffixes,
algorithm=algorithm,
suffix_method=suffix_method,
log_changes=log_changes,
**kwargs,
)
new_catalog_info = create_merged_catalog_info(
self.hc_structure.catalog_info, other.hc_structure.catalog_info, output_catalog_name, suffixes
self,
other,
output_catalog_name,
suffixes,
suffix_method,
)
hc_catalog = self.hc_structure.__class__(
new_catalog_info, alignment.pixel_tree, schema=get_arrow_schema(ddf), moc=alignment.moc
Expand Down Expand Up @@ -720,6 +748,8 @@ def merge_asof(
direction: str = "backward",
suffixes: tuple[str, str] | None = None,
output_catalog_name: str | None = None,
suffix_method: str | None = None,
log_changes: bool = True,
):
"""Uses the pandas `merge_asof` function to merge two catalogs on their indices by distance of keys

Expand All @@ -733,6 +763,14 @@ def merge_asof(
other (lsdb.Catalog): the right catalog to merge to
suffixes (Tuple[str,str]): the suffixes to apply to each partition's column names
direction (str): the direction to perform the merge_asof
output_catalog_name (str): The name of the resulting catalog to be stored in metadata
suffix_method (str): Method to use to add suffixes to columns. Options are:
- "overlapping_columns": only add suffixes to columns that are present in both catalogs
- "all_columns": add suffixes to all columns from both catalogs
Default: "all_columns" Warning: This default will change to "overlapping_columns" in a future
release.
log_changes (bool): If True, logs an info message for each column that is being renamed.
This only applies when suffix_method is 'overlapping_columns'. Default: True

Returns:
A new catalog with the columns from each of the input catalogs with their respective suffixes
Expand All @@ -744,7 +782,24 @@ def merge_asof(
if len(suffixes) != 2:
raise ValueError("`suffixes` must be a tuple with two strings")

ddf, ddf_map, alignment = merge_asof_catalog_data(self, other, suffixes=suffixes, direction=direction)
if suffix_method is None:
suffix_method = DEFAULT_SUFFIX_METHOD
warnings.warn(
"The default suffix behavior will change from applying suffixes to all columns to only "
"applying suffixes to overlapping columns in a future release."
"To maintain the current behavior, explicitly set `suffix_method='all_columns'`. "
"To change to the new behavior, set `suffix_method='overlapping_columns'`.",
FutureWarning,
)

ddf, ddf_map, alignment = merge_asof_catalog_data(
self,
other,
suffixes=suffixes,
direction=direction,
suffix_method=suffix_method,
log_changes=log_changes,
)

if output_catalog_name is None:
output_catalog_name = (
Expand All @@ -753,7 +808,11 @@ def merge_asof(
)

new_catalog_info = create_merged_catalog_info(
self.hc_structure.catalog_info, other.hc_structure.catalog_info, output_catalog_name, suffixes
self,
other,
output_catalog_name,
suffixes,
suffix_method,
)
hc_catalog = hc.catalog.Catalog(
new_catalog_info, alignment.pixel_tree, schema=get_arrow_schema(ddf), moc=alignment.moc
Expand All @@ -768,6 +827,8 @@ def join(
through: AssociationCatalog | None = None,
suffixes: tuple[str, str] | None = None,
output_catalog_name: str | None = None,
suffix_method: str | None = None,
log_changes: bool = True,
) -> Catalog:
"""Perform a spatial join to another catalog

Expand All @@ -783,6 +844,13 @@ def join(
between pixels and individual rows.
suffixes (Tuple[str,str]): suffixes to apply to the columns of each table
output_catalog_name (str): The name of the resulting catalog to be stored in metadata
suffix_method (str): Method to use to add suffixes to columns. Options are:
- "overlapping_columns": only add suffixes to columns that are present in both catalogs
- "all_columns": add suffixes to all columns from both catalogs
Default: "all_columns" Warning: This default will change to "overlapping_columns" in a future
release.
log_changes (bool): If True, logs an info message for each column that is being renamed.
This only applies when suffix_method is 'overlapping_columns'. Default: True

Returns:
A new catalog with the columns from each of the input catalogs with their respective suffixes
Expand All @@ -794,24 +862,42 @@ def join(
if len(suffixes) != 2:
raise ValueError("`suffixes` must be a tuple with two strings")

if suffix_method is None:
suffix_method = DEFAULT_SUFFIX_METHOD
warnings.warn(
"The default suffix behavior will change from applying suffixes to all columns to only "
"applying suffixes to overlapping columns in a future release."
"To maintain the current behavior, explicitly set `suffix_method='all_columns'`. "
"To change to the new behavior, set `suffix_method='overlapping_columns'`.",
FutureWarning,
)

self._check_unloaded_columns([left_on, right_on])

if through is not None:
ddf, ddf_map, alignment = join_catalog_data_through(self, other, through, suffixes)
ddf, ddf_map, alignment = join_catalog_data_through(
self, other, through, suffixes, suffix_method=suffix_method, log_changes=log_changes
)
else:
if left_on is None or right_on is None:
raise ValueError("Either both of left_on and right_on, or through must be set")
if left_on not in self._ddf.columns:
raise ValueError("left_on must be a column in the left catalog")
if right_on not in other._ddf.columns:
raise ValueError("right_on must be a column in the right catalog")
ddf, ddf_map, alignment = join_catalog_data_on(self, other, left_on, right_on, suffixes)
ddf, ddf_map, alignment = join_catalog_data_on(
self, other, left_on, right_on, suffixes, suffix_method=suffix_method, log_changes=log_changes
)

if output_catalog_name is None:
output_catalog_name = self.hc_structure.catalog_info.catalog_name

new_catalog_info = create_merged_catalog_info(
self.hc_structure.catalog_info, other.hc_structure.catalog_info, output_catalog_name, suffixes
self,
other,
output_catalog_name,
suffixes,
suffix_method,
)
hc_catalog = hc.catalog.Catalog(
new_catalog_info, alignment.pixel_tree, schema=get_arrow_schema(ddf), moc=alignment.moc
Expand Down
12 changes: 8 additions & 4 deletions src/lsdb/core/crossmatch/abstract_crossmatch_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import pandas as pd
from hats.catalog import TableProperties

from lsdb.dask.merge_catalog_functions import apply_suffixes

if TYPE_CHECKING:
from lsdb.catalog import Catalog

Expand Down Expand Up @@ -95,14 +97,14 @@ def __init__(
self.right_catalog_info = right_catalog_info
self.right_margin_catalog_info = right_margin_catalog_info

def crossmatch(self, suffixes, **kwargs) -> npd.NestedFrame:
def crossmatch(self, suffixes, suffix_method="all_columns", **kwargs) -> npd.NestedFrame:
"""Perform a crossmatch"""
l_inds, r_inds, extra_cols = self.perform_crossmatch(**kwargs)
if not len(l_inds) == len(r_inds) == len(extra_cols):
raise ValueError(
"Crossmatch algorithm must return left and right indices and extra columns with same length"
)
return self._create_crossmatch_df(l_inds, r_inds, extra_cols, suffixes)
return self._create_crossmatch_df(l_inds, r_inds, extra_cols, suffixes, suffix_method)

def crossmatch_nested(self, nested_column_name, **kwargs) -> npd.NestedFrame:
"""Perform a crossmatch"""
Expand Down Expand Up @@ -191,6 +193,7 @@ def _create_crossmatch_df(
right_idx: npt.NDArray[np.int64],
extra_cols: pd.DataFrame,
suffixes: tuple[str, str],
suffix_method="all_columns",
) -> npd.NestedFrame:
"""Creates a df containing the crossmatch result from matching indices and additional columns

Expand All @@ -204,8 +207,9 @@ def _create_crossmatch_df(
additional columns added
"""
# rename columns so no same names during merging
self._rename_columns_with_suffix(self.left, suffixes[0])
self._rename_columns_with_suffix(self.right, suffixes[1])
self.left, self.right = apply_suffixes(
self.left, self.right, suffixes, suffix_method, log_changes=False
)
# concat dataframes together
index_name = self.left.index.name if self.left.index.name is not None else "index"
left_join_part = self.left.iloc[left_idx].reset_index()
Expand Down
18 changes: 16 additions & 2 deletions src/lsdb/dask/crossmatch_catalog_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def perform_crossmatch(
right_margin_catalog_info,
algorithm,
suffixes,
suffix_method,
meta_df,
**kwargs,
):
Expand Down Expand Up @@ -68,7 +69,7 @@ def perform_crossmatch(
left_catalog_info,
right_catalog_info,
right_margin_catalog_info,
).crossmatch(suffixes, **kwargs)
).crossmatch(suffixes, suffix_method=suffix_method, **kwargs)


# pylint: disable=too-many-arguments, unused-argument
Expand Down Expand Up @@ -123,6 +124,8 @@ def crossmatch_catalog_data(
algorithm: (
Type[AbstractCrossmatchAlgorithm] | BuiltInCrossmatchAlgorithm
) = BuiltInCrossmatchAlgorithm.KD_TREE,
suffix_method: str | None = None,
log_changes: bool = True,
**kwargs,
) -> tuple[nd.NestedFrame, DaskDFPixelMap, PixelAlignment]:
"""Cross-matches the data from two catalogs
Expand All @@ -135,6 +138,12 @@ def crossmatch_catalog_data(
algorithm (BuiltInCrossmatchAlgorithm | Callable): The algorithm to use to perform the
crossmatch. Can be specified using a string for a built-in algorithm, or a custom
method. For more details, see `crossmatch` method in the `Catalog` class.
suffix_method (str): Method to use to add suffixes to columns. Options are:
- "overlapping_columns": only add suffixes to columns that are present in both catalogs
- "all_columns": add suffixes to all columns from both catalogs
Default: "all_columns"
log_changes (bool): If True, logs an info message for each column that is being renamed.
This only applies when suffix_method is 'overlapping_columns'. Default: True
**kwargs: Additional arguments to pass to the cross-match algorithm

Returns:
Expand All @@ -161,7 +170,11 @@ def crossmatch_catalog_data(

# generate meta table structure for dask df
meta_df = generate_meta_df_for_joined_tables(
[left, right], suffixes, extra_columns=crossmatch_algorithm.extra_columns
(left, right),
suffixes,
suffix_method=suffix_method,
extra_columns=crossmatch_algorithm.extra_columns,
log_changes=log_changes,
)

# perform the crossmatch on each partition pairing using dask delayed for lazy computation
Expand All @@ -170,6 +183,7 @@ def crossmatch_catalog_data(
perform_crossmatch,
crossmatch_algorithm,
suffixes,
suffix_method,
meta_df,
**kwargs,
)
Expand Down
Loading
Loading