From e93b502960d8613a8aefbc930e3a5201d584f62e Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Tue, 10 Jun 2025 20:41:38 -0700 Subject: [PATCH 1/2] feat: Make Spark Compute able to use other source Signed-off-by: HaoXuAI --- .../feast/infra/compute_engines/spark/feature_builder.py | 2 +- sdk/python/feast/infra/compute_engines/spark/nodes.py | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/compute_engines/spark/feature_builder.py b/sdk/python/feast/infra/compute_engines/spark/feature_builder.py index df882bfc2c3..a3059105950 100644 --- a/sdk/python/feast/infra/compute_engines/spark/feature_builder.py +++ b/sdk/python/feast/infra/compute_engines/spark/feature_builder.py @@ -29,7 +29,7 @@ def build_source_node(self): source = self.feature_view.batch_source start_time = self.task.start_time end_time = self.task.end_time - node = SparkReadNode("source", source, start_time, end_time) + node = SparkReadNode("source", source, self.spark_session, start_time, end_time) self.nodes.append(node) return node diff --git a/sdk/python/feast/infra/compute_engines/spark/nodes.py b/sdk/python/feast/infra/compute_engines/spark/nodes.py index 8d00f124439..1ab454daa52 100644 --- a/sdk/python/feast/infra/compute_engines/spark/nodes.py +++ b/sdk/python/feast/infra/compute_engines/spark/nodes.py @@ -56,11 +56,13 @@ def __init__( self, name: str, source: DataSource, + spark_session: SparkSession, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, ): super().__init__(name) self.source = source + self.spark_session = spark_session self.start_time = start_time self.end_time = end_time @@ -72,7 +74,10 @@ def execute(self, context: ExecutionContext) -> DAGValue: start_time=self.start_time, end_time=self.end_time, ) - spark_df = cast(SparkRetrievalJob, retrieval_job).to_spark_df() + if isinstance(retrieval_job, SparkRetrievalJob): + spark_df = cast(SparkRetrievalJob, retrieval_job).to_spark_df() + else: + spark_df = self.spark_session.createDataFrame(retrieval_job.to_arrow()) return DAGValue( data=spark_df, From 0b89d509545644aba3c88b9b3609eecf2bc842de Mon Sep 17 00:00:00 2001 From: HaoXuAI Date: Tue, 10 Jun 2025 20:44:29 -0700 Subject: [PATCH 2/2] feat: Make Spark Compute able to use other source Signed-off-by: HaoXuAI --- sdk/python/feast/infra/offline_stores/duckdb.py | 2 +- sdk/python/feast/infra/offline_stores/file_source.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/duckdb.py b/sdk/python/feast/infra/offline_stores/duckdb.py index 3d736c76756..7bf96129d0b 100644 --- a/sdk/python/feast/infra/offline_stores/duckdb.py +++ b/sdk/python/feast/infra/offline_stores/duckdb.py @@ -96,7 +96,7 @@ def _write_data_source( prev_schema = ( DeltaTable(file_options.uri, storage_options=storage_options) .schema() - .to_arrow() + .to_pyarrow() ) table = table.cast(ibis.Schema.from_pyarrow(prev_schema)) write_mode = "append" diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index cba370abf4b..af33338265b 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -202,7 +202,7 @@ def get_table_column_names_and_types( schema = ( DeltaTable(self.path, storage_options=storage_options) .schema() - .to_arrow() + .to_pyarrow() ) else: raise Exception(f"Unknown FileFormat -> {self.file_format}")