Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4662,7 +4662,13 @@ def _prepare_export(
return array_value, id_overrides

def map(self, func, na_action: Optional[str] = None) -> DataFrame:
if not isinstance(func, bigframes.functions.BigqueryCallableRoutine):
if not isinstance(
func,
(
bigframes.functions.BigqueryCallableRoutine,
bigframes.functions.UdfRoutine,
),
):
raise TypeError("the first argument must be callable")

if na_action not in {None, "ignore"}:
Expand Down Expand Up @@ -4690,14 +4696,14 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
func,
(
bigframes.functions.BigqueryCallableRoutine,
bigframes.functions.BigqueryCallableRowRoutine,
bigframes.functions.UdfRoutine,
),
):
raise ValueError(
"For axis=1 a BigFrames BigQuery function must be used."
)

if func.is_row_processor:
if func.udf_def.signature.is_row_processor:
# Early check whether the dataframe dtypes are currently supported
# in the bigquery function
# NOTE: Keep in sync with the value converters used in the gcf code
Expand Down
7 changes: 2 additions & 5 deletions bigframes/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from bigframes.functions.function import (
BigqueryCallableRoutine,
BigqueryCallableRowRoutine,
)
from bigframes.functions.function import BigqueryCallableRoutine, UdfRoutine

__all__ = [
"BigqueryCallableRoutine",
"BigqueryCallableRowRoutine",
"UdfRoutine",
]
48 changes: 12 additions & 36 deletions bigframes/functions/_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,25 +631,15 @@ def wrapper(func):
if udf_sig.is_row_processor:
msg = bfe.format_message("input_types=Series is in preview.")
warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)
return decorator(
bq_functions.BigqueryCallableRowRoutine(
udf_definition,
session,
cloud_function_ref=bigframes_cloud_function,
local_func=func,
is_managed=False,
)
)
else:
return decorator(
bq_functions.BigqueryCallableRoutine(
udf_definition,
session,
cloud_function_ref=bigframes_cloud_function,
local_func=func,
is_managed=False,
)
return decorator(
bq_functions.BigqueryCallableRoutine(
udf_definition,
session,
cloud_function_ref=bigframes_cloud_function,
local_func=func,
is_managed=False,
)
)

return wrapper

Expand Down Expand Up @@ -835,8 +825,9 @@ def wrapper(func):
bq_connection_manager,
session=session, # type: ignore
)
code_def = udf_def.CodeDef.from_func(func)
config = udf_def.ManagedFunctionConfig(
code=udf_def.CodeDef.from_func(func),
code=code_def,
signature=udf_sig,
max_batching_rows=max_batching_rows,
container_cpu=container_cpu,
Expand All @@ -863,26 +854,11 @@ def wrapper(func):
if not name:
self._update_temp_artifacts(full_rf_name, "")

decorator = functools.wraps(func)
if udf_sig.is_row_processor:
msg = bfe.format_message("input_types=Series is in preview.")
warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)
assert session is not None # appease mypy
return decorator(
bq_functions.BigqueryCallableRowRoutine(
udf_definition, session, local_func=func, is_managed=True
)
)
else:
assert session is not None # appease mypy
return decorator(
bq_functions.BigqueryCallableRoutine(
udf_definition,
session,
local_func=func,
is_managed=True,
)
)

return bq_functions.UdfRoutine(func=func, _udf_def=udf_definition)

return wrapper

Expand Down
84 changes: 14 additions & 70 deletions bigframes/functions/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
from bigframes.session import Session
import bigframes.series

import dataclasses
import functools

import google.api_core.exceptions
from google.cloud import bigquery

Expand Down Expand Up @@ -90,13 +93,13 @@ def _try_import_routine(

def _try_import_row_routine(
routine: bigquery.Routine, session: bigframes.Session
) -> BigqueryCallableRowRoutine:
) -> BigqueryCallableRoutine:
udf_def = _routine_as_udf_def(routine, is_row_processor=True)

is_remote = (
hasattr(routine, "remote_function_options") and routine.remote_function_options
)
return BigqueryCallableRowRoutine(udf_def, session, is_managed=not is_remote)
return BigqueryCallableRoutine(udf_def, session, is_managed=not is_remote)


