diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index a76027fbd6..e161a086d4 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -4419,7 +4419,7 @@ def to_dict( allow_large_results: Optional[bool] = None, **kwargs, ) -> dict | list[dict]: - return self.to_pandas(allow_large_results=allow_large_results).to_dict(orient, into, **kwargs) # type: ignore + return self.to_pandas(allow_large_results=allow_large_results).to_dict(orient=orient, into=into, **kwargs) # type: ignore def to_excel( self, diff --git a/bigframes/pandas/io/api.py b/bigframes/pandas/io/api.py index cf4b4eb19c..483bc5e530 100644 --- a/bigframes/pandas/io/api.py +++ b/bigframes/pandas/io/api.py @@ -187,6 +187,7 @@ def read_gbq( # type: ignore[overload-overlap] use_cache: Optional[bool] = ..., col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., + allow_large_results: Optional[bool] = ..., ) -> bigframes.dataframe.DataFrame: ... @@ -203,6 +204,7 @@ def read_gbq( use_cache: Optional[bool] = ..., col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., + allow_large_results: Optional[bool] = ..., ) -> pandas.Series: ... @@ -218,6 +220,7 @@ def read_gbq( use_cache: Optional[bool] = None, col_order: Iterable[str] = (), dry_run: bool = False, + allow_large_results: Optional[bool] = None, ) -> bigframes.dataframe.DataFrame | pandas.Series: _set_default_session_location_if_possible(query_or_table) return global_session.with_default_session( @@ -231,6 +234,7 @@ def read_gbq( use_cache=use_cache, col_order=col_order, dry_run=dry_run, + allow_large_results=allow_large_results, ) @@ -400,6 +404,7 @@ def read_gbq_query( # type: ignore[overload-overlap] col_order: Iterable[str] = ..., filters: vendored_pandas_gbq.FiltersType = ..., dry_run: Literal[False] = ..., + allow_large_results: Optional[bool] = ..., ) -> bigframes.dataframe.DataFrame: ... @@ -416,6 +421,7 @@ def read_gbq_query( col_order: Iterable[str] = ..., filters: vendored_pandas_gbq.FiltersType = ..., dry_run: Literal[True] = ..., + allow_large_results: Optional[bool] = ..., ) -> pandas.Series: ... @@ -431,6 +437,7 @@ def read_gbq_query( col_order: Iterable[str] = (), filters: vendored_pandas_gbq.FiltersType = (), dry_run: bool = False, + allow_large_results: Optional[bool] = None, ) -> bigframes.dataframe.DataFrame | pandas.Series: _set_default_session_location_if_possible(query) return global_session.with_default_session( @@ -444,6 +451,7 @@ def read_gbq_query( col_order=col_order, filters=filters, dry_run=dry_run, + allow_large_results=allow_large_results, ) @@ -617,7 +625,11 @@ def from_glob_path( def _get_bqclient() -> bigquery.Client: - clients_provider = bigframes.session.clients.ClientsProvider( + # Address circular imports in doctest due to bigframes/session/__init__.py + # containing a lot of logic and samples. + from bigframes.session import clients + + clients_provider = clients.ClientsProvider( project=config.options.bigquery.project, location=config.options.bigquery.location, use_regional_endpoints=config.options.bigquery.use_regional_endpoints, @@ -631,11 +643,15 @@ def _get_bqclient() -> bigquery.Client: def _dry_run(query, bqclient) -> bigquery.QueryJob: + # Address circular imports in doctest due to bigframes/session/__init__.py + # containing a lot of logic and samples. + from bigframes.session import metrics as bf_metrics + job = bqclient.query(query, bigquery.QueryJobConfig(dry_run=True)) # Fix for b/435183833. Log metrics even if a Session isn't available. - if bigframes.session.metrics.LOGGING_NAME_ENV_VAR in os.environ: - metrics = bigframes.session.metrics.ExecutionMetrics() + if bf_metrics.LOGGING_NAME_ENV_VAR in os.environ: + metrics = bf_metrics.ExecutionMetrics() metrics.count_job_stats(job) return job @@ -645,6 +661,10 @@ def _set_default_session_location_if_possible(query): def _set_default_session_location_if_possible_deferred_query(create_query): + # Address circular imports in doctest due to bigframes/session/__init__.py + # containing a lot of logic and samples. + from bigframes.session._io import bigquery + # Set the location as per the query if this is the first query the user is # running and: # (1) Default session has not started yet, and @@ -666,7 +686,7 @@ def _set_default_session_location_if_possible_deferred_query(create_query): query = create_query() bqclient = _get_bqclient() - if bigframes.session._io.bigquery.is_query(query): + if bigquery.is_query(query): # Intentionally run outside of the session so that we can detect the # location before creating the session. Since it's a dry_run, labels # aren't necessary. diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 10a112c779..e1307dc9fa 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -60,6 +60,7 @@ from bigframes import exceptions as bfe from bigframes import version +import bigframes._config import bigframes._config.bigquery_options as bigquery_options import bigframes.clients import bigframes.constants @@ -132,6 +133,10 @@ def __init__( context: Optional[bigquery_options.BigQueryOptions] = None, clients_provider: Optional[bigframes.session.clients.ClientsProvider] = None, ): + # Address circular imports in doctest due to bigframes/session/__init__.py + # containing a lot of logic and samples. + from bigframes.session import anonymous_dataset, clients, loader, metrics + _warn_if_bf_version_is_obsolete() if context is None: @@ -167,7 +172,7 @@ def __init__( if clients_provider: self._clients_provider = clients_provider else: - self._clients_provider = bigframes.session.clients.ClientsProvider( + self._clients_provider = clients.ClientsProvider( project=context.project, location=self._location, use_regional_endpoints=context.use_regional_endpoints, @@ -219,15 +224,13 @@ def __init__( else bigframes.enums.DefaultIndexKind.NULL ) - self._metrics = bigframes.session.metrics.ExecutionMetrics() + self._metrics = metrics.ExecutionMetrics() self._function_session = bff_session.FunctionSession() - self._anon_dataset_manager = ( - bigframes.session.anonymous_dataset.AnonymousDatasetManager( - self._clients_provider.bqclient, - location=self._location, - session_id=self._session_id, - kms_key=self._bq_kms_key_name, - ) + self._anon_dataset_manager = anonymous_dataset.AnonymousDatasetManager( + self._clients_provider.bqclient, + location=self._location, + session_id=self._session_id, + kms_key=self._bq_kms_key_name, ) # Session temp tables don't support specifying kms key, so use anon dataset if kms key specified self._session_resource_manager = ( @@ -241,7 +244,7 @@ def __init__( self._temp_storage_manager = ( self._session_resource_manager or self._anon_dataset_manager ) - self._loader = bigframes.session.loader.GbqDataLoader( + self._loader = loader.GbqDataLoader( session=self, bqclient=self._clients_provider.bqclient, storage_manager=self._temp_storage_manager, @@ -395,6 +398,7 @@ def read_gbq( # type: ignore[overload-overlap] use_cache: Optional[bool] = ..., col_order: Iterable[str] = ..., dry_run: Literal[False] = ..., + allow_large_results: Optional[bool] = ..., ) -> dataframe.DataFrame: ... @@ -411,6 +415,7 @@ def read_gbq( use_cache: Optional[bool] = ..., col_order: Iterable[str] = ..., dry_run: Literal[True] = ..., + allow_large_results: Optional[bool] = ..., ) -> pandas.Series: ... @@ -425,8 +430,8 @@ def read_gbq( filters: third_party_pandas_gbq.FiltersType = (), use_cache: Optional[bool] = None, col_order: Iterable[str] = (), - dry_run: bool = False - # Add a verify index argument that fails if the index is not unique. + dry_run: bool = False, + allow_large_results: Optional[bool] = None, ) -> dataframe.DataFrame | pandas.Series: # TODO(b/281571214): Generate prompt to show the progress of read_gbq. if columns and col_order: @@ -436,6 +441,9 @@ def read_gbq( elif col_order: columns = col_order + if allow_large_results is None: + allow_large_results = bigframes._config.options._allow_large_results + if bf_io_bigquery.is_query(query_or_table): return self._loader.read_gbq_query( # type: ignore # for dry_run overload query_or_table, @@ -446,6 +454,7 @@ def read_gbq( use_cache=use_cache, filters=filters, dry_run=dry_run, + allow_large_results=allow_large_results, ) else: if configuration is not None: @@ -521,6 +530,8 @@ def _read_gbq_colab( if pyformat_args is None: pyformat_args = {} + allow_large_results = bigframes._config.options._allow_large_results + query = bigframes.core.pyformat.pyformat( query, pyformat_args=pyformat_args, @@ -533,10 +544,7 @@ def _read_gbq_colab( index_col=bigframes.enums.DefaultIndexKind.NULL, force_total_order=False, dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run), - # TODO(tswast): we may need to allow allow_large_results to be overwritten - # or possibly a general configuration object for an explicit - # destination table and write disposition. - allow_large_results=False, + allow_large_results=allow_large_results, ) @overload @@ -552,6 +560,7 @@ def read_gbq_query( # type: ignore[overload-overlap] col_order: Iterable[str] = ..., filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[False] = ..., + allow_large_results: Optional[bool] = ..., ) -> dataframe.DataFrame: ... @@ -568,6 +577,7 @@ def read_gbq_query( col_order: Iterable[str] = ..., filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[True] = ..., + allow_large_results: Optional[bool] = ..., ) -> pandas.Series: ... @@ -583,6 +593,7 @@ def read_gbq_query( col_order: Iterable[str] = (), filters: third_party_pandas_gbq.FiltersType = (), dry_run: bool = False, + allow_large_results: Optional[bool] = None, ) -> dataframe.DataFrame | pandas.Series: """Turn a SQL query into a DataFrame. @@ -632,9 +643,48 @@ def read_gbq_query( See also: :meth:`Session.read_gbq`. + Args: + query (str): + A SQL query to execute. + index_col (Iterable[str] or str, optional): + The column(s) to use as the index for the DataFrame. This can be + a single column name or a list of column names. If not provided, + a default index will be used. + columns (Iterable[str], optional): + The columns to read from the query result. If not + specified, all columns will be read. + configuration (dict, optional): + A dictionary of query job configuration options. See the + BigQuery REST API documentation for a list of available options: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query + max_results (int, optional): + The maximum number of rows to retrieve from the query + result. If not specified, all rows will be loaded. + use_cache (bool, optional): + Whether to use cached results for the query. Defaults to ``True``. + Setting this to ``False`` will force a re-execution of the query. + col_order (Iterable[str], optional): + The desired order of columns in the resulting DataFrame. This + parameter is deprecated and will be removed in a future version. + Use ``columns`` instead. + filters (list[tuple], optional): + A list of filters to apply to the data. Filters are specified + as a list of tuples, where each tuple contains a column name, + an operator (e.g., '==', '!='), and a value. + dry_run (bool, optional): + If ``True``, the function will not actually execute the query but + will instead return statistics about the query. Defaults to + ``False``. + allow_large_results (bool, optional): + Whether to allow large query results. If ``True``, the query + results can be larger than the maximum response size. + Defaults to ``bpd.options.compute.allow_large_results``. + Returns: - bigframes.pandas.DataFrame: - A DataFrame representing results of the query or table. + bigframes.pandas.DataFrame or pandas.Series: + A DataFrame representing the result of the query. If ``dry_run`` + is ``True``, a ``pandas.Series`` containing query statistics is + returned. Raises: ValueError: @@ -649,6 +699,9 @@ def read_gbq_query( elif col_order: columns = col_order + if allow_large_results is None: + allow_large_results = bigframes._config.options._allow_large_results + return self._loader.read_gbq_query( # type: ignore # for dry_run overload query=query, index_col=index_col, @@ -658,6 +711,7 @@ def read_gbq_query( use_cache=use_cache, filters=filters, dry_run=dry_run, + allow_large_results=allow_large_results, ) @overload @@ -715,9 +769,40 @@ def read_gbq_table( See also: :meth:`Session.read_gbq`. + Args: + table_id (str): + The identifier of the BigQuery table to read. + index_col (Iterable[str] or str, optional): + The column(s) to use as the index for the DataFrame. This can be + a single column name or a list of column names. If not provided, + a default index will be used. + columns (Iterable[str], optional): + The columns to read from the table. If not specified, all + columns will be read. + max_results (int, optional): + The maximum number of rows to retrieve from the table. If not + specified, all rows will be loaded. + filters (list[tuple], optional): + A list of filters to apply to the data. Filters are specified + as a list of tuples, where each tuple contains a column name, + an operator (e.g., '==', '!='), and a value. + use_cache (bool, optional): + Whether to use cached results for the query. Defaults to ``True``. + Setting this to ``False`` will force a re-execution of the query. + col_order (Iterable[str], optional): + The desired order of columns in the resulting DataFrame. This + parameter is deprecated and will be removed in a future version. + Use ``columns`` instead. + dry_run (bool, optional): + If ``True``, the function will not actually execute the query but + will instead return statistics about the table. Defaults to + ``False``. + Returns: - bigframes.pandas.DataFrame: - A DataFrame representing results of the query or table. + bigframes.pandas.DataFrame or pandas.Series: + A DataFrame representing the contents of the table. If + ``dry_run`` is ``True``, a ``pandas.Series`` containing table + statistics is returned. Raises: ValueError: diff --git a/bigframes/session/_io/bigquery/read_gbq_query.py b/bigframes/session/_io/bigquery/read_gbq_query.py index 70c83d7875..aed77615ce 100644 --- a/bigframes/session/_io/bigquery/read_gbq_query.py +++ b/bigframes/session/_io/bigquery/read_gbq_query.py @@ -16,7 +16,7 @@ from __future__ import annotations -from typing import Optional +from typing import cast, Iterable, Optional, Tuple from google.cloud import bigquery import google.cloud.bigquery.table @@ -28,6 +28,7 @@ import bigframes.core.blocks as blocks import bigframes.core.guid import bigframes.core.schema as schemata +import bigframes.enums import bigframes.session @@ -53,7 +54,11 @@ def create_dataframe_from_query_job_stats( def create_dataframe_from_row_iterator( - rows: google.cloud.bigquery.table.RowIterator, *, session: bigframes.session.Session + rows: google.cloud.bigquery.table.RowIterator, + *, + session: bigframes.session.Session, + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind, + columns: Iterable[str], ) -> dataframe.DataFrame: """Convert a RowIterator into a DataFrame wrapping a LocalNode. @@ -61,11 +66,27 @@ def create_dataframe_from_row_iterator( 'jobless' case where there's no destination table. """ pa_table = rows.to_arrow() + bq_schema = list(rows.schema) + is_default_index = not index_col or isinstance( + index_col, bigframes.enums.DefaultIndexKind + ) - # TODO(tswast): Use array_value.promote_offsets() instead once that node is - # supported by the local engine. - offsets_col = bigframes.core.guid.generate_guid() - pa_table = pyarrow_utils.append_offsets(pa_table, offsets_col=offsets_col) + if is_default_index: + # We get a sequential index for free, so use that if no index is specified. + # TODO(tswast): Use array_value.promote_offsets() instead once that node is + # supported by the local engine. + offsets_col = bigframes.core.guid.generate_guid() + pa_table = pyarrow_utils.append_offsets(pa_table, offsets_col=offsets_col) + bq_schema += [bigquery.SchemaField(offsets_col, "INTEGER")] + index_columns: Tuple[str, ...] = (offsets_col,) + index_labels: Tuple[Optional[str], ...] = (None,) + elif isinstance(index_col, str): + index_columns = (index_col,) + index_labels = (index_col,) + else: + index_col = cast(Iterable[str], index_col) + index_columns = tuple(index_col) + index_labels = cast(Tuple[Optional[str], ...], tuple(index_col)) # We use the ManagedArrowTable constructor directly, because the # results of to_arrow() should be the source of truth with regards @@ -74,17 +95,27 @@ def create_dataframe_from_row_iterator( # like the output of the BQ Storage Read API. mat = local_data.ManagedArrowTable( pa_table, - schemata.ArraySchema.from_bq_schema( - list(rows.schema) + [bigquery.SchemaField(offsets_col, "INTEGER")] - ), + schemata.ArraySchema.from_bq_schema(bq_schema), ) mat.validate() + column_labels = [ + field.name for field in rows.schema if field.name not in index_columns + ] + array_value = core.ArrayValue.from_managed(mat, session) block = blocks.Block( array_value, - (offsets_col,), - [field.name for field in rows.schema], - (None,), + index_columns=index_columns, + column_labels=column_labels, + index_labels=index_labels, ) - return dataframe.DataFrame(block) + df = dataframe.DataFrame(block) + + if columns: + df = df[list(columns)] + + if not is_default_index: + df = df.sort_index() + + return df diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 6500701324..49b1195235 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -721,6 +721,9 @@ def read_gbq_table( columns=columns, use_cache=use_cache, dry_run=dry_run, + # If max_results has been set, we almost certainly have < 10 GB + # of results. + allow_large_results=False, ) return df @@ -895,7 +898,7 @@ def read_gbq_query( # type: ignore[overload-overlap] filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[False] = ..., force_total_order: Optional[bool] = ..., - allow_large_results: bool = ..., + allow_large_results: bool, ) -> dataframe.DataFrame: ... @@ -912,7 +915,7 @@ def read_gbq_query( filters: third_party_pandas_gbq.FiltersType = ..., dry_run: Literal[True] = ..., force_total_order: Optional[bool] = ..., - allow_large_results: bool = ..., + allow_large_results: bool, ) -> pandas.Series: ... @@ -928,7 +931,7 @@ def read_gbq_query( filters: third_party_pandas_gbq.FiltersType = (), dry_run: bool = False, force_total_order: Optional[bool] = None, - allow_large_results: bool = True, + allow_large_results: bool, ) -> dataframe.DataFrame | pandas.Series: configuration = _transform_read_gbq_configuration(configuration) @@ -953,6 +956,7 @@ def read_gbq_query( True if use_cache is None else use_cache ) + _check_duplicates("columns", columns) index_cols = _to_index_cols(index_col) _check_index_col_param(index_cols, columns) @@ -1040,10 +1044,19 @@ def read_gbq_query( # local node. Likely there are a wide range of sizes in which it # makes sense to download the results beyond the first page, even if # there is a job and destination table available. - if rows is not None and destination is None: + if ( + rows is not None + and destination is None + and ( + query_job_for_metrics is None + or query_job_for_metrics.statement_type == "SELECT" + ) + ): return bf_read_gbq_query.create_dataframe_from_row_iterator( rows, session=self._session, + index_col=index_col, + columns=columns, ) # If there was no destination table and we've made it this far, that diff --git a/tests/system/small/bigquery/test_vector_search.py b/tests/system/small/bigquery/test_vector_search.py index a282135fa6..3107795730 100644 --- a/tests/system/small/bigquery/test_vector_search.py +++ b/tests/system/small/bigquery/test_vector_search.py @@ -123,12 +123,17 @@ def test_vector_search_basic_params_with_df(): "embedding": [[1.0, 2.0], [3.0, 5.2]], } ) - vector_search_result = bbq.vector_search( - base_table="bigframes-dev.bigframes_tests_sys.base_table", - column_to_search="my_embedding", - query=search_query, - top_k=2, - ).to_pandas() # type:ignore + vector_search_result = ( + bbq.vector_search( + base_table="bigframes-dev.bigframes_tests_sys.base_table", + column_to_search="my_embedding", + query=search_query, + top_k=2, + ) + .sort_values("distance") + .sort_index() + .to_pandas() + ) # type:ignore expected = pd.DataFrame( { "query_id": ["cat", "dog", "dog", "cat"], @@ -157,80 +162,60 @@ def test_vector_search_basic_params_with_df(): ) -def test_vector_search_different_params_with_query(): - search_query = bpd.Series([[1.0, 2.0], [3.0, 5.2]]) - vector_search_result = bbq.vector_search( - base_table="bigframes-dev.bigframes_tests_sys.base_table", - column_to_search="my_embedding", - query=search_query, - distance_type="cosine", - top_k=2, - ).to_pandas() # type:ignore - expected = pd.DataFrame( +def test_vector_search_different_params_with_query(session): + base_df = bpd.DataFrame( { - "0": [ - np.array([1.0, 2.0]), - np.array([1.0, 2.0]), - np.array([3.0, 5.2]), - np.array([3.0, 5.2]), - ], - "id": [2, 1, 1, 2], + "id": [1, 2, 3, 4], "my_embedding": [ - np.array([2.0, 4.0]), - np.array([1.0, 2.0]), - np.array([1.0, 2.0]), - np.array([2.0, 4.0]), + np.array([0.0, 1.0]), + np.array([1.0, 0.0]), + np.array([0.0, -1.0]), + np.array([-1.0, 0.0]), ], - "distance": [0.0, 0.0, 0.001777, 0.001777], }, - index=pd.Index([0, 0, 1, 1], dtype="Int64"), - ) - pd.testing.assert_frame_equal( - vector_search_result, expected, check_dtype=False, rtol=0.1 - ) - - -def test_vector_search_df_with_query_column_to_search(): - search_query = bpd.DataFrame( - { - "query_id": ["dog", "cat"], - "embedding": [[1.0, 2.0], [3.0, 5.2]], - "another_embedding": [[1.0, 2.5], [3.3, 5.2]], - } - ) - vector_search_result = bbq.vector_search( - base_table="bigframes-dev.bigframes_tests_sys.base_table", - column_to_search="my_embedding", - query=search_query, - query_column_to_search="another_embedding", - top_k=2, - ).to_pandas() # type:ignore - expected = pd.DataFrame( - { - "query_id": ["dog", "dog", "cat", "cat"], - "embedding": [ - np.array([1.0, 2.0]), - np.array([1.0, 2.0]), - np.array([3.0, 5.2]), - np.array([3.0, 5.2]), - ], - "another_embedding": [ - np.array([1.0, 2.5]), - np.array([1.0, 2.5]), - np.array([3.3, 5.2]), - np.array([3.3, 5.2]), - ], - "id": [1, 4, 2, 5], - "my_embedding": [ - np.array([1.0, 2.0]), - np.array([1.0, 3.2]), - np.array([2.0, 4.0]), - np.array([5.0, 5.4]), - ], - "distance": [0.5, 0.7, 1.769181, 1.711724], - }, - index=pd.Index([0, 0, 1, 1], dtype="Int64"), - ) - pd.testing.assert_frame_equal( - vector_search_result, expected, check_dtype=False, rtol=0.1 + session=session, ) + base_table = base_df.to_gbq() + try: + search_query = bpd.Series([[0.75, 0.25], [-0.25, -0.75]], session=session) + vector_search_result = ( + bbq.vector_search( + base_table=base_table, + column_to_search="my_embedding", + query=search_query, + distance_type="cosine", + top_k=2, + ) + .sort_values("distance") + .sort_index() + .to_pandas() + ) # type:ignore + expected = pd.DataFrame( + { + "0": [ + [0.75, 0.25], + [0.75, 0.25], + [-0.25, -0.75], + [-0.25, -0.75], + ], + "id": [2, 1, 3, 4], + "my_embedding": [ + [1.0, 0.0], + [0.0, 1.0], + [0.0, -1.0], + [-1.0, 0.0], + ], + "distance": [ + 0.051317, + 0.683772, + 0.051317, + 0.683772, + ], + }, + index=pd.Index([0, 0, 1, 1], dtype="Int64"), + ) + pd.testing.assert_frame_equal( + vector_search_result, expected, check_dtype=False, rtol=0.1 + ) + finally: + session.bqclient.delete_table(base_table, not_found_ok=True) diff --git a/tests/system/small/ml/test_forecasting.py b/tests/system/small/ml/test_forecasting.py index d1b6b18fbe..134f82e96e 100644 --- a/tests/system/small/ml/test_forecasting.py +++ b/tests/system/small/ml/test_forecasting.py @@ -432,8 +432,10 @@ def test_arima_plus_detect_anomalies_params( }, ) pd.testing.assert_frame_equal( - anomalies[["is_anomaly", "lower_bound", "upper_bound", "anomaly_probability"]], - expected, + anomalies[["is_anomaly", "lower_bound", "upper_bound", "anomaly_probability"]] + .sort_values("anomaly_probability") + .reset_index(drop=True), + expected.sort_values("anomaly_probability").reset_index(drop=True), rtol=0.1, check_index_type=False, check_dtype=False, @@ -449,11 +451,16 @@ def test_arima_plus_score( id_col_name, ): if id_col_name: - result = time_series_arima_plus_model_w_id.score( - new_time_series_df_w_id[["parsed_date"]], - new_time_series_df_w_id[["total_visits"]], - new_time_series_df_w_id[["id"]], - ).to_pandas() + result = ( + time_series_arima_plus_model_w_id.score( + new_time_series_df_w_id[["parsed_date"]], + new_time_series_df_w_id[["total_visits"]], + new_time_series_df_w_id[["id"]], + ) + .to_pandas() + .sort_values("id") + .reset_index(drop=True) + ) else: result = time_series_arima_plus_model.score( new_time_series_df[["parsed_date"]], new_time_series_df[["total_visits"]] @@ -472,6 +479,8 @@ def test_arima_plus_score( ) expected["id"] = expected["id"].astype(str).str.replace(r"\.0$", "", regex=True) expected["id"] = expected["id"].astype("string[pyarrow]") + expected = expected.sort_values("id") + expected = expected.reset_index(drop=True) else: expected = pd.DataFrame( { @@ -488,6 +497,7 @@ def test_arima_plus_score( expected, rtol=0.1, check_index_type=False, + check_dtype=False, ) @@ -542,11 +552,16 @@ def test_arima_plus_score_series( id_col_name, ): if id_col_name: - result = time_series_arima_plus_model_w_id.score( - new_time_series_df_w_id["parsed_date"], - new_time_series_df_w_id["total_visits"], - new_time_series_df_w_id["id"], - ).to_pandas() + result = ( + time_series_arima_plus_model_w_id.score( + new_time_series_df_w_id["parsed_date"], + new_time_series_df_w_id["total_visits"], + new_time_series_df_w_id["id"], + ) + .to_pandas() + .sort_values("id") + .reset_index(drop=True) + ) else: result = time_series_arima_plus_model.score( new_time_series_df["parsed_date"], new_time_series_df["total_visits"] @@ -565,6 +580,8 @@ def test_arima_plus_score_series( ) expected["id"] = expected["id"].astype(str).str.replace(r"\.0$", "", regex=True) expected["id"] = expected["id"].astype("string[pyarrow]") + expected = expected.sort_values("id") + expected = expected.reset_index(drop=True) else: expected = pd.DataFrame( { @@ -581,6 +598,7 @@ def test_arima_plus_score_series( expected, rtol=0.1, check_index_type=False, + check_dtype=False, ) diff --git a/tests/system/small/ml/test_preprocessing.py b/tests/system/small/ml/test_preprocessing.py index 34be48be1e..65a851efc3 100644 --- a/tests/system/small/ml/test_preprocessing.py +++ b/tests/system/small/ml/test_preprocessing.py @@ -245,7 +245,7 @@ def test_max_abs_scaler_save_load(new_penguins_df, dataset_id): index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"), ) - pd.testing.assert_frame_equal(result, expected, rtol=0.1) + pd.testing.assert_frame_equal(result.sort_index(), expected.sort_index(), rtol=0.1) def test_min_max_scaler_normalized_fit_transform(new_penguins_df): diff --git a/tests/system/small/session/test_read_gbq_query.py b/tests/system/small/session/test_read_gbq_query.py new file mode 100644 index 0000000000..c1408febca --- /dev/null +++ b/tests/system/small/session/test_read_gbq_query.py @@ -0,0 +1,113 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime + +import pytest + +import bigframes +import bigframes.core.nodes as nodes + + +def test_read_gbq_query_w_allow_large_results(session: bigframes.Session): + if not hasattr(session.bqclient, "default_job_creation_mode"): + pytest.skip("Jobless query only available on newer google-cloud-bigquery.") + + query = "SELECT 1" + + # Make sure we don't get a cached table. + configuration = {"query": {"useQueryCache": False}} + + # Very small results should wrap a local node. + df_false = session.read_gbq( + query, + configuration=configuration, + allow_large_results=False, + ) + assert df_false.shape == (1, 1) + roots_false = df_false._get_block().expr.node.roots + assert any(isinstance(node, nodes.ReadLocalNode) for node in roots_false) + assert not any(isinstance(node, nodes.ReadTableNode) for node in roots_false) + + # Large results allowed should wrap a table. + df_true = session.read_gbq( + query, + configuration=configuration, + allow_large_results=True, + ) + assert df_true.shape == (1, 1) + roots_true = df_true._get_block().expr.node.roots + assert any(isinstance(node, nodes.ReadTableNode) for node in roots_true) + + +def test_read_gbq_query_w_columns(session: bigframes.Session): + query = """ + SELECT 1 as int_col, + 'a' as str_col, + TIMESTAMP('2025-08-21 10:41:32.123456') as timestamp_col + """ + + result = session.read_gbq( + query, + columns=["timestamp_col", "int_col"], + ) + assert list(result.columns) == ["timestamp_col", "int_col"] + assert result.to_dict(orient="records") == [ + { + "timestamp_col": datetime.datetime( + 2025, 8, 21, 10, 41, 32, 123456, tzinfo=datetime.timezone.utc + ), + "int_col": 1, + } + ] + + +@pytest.mark.parametrize( + ("index_col", "expected_index_names"), + ( + pytest.param( + "my_custom_index", + ("my_custom_index",), + id="string", + ), + pytest.param( + ("my_custom_index",), + ("my_custom_index",), + id="iterable", + ), + pytest.param( + ("my_custom_index", "int_col"), + ("my_custom_index", "int_col"), + id="multiindex", + ), + ), +) +def test_read_gbq_query_w_index_col( + session: bigframes.Session, index_col, expected_index_names +): + query = """ + SELECT 1 as int_col, + 'a' as str_col, + 0 as my_custom_index, + TIMESTAMP('2025-08-21 10:41:32.123456') as timestamp_col + """ + + result = session.read_gbq( + query, + index_col=index_col, + ) + assert tuple(result.index.names) == expected_index_names + assert frozenset(result.columns) == frozenset( + {"int_col", "str_col", "my_custom_index", "timestamp_col"} + ) - frozenset(expected_index_names) diff --git a/tests/system/small/test_pandas_options.py b/tests/system/small/test_pandas_options.py index 1d360e0d4f..7a750ddfd3 100644 --- a/tests/system/small/test_pandas_options.py +++ b/tests/system/small/test_pandas_options.py @@ -280,6 +280,17 @@ def test_credentials_need_reauthentication( session = bpd.get_global_session() assert session.bqclient._http.credentials.valid + # We look at the thread-local session because of the + # reset_default_session_and_location fixture and that this test mutates + # state that might otherwise be used by tests running in parallel. + current_session = ( + bigframes.core.global_session._global_session_state.thread_local_session + ) + assert current_session is not None + + # Force a temp table to be created, so there is something to cleanup. + current_session._anon_dataset_manager.create_temp_table(schema=()) + with monkeypatch.context() as m: # Simulate expired credentials to trigger the credential refresh flow m.setattr( @@ -303,15 +314,6 @@ def test_credentials_need_reauthentication( with pytest.raises(google.auth.exceptions.RefreshError): bpd.read_gbq(test_query) - # Now verify that closing the session works We look at the - # thread-local session because of the - # reset_default_session_and_location fixture and that this test mutates - # state that might otherwise be used by tests running in parallel. - assert ( - bigframes.core.global_session._global_session_state.thread_local_session - is not None - ) - with warnings.catch_warnings(record=True) as warned: bpd.close_session() # CleanupFailedWarning: can't clean up diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index a04da64af0..40fcb150f6 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -619,7 +619,7 @@ def test_read_gbq_wildcard( pytest.param( {"query": {"useQueryCache": False, "maximumBytesBilled": "100"}}, marks=pytest.mark.xfail( - raises=google.api_core.exceptions.InternalServerError, + raises=google.api_core.exceptions.BadRequest, reason="Expected failure when the query exceeds the maximum bytes billed limit.", ), ), diff --git a/tests/system/small/test_unordered.py b/tests/system/small/test_unordered.py index 0825b78037..c4f6521642 100644 --- a/tests/system/small/test_unordered.py +++ b/tests/system/small/test_unordered.py @@ -103,7 +103,7 @@ def test_unordered_mode_read_gbq(unordered_session): } ) # Don't need ignore_order as there is only 1 row - assert_pandas_df_equal(df.to_pandas(), expected) + assert_pandas_df_equal(df.to_pandas(), expected, check_index_type=False) @pytest.mark.parametrize( diff --git a/tests/unit/session/test_read_gbq_query.py b/tests/unit/session/test_read_gbq_query.py index afd9922426..1f9d2fb945 100644 --- a/tests/unit/session/test_read_gbq_query.py +++ b/tests/unit/session/test_read_gbq_query.py @@ -25,7 +25,7 @@ def test_read_gbq_query_sets_destination_table(): # Use partial ordering mode to skip column uniqueness checks. session = mocks.create_bigquery_session(ordering_mode="partial") - _ = session.read_gbq_query("SELECT 'my-test-query';") + _ = session.read_gbq_query("SELECT 'my-test-query';", allow_large_results=True) queries = session._queries # type: ignore configs = session._job_configs # type: ignore diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index 3dae2b6bbe..0fdca4dde1 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -25,6 +25,7 @@ def read_gbq( filters: FiltersType = (), use_cache: Optional[bool] = None, col_order: Iterable[str] = (), + allow_large_results: Optional[bool] = None, ): """Loads a DataFrame from BigQuery. @@ -156,6 +157,11 @@ def read_gbq( `configuration` to avoid conflicts. col_order (Iterable[str]): Alias for columns, retained for backwards compatibility. + allow_large_results (bool, optional): + Whether to allow large query results. If ``True``, the query + results can be larger than the maximum response size. This + option is only applicable when ``query_or_table`` is a query. + Defaults to ``bpd.options.compute.allow_large_results``. Raises: bigframes.exceptions.DefaultIndexWarning: