Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/lsdb/loaders/hats/hats_loading_config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -28,6 +29,10 @@ class HatsLoadingConfig:
error_empty_filter: bool = True
"""If loading raises an error for an empty filter result. Defaults to True."""

enable_fsspec_optimization: bool | None = None
"""Whether to enable fsspec optimization for remote file systems. If None,
will check LSDB_ENABLE_FSSPEC_OPTIMIZATION environment variable."""

kwargs: dict = field(default_factory=dict)
"""Extra kwargs for the pandas parquet file reader"""

Expand All @@ -39,6 +44,14 @@ def __post_init__(self):
f"Invalid keyword argument '{nonused_kwarg}' found. Did you mean 'margin_cache'?"
)

# Handle environment variable for fsspec optimization if not explicitly set
if self.enable_fsspec_optimization is None:
env_value = os.getenv("LSDB_ENABLE_FSSPEC_OPTIMIZATION")
if env_value is not None:
self.enable_fsspec_optimization = env_value.lower() in ("true", "1", "yes", "on")
else:
self.enable_fsspec_optimization = False

def make_query_url_params(self) -> dict:
"""
Generates a dictionary of URL parameters with `columns` and `filters` attributes.
Expand Down Expand Up @@ -72,6 +85,17 @@ def make_query_url_params(self) -> dict:
return url_params

def get_read_kwargs(self):
"""Clumps existing kwargs and `dtype_backend`, if specified."""
"""Clumps existing kwargs and applies fsspec optimization if enabled."""
kwargs = dict(self.kwargs)

# Apply fsspec optimization if enabled
if self.enable_fsspec_optimization:
# Set up open_file_options with precache_options if not already present
if "open_file_options" not in kwargs:
kwargs["open_file_options"] = {}

# Only add precache_options if not already set by the user
if "precache_options" not in kwargs["open_file_options"]:
kwargs["open_file_options"]["precache_options"] = {"method": "parquet"}

return kwargs
15 changes: 14 additions & 1 deletion src/lsdb/loaders/hats/read_hats.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@ def read_hats(
columns: list[str] | str | None = None,
margin_cache: str | Path | UPath | None = None,
error_empty_filter: bool = True,
enable_fsspec_optimization: bool | None = None,
**kwargs,
) -> Dataset:
"""Load catalog from a HATS path. See open_catalog()."""
return open_catalog(path, search_filter, columns, margin_cache, error_empty_filter, **kwargs)
return open_catalog(
path, search_filter, columns, margin_cache, error_empty_filter, enable_fsspec_optimization, **kwargs
)


def open_catalog(
Expand All @@ -50,6 +53,7 @@ def open_catalog(
columns: list[str] | str | None = None,
margin_cache: str | Path | UPath | None = None,
error_empty_filter: bool = True,
enable_fsspec_optimization: bool | None = None,
**kwargs,
) -> Dataset:
"""Open a catalog from a HATS path.
Expand Down Expand Up @@ -100,6 +104,11 @@ def open_catalog(
columns (list[str] | str): Default `None`. The set of columns to filter the catalog on. If None,
the catalog's default columns will be loaded. To load all catalog columns, use `columns="all"`.
margin_cache (path-like): Default `None`. The margin for the main catalog, provided as a path.
error_empty_filter (bool): Default `True`. If loading raises an error for an empty filter result.
enable_fsspec_optimization (bool | None): Default `None`. Whether to enable fsspec optimization
for remote file systems (e.g. S3, GCS). If None, will check LSDB_ENABLE_FSSPEC_OPTIMIZATION
environment variable. The optimization uses precaching to improve performance when reading
Parquet files from remote storage.
dtype_backend (str): Backend data type to apply to the catalog.
Defaults to "pyarrow". If None, no type conversion is performed.
**kwargs: Arguments to pass to the pandas parquet file reader
Expand All @@ -122,6 +131,7 @@ def open_catalog(
columns=columns,
margin_cache=margin_cache,
error_empty_filter=error_empty_filter,
enable_fsspec_optimization=enable_fsspec_optimization,
**kwargs,
)
catalog.hc_collection = hc_catalog # type: ignore[attr-defined]
Expand All @@ -132,6 +142,7 @@ def open_catalog(
columns=columns,
margin_cache=margin_cache,
error_empty_filter=error_empty_filter,
enable_fsspec_optimization=enable_fsspec_optimization,
**kwargs,
)
return catalog
Expand Down Expand Up @@ -164,6 +175,7 @@ def _load_catalog(
columns: list[str] | str | None = None,
margin_cache: str | Path | UPath | None = None,
error_empty_filter: bool = True,
enable_fsspec_optimization: bool | None = None,
**kwargs,
) -> Dataset:
if columns is None and hc_catalog.catalog_info.default_columns is not None:
Expand All @@ -188,6 +200,7 @@ def _load_catalog(
columns=columns,
margin_cache=margin_cache,
error_empty_filter=error_empty_filter,
enable_fsspec_optimization=enable_fsspec_optimization,
kwargs=kwargs,
)

Expand Down
118 changes: 118 additions & 0 deletions tests/lsdb/loaders/hats/test_fsspec_optimization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Tests for fsspec optimization functionality."""

import os
from unittest.mock import patch

import pytest

from lsdb.loaders.hats.hats_loading_config import HatsLoadingConfig


class TestFsspecOptimization:
"""Test fsspec optimization parameter handling."""

def test_explicit_enable(self):
"""Test explicit enable of fsspec optimization."""
config = HatsLoadingConfig(enable_fsspec_optimization=True)
kwargs = config.get_read_kwargs()

assert "open_file_options" in kwargs
assert "precache_options" in kwargs["open_file_options"]
assert kwargs["open_file_options"]["precache_options"]["method"] == "parquet"

def test_explicit_disable(self):
"""Test explicit disable of fsspec optimization."""
config = HatsLoadingConfig(enable_fsspec_optimization=False)
kwargs = config.get_read_kwargs()

# Should not add open_file_options if not already present
assert "open_file_options" not in kwargs or "precache_options" not in kwargs.get(
"open_file_options", {}
)

def test_environment_variable_true(self):
"""Test environment variable set to true."""
with patch.dict(os.environ, {"LSDB_ENABLE_FSSPEC_OPTIMIZATION": "true"}):
config = HatsLoadingConfig()
assert config.enable_fsspec_optimization is True

kwargs = config.get_read_kwargs()
assert "open_file_options" in kwargs
assert kwargs["open_file_options"]["precache_options"]["method"] == "parquet"

def test_environment_variable_false(self):
"""Test environment variable set to false."""
with patch.dict(os.environ, {"LSDB_ENABLE_FSSPEC_OPTIMIZATION": "false"}):
config = HatsLoadingConfig()
assert config.enable_fsspec_optimization is False

def test_environment_variable_numeric(self):
"""Test environment variable with numeric values."""
with patch.dict(os.environ, {"LSDB_ENABLE_FSSPEC_OPTIMIZATION": "1"}):
config = HatsLoadingConfig()
assert config.enable_fsspec_optimization is True

with patch.dict(os.environ, {"LSDB_ENABLE_FSSPEC_OPTIMIZATION": "0"}):
config = HatsLoadingConfig()
assert config.enable_fsspec_optimization is False

def test_environment_variable_case_insensitive(self):
"""Test environment variable is case insensitive."""
with patch.dict(os.environ, {"LSDB_ENABLE_FSSPEC_OPTIMIZATION": "TRUE"}):
config = HatsLoadingConfig()
assert config.enable_fsspec_optimization is True

with patch.dict(os.environ, {"LSDB_ENABLE_FSSPEC_OPTIMIZATION": "FALSE"}):
config = HatsLoadingConfig()
assert config.enable_fsspec_optimization is False

def test_environment_variable_other_values(self):
"""Test environment variable with other true/false values."""
for true_value in ["yes", "on", "YES", "ON"]:
with patch.dict(os.environ, {"LSDB_ENABLE_FSSPEC_OPTIMIZATION": true_value}):
config = HatsLoadingConfig()
assert config.enable_fsspec_optimization is True

def test_no_environment_variable(self):
"""Test default behavior when no environment variable is set."""
with patch.dict(os.environ, {}, clear=True):
config = HatsLoadingConfig()
assert config.enable_fsspec_optimization is False

def test_explicit_overrides_environment(self):
"""Test that explicit parameter overrides environment variable."""
with patch.dict(os.environ, {"LSDB_ENABLE_FSSPEC_OPTIMIZATION": "true"}):
config = HatsLoadingConfig(enable_fsspec_optimization=False)
assert config.enable_fsspec_optimization is False

def test_preserve_user_precache_options(self):
"""Test that user-provided precache_options are not overridden."""
user_kwargs = {"open_file_options": {"precache_options": {"method": "custom", "size": 1024}}}
config = HatsLoadingConfig(enable_fsspec_optimization=True, kwargs=user_kwargs)
kwargs = config.get_read_kwargs()

# Should not override user's precache_options
assert kwargs["open_file_options"]["precache_options"]["method"] == "custom"
assert kwargs["open_file_options"]["precache_options"]["size"] == 1024

def test_preserve_other_open_file_options(self):
"""Test that other open_file_options are preserved when adding precache_options."""
user_kwargs = {"open_file_options": {"block_size": 1024, "cache_type": "bytes"}}
config = HatsLoadingConfig(enable_fsspec_optimization=True, kwargs=user_kwargs)
kwargs = config.get_read_kwargs()

# Should add precache_options but preserve other options
assert kwargs["open_file_options"]["precache_options"]["method"] == "parquet"
assert kwargs["open_file_options"]["block_size"] == 1024
assert kwargs["open_file_options"]["cache_type"] == "bytes"

def test_preserve_other_kwargs(self):
"""Test that other kwargs are preserved."""
user_kwargs = {"filters": [("column", "==", "value")], "columns": ["col1", "col2"]}
config = HatsLoadingConfig(enable_fsspec_optimization=True, kwargs=user_kwargs)
kwargs = config.get_read_kwargs()

# Should preserve other kwargs
assert kwargs["filters"] == [("column", "==", "value")]
assert kwargs["columns"] == ["col1", "col2"]
assert kwargs["open_file_options"]["precache_options"]["method"] == "parquet"
13 changes: 13 additions & 0 deletions tests/lsdb/loaders/hats/test_read_hats.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,19 @@ def test_read_hats_margin_catalog_subset(
helpers.assert_divisions_are_correct(margin)


def test_read_hats_with_fsspec_optimization(small_sky_order1_dir, helpers):
"""Test that fsspec optimization parameter is accepted and works."""
# Test explicit enable
catalog = lsdb.open_catalog(small_sky_order1_dir, enable_fsspec_optimization=True)
assert isinstance(catalog, lsdb.Catalog)
helpers.assert_divisions_are_correct(catalog)

# Test explicit disable
catalog = lsdb.open_catalog(small_sky_order1_dir, enable_fsspec_optimization=False)
assert isinstance(catalog, lsdb.Catalog)
helpers.assert_divisions_are_correct(catalog)


def test_read_hats_margin_catalog_subset_is_empty(small_sky_order1_source_margin_dir):
search_filter = ConeSearch(ra=100, dec=80, radius_arcsec=1)
with pytest.raises(ValueError, match="no coverage"):
Expand Down