def _routine_as_udf_def(
Expand All @@ -117,7 +120,6 @@ def _routine_as_udf_def(
)


# TODO(b/399894805): Support managed function.
def read_gbq_function(
function_name: str,
*,
Expand Down Expand Up @@ -202,7 +204,7 @@ def bigframes_remote_function(self):

@property
def is_row_processor(self) -> bool:
return False
return self.udf_def.signature.is_row_processor

@property
def udf_def(self) -> udf_def.BigqueryUdf:
Expand All @@ -225,75 +227,17 @@ def bigframes_bigquery_function_output_dtype(self):
return self.udf_def.signature.output.emulating_type.bf_type


class BigqueryCallableRowRoutine:
"""
A reference to a routine in the context of a session.

Can be used both directly as a callable, or as an input to dataframe ops that take a callable.
"""

def __init__(
self,
udf_def: udf_def.BigqueryUdf,
session: bigframes.Session,
*,
local_func: Optional[Callable] = None,
cloud_function_ref: Optional[str] = None,
is_managed: bool = False,
):
assert udf_def.signature.is_row_processor
self._udf_def = udf_def
self._session = session
self._local_fun = local_func
self._cloud_function = cloud_function_ref
self._is_managed = is_managed
@dataclasses.dataclass(frozen=True)
class UdfRoutine:
func: Callable
# Try not to depend on this, bq managed function creation will be deferred later
# And this ref will be replaced with requirements rather to support lazy creation
_udf_def: udf_def.BigqueryUdf

@functools.partial
def __call__(self, *args, **kwargs):
if self._local_fun:
return self._local_fun(*args, **kwargs)
# avoid circular imports
from bigframes.core.compile.sqlglot import sql as sg_sql
import bigframes.session._io.bigquery as bf_io_bigquery

args_string = ", ".join([sg_sql.to_sql(sg_sql.literal(v)) for v in args])
sql = f"SELECT `{str(self._udf_def.routine_ref)}`({args_string})"
iter, job = bf_io_bigquery.start_query_with_client(
self._session.bqclient,
sql=sql,
query_with_job=True,
job_config=bigquery.QueryJobConfig(),
publisher=self._session._publisher,
) # type: ignore
return list(iter.to_arrow().to_pydict().values())[0][0]

@property
def bigframes_bigquery_function(self) -> str:
return str(self._udf_def.routine_ref)

@property
def bigframes_remote_function(self):
return None if self._is_managed else str(self._udf_def.routine_ref)

@property
def is_row_processor(self) -> bool:
return True
return self.func(*args, **kwargs)

@property
def udf_def(self) -> udf_def.BigqueryUdf:
return self._udf_def

@property
def bigframes_cloud_function(self) -> Optional[str]:
return self._cloud_function

@property
def input_dtypes(self):
return tuple(arg.bf_type for arg in self.udf_def.signature.inputs)

@property
def output_dtype(self):
return self.udf_def.signature.output.bf_type

@property
def bigframes_bigquery_function_output_dtype(self):
return self.udf_def.signature.output.emulating_type.bf_type
8 changes: 8 additions & 0 deletions bigframes/functions/udf_def.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,14 @@ def stable_hash(self) -> bytes:

return hash_val.digest()

def to_callable(self):
"""
Reconstructs the python callable from the pickled code.

Assumption: package_requirements match local environment
"""
return cloudpickle.loads(self.pickled_code)


@dataclasses.dataclass(frozen=True)
class ManagedFunctionConfig:
Expand Down
16 changes: 14 additions & 2 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2029,7 +2029,13 @@ def apply(
" are supported."
)

if isinstance(func, bigframes.functions.BigqueryCallableRoutine):
if isinstance(
func,
(
bigframes.functions.BigqueryCallableRoutine,
bigframes.functions.UdfRoutine,
),
):
# We are working with bigquery function at this point
if args:
result_series = self._apply_nary_op(
Expand Down Expand Up @@ -2090,7 +2096,13 @@ def combine(
" are supported."
)

if isinstance(func, bigframes.functions.BigqueryCallableRoutine):
if isinstance(
func,
(
bigframes.functions.BigqueryCallableRoutine,
bigframes.functions.UdfRoutine,
),
):
result_series = self._apply_binary_op(
other, ops.BinaryRemoteFunctionOp(function_def=func.udf_def)
)
Expand Down
Loading
Loading