From adaeb6ff05719927d933766359d16f70883f7279 Mon Sep 17 00:00:00 2001 From: Nick Zeolla Date: Tue, 12 Sep 2023 17:04:07 +0100 Subject: [PATCH 1/2] Add bigquery table create disposition to offline store Signed-off-by: Nick Zeolla --- sdk/python/feast/infra/offline_stores/bigquery.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 5913b60f62f..240c40dd7e2 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -19,7 +19,7 @@ import pandas as pd import pyarrow import pyarrow.parquet -from pydantic import StrictStr, validator +from pydantic import StrictStr, validator, ConstrainedStr from pydantic.typing import Literal from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed @@ -71,6 +71,11 @@ def get_http_client_info(): return http_client_info.ClientInfo(user_agent=get_user_agent()) +class BigQueryTableCreateDisposition(ConstrainedStr): + """Custom constraint for table_create_disposition. To understand more, see: + https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.create_disposition""" + values = {"CREATE_NEVER", "CREATE_IF_NEEDED"} + class BigQueryOfflineStoreConfig(FeastConfigBaseModel): """Offline store config for GCP BigQuery""" @@ -95,6 +100,9 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel): gcs_staging_location: Optional[str] = None """ (optional) GCS location used for offloading BigQuery results as parquet files.""" + table_create_disposition: Optional[BigQueryTableCreateDisposition] = None + """ (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED.""" + @validator("billing_project_id") def project_id_exists(cls, v, values, **kwargs): if v and not values["project_id"]: @@ -324,6 +332,7 @@ def write_logged_features( job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.PARQUET, schema=arrow_schema_to_bq_schema(source.get_schema(registry)), + create_disposition=config.offline_store.table_create_disposition, time_partitioning=bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.DAY, field=source.get_log_timestamp_column(), @@ -384,6 +393,7 @@ def offline_write_batch( job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.PARQUET, schema=arrow_schema_to_bq_schema(pa_schema), + create_disposition=config.offline_store.table_create_disposition, write_disposition="WRITE_APPEND", # Default but included for clarity ) From c98c9b82463243851a84498236e7113eb672c48b Mon Sep 17 00:00:00 2001 From: Nick Zeolla Date: Tue, 12 Sep 2023 17:35:55 +0100 Subject: [PATCH 2/2] linting Signed-off-by: Nick Zeolla --- sdk/python/feast/infra/offline_stores/bigquery.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 240c40dd7e2..86c587c7fd2 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -19,7 +19,7 @@ import pandas as pd import pyarrow import pyarrow.parquet -from pydantic import StrictStr, validator, ConstrainedStr +from pydantic import ConstrainedStr, StrictStr, validator from pydantic.typing import Literal from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed @@ -71,9 +71,11 @@ def get_http_client_info(): return http_client_info.ClientInfo(user_agent=get_user_agent()) + class BigQueryTableCreateDisposition(ConstrainedStr): """Custom constraint for table_create_disposition. To understand more, see: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.create_disposition""" + values = {"CREATE_NEVER", "CREATE_IF_NEEDED"}