From 8b5bca7446999345d1c7c9aae497306647b3625a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 27 Mar 2026 19:36:55 +0000 Subject: [PATCH 1/3] feat: Support loading avro, orc data --- bigframes/pandas/__init__.py | 6 ++ bigframes/pandas/io/api.py | 34 ++++++++ bigframes/session/__init__.py | 81 +++++++++++++++++++ test_types.orc | Bin 0 -> 406 bytes tests/system/small/test_session.py | 120 +++++++++++++++++++++++++++++ 5 files changed, 241 insertions(+) create mode 100644 test_types.orc diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 4db900e776..9a2039d396 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -97,6 +97,7 @@ _read_gbq_colab, from_glob_path, read_arrow, + read_avro, read_csv, read_gbq, read_gbq_function, @@ -105,6 +106,7 @@ read_gbq_query, read_gbq_table, read_json, + read_orc, read_pandas, read_parquet, read_pickle, @@ -461,6 +463,8 @@ def reset_session(): read_pandas, read_parquet, read_pickle, + read_orc, + read_avro, remote_function, to_datetime, to_timedelta, @@ -496,6 +500,8 @@ def reset_session(): "read_pandas", "read_parquet", "read_pickle", + "read_orc", + "read_avro", "remote_function", "to_datetime", "to_timedelta", diff --git a/bigframes/pandas/io/api.py b/bigframes/pandas/io/api.py index 7296cd2b7f..8946953e17 100644 --- a/bigframes/pandas/io/api.py +++ b/bigframes/pandas/io/api.py @@ -600,6 +600,40 @@ def read_parquet( read_parquet.__doc__ = inspect.getdoc(bigframes.session.Session.read_parquet) +def read_orc( + path: str | IO["bytes"], + *, + engine: str = "auto", + write_engine: constants.WriteEngineType = "default", +) -> bigframes.dataframe.DataFrame: + return global_session.with_default_session( + bigframes.session.Session.read_orc, + path, + engine=engine, + write_engine=write_engine, + ) + + +read_orc.__doc__ = inspect.getdoc(bigframes.session.Session.read_orc) + + +def read_avro( + path: str | IO["bytes"], + *, + engine: str = "bigquery", + write_engine: constants.WriteEngineType = "default", +) -> bigframes.dataframe.DataFrame: + return global_session.with_default_session( + bigframes.session.Session.read_avro, + path, + engine=engine, + write_engine=write_engine, + ) + + +read_avro.__doc__ = inspect.getdoc(bigframes.session.Session.read_avro) + + def read_gbq_function( function_name: str, is_row_processor: bool = False, diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 710b3701fa..d3b778b802 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1369,6 +1369,87 @@ def read_parquet( ) return self._read_pandas(pandas_obj, write_engine=write_engine) + def read_orc( + self, + path: str | IO["bytes"], + *, + engine: str = "auto", + write_engine: constants.WriteEngineType = "default", + ) -> dataframe.DataFrame: + """Load an ORC file to a BigQuery DataFrames DataFrame. + + Args: + path (str or IO): + The path or buffer to the ORC file. Can be a local path or Google Cloud Storage URI. + engine (str, default "auto"): + The engine used to read the file. Supported values: `auto`, `bigquery`, `pyarrow`. + write_engine (str, default "default"): + The write engine used to persist the data to BigQuery if needed. + + Returns: + bigframes.dataframe.DataFrame: + A new DataFrame representing the data from the ORC file. + """ + bigframes.session.validation.validate_engine_compatibility( + engine=engine, + write_engine=write_engine, + ) + if engine == "bigquery": + job_config = bigquery.LoadJobConfig() + job_config.source_format = bigquery.SourceFormat.ORC + job_config.labels = {"bigframes-api": "read_orc"} + table_id = self._loader.load_file(path, job_config=job_config) + return self._loader.read_gbq_table(table_id) + elif engine in ("auto", "pyarrow"): + if isinstance(path, str) and "*" in path: + raise ValueError( + "The provided path contains a wildcard character (*), which is not " + "supported by the current engine. To read files from wildcard paths, " + "please use the 'bigquery' engine by setting `engine='bigquery'` in " + "your configuration." + ) + + read_orc_kwargs: Dict[str, Any] = {} + if not pandas.__version__.startswith("1."): + read_orc_kwargs["dtype_backend"] = "pyarrow" + + pandas_obj = pandas.read_orc(path, **read_orc_kwargs) + return self._read_pandas(pandas_obj, write_engine=write_engine) + else: + raise ValueError( + f"Unsupported engine: {repr(engine)}. Supported values: 'auto', 'bigquery', 'pyarrow'." + ) + + def read_avro( + self, + path: str | IO["bytes"], + *, + engine: str = "default", + ) -> dataframe.DataFrame: + """Load an Avro file to a BigQuery DataFrames DataFrame. + + Args: + path (str or IO): + The path or buffer to the Avro file. Can be a local path or Google Cloud Storage URI. + engine (str, default "default"): + The engine used to read the file. Only `bigquery` is supported for Avro. + + Returns: + bigframes.dataframe.DataFrame: + A new DataFrame representing the data from the Avro file. + """ + if engine not in ("default", "bigquery"): + raise ValueError( + f"Unsupported engine: {repr(engine)}. Supported values: 'default', 'bigquery'." + ) + + job_config = bigquery.LoadJobConfig() + job_config.use_avro_logical_types = True + job_config.source_format = bigquery.SourceFormat.AVRO + job_config.labels = {"bigframes-api": "read_avro"} + table_id = self._loader.load_file(path, job_config=job_config) + return self._loader.read_gbq_table(table_id) + def read_json( self, path_or_buf: str | IO["bytes"], diff --git a/test_types.orc b/test_types.orc new file mode 100644 index 0000000000000000000000000000000000000000..3da1defa5486a2b7f20ed45a56b2bceadd51bfcb GIT binary patch literal 406 zcmeYda^_+aV&Py6VBnJDVqpLRAu$d{F9C^$8}l!(O;mtT21Ll0F~~`tZ*X8>fHSz* zIM@UjBsjn{ql5yOW&+cU0*n%DKt3ywW(LxX0!&aolLRx+Jq!X2K#~bYN-?|phH#m2 zK^%(9fjGoDm<3icN^t=x4+$X!e+@<@Jq{irCPpSH{*uhx)Z&uF+=BSL#LTi(WP!x; z#G+IM1`P%S1|<#-PB=>l!ZKjA0z1YEho!j09VYmBFeb4Y=^5%7aLi-`22Vo+ixLw9 RkA{T-ThkY2X8#~(2>_KcT9*I- literal 0 HcmV?d00001 diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index e8e601cc76..0d4ccb908b 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -1931,6 +1931,126 @@ def test_read_parquet_gcs( bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out) +@pytest.mark.parametrize( + ("engine", "filename"), + ( + pytest.param( + "bigquery", + "000000000000.orc", + id="bigquery", + ), + pytest.param( + "auto", + "000000000000.orc", + id="auto", + ), + pytest.param( + "pyarrow", + "000000000000.orc", + id="pyarrow", + ), + pytest.param( + "bigquery", + "*.orc", + id="bigquery_wildcard", + ), + pytest.param( + "auto", + "*.orc", + id="auto_wildcard", + marks=pytest.mark.xfail( + raises=ValueError, + ), + ), + ), +) +def test_read_orc_gcs( + session: bigframes.Session, scalars_dfs, gcs_folder, engine, filename +): + scalars_df, _ = scalars_dfs + write_path = gcs_folder + test_read_orc_gcs.__name__ + "000000000000.orc" + read_path = gcs_folder + test_read_orc_gcs.__name__ + filename + + df_in: bigframes.dataframe.DataFrame = scalars_df.copy() + df_in = df_in.drop( + columns=[ + "geography_col", + "time_col", + "datetime_col", + "duration_col", + "timestamp_col", + ] + ) + df_write = df_in.reset_index(drop=False) + df_write.index.name = f"ordering_id_{random.randrange(1_000_000)}" + df_write.to_orc(write_path) + + df_out = ( + session.read_orc(read_path, engine=engine) + .set_index(df_write.index.name) + .sort_index() + .set_index(typing.cast(str, df_in.index.name)) + ) + + assert df_out.size != 0 + pd_df_in = df_in.to_pandas() + pd_df_out = df_out.to_pandas() + bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out) + + +@pytest.mark.parametrize( + ("engine", "filename"), + ( + pytest.param( + "bigquery", + "000000000000.avro", + id="bigquery", + ), + pytest.param( + "bigquery", + "*.avro", + id="bigquery_wildcard", + ), + ), +) +def test_read_avro_gcs( + session: bigframes.Session, scalars_dfs, gcs_folder, engine, filename +): + scalars_df, _ = scalars_dfs + write_uri = gcs_folder + test_read_avro_gcs.__name__ + "*.avro" + read_uri = gcs_folder + test_read_avro_gcs.__name__ + filename + + df_in: bigframes.dataframe.DataFrame = scalars_df.copy() + # datetime round-trips back as str in avro + df_in = df_in.drop(columns=["geography_col", "duration_col", "datetime_col"]) + df_write = df_in.reset_index(drop=False) + index_name = f"ordering_id_{random.randrange(1_000_000)}" + df_write.index.name = index_name + + # Create a BigQuery table + table_id = df_write.to_gbq() + + # Extract to GCS as Avro + client = session.bqclient + extract_job_config = bigquery.ExtractJobConfig() + extract_job_config.destination_format = "AVRO" + extract_job_config.use_avro_logical_types = True + + client.extract_table(table_id, write_uri, job_config=extract_job_config).result() + + df_out = ( + session.read_avro(read_uri, engine=engine) + .set_index(index_name) + .sort_index() + .set_index(typing.cast(str, df_in.index.name)) + ) + + assert df_out.size != 0 + pd_df_in = df_in.to_pandas() + pd_df_out = df_out.to_pandas() + bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out) + + @pytest.mark.parametrize( "compression", [ From d989eb2f228d5ab93c3e830803e104f5d1a66fbd Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 30 Mar 2026 22:01:45 +0000 Subject: [PATCH 2/3] fix arg inconsistencies --- bigframes/pandas/io/api.py | 4 +--- bigframes/session/__init__.py | 8 ++++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/bigframes/pandas/io/api.py b/bigframes/pandas/io/api.py index 8946953e17..d0ad88ec4b 100644 --- a/bigframes/pandas/io/api.py +++ b/bigframes/pandas/io/api.py @@ -620,14 +620,12 @@ def read_orc( def read_avro( path: str | IO["bytes"], *, - engine: str = "bigquery", - write_engine: constants.WriteEngineType = "default", + engine: str = "auto", ) -> bigframes.dataframe.DataFrame: return global_session.with_default_session( bigframes.session.Session.read_avro, path, engine=engine, - write_engine=write_engine, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index d3b778b802..2fb0d6d8f7 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1424,23 +1424,23 @@ def read_avro( self, path: str | IO["bytes"], *, - engine: str = "default", + engine: str = "auto", ) -> dataframe.DataFrame: """Load an Avro file to a BigQuery DataFrames DataFrame. Args: path (str or IO): The path or buffer to the Avro file. Can be a local path or Google Cloud Storage URI. - engine (str, default "default"): + engine (str, default "auto"): The engine used to read the file. Only `bigquery` is supported for Avro. Returns: bigframes.dataframe.DataFrame: A new DataFrame representing the data from the Avro file. """ - if engine not in ("default", "bigquery"): + if engine not in ("auto", "bigquery"): raise ValueError( - f"Unsupported engine: {repr(engine)}. Supported values: 'default', 'bigquery'." + f"Unsupported engine: {repr(engine)}. Supported values: 'auto', 'bigquery'." ) job_config = bigquery.LoadJobConfig() From 9af47af6293e5a83c077d8f4e674f739db493f6f Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 31 Mar 2026 00:02:27 +0000 Subject: [PATCH 3/3] skip test for pandas<2 --- tests/system/small/test_session.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 0d4ccb908b..ff8f346fa3 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -1967,6 +1967,11 @@ def test_read_parquet_gcs( def test_read_orc_gcs( session: bigframes.Session, scalars_dfs, gcs_folder, engine, filename ): + pytest.importorskip( + "pandas", + minversion="2.0.0", + reason="pandas<2 does not handle nullable int columns well", + ) scalars_df, _ = scalars_dfs write_path = gcs_folder + test_read_orc_gcs.__name__ + "000000000000.orc" read_path = gcs_folder + test_read_orc_gcs.__name__ + filename