From 176c861428f8cc09cddd918afb204945968a4b12 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Sat, 28 Mar 2026 01:22:37 +0000 Subject: [PATCH 1/2] refactor: Simplify @udf wrapper object --- bigframes/dataframe.py | 12 +- bigframes/functions/__init__.py | 7 +- bigframes/functions/_function_session.py | 48 +- bigframes/functions/function.py | 84 +- bigframes/functions/udf_def.py | 8 + bigframes/series.py | 16 +- .../large/functions/test_managed_function.py | 1660 ++++++++--------- 7 files changed, 792 insertions(+), 1043 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 1ac80a4e6a1..5fbe5be419e 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -4662,7 +4662,13 @@ def _prepare_export( return array_value, id_overrides def map(self, func, na_action: Optional[str] = None) -> DataFrame: - if not isinstance(func, bigframes.functions.BigqueryCallableRoutine): + if not isinstance( + func, + ( + bigframes.functions.BigqueryCallableRoutine, + bigframes.functions.UdfRoutine, + ), + ): raise TypeError("the first argument must be callable") if na_action not in {None, "ignore"}: @@ -4690,14 +4696,14 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): func, ( bigframes.functions.BigqueryCallableRoutine, - bigframes.functions.BigqueryCallableRowRoutine, + bigframes.functions.UdfRoutine, ), ): raise ValueError( "For axis=1 a BigFrames BigQuery function must be used." ) - if func.is_row_processor: + if func.udf_def.signature.is_row_processor: # Early check whether the dataframe dtypes are currently supported # in the bigquery function # NOTE: Keep in sync with the value converters used in the gcf code diff --git a/bigframes/functions/__init__.py b/bigframes/functions/__init__.py index 5f87956a611..ff054812452 100644 --- a/bigframes/functions/__init__.py +++ b/bigframes/functions/__init__.py @@ -11,12 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from bigframes.functions.function import ( - BigqueryCallableRoutine, - BigqueryCallableRowRoutine, -) +from bigframes.functions.function import BigqueryCallableRoutine, UdfRoutine __all__ = [ "BigqueryCallableRoutine", - "BigqueryCallableRowRoutine", + "UdfRoutine", ] diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index fe7889e9556..bb1996ad1ea 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -631,25 +631,15 @@ def wrapper(func): if udf_sig.is_row_processor: msg = bfe.format_message("input_types=Series is in preview.") warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) - return decorator( - bq_functions.BigqueryCallableRowRoutine( - udf_definition, - session, - cloud_function_ref=bigframes_cloud_function, - local_func=func, - is_managed=False, - ) - ) - else: - return decorator( - bq_functions.BigqueryCallableRoutine( - udf_definition, - session, - cloud_function_ref=bigframes_cloud_function, - local_func=func, - is_managed=False, - ) + return decorator( + bq_functions.BigqueryCallableRoutine( + udf_definition, + session, + cloud_function_ref=bigframes_cloud_function, + local_func=func, + is_managed=False, ) + ) return wrapper @@ -835,8 +825,9 @@ def wrapper(func): bq_connection_manager, session=session, # type: ignore ) + code_def = udf_def.CodeDef.from_func(func) config = udf_def.ManagedFunctionConfig( - code=udf_def.CodeDef.from_func(func), + code=code_def, signature=udf_sig, max_batching_rows=max_batching_rows, container_cpu=container_cpu, @@ -863,26 +854,11 @@ def wrapper(func): if not name: self._update_temp_artifacts(full_rf_name, "") - decorator = functools.wraps(func) if udf_sig.is_row_processor: msg = bfe.format_message("input_types=Series is in preview.") warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) - assert session is not None # appease mypy - return decorator( - bq_functions.BigqueryCallableRowRoutine( - udf_definition, session, local_func=func, is_managed=True - ) - ) - else: - assert session is not None # appease mypy - return decorator( - bq_functions.BigqueryCallableRoutine( - udf_definition, - session, - local_func=func, - is_managed=True, - ) - ) + + return bq_functions.UdfRoutine(func=func, _udf_def=udf_definition) return wrapper diff --git a/bigframes/functions/function.py b/bigframes/functions/function.py index 18a000c722f..a1fbd2b3bfe 100644 --- a/bigframes/functions/function.py +++ b/bigframes/functions/function.py @@ -21,6 +21,9 @@ from bigframes.session import Session import bigframes.series +import dataclasses +import functools + import google.api_core.exceptions from google.cloud import bigquery @@ -90,13 +93,13 @@ def _try_import_routine( def _try_import_row_routine( routine: bigquery.Routine, session: bigframes.Session -) -> BigqueryCallableRowRoutine: +) -> BigqueryCallableRoutine: udf_def = _routine_as_udf_def(routine, is_row_processor=True) is_remote = ( hasattr(routine, "remote_function_options") and routine.remote_function_options ) - return BigqueryCallableRowRoutine(udf_def, session, is_managed=not is_remote) + return BigqueryCallableRoutine(udf_def, session, is_managed=not is_remote) def _routine_as_udf_def( @@ -117,7 +120,6 @@ def _routine_as_udf_def( ) -# TODO(b/399894805): Support managed function. def read_gbq_function( function_name: str, *, @@ -202,7 +204,7 @@ def bigframes_remote_function(self): @property def is_row_processor(self) -> bool: - return False + return self.udf_def.signature.is_row_processor @property def udf_def(self) -> udf_def.BigqueryUdf: @@ -225,75 +227,17 @@ def bigframes_bigquery_function_output_dtype(self): return self.udf_def.signature.output.emulating_type.bf_type -class BigqueryCallableRowRoutine: - """ - A reference to a routine in the context of a session. - - Can be used both directly as a callable, or as an input to dataframe ops that take a callable. - """ - - def __init__( - self, - udf_def: udf_def.BigqueryUdf, - session: bigframes.Session, - *, - local_func: Optional[Callable] = None, - cloud_function_ref: Optional[str] = None, - is_managed: bool = False, - ): - assert udf_def.signature.is_row_processor - self._udf_def = udf_def - self._session = session - self._local_fun = local_func - self._cloud_function = cloud_function_ref - self._is_managed = is_managed +@dataclasses.dataclass(frozen=True) +class UdfRoutine: + func: Callable + # Try not to depend on this, bq managed function creation will be deferred later + # And this ref will be replaced with requirements rather to support lazy creation + _udf_def: udf_def.BigqueryUdf + @functools.partial def __call__(self, *args, **kwargs): - if self._local_fun: - return self._local_fun(*args, **kwargs) - # avoid circular imports - from bigframes.core.compile.sqlglot import sql as sg_sql - import bigframes.session._io.bigquery as bf_io_bigquery - - args_string = ", ".join([sg_sql.to_sql(sg_sql.literal(v)) for v in args]) - sql = f"SELECT `{str(self._udf_def.routine_ref)}`({args_string})" - iter, job = bf_io_bigquery.start_query_with_client( - self._session.bqclient, - sql=sql, - query_with_job=True, - job_config=bigquery.QueryJobConfig(), - publisher=self._session._publisher, - ) # type: ignore - return list(iter.to_arrow().to_pydict().values())[0][0] - - @property - def bigframes_bigquery_function(self) -> str: - return str(self._udf_def.routine_ref) - - @property - def bigframes_remote_function(self): - return None if self._is_managed else str(self._udf_def.routine_ref) - - @property - def is_row_processor(self) -> bool: - return True + return self.func(*args, **kwargs) @property def udf_def(self) -> udf_def.BigqueryUdf: return self._udf_def - - @property - def bigframes_cloud_function(self) -> Optional[str]: - return self._cloud_function - - @property - def input_dtypes(self): - return tuple(arg.bf_type for arg in self.udf_def.signature.inputs) - - @property - def output_dtype(self): - return self.udf_def.signature.output.bf_type - - @property - def bigframes_bigquery_function_output_dtype(self): - return self.udf_def.signature.output.emulating_type.bf_type diff --git a/bigframes/functions/udf_def.py b/bigframes/functions/udf_def.py index 3ebf2eeb47a..3acb799f623 100644 --- a/bigframes/functions/udf_def.py +++ b/bigframes/functions/udf_def.py @@ -455,6 +455,14 @@ def stable_hash(self) -> bytes: return hash_val.digest() + def to_callable(self): + """ + Reconstructs the python callable from the pickled code. + + Assumption: package_requirements match local environment + """ + return cloudpickle.loads(self.pickled_code) + @dataclasses.dataclass(frozen=True) class ManagedFunctionConfig: diff --git a/bigframes/series.py b/bigframes/series.py index 7eb30beb826..454753a577c 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -2029,7 +2029,13 @@ def apply( " are supported." ) - if isinstance(func, bigframes.functions.BigqueryCallableRoutine): + if isinstance( + func, + ( + bigframes.functions.BigqueryCallableRoutine, + bigframes.functions.UdfRoutine, + ), + ): # We are working with bigquery function at this point if args: result_series = self._apply_nary_op( @@ -2090,7 +2096,13 @@ def combine( " are supported." ) - if isinstance(func, bigframes.functions.BigqueryCallableRoutine): + if isinstance( + func, + ( + bigframes.functions.BigqueryCallableRoutine, + bigframes.functions.UdfRoutine, + ), + ): result_series = self._apply_binary_op( other, ops.BinaryRemoteFunctionOp(function_def=func.udf_def) ) diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index a74ff292732..1c79fac89fb 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -16,7 +16,6 @@ import google.api_core.exceptions import pandas -import pyarrow import pytest import test_utils.prefixer @@ -25,401 +24,332 @@ import bigframes.dtypes import bigframes.exceptions as bfe import bigframes.pandas as bpd -from bigframes.testing.utils import cleanup_function_assets prefixer = test_utils.prefixer.Prefixer("bigframes", "") -def test_managed_function_array_output(session, scalars_dfs, dataset_id): - try: - with warnings.catch_warnings(record=True) as record: +@pytest.fixture +def function_id(dataset_id, session): + name = prefixer.create_prefix() + yield name + session.bqclient.delete_routine(f"{dataset_id}.{name}") - @session.udf( - dataset=dataset_id, - name=prefixer.create_prefix(), - ) - def featurize(x: int) -> list[float]: - return [float(i) for i in [x, x + 1, x + 2]] - # No following conflict warning when there is no redundant type hints. - input_type_warning = "Conflicting input types detected" - return_type_warning = "Conflicting return type detected" - assert not any(input_type_warning in str(warning.message) for warning in record) - assert not any( - return_type_warning in str(warning.message) for warning in record +def test_managed_function_array_output(session, scalars_dfs, dataset_id, function_id): + with warnings.catch_warnings(record=True) as record: + + @session.udf( + dataset=dataset_id, + name=function_id, ) + def featurize(x: int) -> list[float]: + return [float(i) for i in [x, x + 1, x + 2]] - scalars_df, scalars_pandas_df = scalars_dfs + # No following conflict warning when there is no redundant type hints. + input_type_warning = "Conflicting input types detected" + return_type_warning = "Conflicting return type detected" + assert not any(input_type_warning in str(warning.message) for warning in record) + assert not any(return_type_warning in str(warning.message) for warning in record) - bf_int64_col = scalars_df["int64_too"] - bf_result = bf_int64_col.apply(featurize).to_pandas() + scalars_df, scalars_pandas_df = scalars_dfs - pd_int64_col = scalars_pandas_df["int64_too"] - pd_result = pd_int64_col.apply(featurize) + bf_int64_col = scalars_df["int64_too"] + bf_result = bf_int64_col.apply(featurize).to_pandas() - # Ignore any dtype disparity. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + pd_int64_col = scalars_pandas_df["int64_too"] + pd_result = pd_int64_col.apply(featurize) - # Make sure the read_gbq_function path works for this function. - featurize_ref = session.read_gbq_function(featurize.bigframes_bigquery_function) + # Ignore any dtype disparity. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - assert hasattr(featurize_ref, "bigframes_bigquery_function") - assert featurize_ref.bigframes_remote_function is None - assert ( - featurize_ref.bigframes_bigquery_function - == featurize.bigframes_bigquery_function - ) + # Make sure the read_gbq_function path works for this function. + featurize_ref = session.read_gbq_function(f"{dataset_id}.{function_id}") - # Test on the function from read_gbq_function. - got = featurize_ref(10) - assert got == [10.0, 11.0, 12.0] + # Test on the function from read_gbq_function. + got = featurize_ref(10) + assert got == [10.0, 11.0, 12.0] - bf_result_gbq = bf_int64_col.apply(featurize_ref).to_pandas() - pandas.testing.assert_series_equal(bf_result_gbq, pd_result, check_dtype=False) + bf_result_gbq = bf_int64_col.apply(featurize_ref).to_pandas() + pandas.testing.assert_series_equal(bf_result_gbq, pd_result, check_dtype=False) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets(featurize, session.bqclient, ignore_failures=False) +def test_managed_function_series_apply(session, dataset_id, scalars_dfs, function_id): + # An explicit name with "def" in it is used to test the robustness of + # the user code extraction logic, which depends on that term. + assert "def" in function_id, "The substring 'def' was not found in 'function_id'" -def test_managed_function_series_apply(session, dataset_id, scalars_dfs): - try: - # An explicit name with "def" in it is used to test the robustness of - # the user code extraction logic, which depends on that term. - bq_name = f"{prefixer.create_prefix()}_def_to_test_code_extraction" - assert "def" in bq_name, "The substring 'def' was not found in 'bq_name'" + @session.udf(dataset=dataset_id, name=function_id) + def foo(x: int) -> bytes: + return bytes(abs(x)) - @session.udf(dataset=dataset_id, name=bq_name) - def foo(x: int) -> bytes: - return bytes(abs(x)) + # Function should still work normally. + assert foo(-2) == bytes(2) - # Function should still work normally. - assert foo(-2) == bytes(2) + scalars_df, scalars_pandas_df = scalars_dfs - assert hasattr(foo, "bigframes_bigquery_function") - assert hasattr(foo, "input_dtypes") - assert hasattr(foo, "output_dtype") - assert hasattr(foo, "bigframes_bigquery_function_output_dtype") + bf_result_col = scalars_df["int64_too"].apply(foo) + bf_result = ( + scalars_df["int64_too"].to_frame().assign(result=bf_result_col).to_pandas() + ) - scalars_df, scalars_pandas_df = scalars_dfs + pd_result_col = scalars_pandas_df["int64_too"].apply(foo) + pd_result = scalars_pandas_df["int64_too"].to_frame().assign(result=pd_result_col) - bf_result_col = scalars_df["int64_too"].apply(foo) - bf_result = ( - scalars_df["int64_too"].to_frame().assign(result=bf_result_col).to_pandas() - ) + pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) - pd_result_col = scalars_pandas_df["int64_too"].apply(foo) - pd_result = ( - scalars_pandas_df["int64_too"].to_frame().assign(result=pd_result_col) - ) - - pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) + # Make sure the read_gbq_function path works for this function. + foo_ref = session.read_gbq_function(f"{dataset_id}.{function_id}") - # Make sure the read_gbq_function path works for this function. - foo_ref = session.read_gbq_function( - function_name=foo.bigframes_bigquery_function, # type: ignore - ) - assert hasattr(foo_ref, "bigframes_bigquery_function") - assert foo_ref.bigframes_remote_function is None - assert foo.bigframes_bigquery_function == foo_ref.bigframes_bigquery_function # type: ignore - - bf_result_col_gbq = scalars_df["int64_too"].apply(foo_ref) - bf_result_gbq = ( - scalars_df["int64_too"] - .to_frame() - .assign(result=bf_result_col_gbq) - .to_pandas() - ) + bf_result_col_gbq = scalars_df["int64_too"].apply(foo_ref) + bf_result_gbq = ( + scalars_df["int64_too"].to_frame().assign(result=bf_result_col_gbq).to_pandas() + ) - pandas.testing.assert_frame_equal(bf_result_gbq, pd_result, check_dtype=False) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets(foo, session.bqclient, ignore_failures=False) + pandas.testing.assert_frame_equal(bf_result_gbq, pd_result, check_dtype=False) def test_managed_function_series_apply_array_output( session, dataset_id, scalars_dfs, + function_id, ): - try: - with pytest.warns(bfe.PreviewWarning, match="udf is in preview."): + with pytest.warns(bfe.PreviewWarning, match="udf is in preview."): - @session.udf(dataset=dataset_id, name=prefixer.create_prefix()) - def foo_list(x: int) -> list[float]: - return [float(abs(x)), float(abs(x) + 1)] + @session.udf(dataset=dataset_id, name=function_id) + def foo_list(x: int) -> list[float]: + return [float(abs(x)), float(abs(x) + 1)] - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_result_col = scalars_df["int64_too"].apply(foo_list) - bf_result = ( - scalars_df["int64_too"].to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_result_col = scalars_df["int64_too"].apply(foo_list) + bf_result = ( + scalars_df["int64_too"].to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_result_col = scalars_pandas_df["int64_too"].apply(foo_list) - pd_result = ( - scalars_pandas_df["int64_too"].to_frame().assign(result=pd_result_col) - ) + pd_result_col = scalars_pandas_df["int64_too"].apply(foo_list) + pd_result = scalars_pandas_df["int64_too"].to_frame().assign(result=pd_result_col) - # Ignore any dtype difference. - pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets(foo_list, session.bqclient, ignore_failures=False) - - -def test_managed_function_series_combine(session, dataset_id, scalars_dfs): - try: - # This function is deliberately written to not work with NA input. - def add(x: int, y: int) -> int: - return x + y - - scalars_df, scalars_pandas_df = scalars_dfs - int_col_name_with_nulls = "int64_col" - int_col_name_no_nulls = "int64_too" - bf_df = scalars_df[[int_col_name_with_nulls, int_col_name_no_nulls]] - pd_df = scalars_pandas_df[[int_col_name_with_nulls, int_col_name_no_nulls]] - - # make sure there are NA values in the test column. - assert any([pandas.isna(val) for val in bf_df[int_col_name_with_nulls]]) - - add_managed_func = session.udf( - dataset=dataset_id, name=prefixer.create_prefix() - )(add) - - # with nulls in the series the managed function application would fail. - with pytest.raises( - google.api_core.exceptions.BadRequest, match="unsupported operand" - ): - bf_df[int_col_name_with_nulls].combine( - bf_df[int_col_name_no_nulls], add_managed_func - ).to_pandas() - - # after filtering out nulls the managed function application should work - # similar to pandas. - pd_filter = pd_df[int_col_name_with_nulls].notnull() - pd_result = pd_df[pd_filter][int_col_name_with_nulls].combine( - pd_df[pd_filter][int_col_name_no_nulls], add - ) - bf_filter = bf_df[int_col_name_with_nulls].notnull() - bf_result = ( - bf_df[bf_filter][int_col_name_with_nulls] - .combine(bf_df[bf_filter][int_col_name_no_nulls], add_managed_func) - .to_pandas() - ) + # Ignore any dtype difference. + pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) - # ignore any dtype difference. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - # Make sure the read_gbq_function path works for this function. - add_managed_func_ref = session.read_gbq_function( - add_managed_func.bigframes_bigquery_function - ) - bf_result = ( - bf_df[bf_filter][int_col_name_with_nulls] - .combine(bf_df[bf_filter][int_col_name_no_nulls], add_managed_func_ref) - .to_pandas() - ) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets( - add_managed_func, session.bqclient, ignore_failures=False - ) +def test_managed_function_series_combine(session, dataset_id, scalars_dfs, function_id): + # This function is deliberately written to not work with NA input. + def add(x: int, y: int) -> int: + return x + y + scalars_df, scalars_pandas_df = scalars_dfs + int_col_name_with_nulls = "int64_col" + int_col_name_no_nulls = "int64_too" + bf_df = scalars_df[[int_col_name_with_nulls, int_col_name_no_nulls]] + pd_df = scalars_pandas_df[[int_col_name_with_nulls, int_col_name_no_nulls]] -def test_managed_function_series_combine_array_output(session, dataset_id, scalars_dfs): - try: - # The type hints in this function's signature has conflicts. The - # `input_types` and `output_type` arguments from udf decorator take - # precedence and will be used instead. - def add_list(x, y: bool) -> list[bool]: - return [x, y] - - scalars_df, scalars_pandas_df = scalars_dfs - int_col_name_with_nulls = "int64_col" - int_col_name_no_nulls = "int64_too" - bf_df = scalars_df[[int_col_name_with_nulls, int_col_name_no_nulls]] - pd_df = scalars_pandas_df[[int_col_name_with_nulls, int_col_name_no_nulls]] - - # Make sure there are NA values in the test column. - assert any([pandas.isna(val) for val in bf_df[int_col_name_with_nulls]]) - - with warnings.catch_warnings(record=True) as record: - add_list_managed_func = session.udf( - input_types=[int, int], - output_type=list[int], - dataset=dataset_id, - name=prefixer.create_prefix(), - )(add_list) - - input_type_warning = "Conflicting input types detected" - assert any(input_type_warning in str(warning.message) for warning in record) - return_type_warning = "Conflicting return type detected" - assert any(return_type_warning in str(warning.message) for warning in record) - - # After filtering out nulls the managed function application should work - # similar to pandas. - pd_filter = pd_df[int_col_name_with_nulls].notnull() - pd_result = pd_df[pd_filter][int_col_name_with_nulls].combine( - pd_df[pd_filter][int_col_name_no_nulls], add_list - ) - bf_filter = bf_df[int_col_name_with_nulls].notnull() - bf_result = ( - bf_df[bf_filter][int_col_name_with_nulls] - .combine(bf_df[bf_filter][int_col_name_no_nulls], add_list_managed_func) - .to_pandas() - ) + # make sure there are NA values in the test column. + assert any([pandas.isna(val) for val in bf_df[int_col_name_with_nulls]]) - # Ignore any dtype difference. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + add_managed_func = session.udf(dataset=dataset_id, name=function_id)(add) - # Make sure the read_gbq_function path works for this function. - add_list_managed_func_ref = session.read_gbq_function( - function_name=add_list_managed_func.bigframes_bigquery_function, # type: ignore - ) + # with nulls in the series the managed function application would fail. + with pytest.raises( + google.api_core.exceptions.BadRequest, match="unsupported operand" + ): + bf_df[int_col_name_with_nulls].combine( + bf_df[int_col_name_no_nulls], add_managed_func + ).to_pandas() - assert hasattr(add_list_managed_func_ref, "bigframes_bigquery_function") - assert add_list_managed_func_ref.bigframes_remote_function is None - assert ( - add_list_managed_func_ref.bigframes_bigquery_function - == add_list_managed_func.bigframes_bigquery_function - ) + # after filtering out nulls the managed function application should work + # similar to pandas. + pd_filter = pd_df[int_col_name_with_nulls].notnull() + pd_result = pd_df[pd_filter][int_col_name_with_nulls].combine( + pd_df[pd_filter][int_col_name_no_nulls], add + ) + bf_filter = bf_df[int_col_name_with_nulls].notnull() + bf_result = ( + bf_df[bf_filter][int_col_name_with_nulls] + .combine(bf_df[bf_filter][int_col_name_no_nulls], add_managed_func) + .to_pandas() + ) - # Test on the function from read_gbq_function. - got = add_list_managed_func_ref(10, 38) - assert got == [10, 38] + # ignore any dtype difference. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - bf_result_gbq = ( - bf_df[bf_filter][int_col_name_with_nulls] - .combine(bf_df[bf_filter][int_col_name_no_nulls], add_list_managed_func_ref) - .to_pandas() - ) + # Make sure the read_gbq_function path works for this function. + add_managed_func_ref = session.read_gbq_function(f"{dataset_id}.{function_id}") + bf_result = ( + bf_df[bf_filter][int_col_name_with_nulls] + .combine(bf_df[bf_filter][int_col_name_no_nulls], add_managed_func_ref) + .to_pandas() + ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - pandas.testing.assert_series_equal(bf_result_gbq, pd_result, check_dtype=False) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets( - add_list_managed_func, session.bqclient, ignore_failures=False - ) +def test_managed_function_series_combine_array_output( + session, dataset_id, scalars_dfs, function_id +): + # The type hints in this function's signature has conflicts. The + # `input_types` and `output_type` arguments from udf decorator take + # precedence and will be used instead. + def add_list(x, y: bool) -> list[bool]: + return [x, y] -def test_managed_function_dataframe_map(session, dataset_id, scalars_dfs): - try: + scalars_df, scalars_pandas_df = scalars_dfs + int_col_name_with_nulls = "int64_col" + int_col_name_no_nulls = "int64_too" + bf_df = scalars_df[[int_col_name_with_nulls, int_col_name_no_nulls]] + pd_df = scalars_pandas_df[[int_col_name_with_nulls, int_col_name_no_nulls]] - def add_one(x): - return x + 1 + # Make sure there are NA values in the test column. + assert any([pandas.isna(val) for val in bf_df[int_col_name_with_nulls]]) - mf_add_one = session.udf( - input_types=[int], - output_type=int, + with warnings.catch_warnings(record=True) as record: + add_list_managed_func = session.udf( + input_types=[int, int], + output_type=list[int], dataset=dataset_id, - name=prefixer.create_prefix(), - )(add_one) + name=function_id, + )(add_list) + + input_type_warning = "Conflicting input types detected" + assert any(input_type_warning in str(warning.message) for warning in record) + return_type_warning = "Conflicting return type detected" + assert any(return_type_warning in str(warning.message) for warning in record) + + # After filtering out nulls the managed function application should work + # similar to pandas. + pd_filter = pd_df[int_col_name_with_nulls].notnull() + pd_result = pd_df[pd_filter][int_col_name_with_nulls].combine( + pd_df[pd_filter][int_col_name_no_nulls], add_list + ) + bf_filter = bf_df[int_col_name_with_nulls].notnull() + bf_result = ( + bf_df[bf_filter][int_col_name_with_nulls] + .combine(bf_df[bf_filter][int_col_name_no_nulls], add_list_managed_func) + .to_pandas() + ) - scalars_df, scalars_pandas_df = scalars_dfs - int64_cols = ["int64_col", "int64_too"] + # Ignore any dtype difference. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - bf_int64_df = scalars_df[int64_cols] - bf_int64_df_filtered = bf_int64_df.dropna() - bf_result = bf_int64_df_filtered.map(mf_add_one).to_pandas() + # Make sure the read_gbq_function path works for this function. + add_list_managed_func_ref = session.read_gbq_function(f"{dataset_id}.{function_id}") - pd_int64_df = scalars_pandas_df[int64_cols] - pd_int64_df_filtered = pd_int64_df.dropna() - pd_result = pd_int64_df_filtered.map(add_one) - # TODO(shobs): Figure why pandas .map() changes the dtype, i.e. - # pd_int64_df_filtered.dtype is Int64Dtype() - # pd_int64_df_filtered.map(lambda x: x).dtype is int64. - # For this test let's force the pandas dtype to be same as input. - for col in pd_result: - pd_result[col] = pd_result[col].astype(pd_int64_df_filtered[col].dtype) + # Test on the function from read_gbq_function. + got = add_list_managed_func_ref(10, 38) + assert got == [10, 38] - pandas.testing.assert_frame_equal(bf_result, pd_result) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets(mf_add_one, session.bqclient, ignore_failures=False) + bf_result_gbq = ( + bf_df[bf_filter][int_col_name_with_nulls] + .combine(bf_df[bf_filter][int_col_name_no_nulls], add_list_managed_func_ref) + .to_pandas() + ) + pandas.testing.assert_series_equal(bf_result_gbq, pd_result, check_dtype=False) -def test_managed_function_dataframe_map_array_output(session, scalars_dfs, dataset_id): - try: - def add_one_list(x): - return [x + 1] * 3 +def test_managed_function_dataframe_map(session, dataset_id, scalars_dfs, function_id): + def add_one(x): + return x + 1 - mf_add_one_list = session.udf( - input_types=[int], - output_type=list[int], - dataset=dataset_id, - name=prefixer.create_prefix(), - )(add_one_list) + mf_add_one = session.udf( + input_types=[int], + output_type=int, + dataset=dataset_id, + name=function_id, + )(add_one) - scalars_df, scalars_pandas_df = scalars_dfs - int64_cols = ["int64_col", "int64_too"] + scalars_df, scalars_pandas_df = scalars_dfs + int64_cols = ["int64_col", "int64_too"] - bf_int64_df = scalars_df[int64_cols] - bf_int64_df_filtered = bf_int64_df.dropna() - bf_result = bf_int64_df_filtered.map(mf_add_one_list).to_pandas() + bf_int64_df = scalars_df[int64_cols] + bf_int64_df_filtered = bf_int64_df.dropna() + bf_result = bf_int64_df_filtered.map(mf_add_one).to_pandas() - pd_int64_df = scalars_pandas_df[int64_cols] - pd_int64_df_filtered = pd_int64_df.dropna() - pd_result = pd_int64_df_filtered.map(add_one_list) + pd_int64_df = scalars_pandas_df[int64_cols] + pd_int64_df_filtered = pd_int64_df.dropna() + pd_result = pd_int64_df_filtered.map(add_one) + # TODO(shobs): Figure why pandas .map() changes the dtype, i.e. + # pd_int64_df_filtered.dtype is Int64Dtype() + # pd_int64_df_filtered.map(lambda x: x).dtype is int64. + # For this test let's force the pandas dtype to be same as input. + for col in pd_result: + pd_result[col] = pd_result[col].astype(pd_int64_df_filtered[col].dtype) - # Ignore any dtype difference. - pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) + pandas.testing.assert_frame_equal(bf_result, pd_result) - # Make sure the read_gbq_function path works for this function. - mf_add_one_list_ref = session.read_gbq_function( - function_name=mf_add_one_list.bigframes_bigquery_function, # type: ignore - ) - bf_result_gbq = bf_int64_df_filtered.map(mf_add_one_list_ref).to_pandas() - pandas.testing.assert_frame_equal(bf_result_gbq, pd_result, check_dtype=False) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets( - mf_add_one_list, session.bqclient, ignore_failures=False - ) +def test_managed_function_dataframe_map_array_output( + session, scalars_dfs, dataset_id, function_id +): + def add_one_list(x): + return [x + 1] * 3 + + mf_add_one_list = session.udf( + input_types=[int], + output_type=list[int], + dataset=dataset_id, + name=function_id, + )(add_one_list) + scalars_df, scalars_pandas_df = scalars_dfs + int64_cols = ["int64_col", "int64_too"] -def test_managed_function_dataframe_apply_axis_1(session, dataset_id, scalars_dfs): - try: - scalars_df, scalars_pandas_df = scalars_dfs - series = scalars_df["int64_too"] - series_pandas = scalars_pandas_df["int64_too"] + bf_int64_df = scalars_df[int64_cols] + bf_int64_df_filtered = bf_int64_df.dropna() + bf_result = bf_int64_df_filtered.map(mf_add_one_list).to_pandas() - def add_ints(x, y): - return x + y + pd_int64_df = scalars_pandas_df[int64_cols] + pd_int64_df_filtered = pd_int64_df.dropna() + pd_result = pd_int64_df_filtered.map(add_one_list) - add_ints_mf = session.udf( - input_types=[int, int], - output_type=int, - dataset=dataset_id, - name=prefixer.create_prefix(), - )(add_ints) - assert add_ints_mf.bigframes_bigquery_function # type: ignore - - with pytest.warns( - bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview." - ): - bf_result = ( - bpd.DataFrame({"x": series, "y": series}) - .apply(add_ints_mf, axis=1) - .to_pandas() - ) - - pd_result = pandas.DataFrame({"x": series_pandas, "y": series_pandas}).apply( - lambda row: add_ints(row["x"], row["y"]), axis=1 - ) + # Ignore any dtype difference. + pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) - pandas.testing.assert_series_equal( - pd_result, bf_result, check_dtype=False, check_exact=True + # Make sure the read_gbq_function path works for this function. + mf_add_one_list_ref = session.read_gbq_function(f"{dataset_id}.{function_id}") + + bf_result_gbq = bf_int64_df_filtered.map(mf_add_one_list_ref).to_pandas() + pandas.testing.assert_frame_equal(bf_result_gbq, pd_result, check_dtype=False) + + +def test_managed_function_dataframe_apply_axis_1( + session, dataset_id, scalars_dfs, function_id +): + scalars_df, scalars_pandas_df = scalars_dfs + series = scalars_df["int64_too"] + series_pandas = scalars_pandas_df["int64_too"] + + def add_ints(x, y): + return x + y + + add_ints_mf = session.udf( + input_types=[int, int], + output_type=int, + dataset=dataset_id, + name=function_id, + )(add_ints) + + with pytest.warns( + bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview." + ): + bf_result = ( + bpd.DataFrame({"x": series, "y": series}) + .apply(add_ints_mf, axis=1) + .to_pandas() ) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets(add_ints_mf, session.bqclient, ignore_failures=False) + + pd_result = pandas.DataFrame({"x": series_pandas, "y": series_pandas}).apply( + lambda row: add_ints(row["x"], row["y"]), axis=1 + ) + + pandas.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_exact=True + ) -def test_managed_function_dataframe_apply_axis_1_array_output(session, dataset_id): +def test_managed_function_dataframe_apply_axis_1_array_output( + session, dataset_id, function_id +): bf_df = bigframes.dataframe.DataFrame( { "Id": [1, 2, 3], @@ -441,91 +371,69 @@ def test_managed_function_dataframe_apply_axis_1_array_output(session, dataset_i input_types=[int, float, str], output_type=list[str], dataset=dataset_id, - name=prefixer.create_prefix(), + name=function_id, ) def foo(x, y, z): return [str(x), str(y), z] - try: - assert getattr(foo, "is_row_processor") is False - assert getattr(foo, "input_dtypes") == expected_dtypes - assert getattr(foo, "output_dtype") == pandas.ArrowDtype( - pyarrow.list_( - bigframes.dtypes.bigframes_dtype_to_arrow_dtype( - bigframes.dtypes.STRING_DTYPE - ) - ) - ) - assert getattr(foo, "output_dtype") == getattr( - foo, "bigframes_bigquery_function_output_dtype" - ) + # Fails to apply on dataframe with incompatible number of columns. + with pytest.raises( + ValueError, + match="^Parameter count mismatch:.* expected 3 parameters but received 2 DataFrame columns.", + ): + bf_df[["Id", "Age"]].apply(foo, axis=1) - # Fails to apply on dataframe with incompatible number of columns. - with pytest.raises( - ValueError, - match="^Parameter count mismatch:.* expected 3 parameters but received 2 DataFrame columns.", - ): - bf_df[["Id", "Age"]].apply(foo, axis=1) - - with pytest.raises( - ValueError, - match="^Parameter count mismatch:.* expected 3 parameters but received 4 DataFrame columns.", - ): - bf_df.assign(Country="lalaland").apply(foo, axis=1) - - # Fails to apply on dataframe with incompatible column datatypes. - with pytest.raises( - ValueError, - match="^Data type mismatch for DataFrame columns: Expected .* Received .*", - ): - bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) - - # Successfully applies to dataframe with matching number of columns. - # and their datatypes. - with pytest.warns( - bigframes.exceptions.PreviewWarning, - match="axis=1 scenario is in preview.", - ): - bf_result = bf_df.apply(foo, axis=1).to_pandas() - - # Since this scenario is not pandas-like, let's handcraft the - # expected result. - expected_result = pandas.Series( - [ - ["1", "22.5", "alpha"], - ["2", "23.0", "beta"], - ["3", "23.5", "gamma"], - ] - ) + with pytest.raises( + ValueError, + match="^Parameter count mismatch:.* expected 3 parameters but received 4 DataFrame columns.", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) - pandas.testing.assert_series_equal( - expected_result, bf_result, check_dtype=False, check_index_type=False - ) + # Fails to apply on dataframe with incompatible column datatypes. + with pytest.raises( + ValueError, + match="^Data type mismatch for DataFrame columns: Expected .* Received .*", + ): + bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) - # Make sure the read_gbq_function path works for this function. - foo_ref = session.read_gbq_function(foo.bigframes_bigquery_function) + # Successfully applies to dataframe with matching number of columns. + # and their datatypes. + with pytest.warns( + bigframes.exceptions.PreviewWarning, + match="axis=1 scenario is in preview.", + ): + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result. + expected_result = pandas.Series( + [ + ["1", "22.5", "alpha"], + ["2", "23.0", "beta"], + ["3", "23.5", "gamma"], + ] + ) - assert hasattr(foo_ref, "bigframes_bigquery_function") - assert foo_ref.bigframes_remote_function is None - assert foo_ref.bigframes_bigquery_function == foo.bigframes_bigquery_function + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) - # Test on the function from read_gbq_function. - got = foo_ref(10, 38, "hello") - assert got == ["10", "38.0", "hello"] + # Make sure the read_gbq_function path works for this function. + foo_ref = session.read_gbq_function(f"{dataset_id}.{function_id}") - with pytest.warns( - bigframes.exceptions.PreviewWarning, - match="axis=1 scenario is in preview.", - ): - bf_result_gbq = bf_df.apply(foo_ref, axis=1).to_pandas() + # Test on the function from read_gbq_function. + got = foo_ref(10, 38, "hello") + assert got == ["10", "38.0", "hello"] - pandas.testing.assert_series_equal( - bf_result_gbq, expected_result, check_dtype=False, check_index_type=False - ) + with pytest.warns( + bigframes.exceptions.PreviewWarning, + match="axis=1 scenario is in preview.", + ): + bf_result_gbq = bf_df.apply(foo_ref, axis=1).to_pandas() - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets(foo, session.bqclient, ignore_failures=False) + pandas.testing.assert_series_equal( + bf_result_gbq, expected_result, check_dtype=False, check_index_type=False + ) @pytest.mark.parametrize( @@ -536,95 +444,78 @@ def foo(x, y, z): ], ) def test_managed_function_with_connection( - session, scalars_dfs, dataset_id, request, connection_fixture + session, scalars_dfs, dataset_id, request, connection_fixture, function_id ): - try: - bigquery_connection = request.getfixturevalue(connection_fixture) + bigquery_connection = request.getfixturevalue(connection_fixture) - @session.udf( - bigquery_connection=bigquery_connection, - dataset=dataset_id, - name=prefixer.create_prefix(), - ) - def foo(x: int) -> int: - return x + 10 - - # Function should still work normally. - assert foo(-2) == 8 + @session.udf( + bigquery_connection=bigquery_connection, + dataset=dataset_id, + name=function_id, + ) + def foo(x: int) -> int: + return x + 10 - scalars_df, scalars_pandas_df = scalars_dfs + # Function should still work normally. + assert foo(-2) == 8 - bf_result_col = scalars_df["int64_too"].apply(foo) - bf_result = ( - scalars_df["int64_too"].to_frame().assign(result=bf_result_col).to_pandas() - ) + scalars_df, scalars_pandas_df = scalars_dfs - pd_result_col = scalars_pandas_df["int64_too"].apply(foo) - pd_result = ( - scalars_pandas_df["int64_too"].to_frame().assign(result=pd_result_col) - ) + bf_result_col = scalars_df["int64_too"].apply(foo) + bf_result = ( + scalars_df["int64_too"].to_frame().assign(result=bf_result_col).to_pandas() + ) - pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets(foo, session.bqclient, ignore_failures=False) + pd_result_col = scalars_pandas_df["int64_too"].apply(foo) + pd_result = scalars_pandas_df["int64_too"].to_frame().assign(result=pd_result_col) + pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) -def test_managed_function_options(session, dataset_id, scalars_dfs): - try: - def multiply_five(x: int) -> int: - return x * 5 +def test_managed_function_options(session, dataset_id, scalars_dfs, function_id): + def multiply_five(x: int) -> int: + return x * 5 - mf_multiply_five = session.udf( - dataset=dataset_id, - name=prefixer.create_prefix(), - max_batching_rows=100, - container_cpu=2, - container_memory="2Gi", - )(multiply_five) + mf_multiply_five = session.udf( + dataset=dataset_id, + name=function_id, + max_batching_rows=100, + container_cpu=2, + container_memory="2Gi", + )(multiply_five) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_df = scalars_df["int64_col"] - bf_int64_df_filtered = bf_int64_df.dropna() - bf_result = bf_int64_df_filtered.apply(mf_multiply_five).to_pandas() + bf_int64_df = scalars_df["int64_col"] + bf_int64_df_filtered = bf_int64_df.dropna() + bf_result = bf_int64_df_filtered.apply(mf_multiply_five).to_pandas() - pd_int64_df = scalars_pandas_df["int64_col"] - pd_int64_df_filtered = pd_int64_df.dropna() - pd_result = pd_int64_df_filtered.apply(multiply_five) + pd_int64_df = scalars_pandas_df["int64_col"] + pd_int64_df_filtered = pd_int64_df.dropna() + pd_result = pd_int64_df_filtered.apply(multiply_five) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - # Make sure the read_gbq_function path works for this function. - multiply_five_ref = session.read_gbq_function( - function_name=mf_multiply_five.bigframes_bigquery_function, # type: ignore - ) - assert mf_multiply_five.bigframes_bigquery_function == multiply_five_ref.bigframes_bigquery_function # type: ignore + # Make sure the read_gbq_function path works for this function. + multiply_five_ref = session.read_gbq_function( + function_name=f"{dataset_id}.{function_id}" # type: ignore + ) - bf_result = bf_int64_df_filtered.apply(multiply_five_ref).to_pandas() - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + bf_result = bf_int64_df_filtered.apply(multiply_five_ref).to_pandas() + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - # Retrieve the routine and validate its runtime configuration. - routine = session.bqclient.get_routine( - mf_multiply_five.bigframes_bigquery_function - ) + # Retrieve the routine and validate its runtime configuration. + routine = session.bqclient.get_routine(f"{dataset_id}.{function_id}") - # TODO(jialuo): Use the newly exposed class properties instead of - # accessing the hidden _properties after resolve of this issue: - # https://github.com/googleapis/python-bigquery/issues/2240. - assert routine._properties["externalRuntimeOptions"]["maxBatchingRows"] == "100" - assert routine._properties["externalRuntimeOptions"]["containerCpu"] == 2 - assert routine._properties["externalRuntimeOptions"]["containerMemory"] == "2Gi" - - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets( - mf_multiply_five, session.bqclient, ignore_failures=False - ) + # TODO(jialuo): Use the newly exposed class properties instead of + # accessing the hidden _properties after resolve of this issue: + # https://github.com/googleapis/python-bigquery/issues/2240. + assert routine._properties["externalRuntimeOptions"]["maxBatchingRows"] == "100" + assert routine._properties["externalRuntimeOptions"]["containerCpu"] == 2 + assert routine._properties["externalRuntimeOptions"]["containerMemory"] == "2Gi" -def test_managed_function_options_errors(session, dataset_id): +def test_managed_function_options_errors(session, dataset_id, function_id): def foo(x: int) -> int: return 0 @@ -635,7 +526,7 @@ def foo(x: int) -> int: ): session.udf( dataset=dataset_id, - name=prefixer.create_prefix(), + name=function_id, max_batching_rows=100, container_cpu=2.5, container_memory="2Gi", @@ -648,7 +539,7 @@ def foo(x: int) -> int: ): session.udf( dataset=dataset_id, - name=prefixer.create_prefix(), + name=function_id, max_batching_rows=100, container_cpu=0.10, container_memory="512Mi", @@ -661,130 +552,117 @@ def foo(x: int) -> int: ): session.udf( dataset=dataset_id, - name=prefixer.create_prefix(), + name=function_id, max_batching_rows=100, container_cpu=2, container_memory="64Mi", )(foo) -def test_managed_function_df_apply_axis_1(session, dataset_id, scalars_dfs): +def test_managed_function_df_apply_axis_1( + session, dataset_id, scalars_dfs, function_id +): columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] scalars_df, scalars_pandas_df = scalars_dfs - try: - - def serialize_row(row): - # TODO(b/435021126): Remove explicit type conversion of the field - # "name" after the issue has been addressed. It is added only to - # accept partial pandas parity for the time being. - custom = { - "name": int(row.name), - "index": [idx for idx in row.index], - "values": [ - val.item() if hasattr(val, "item") else val for val in row.values - ], - } - return str( - { - "default": row.to_json(), - "split": row.to_json(orient="split"), - "records": row.to_json(orient="records"), - "index": row.to_json(orient="index"), - "table": row.to_json(orient="table"), - "custom": custom, - } - ) - - with pytest.raises( - TypeError, - match="Argument type hint must be Pandas Series, not BigFrames Series.", - ): - serialize_row_mf = session.udf( - input_types=bigframes.series.Series, - output_type=str, - dataset=dataset_id, - name=prefixer.create_prefix(), - )(serialize_row) + def serialize_row(row): + # TODO(b/435021126): Remove explicit type conversion of the field + # "name" after the issue has been addressed. It is added only to + # accept partial pandas parity for the time being. + custom = { + "name": int(row.name), + "index": [idx for idx in row.index], + "values": [ + val.item() if hasattr(val, "item") else val for val in row.values + ], + } + return str( + { + "default": row.to_json(), + "split": row.to_json(orient="split"), + "records": row.to_json(orient="records"), + "index": row.to_json(orient="index"), + "table": row.to_json(orient="table"), + "custom": custom, + } + ) + + with pytest.raises( + TypeError, + match="Argument type hint must be Pandas Series, not BigFrames Series.", + ): serialize_row_mf = session.udf( - input_types=pandas.Series, + input_types=bigframes.series.Series, output_type=str, dataset=dataset_id, - name=prefixer.create_prefix(), + name=function_id, )(serialize_row) - assert getattr(serialize_row_mf, "is_row_processor") - - bf_result = scalars_df[columns].apply(serialize_row_mf, axis=1).to_pandas() - pd_result = scalars_pandas_df[columns].apply(serialize_row, axis=1) + serialize_row_mf = session.udf( + input_types=pandas.Series, + output_type=str, + dataset=dataset_id, + name=function_id, + )(serialize_row) - # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' - # , ignore this mismatch by using check_dtype=False. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + bf_result = scalars_df[columns].apply(serialize_row_mf, axis=1).to_pandas() + pd_result = scalars_pandas_df[columns].apply(serialize_row, axis=1) - # Let's make sure the read_gbq_function path works for this function. - serialize_row_reuse = session.read_gbq_function( - serialize_row_mf.bigframes_bigquery_function, is_row_processor=True - ) - bf_result = scalars_df[columns].apply(serialize_row_reuse, axis=1).to_pandas() - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - finally: - # clean up the gcp assets created for the managed function. - cleanup_function_assets( - serialize_row_mf, session.bqclient, ignore_failures=False - ) + # Let's make sure the read_gbq_function path works for this function. + serialize_row_reuse = session.read_gbq_function( + f"{dataset_id}.{function_id}", is_row_processor=True + ) + bf_result = scalars_df[columns].apply(serialize_row_reuse, axis=1).to_pandas() + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) -def test_managed_function_df_apply_axis_1_aggregates(session, dataset_id, scalars_dfs): +def test_managed_function_df_apply_axis_1_aggregates( + session, dataset_id, scalars_dfs, function_id +): columns = ["int64_col", "int64_too", "float64_col"] scalars_df, scalars_pandas_df = scalars_dfs - try: - - def analyze(row): - # TODO(b/435021126): Remove explicit type conversion of the fields - # after the issue has been addressed. It is added only to accept - # partial pandas parity for the time being. - return str( - { - "dtype": row.dtype, - "count": int(row.count()), - "min": int(row.min()), - "max": int(row.max()), - "mean": float(row.mean()), - "std": float(row.std()), - "var": float(row.var()), - } - ) - - with pytest.warns( - bfe.FunctionPackageVersionWarning, - match=( - "numpy, pandas, and pyarrow versions in the function execution" - "\nenvironment may not precisely match your local environment." - ), - ): - analyze_mf = session.udf( - input_types=pandas.Series, - output_type=str, - dataset=dataset_id, - name=prefixer.create_prefix(), - )(analyze) - - assert getattr(analyze_mf, "is_row_processor") + def analyze(row): + # TODO(b/435021126): Remove explicit type conversion of the fields + # after the issue has been addressed. It is added only to accept + # partial pandas parity for the time being. + return str( + { + "dtype": row.dtype, + "count": int(row.count()), + "min": int(row.min()), + "max": int(row.max()), + "mean": float(row.mean()), + "std": float(row.std()), + "var": float(row.var()), + } + ) - bf_result = scalars_df[columns].dropna().apply(analyze_mf, axis=1).to_pandas() - pd_result = scalars_pandas_df[columns].dropna().apply(analyze, axis=1) + with pytest.warns( + bfe.FunctionPackageVersionWarning, + match=( + "numpy, pandas, and pyarrow versions in the function execution" + "\nenvironment may not precisely match your local environment." + ), + ): + analyze_mf = session.udf( + input_types=pandas.Series, + output_type=str, + dataset=dataset_id, + name=function_id, + )(analyze) - # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' - # , ignore this mismatch by using check_dtype=False. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + bf_result = scalars_df[columns].dropna().apply(analyze_mf, axis=1).to_pandas() + pd_result = scalars_pandas_df[columns].dropna().apply(analyze, axis=1) - finally: - # clean up the gcp assets created for the managed function. - cleanup_function_assets(analyze_mf, session.bqclient, ignore_failures=False) + # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.parametrize( @@ -854,58 +732,50 @@ def analyze(row): ), ], ) -def test_managed_function_df_apply_axis_1_complex(session, dataset_id, pd_df): +def test_managed_function_df_apply_axis_1_complex( + session, dataset_id, pd_df, function_id +): bf_df = session.read_pandas(pd_df) - try: - - def serialize_row(row): - # TODO(b/435021126): Remove explicit type conversion of the field - # "name" after the issue has been addressed. It is added only to - # accept partial pandas parity for the time being. - custom = { - "name": int(row.name), - "index": [idx for idx in row.index], - "values": [ - val.item() if hasattr(val, "item") else val for val in row.values - ], + def serialize_row(row): + # TODO(b/435021126): Remove explicit type conversion of the field + # "name" after the issue has been addressed. It is added only to + # accept partial pandas parity for the time being. + custom = { + "name": int(row.name), + "index": [idx for idx in row.index], + "values": [ + val.item() if hasattr(val, "item") else val for val in row.values + ], + } + return str( + { + "default": row.to_json(), + "split": row.to_json(orient="split"), + "records": row.to_json(orient="records"), + "index": row.to_json(orient="index"), + "custom": custom, } - return str( - { - "default": row.to_json(), - "split": row.to_json(orient="split"), - "records": row.to_json(orient="records"), - "index": row.to_json(orient="index"), - "custom": custom, - } - ) - - serialize_row_mf = session.udf( - input_types=pandas.Series, - output_type=str, - dataset=dataset_id, - name=prefixer.create_prefix(), - )(serialize_row) - - assert getattr(serialize_row_mf, "is_row_processor") + ) - bf_result = bf_df.apply(serialize_row_mf, axis=1).to_pandas() - pd_result = pd_df.apply(serialize_row, axis=1) + serialize_row_mf = session.udf( + input_types=pandas.Series, + output_type=str, + dataset=dataset_id, + name=function_id, + )(serialize_row) - # ignore known dtype difference between pandas and bigframes. - pandas.testing.assert_series_equal( - pd_result, bf_result, check_dtype=False, check_index_type=False - ) + bf_result = bf_df.apply(serialize_row_mf, axis=1).to_pandas() + pd_result = pd_df.apply(serialize_row, axis=1) - finally: - # clean up the gcp assets created for the managed function. - cleanup_function_assets( - serialize_row_mf, session.bqclient, ignore_failures=False - ) + # ignore known dtype difference between pandas and bigframes. + pandas.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) @pytest.mark.skip(reason="Revert after this bug b/435018880 is fixed.") -def test_managed_function_df_apply_axis_1_na_nan_inf(dataset_id, session): +def test_managed_function_df_apply_axis_1_na_nan_inf(dataset_id, session, function_id): """This test is for special cases of float values, to make sure any (nan, inf, -inf) produced by user code is honored. """ @@ -929,395 +799,331 @@ def test_managed_function_df_apply_axis_1_na_nan_inf(dataset_id, session): pd_df = bf_df.to_pandas() - try: - - def float_parser(row: pandas.Series): - import numpy as mynp - import pandas as mypd - - if row["text"] == "pandas na": - return mypd.NA - if row["text"] == "numpy nan": - return mynp.nan - return float(row["text"]) - - float_parser_mf = session.udf( - input_types=pandas.Series, - output_type=float, - dataset=dataset_id, - name=prefixer.create_prefix(), - )(float_parser) - - assert getattr(float_parser_mf, "is_row_processor") - - pd_result = pd_df.apply(float_parser, axis=1) - bf_result = bf_df.apply(float_parser_mf, axis=1).to_pandas() - - # bf_result.dtype is 'Float64' while pd_result.dtype is 'object' - # , ignore this mismatch by using check_dtype=False. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - - # Let's also assert that the data is consistent in this round trip - # (BQ -> BigFrames -> BQ -> GCF -> BQ -> BigFrames) w.r.t. their - # expected values in BQ. - bq_result = bf_df["num"].to_pandas() - bq_result.name = None - pandas.testing.assert_series_equal(bq_result, bf_result) - finally: - # clean up the gcp assets created for the managed function. - cleanup_function_assets( - float_parser_mf, session.bqclient, ignore_failures=False - ) - + def float_parser(row: pandas.Series): + import numpy as mynp + import pandas as mypd -def test_managed_function_df_apply_axis_1_args(session, dataset_id, scalars_dfs): - columns = ["int64_col", "int64_too"] - scalars_df, scalars_pandas_df = scalars_dfs + if row["text"] == "pandas na": + return mypd.NA + if row["text"] == "numpy nan": + return mynp.nan + return float(row["text"]) - try: + float_parser_mf = session.udf( + input_types=pandas.Series, + output_type=float, + dataset=dataset_id, + name=function_id, + )(float_parser) - def the_sum(s1, s2, x): - return s1 + s2 + x + pd_result = pd_df.apply(float_parser, axis=1) + bf_result = bf_df.apply(float_parser_mf, axis=1).to_pandas() - the_sum_mf = session.udf( - input_types=[int, int, int], - output_type=int, - dataset=dataset_id, - name=prefixer.create_prefix(), - )(the_sum) - - args1 = (1,) - - # Fails to apply on dataframe with incompatible number of columns and args. - with pytest.raises( - ValueError, - match="^Parameter count mismatch:.* expected 3 parameters but received 4 values \\(3 DataFrame columns and 1 args\\)", - ): - scalars_df[columns + ["float64_col"]].apply(the_sum_mf, axis=1, args=args1) - - # Fails to apply on dataframe with incompatible column datatypes. - with pytest.raises( - ValueError, - match="^Data type mismatch for DataFrame columns: Expected .* Received .*", - ): - scalars_df[columns].assign( - int64_col=lambda df: df["int64_col"].astype("Float64") - ).apply(the_sum_mf, axis=1, args=args1) - - # Fails to apply on dataframe with incompatible args datatypes. - with pytest.raises( - ValueError, - match="^Data type mismatch for 'args' parameter: Expected .* Received .*", - ): - scalars_df[columns].apply(the_sum_mf, axis=1, args=(1.3,)) + # bf_result.dtype is 'Float64' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - bf_result = ( - scalars_df[columns] - .dropna() - .apply(the_sum_mf, axis=1, args=args1) - .to_pandas() - ) - pd_result = scalars_pandas_df[columns].dropna().apply(sum, axis=1, args=args1) + # Let's also assert that the data is consistent in this round trip + # (BQ -> BigFrames -> BQ -> GCF -> BQ -> BigFrames) w.r.t. their + # expected values in BQ. + bq_result = bf_df["num"].to_pandas() + bq_result.name = None + pandas.testing.assert_series_equal(bq_result, bf_result) - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - finally: - # clean up the gcp assets created for the managed function. - cleanup_function_assets(the_sum_mf, session.bqclient, ignore_failures=False) +def test_managed_function_df_apply_axis_1_args( + session, dataset_id, scalars_dfs, function_id +): + columns = ["int64_col", "int64_too"] + scalars_df, scalars_pandas_df = scalars_dfs + def the_sum(s1, s2, x): + return s1 + s2 + x -def test_managed_function_df_apply_axis_1_series_args(session, dataset_id, scalars_dfs): - columns = ["int64_col", "float64_col"] - scalars_df, scalars_pandas_df = scalars_dfs + the_sum_mf = session.udf( + input_types=[int, int, int], + output_type=int, + dataset=dataset_id, + name=function_id, + )(the_sum) - try: + args1 = (1,) - def analyze(s: pandas.Series, x: bool, y: float) -> str: - value = f"value is {s['int64_col']} and {s['float64_col']}" - if x: - return f"{value}, x is True!" - if y > 0: - return f"{value}, x is False, y is positive!" - return f"{value}, x is False, y is non-positive!" + # Fails to apply on dataframe with incompatible number of columns and args. + with pytest.raises( + ValueError, + match="^Parameter count mismatch:.* expected 3 parameters but received 4 values \\(3 DataFrame columns and 1 args\\)", + ): + scalars_df[columns + ["float64_col"]].apply(the_sum_mf, axis=1, args=args1) - analyze_mf = session.udf( - dataset=dataset_id, - name=prefixer.create_prefix(), - )(analyze) + # Fails to apply on dataframe with incompatible column datatypes. + with pytest.raises( + ValueError, + match="^Data type mismatch for DataFrame columns: Expected .* Received .*", + ): + scalars_df[columns].assign( + int64_col=lambda df: df["int64_col"].astype("Float64") + ).apply(the_sum_mf, axis=1, args=args1) - args1 = (True, 10.0) - bf_result = ( - scalars_df[columns] - .dropna() - .apply(analyze_mf, axis=1, args=args1) - .to_pandas() - ) - pd_result = ( - scalars_pandas_df[columns].dropna().apply(analyze, axis=1, args=args1) - ) + # Fails to apply on dataframe with incompatible args datatypes. + with pytest.raises( + ValueError, + match="^Data type mismatch for 'args' parameter: Expected .* Received .*", + ): + scalars_df[columns].apply(the_sum_mf, axis=1, args=(1.3,)) - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + bf_result = ( + scalars_df[columns].dropna().apply(the_sum_mf, axis=1, args=args1).to_pandas() + ) + pd_result = scalars_pandas_df[columns].dropna().apply(sum, axis=1, args=args1) - args2 = (False, -10.0) - analyze_mf_ref = session.read_gbq_function( - analyze_mf.bigframes_bigquery_function, is_row_processor=True - ) - bf_result = ( - scalars_df[columns] - .dropna() - .apply(analyze_mf_ref, axis=1, args=args2) - .to_pandas() - ) - pd_result = ( - scalars_pandas_df[columns].dropna().apply(analyze, axis=1, args=args2) - ) + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - finally: - # clean up the gcp assets created for the managed function. - cleanup_function_assets(analyze_mf, session.bqclient, ignore_failures=False) +def test_managed_function_df_apply_axis_1_series_args( + session, dataset_id, scalars_dfs, function_id +): + columns = ["int64_col", "float64_col"] + scalars_df, scalars_pandas_df = scalars_dfs + def analyze(s: pandas.Series, x: bool, y: float) -> str: + value = f"value is {s['int64_col']} and {s['float64_col']}" + if x: + return f"{value}, x is True!" + if y > 0: + return f"{value}, x is False, y is positive!" + return f"{value}, x is False, y is non-positive!" -def test_managed_function_df_where_mask(session, dataset_id, scalars_dfs): - try: - # The return type has to be bool type for callable where condition. - def is_sum_positive(a, b): - return a + b > 0 + analyze_mf = session.udf( + dataset=dataset_id, + name=function_id, + )(analyze) - is_sum_positive_mf = session.udf( - input_types=[int, int], - output_type=bool, - dataset=dataset_id, - name=prefixer.create_prefix(), - )(is_sum_positive) + args1 = (True, 10.0) + bf_result = ( + scalars_df[columns].dropna().apply(analyze_mf, axis=1, args=args1).to_pandas() + ) + pd_result = scalars_pandas_df[columns].dropna().apply(analyze, axis=1, args=args1) - scalars_df, scalars_pandas_df = scalars_dfs - int64_cols = ["int64_col", "int64_too"] + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - bf_int64_df = scalars_df[int64_cols] - bf_int64_df_filtered = bf_int64_df.dropna() - pd_int64_df = scalars_pandas_df[int64_cols] - pd_int64_df_filtered = pd_int64_df.dropna() + args2 = (False, -10.0) + analyze_mf_ref = session.read_gbq_function( + f"{dataset_id}.{function_id}", is_row_processor=True + ) + bf_result = ( + scalars_df[columns] + .dropna() + .apply(analyze_mf_ref, axis=1, args=args2) + .to_pandas() + ) + pd_result = scalars_pandas_df[columns].dropna().apply(analyze, axis=1, args=args2) - # Test callable condition in dataframe.where method. - bf_result = bf_int64_df_filtered.where(is_sum_positive_mf).to_pandas() - # Pandas doesn't support such case, use following as workaround. - pd_result = pd_int64_df_filtered.where(pd_int64_df_filtered.sum(axis=1) > 0) + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - # Ignore any dtype difference. - pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) - # Make sure the read_gbq_function path works for dataframe.where method. - is_sum_positive_ref = session.read_gbq_function( - function_name=is_sum_positive_mf.bigframes_bigquery_function - ) +def test_managed_function_df_where_mask(session, dataset_id, scalars_dfs, function_id): + # The return type has to be bool type for callable where condition. + def is_sum_positive(a, b): + return a + b > 0 - bf_result_gbq = bf_int64_df_filtered.where( - is_sum_positive_ref, -bf_int64_df_filtered - ).to_pandas() - pd_result_gbq = pd_int64_df_filtered.where( - pd_int64_df_filtered.sum(axis=1) > 0, -pd_int64_df_filtered - ) + is_sum_positive_mf = session.udf( + input_types=[int, int], + output_type=bool, + dataset=dataset_id, + name=function_id, + )(is_sum_positive) - # Ignore any dtype difference. - pandas.testing.assert_frame_equal( - bf_result_gbq, pd_result_gbq, check_dtype=False - ) + scalars_df, scalars_pandas_df = scalars_dfs + int64_cols = ["int64_col", "int64_too"] - # Test callable condition in dataframe.mask method. - bf_result_gbq = bf_int64_df_filtered.mask( - is_sum_positive_ref, -bf_int64_df_filtered - ).to_pandas() - pd_result_gbq = pd_int64_df_filtered.mask( - pd_int64_df_filtered.sum(axis=1) > 0, -pd_int64_df_filtered - ) + bf_int64_df = scalars_df[int64_cols] + bf_int64_df_filtered = bf_int64_df.dropna() + pd_int64_df = scalars_pandas_df[int64_cols] + pd_int64_df_filtered = pd_int64_df.dropna() - # Ignore any dtype difference. - pandas.testing.assert_frame_equal( - bf_result_gbq, pd_result_gbq, check_dtype=False - ) + # Test callable condition in dataframe.where method. + bf_result = bf_int64_df_filtered.where(is_sum_positive_mf).to_pandas() + # Pandas doesn't support such case, use following as workaround. + pd_result = pd_int64_df_filtered.where(pd_int64_df_filtered.sum(axis=1) > 0) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets( - is_sum_positive_mf, session.bqclient, ignore_failures=False - ) + # Ignore any dtype difference. + pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) + # Make sure the read_gbq_function path works for dataframe.where method. + is_sum_positive_ref = session.read_gbq_function(f"{dataset_id}.{function_id}") -def test_managed_function_df_where_mask_series(session, dataset_id, scalars_dfs): - try: - # The return type has to be bool type for callable where condition. - def is_sum_positive_series(s): - return s["int64_col"] + s["int64_too"] > 0 + bf_result_gbq = bf_int64_df_filtered.where( + is_sum_positive_ref, -bf_int64_df_filtered + ).to_pandas() + pd_result_gbq = pd_int64_df_filtered.where( + pd_int64_df_filtered.sum(axis=1) > 0, -pd_int64_df_filtered + ) - is_sum_positive_series_mf = session.udf( - input_types=pandas.Series, - output_type=bool, - dataset=dataset_id, - name=prefixer.create_prefix(), - )(is_sum_positive_series) + # Ignore any dtype difference. + pandas.testing.assert_frame_equal(bf_result_gbq, pd_result_gbq, check_dtype=False) - scalars_df, scalars_pandas_df = scalars_dfs - int64_cols = ["int64_col", "int64_too"] + # Test callable condition in dataframe.mask method. + bf_result_gbq = bf_int64_df_filtered.mask( + is_sum_positive_ref, -bf_int64_df_filtered + ).to_pandas() + pd_result_gbq = pd_int64_df_filtered.mask( + pd_int64_df_filtered.sum(axis=1) > 0, -pd_int64_df_filtered + ) - bf_int64_df = scalars_df[int64_cols] - bf_int64_df_filtered = bf_int64_df.dropna() - pd_int64_df = scalars_pandas_df[int64_cols] - pd_int64_df_filtered = pd_int64_df.dropna() + # Ignore any dtype difference. + pandas.testing.assert_frame_equal(bf_result_gbq, pd_result_gbq, check_dtype=False) - # Test callable condition in dataframe.where method. - bf_result = bf_int64_df_filtered.where(is_sum_positive_series_mf).to_pandas() - pd_result = pd_int64_df_filtered.where(is_sum_positive_series) - # Ignore any dtype difference. - pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) +def test_managed_function_df_where_mask_series( + session, dataset_id, scalars_dfs, function_id +): + # The return type has to be bool type for callable where condition. + def is_sum_positive_series(s): + return s["int64_col"] + s["int64_too"] > 0 - # Make sure the read_gbq_function path works for dataframe.where method. - is_sum_positive_series_ref = session.read_gbq_function( - function_name=is_sum_positive_series_mf.bigframes_bigquery_function, - is_row_processor=True, - ) + is_sum_positive_series_mf = session.udf( + input_types=pandas.Series, + output_type=bool, + dataset=dataset_id, + name=function_id, + )(is_sum_positive_series) - # This is for callable `other` arg in dataframe.where method. - def func_for_other(x): - return -x + scalars_df, scalars_pandas_df = scalars_dfs + int64_cols = ["int64_col", "int64_too"] - bf_result_gbq = bf_int64_df_filtered.where( - is_sum_positive_series_ref, func_for_other - ).to_pandas() - pd_result_gbq = pd_int64_df_filtered.where( - is_sum_positive_series, func_for_other - ) + bf_int64_df = scalars_df[int64_cols] + bf_int64_df_filtered = bf_int64_df.dropna() + pd_int64_df = scalars_pandas_df[int64_cols] + pd_int64_df_filtered = pd_int64_df.dropna() - # Ignore any dtype difference. - pandas.testing.assert_frame_equal( - bf_result_gbq, pd_result_gbq, check_dtype=False - ) + # Test callable condition in dataframe.where method. + bf_result = bf_int64_df_filtered.where(is_sum_positive_series_mf).to_pandas() + pd_result = pd_int64_df_filtered.where(is_sum_positive_series) - # Test callable condition in dataframe.mask method. - bf_result_gbq = bf_int64_df_filtered.mask( - is_sum_positive_series_ref, func_for_other - ).to_pandas() - pd_result_gbq = pd_int64_df_filtered.mask( - is_sum_positive_series, func_for_other - ) + # Ignore any dtype difference. + pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) - # Ignore any dtype difference. - pandas.testing.assert_frame_equal( - bf_result_gbq, pd_result_gbq, check_dtype=False - ) + # Make sure the read_gbq_function path works for dataframe.where method. + is_sum_positive_series_ref = session.read_gbq_function( + f"{dataset_id}.{function_id}", is_row_processor=True + ) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets( - is_sum_positive_series_mf, session.bqclient, ignore_failures=False - ) + # This is for callable `other` arg in dataframe.where method. + def func_for_other(x): + return -x + bf_result_gbq = bf_int64_df_filtered.where( + is_sum_positive_series_ref, func_for_other + ).to_pandas() + pd_result_gbq = pd_int64_df_filtered.where(is_sum_positive_series, func_for_other) -def test_managed_function_df_where_other_issue(session, dataset_id, scalars_df_index): - try: + # Ignore any dtype difference. + pandas.testing.assert_frame_equal(bf_result_gbq, pd_result_gbq, check_dtype=False) - def the_sum(s: pandas.Series) -> int: - return s["int64_col"] + s["int64_too"] + # Test callable condition in dataframe.mask method. + bf_result_gbq = bf_int64_df_filtered.mask( + is_sum_positive_series_ref, func_for_other + ).to_pandas() + pd_result_gbq = pd_int64_df_filtered.mask(is_sum_positive_series, func_for_other) - the_sum_mf = session.udf( - dataset=dataset_id, - name=prefixer.create_prefix(), - )(the_sum) + # Ignore any dtype difference. + pandas.testing.assert_frame_equal(bf_result_gbq, pd_result_gbq, check_dtype=False) - int64_cols = ["int64_col", "int64_too"] - bf_int64_df = scalars_df_index[int64_cols] - bf_int64_df_filtered = bf_int64_df.dropna() +def test_managed_function_df_where_other_issue( + session, dataset_id, scalars_df_index, function_id +): + def the_sum(s: pandas.Series) -> int: + return s["int64_col"] + s["int64_too"] - with pytest.raises( - ValueError, - match="Seires is not a supported replacement type!", - ): - # The execution of the callable other=the_sum_mf will return a - # Series, which is not a supported replacement type. - bf_int64_df_filtered.where(cond=bf_int64_df_filtered, other=the_sum_mf) + the_sum_mf = session.udf( + dataset=dataset_id, + name=function_id, + )(the_sum) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets(the_sum_mf, session.bqclient, ignore_failures=False) + int64_cols = ["int64_col", "int64_too"] + bf_int64_df = scalars_df_index[int64_cols] + bf_int64_df_filtered = bf_int64_df.dropna() -def test_managed_function_series_where_mask_map(session, dataset_id, scalars_dfs): - try: - # The return type has to be bool type for callable where condition. - def _is_positive(s): - return s + 1000 > 0 + with pytest.raises( + ValueError, + match="Seires is not a supported replacement type!", + ): + # The execution of the callable other=the_sum_mf will return a + # Series, which is not a supported replacement type. + bf_int64_df_filtered.where(cond=bf_int64_df_filtered, other=the_sum_mf) - is_positive_mf = session.udf( - input_types=int, - output_type=bool, - dataset=dataset_id, - name=prefixer.create_prefix(), - )(_is_positive) - scalars, scalars_pandas = scalars_dfs +def test_managed_function_series_where_mask_map( + session, dataset_id, scalars_dfs, function_id +): + # The return type has to be bool type for callable where condition. + def _is_positive(s): + return s + 1000 > 0 - bf_int64 = scalars["int64_col"] - bf_int64_filtered = bf_int64.dropna() - pd_int64 = scalars_pandas["int64_col"] - pd_int64_filtered = pd_int64.dropna() + is_positive_mf = session.udf( + input_types=int, + output_type=bool, + dataset=dataset_id, + name=function_id, + )(_is_positive) - # Test series.where method: the cond is a callable (managed function) - # and the other is not a callable. - bf_result = bf_int64_filtered.where( - cond=is_positive_mf, other=-bf_int64_filtered - ).to_pandas() - pd_result = pd_int64_filtered.where(cond=_is_positive, other=-pd_int64_filtered) + scalars, scalars_pandas = scalars_dfs - # Ignore any dtype difference. - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + bf_int64 = scalars["int64_col"] + bf_int64_filtered = bf_int64.dropna() + pd_int64 = scalars_pandas["int64_col"] + pd_int64_filtered = pd_int64.dropna() - # Test series.mask method: the cond is a callable (managed function) - # and the other is not a callable. - bf_result = bf_int64_filtered.mask( - cond=is_positive_mf, other=-bf_int64_filtered - ).to_pandas() - pd_result = pd_int64_filtered.mask(cond=_is_positive, other=-pd_int64_filtered) + # Test series.where method: the cond is a callable (managed function) + # and the other is not a callable. + bf_result = bf_int64_filtered.where( + cond=is_positive_mf, other=-bf_int64_filtered + ).to_pandas() + pd_result = pd_int64_filtered.where(cond=_is_positive, other=-pd_int64_filtered) - # Ignore any dtype difference. - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + # Ignore any dtype difference. + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - # Test series.map method. - bf_result = bf_int64_filtered.map(is_positive_mf).to_pandas() - pd_result = pd_int64_filtered.map(_is_positive) + # Test series.mask method: the cond is a callable (managed function) + # and the other is not a callable. + bf_result = bf_int64_filtered.mask( + cond=is_positive_mf, other=-bf_int64_filtered + ).to_pandas() + pd_result = pd_int64_filtered.mask(cond=_is_positive, other=-pd_int64_filtered) - # Ignore any dtype difference. - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + # Ignore any dtype difference. + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets(is_positive_mf, session.bqclient, ignore_failures=False) + # Test series.map method. + bf_result = bf_int64_filtered.map(is_positive_mf).to_pandas() + pd_result = pd_int64_filtered.map(_is_positive) + # Ignore any dtype difference. + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) -def test_managed_function_series_apply_args(session, dataset_id, scalars_dfs): - try: - with pytest.warns(bfe.PreviewWarning, match="udf is in preview."): - @session.udf(dataset=dataset_id, name=prefixer.create_prefix()) - def foo_list(x: int, y0: float, y1: bytes, y2: bool) -> list[str]: - return [str(x), str(y0), str(y1), str(y2)] +def test_managed_function_series_apply_args( + session, dataset_id, scalars_dfs, function_id +): + with pytest.warns(bfe.PreviewWarning, match="udf is in preview."): - scalars_df, scalars_pandas_df = scalars_dfs + @session.udf(dataset=dataset_id, name=function_id) + def foo_list(x: int, y0: float, y1: bytes, y2: bool) -> list[str]: + return [str(x), str(y0), str(y1), str(y2)] - bf_result = ( - scalars_df["int64_too"] - .apply(foo_list, args=(12.34, b"hello world", False)) - .to_pandas() - ) - pd_result = scalars_pandas_df["int64_too"].apply( - foo_list, args=(12.34, b"hello world", False) - ) + scalars_df, scalars_pandas_df = scalars_dfs - # Ignore any dtype difference. - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + bf_result = ( + scalars_df["int64_too"] + .apply(foo_list, args=(12.34, b"hello world", False)) + .to_pandas() + ) + pd_result = scalars_pandas_df["int64_too"].apply( + foo_list, args=(12.34, b"hello world", False) + ) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets(foo_list, session.bqclient, ignore_failures=False) + # Ignore any dtype difference. + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) From ac0f9a1b5d4e2e98c7177f604340e6c33bf8fa0e Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 30 Mar 2026 22:05:46 +0000 Subject: [PATCH 2/2] fix unit tests --- tests/unit/functions/test_remote_function.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/unit/functions/test_remote_function.py b/tests/unit/functions/test_remote_function.py index bfb6192a2c4..ac6f04056d7 100644 --- a/tests/unit/functions/test_remote_function.py +++ b/tests/unit/functions/test_remote_function.py @@ -62,8 +62,7 @@ def my_remote_func(x: int) -> int: deployed = session.deploy_udf(my_remote_func) - # Test that the function would have been deployed somewhere. - assert deployed.bigframes_bigquery_function + assert deployed.udf_def is not None def test_deploy_udf_with_name(): @@ -74,5 +73,4 @@ def my_remote_func(x: int) -> int: deployed = session.deploy_udf(my_remote_func, name="my_custom_name") - # Test that the function would have been deployed somewhere. - assert "my_custom_name" in deployed.bigframes_bigquery_function + assert deployed.udf_def is not None