Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/catalog_dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Dataframe Methods
.. autosummary::
:toctree: api/

Catalog.reduce
Catalog.map_rows
Catalog.map_partitions
Catalog.to_hats
Catalog.compute
Expand Down
478 changes: 246 additions & 232 deletions docs/tutorials/pre_executed/nestedframe.ipynb

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions docs/tutorials/pre_executed/scaling_workflows.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"id": "cef7da55",
"metadata": {},
"outputs": [],
Expand Down Expand Up @@ -436,10 +436,10 @@
" # Return the features as a dictionary\n",
" return dict(zip(extractor.names, features))\n",
"\n",
" features = pix_df.reduce(\n",
" features = pix_df.map_rows(\n",
" _extract_features,\n",
" \"lc.hmjd\",\n",
" \"lc.mag\",\n",
" columns=[\"lc.hmjd\", \"lc.mag\"],\n",
" row_container=\"args\", # Pass columns as individual arguments\n",
" )\n",
"\n",
" return features"
Expand Down
423 changes: 146 additions & 277 deletions docs/tutorials/pre_executed/timeseries.ipynb

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions docs/tutorials/pre_executed/using_rubin_data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2204,12 +2204,12 @@
"id": "19e229f3-844a-47d7-bdad-dfa0ce83f51d",
"metadata": {},
"source": [
"Simple aggregrations can be applied via the `reduce` function, where below we define a very simple mean magnitude function and pass it along to reduce, selecting the \"psfMag\" sub-column of \"diaObjectForcedSource\" to compute the mean of for each object."
"Simple aggregrations can be applied via the `map_rows` function, where below we define a very simple mean magnitude function and pass it along to each row, selecting the \"psfMag\" sub-column of \"diaObjectForcedSource\" to compute the mean of for each object."
]
},
{
"cell_type": "code",
"execution_count": 11,
"execution_count": null,
"id": "abc7f804-1495-421f-9ec6-19333711c180",
"metadata": {
"execution": {
Expand Down Expand Up @@ -2304,14 +2304,14 @@
"import numpy as np\n",
"\n",
"\n",
"def mean_mag(mag):\n",
" return {\"mean_psfMag\": np.mean(mag)}\n",
"def mean_mag(row):\n",
" return {\"mean_psfMag\": np.mean(row[\"diaObjectForcedSource.psfMag\"])}\n",
"\n",
"\n",
"# meta defines the expected structure of the result\n",
"# append_columns adds the result as a column to the original catalog\n",
"oc_mean_mags_g = oc_long_lcs_g.reduce(\n",
" mean_mag, \"diaObjectForcedSource.psfMag\", meta={\"mean_psfMag\": np.float64}, append_columns=True\n",
"oc_mean_mags_g = oc_long_lcs_g.map_rows(\n",
" mean_mag, columns=[\"diaObjectForcedSource.psfMag\"], meta={\"mean_psfMag\": np.float64}, append_columns=True\n",
")\n",
"oc_mean_mags_g.head(10)[[\"mean_psfMag\"]]"
]
Expand Down
21 changes: 12 additions & 9 deletions docs/tutorials/pre_executed/ztf-alerts-sne.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1095,14 +1095,14 @@
"<img src=\"attachment:f353d676-708a-4aa9-94fe-7c25c516a871.png\" width=\"300\"/>\n",
"</div>\n",
"\n",
"Here we use the `.reduce()` method of the `Catalog` object to apply the feature extraction function to each row of the catalog.\n",
"Here we use the `.map_rows()` method of the `Catalog` object to apply the feature extraction function to each row of the catalog.\n",
"It is very similar to the `pandas.DataFrame.apply()` method but it works more efficiently and conveniently with nested data.\n",
"See [nested-pandas documentation](https://nested-pandas.readthedocs.io/en/latest/gettingstarted/quickstart.html#Reduce-Function) for more details."
"See [nested-pandas documentation](https://nested-pandas.readthedocs.io/en/latest/gettingstarted/quickstart.html#The-map_rows-Function) for more details."
]
},
{
"cell_type": "code",
"execution_count": 7,
"execution_count": null,
"id": "fcc42d39",
"metadata": {
"ExecuteTime": {
Expand Down Expand Up @@ -1325,13 +1325,16 @@
"\n",
"\n",
"# Extract features and filter objects by them\n",
"catalog_with_features = catalog.reduce(\n",
"catalog_with_features = catalog.map_rows(\n",
" extract_features, # function\n",
" \"lc.lc_fid\",\n",
" \"lc.lc_mjd\",\n",
" \"lc.lc_magpsf\",\n",
" \"lc.lc_sigmapsf\",\n",
" \"oid\", # columns to use\n",
" columns=[\n",
" \"lc.lc_fid\",\n",
" \"lc.lc_mjd\",\n",
" \"lc.lc_magpsf\",\n",
" \"lc.lc_sigmapsf\",\n",
" \"oid\",\n",
" ], # columns to use\n",
" row_container=\"args\", # Pass columns as individual arguments\n",
" meta=dict.fromkeys(feature_extractor.names, float), # Dask meta\n",
" append_columns=True, # Add the result feature columns to the catalog\n",
")\n",
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies = [
"dask[complete]>=2025.3.0",
"deprecated",
"hats>=0.6.7,<0.7.0",
"nested-pandas>=0.4.7,<0.6.0",
"nested-pandas>=0.6.0,<0.7.0",

# NOTE: package PINNED at <21.0.0, see https://github.com/astronomy-commons/lsdb/issues/974
"pyarrow>=14.0.1,<21.0.0; platform_system == 'Windows'",
Expand Down
142 changes: 142 additions & 0 deletions src/lsdb/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import hats as hc
import nested_pandas as npd
import pandas as pd
from deprecated import deprecated # type: ignore
from hats.catalog.catalog_collection import CatalogCollection
from hats.catalog.healpix_dataset.healpix_dataset import HealpixDataset as HCHealpixDataset
from hats.catalog.index.index_catalog import IndexCatalog as HCIndexCatalog
Expand Down Expand Up @@ -990,6 +991,7 @@ def nest_lists(
)
return catalog

@deprecated(version="0.6.7", reason="`reduce` will be removed in the future, " "use `map_rows` instead.")
def reduce(self, func, *args, meta=None, append_columns=False, infer_nesting=True, **kwargs) -> Catalog:
"""
Takes a function and applies it to each top-level row of the Catalog.
Expand Down Expand Up @@ -1059,6 +1061,146 @@ def reduce(self, func, *args, meta=None, append_columns=False, infer_nesting=Tru
)
return catalog

def map_rows(
self,
func,
columns=None,
row_container="dict",
output_names=None,
infer_nesting=True,
append_columns=False,
meta=None,
**kwargs,
) -> Catalog:
"""
Takes a function and applies it to each top-level row of the Catalog.

docstring copied from nested-pandas

Nested columns are packaged alongside base columns and available for function use, where base columns
are passed as scalars and nested columns are passed as numpy arrays. The way in which the row data is
packaged is configurable (by default, a dictionary) and controlled by the `row_container` argument.

Parameters
----------
func : callable
Function to apply to each nested dataframe. The first arguments to `func` should be which
columns to apply the function to. See the Notes for recommendations
on writing func outputs.
columns : None | str | list of str
Specifies which columns to pass to the function in the row_container format.
If None, all columns are passed. If list of str, those columns are passed.
If str, a single column is passed or if the string is a nested column, then all nested sub-columns
are passed (e.g. columns="nested" passes all columns of the nested dataframe "nested"). To pass
individual nested sub-columns, use the hierarchical column name (e.g. columns=["nested.t",...]).
row_container : 'dict' or 'args', default 'dict'
Specifies how the row data will be packaged when passed as an input to the function.
If 'dict', the function will be called as `func({"col1": value, ...}, **kwargs)`, so func should
expect a single dictionary input with keys corresponding to column names.
If 'args', the function will be called as `func(value, ..., **kwargs)`, so func should expect
positional arguments corresponding to the columns specified in `args`.
output_names : None | str | list of str
Specifies the names of the output columns in the resulting NestedFrame. If None, the function
will return whatever names the user function returns. If specified will override any names
returned by the user function provided the number of names matches the number of outputs. When not
specified and the user function returns values without names (e.g. a list or tuple), the output
columns will be enumerated (e.g. "0", "1", ...).
infer_nesting : bool, default True
If True, the function will pack output columns into nested
structures based on column names adhering to a nested naming
scheme. E.g. "nested.b" and "nested.c" will be packed into a column
called "nested" with columns "b" and "c". If False, all outputs
will be returned as base columns. Note that this will trigger off of names specified in
`output_names` in addition to names returned by the user function.
append_columns : bool, default False
if True, the output columns should be appended to those in the original NestedFrame.
meta : dataframe or series-like, optional
The dask meta of the output. If append_columns is True, the meta should specify just the
additional columns output by func.
kwargs : keyword arguments, optional
Keyword arguments to pass to the function.

Returns
-------
`Catalog`
`Catalog` with the results of the function applied to the columns of the frame.

Notes
-----
If concerned about performance, specify `columns` to only include the columns
needed for the function, as this will avoid the overhead of packaging
all columns for each row.

By default, `map_rows` will produce a `NestedFrame` with enumerated
column names for each returned value of the function. It's recommended
to either specify `output_names` or have `func` return a dictionary
where each key is an output column of the dataframe returned by
`map_rows` (as shown above).

Examples
--------

Writing a function that takes a row as a dictionary:

>>> import numpy as np
>>> import lsdb
>>> import pandas as pd
>>> catalog = lsdb.from_dataframe(pd.DataFrame({"ra":[0, 10], "dec":[5, 15],
... "mag":[21, 22], "mag_err":[.1, .2]}))

>>> def my_sigma(row):
... '''map_rows will return a NestedFrame with two columns'''
... return row["mag"] + row["mag_err"], row["mag"] - row["mag_err"]
>>> meta = {"plus_one": np.float64, "minus_one": np.float64}
>>> catalog.map_rows(my_sigma,
... columns=["mag","mag_err"],
... output_names=["plus_one", "minus_one"],
... meta=meta).compute().reset_index()
_healpix_29 plus_one minus_one
0 1372475556631677955 21.1 20.9
1 1389879706834706546 22.2 21.8


Writing the same function using positional arguments:

>>> def my_sigma(col1, col2):
... '''map_rows will return a NestedFrame with two columns'''
... return col1 + col2, col1 - col2
>>> meta = {"plus_one": np.float64, "minus_one": np.float64}
>>> catalog.map_rows(my_sigma,
... columns=["mag","mag_err"],
... row_container='args', # send columns as positional args
... output_names=["plus_one", "minus_one"],
... meta=meta).compute().reset_index()
_healpix_29 plus_one minus_one
0 1372475556631677955 21.1 20.9
1 1389879706834706546 22.2 21.8

See more examples in the nested-pandas documentation.
"""
catalog = super().map_rows(
func,
columns=columns,
row_container=row_container,
output_names=output_names,
infer_nesting=infer_nesting,
append_columns=append_columns,
meta=meta,
**kwargs,
)
if self.margin is not None:
catalog.margin = self.margin.map_rows(
func,
columns=columns,
row_container=row_container,
output_names=output_names,
infer_nesting=infer_nesting,
append_columns=append_columns,
meta=meta,
**kwargs,
)
return catalog

def to_hats(
self,
base_catalog_path: str | Path | UPath,
Expand Down
Loading
Loading