From 2f37e0782f40f6e70b501a1390c712bf155eabd4 Mon Sep 17 00:00:00 2001 From: tanlocnguyen Date: Wed, 20 Mar 2024 05:00:25 +0700 Subject: [PATCH 1/3] feat: Enable Arrow-based columnar data transfers Signed-off-by: tanlocnguyen --- docs/reference/offline-stores/spark.md | 2 ++ .../feast/templates/spark/feature_repo/feature_store.yaml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/docs/reference/offline-stores/spark.md b/docs/reference/offline-stores/spark.md index 3cca2aab1af..2e2facba64a 100644 --- a/docs/reference/offline-stores/spark.md +++ b/docs/reference/offline-stores/spark.md @@ -30,6 +30,8 @@ offline_store: spark.sql.catalogImplementation: "hive" spark.sql.parser.quotedRegexColumnNames: "true" spark.sql.session.timeZone: "UTC" + spark.sql.execution.arrow.fallback.enabled: "true" + spark.sql.execution.arrow.pyspark.enabled: "true" online_store: path: data/online_store.db ``` diff --git a/sdk/python/feast/templates/spark/feature_repo/feature_store.yaml b/sdk/python/feast/templates/spark/feature_repo/feature_store.yaml index f72c7c65f4b..08383a29e13 100644 --- a/sdk/python/feast/templates/spark/feature_repo/feature_store.yaml +++ b/sdk/python/feast/templates/spark/feature_repo/feature_store.yaml @@ -12,6 +12,8 @@ offline_store: spark.sql.catalogImplementation: "hive" spark.sql.parser.quotedRegexColumnNames: "true" spark.sql.session.timeZone: "UTC" + spark.sql.execution.arrow.fallback.enabled: "true" + spark.sql.execution.arrow.pyspark.enabled: "true" online_store: path: data/online_store.db entity_key_serialization_version: 2 From c1c99901148d789a6fa21b48edea36d26e24dfb8 Mon Sep 17 00:00:00 2001 From: tanlocnguyen Date: Thu, 21 Mar 2024 22:19:27 +0700 Subject: [PATCH 2/3] fix: Add __eq__, __hash__ to SparkSource for comparision Signed-off-by: tanlocnguyen --- .../contrib/spark_offline_store/spark_source.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index 8cd392ce5df..d3568adac4f 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -185,6 +185,21 @@ def get_table_query_string(self) -> str: return f"`{tmp_table_name}`" + def __eq__(self, other): + base_eq = super().__eq__(other) + if not base_eq: + return False + if self.table != other.table: + return False + if self.query != other.query: + return False + if self.path != other.path: + return False + return True + + def __hash__(self): + return super().__hash__() + class SparkOptions: allowed_formats = [format.value for format in SparkSourceFormat] From add7f3a472297c9b0e8794bbf5091b53dc9759e9 Mon Sep 17 00:00:00 2001 From: tanlocnguyen Date: Fri, 22 Mar 2024 01:23:16 +0700 Subject: [PATCH 3/3] chore: simplify the logic Signed-off-by: tanlocnguyen --- .../contrib/spark_offline_store/spark_source.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index d3568adac4f..0809043a010 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -189,13 +189,11 @@ def __eq__(self, other): base_eq = super().__eq__(other) if not base_eq: return False - if self.table != other.table: - return False - if self.query != other.query: - return False - if self.path != other.path: - return False - return True + return ( + self.table == other.table + and self.query == other.query + and self.path == other.path + ) def __hash__(self): return super().__hash__()