From ab7d4fba2c65b455927274e233c6f305e09cef4b Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 8 Dec 2024 22:35:19 -0500 Subject: [PATCH 01/33] merged rebase Signed-off-by: Francisco Javier Arceo --- setup.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/setup.py b/setup.py index 59b881c9715..d1f9e14d519 100644 --- a/setup.py +++ b/setup.py @@ -154,7 +154,11 @@ FAISS_REQUIRED = ["faiss-cpu>=1.7.0,<2"] QDRANT_REQUIRED = ["qdrant-client>=1.12.0"] +<<<<<<< HEAD GO_REQUIRED = ["cffi~=1.15.0"] +======= +MILVUS_REQUIRED = ["pymilvus"] +>>>>>>> 321538df2 (feat: Adding support for Milvus as a Vector database) MILVUS_REQUIRED = ["pymilvus"] From a6c900572f78885c63101cfdef42a1f992b78b3b Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 10 Nov 2024 23:19:11 -0500 Subject: [PATCH 02/33] adding configuration Signed-off-by: Francisco Javier Arceo --- Makefile | 19 ++++ .../online_store/test_online_retrieval.py | 103 ++++++++++++++++++ 2 files changed, 122 insertions(+) diff --git a/Makefile b/Makefile index de2ee568b68..ad35e549a3b 100644 --- a/Makefile +++ b/Makefile @@ -349,6 +349,25 @@ test-python-universal-cassandra-no-cloud-providers: not test_snowflake" \ sdk/python/tests + test-python-universal-milvus-online: + PYTHONPATH='.' \ + FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.milvus_repo_configuration \ + PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.milvus\ + python -m pytest -n 8 --integration \ + -k "not test_universal_cli and \ + not test_go_feature_server and \ + not test_feature_logging and \ + not test_reorder_columns and \ + not test_logged_features_validation and \ + not test_lambda_materialization_consistency and \ + not test_offline_write and \ + not test_push_features_to_offline_store and \ + not gcs_registry and \ + not s3_registry and \ + not test_universal_types and \ + not test_snowflake" \ + sdk/python/tests + test-python-universal-singlestore-online: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.singlestore_repo_configuration \ diff --git a/sdk/python/tests/unit/online_store/test_online_retrieval.py b/sdk/python/tests/unit/online_store/test_online_retrieval.py index 83184643f35..ffa8aee20a3 100644 --- a/sdk/python/tests/unit/online_store/test_online_retrieval.py +++ b/sdk/python/tests/unit/online_store/test_online_retrieval.py @@ -11,12 +11,14 @@ from pandas.testing import assert_frame_equal from feast import FeatureStore, RepoConfig +from feast.infra.online_stores.contrib.milvus import MilvusOnlineStoreConfig from feast.errors import FeatureViewNotFoundException from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import FloatList as FloatListProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import RegistryConfig from feast.utils import _utc_now +from feast.infra.provider import Provider from tests.integration.feature_repos.universal.feature_views import TAGS from tests.utils.cli_repo_creator import CliRunner, get_example_repo @@ -561,3 +563,104 @@ def test_sqlite_vec_import() -> None: """).fetchall() result = [(rowid, round(distance, 2)) for rowid, distance in result] assert result == [(2, 2.39), (1, 2.39)] + +def test_milvus_get_online_documents() -> None: + """ + Test retrieving documents from the online store in local mode. + """ +def test_milvus_get_online_documents() -> None: + """ + Test retrieving documents from the online store in local mode using Milvus. + """ + n = 10 # number of samples - note: we'll actually double it + vector_length = 8 + runner = CliRunner() + with runner.local_repo( + get_example_repo("example_feature_repo_1.py"), "file" + ) as store: + # Configure the online store to use Milvus + new_config = RepoConfig( + project=store.config.project, + registry=store.config.registry, + provider=store.config.provider, + online_store=MilvusOnlineStoreConfig( + type="milvus", + host="localhost", + port=19530, + index_type="IVF_FLAT", + metric_type="L2", + embedding_dim=vector_length, + vector_enabled=True, + ), + entity_key_serialization_version=store.config.entity_key_serialization_version, + ) + store = FeatureStore(config=new_config, repo_path=store.repo_path) + # Apply the new configuration + store.apply([]) + + # Write some data to the feature view + document_embeddings_fv = store.get_feature_view(name="document_embeddings") + + provider: Provider = store._get_provider() + + item_keys = [ + EntityKeyProto( + join_keys=["item_id"], entity_values=[ValueProto.Value(int64_val=i)] + ) + for i in range(n) + ] + data = [] + for item_key in item_keys: + embedding_vector = np.random.random(vector_length).tolist() + data.append( + ( + item_key, + { + "Embeddings": ValueProto.Value( + float_list_val=FloatListProto(val=embedding_vector) + ) + }, + _utc_now(), + _utc_now(), + ) + ) + + provider.online_write_batch( + config=store.config, + table=document_embeddings_fv, + data=data, + progress=None, + ) + + documents_df = pd.DataFrame( + { + "item_id": [i for i in range(n)], + "Embeddings": [ + np.random.random(vector_length).tolist() for _ in range(n) + ], + "event_timestamp": [_utc_now() for _ in range(n)], + } + ) + + store.write_to_online_store( + feature_view_name="document_embeddings", + df=documents_df, + ) + + # For Milvus, get the collection and check the number of entities + collection = provider._online_store._get_collection( + store.config, document_embeddings_fv + ) + record_count = collection.num_entities + assert record_count == len(data) + documents_df.shape[0] + + query_embedding = np.random.random(vector_length).tolist() + + # Retrieve online documents using Milvus + result = store.retrieve_online_documents( + feature="document_embeddings:Embeddings", query=query_embedding, top_k=3 + ).to_dict() + + assert "Embeddings" in result + assert "distance" in result + assert len(result["distance"]) == 3 From b61b965648af2b975c1076ef14be385310c79121 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 11 Nov 2024 22:01:43 -0500 Subject: [PATCH 03/33] updated Signed-off-by: Francisco Javier Arceo --- Makefile | 2 +- .../milvus_online_store/milvus.py | 366 ++++++++++++++++++ .../online_store/test_online_retrieval.py | 9 +- 3 files changed, 370 insertions(+), 7 deletions(-) create mode 100644 sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py diff --git a/Makefile b/Makefile index ad35e549a3b..84d1a552349 100644 --- a/Makefile +++ b/Makefile @@ -351,7 +351,7 @@ test-python-universal-cassandra-no-cloud-providers: test-python-universal-milvus-online: PYTHONPATH='.' \ - FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.milvus_repo_configuration \ + FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.milvus_online_store.milvus_repo_configuration \ PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.milvus\ python -m pytest -n 8 --integration \ -k "not test_universal_cli and \ diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py new file mode 100644 index 00000000000..757b6469a02 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -0,0 +1,366 @@ +from datetime import datetime +from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple + +from pydantic import StrictStr +from pymilvus import ( + Collection, + CollectionSchema, + DataType, + FieldSchema, + connections, +) + +from feast import Entity +from feast.feature_view import FeatureView +from feast.infra.infra_object import InfraObject +from feast.infra.key_encoding_utils import ( + deserialize_entity_key, + serialize_entity_key, +) +from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.online_stores.vector_store import VectorStoreConfig +from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.utils import ( + _build_retrieve_online_document_record, + to_naive_utc, +) + + +class MilvusOnlineStoreConfig(FeastConfigBaseModel, VectorStoreConfig): + """Online store config for Milvus vector database""" + + type: Literal["milvus", "feast.infra.online_stores.milvus.MilvusOnlineStore"] = ( + "milvus" + ) + """Online store type selector""" + + host: StrictStr = "localhost" + """Hostname for Milvus server""" + + port: int = 19530 + """Port for Milvus server""" + + index_type: str = "IVF_FLAT" + """Index type for Milvus collection""" + + metric_type: str = "L2" + """Distance metric type""" + + embedding_dim: int = 128 + """Dimension of the embedding vectors""" + + vector_enabled: bool = True + """Flag to enable vector search""" + + +class MilvusOnlineStore(OnlineStore): + """ + Milvus implementation of the online store interface. + + Attributes: + _collections: Dictionary to cache Milvus collections. + """ + + _collections: Dict[str, Collection] = {} + + def _connect(self, config: RepoConfig): + connections.connect( + alias="default", + host=config.online_store.host, + port=str(config.online_store.port), + ) + + def _get_collection(self, config: RepoConfig, table: FeatureView) -> Collection: + collection_name = _table_id(config.project, table) + if collection_name not in self._collections: + self._connect(config) + + fields = [ + FieldSchema( + name="pk", dtype=DataType.INT64, is_primary=True, auto_id=True + ), + FieldSchema(name="entity_key", dtype=DataType.VARCHAR, max_length=512), + FieldSchema( + name="feature_name", dtype=DataType.VARCHAR, max_length=256 + ), + FieldSchema(name="value", dtype=DataType.BINARY_VECTOR, dim=8 * 1024), + FieldSchema( + name="vector_value", + dtype=DataType.FLOAT_VECTOR, + dim=config.online_store.embedding_dim, + ), + FieldSchema(name="event_ts", dtype=DataType.INT64), + FieldSchema(name="created_ts", dtype=DataType.INT64), + ] + schema = CollectionSchema( + fields=fields, description="Feast feature view data" + ) + collection = Collection( + name=collection_name, schema=schema, using="default" + ) + if not collection.has_index(): + index_params = { + "index_type": config.online_store.index_type, + "metric_type": config.online_store.metric_type, + "params": {"nlist": 128}, + } + collection.create_index( + field_name="vector_value", index_params=index_params + ) + collection.load() + self._collections[collection_name] = collection + return self._collections[collection_name] + + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[ + EntityKeyProto, + Dict[str, ValueProto], + datetime, + Optional[datetime], + ] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + collection = self._get_collection(config, table) + entity_keys = [] + feature_names = [] + values = [] + vector_values = [] + event_tss = [] + created_tss = [] + + for entity_key, values_dict, timestamp, created_ts in data: + entity_key_str = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() + timestamp_int = int(to_naive_utc(timestamp).timestamp() * 1e6) + created_ts_int = ( + int(to_naive_utc(created_ts).timestamp() * 1e6) if created_ts else 0 + ) + for feature_name, val in values_dict.items(): + entity_keys.append(entity_key_str) + feature_names.append(feature_name) + values.append(val.SerializeToString()) + if config.online_store.vector_enabled: + vector_values.append(val.float_list_val.val) + else: + vector_values.append([0.0] * config.online_store.embedding_dim) + event_tss.append(timestamp_int) + created_tss.append(created_ts_int) + if progress: + progress(1) + + if entity_keys: + insert_data = { + "entity_key": entity_keys, + "feature_name": feature_names, + "value": values, + "vector_value": vector_values, + "event_ts": event_tss, + "created_ts": created_tss, + } + collection.insert(insert_data) + collection.flush() + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + collection = self._get_collection(config, table) + results = [] + + for entity_key in entity_keys: + entity_key_str = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() + expr = f"entity_key == '{entity_key_str}'" + if requested_features: + features_str = ", ".join([f"'{f}'" for f in requested_features]) + expr += f" && feature_name in [{features_str}]" + + res = collection.query( + expr, + output_fields=["feature_name", "value", "event_ts"], + consistency_level="Strong", + ) + + res_dict = {} + res_ts = None + for r in res: + feature_name = r["feature_name"] + val_bin = r["value"] + val = ValueProto() + val.ParseFromString(val_bin) + res_dict[feature_name] = val + res_ts = datetime.fromtimestamp(r["event_ts"] / 1e6) + if not res_dict: + results.append((None, None)) + else: + results.append((res_ts, res_dict)) + return results + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + self._connect(config) + for table in tables_to_keep: + self._get_collection(config, table) + for table in tables_to_delete: + collection_name = _table_id(config.project, table) + collection = Collection(name=collection_name) + if collection.exists(): + collection.drop() + self._collections.pop(collection_name, None) + + def plan( + self, config: RepoConfig, desired_registry_proto: RegistryProto + ) -> List[InfraObject]: + project = config.project + + infra_objects: List[InfraObject] = [ + MilvusTable( + host=config.online_store.host, + port=config.online_store.port, + name=_table_id(project, FeatureView.from_proto(view)), + ) + for view in [ + *desired_registry_proto.feature_views, + *desired_registry_proto.stream_feature_views, + ] + ] + return infra_objects + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + self._connect(config) + for table in tables: + collection_name = _table_id(config.project, table) + collection = Collection(name=collection_name) + if collection.exists(): + collection.drop() + self._collections.pop(collection_name, None) + + def retrieve_online_documents( + self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], + top_k: int, + distance_metric: Optional[str] = None, + ) -> List[ + Tuple[ + Optional[datetime], + Optional[EntityKeyProto], + Optional[ValueProto], + Optional[ValueProto], + Optional[ValueProto], + ] + ]: + collection = self._get_collection(config, table) + if not config.online_store.vector_enabled: + raise ValueError("Vector search is not enabled in the online store config") + + search_params = { + "metric_type": distance_metric or config.online_store.metric_type, + "params": {"nprobe": 10}, + } + expr = f"feature_name == '{requested_feature}'" + + results = collection.search( + data=[embedding], + anns_field="vector_value", + param=search_params, + limit=top_k, + expr=expr, + output_fields=["entity_key", "value", "event_ts"], + consistency_level="Strong", + ) + + result_list = [] + for hits in results: + for hit in hits: + entity_key_str = hit.entity.get("entity_key") + val_bin = hit.entity.get("value") + val = ValueProto() + val.ParseFromString(val_bin) + distance = hit.distance + event_ts = datetime.fromtimestamp(hit.entity.get("event_ts") / 1e6) + entity_key = deserialize_entity_key( + bytes.fromhex(entity_key_str), + config.entity_key_serialization_version, + ) + result_list.append( + _build_retrieve_online_document_record( + entity_key, + val.SerializeToString(), + embedding, + distance, + event_ts, + config.entity_key_serialization_version, + ) + ) + return result_list + + +def _table_id(project: str, table: FeatureView) -> str: + return f"{project}_{table.name}" + + +class MilvusTable(InfraObject): + """ + A Milvus collection managed by Feast. + + Attributes: + host: The host of the Milvus server. + port: The port of the Milvus server. + name: The name of the collection. + """ + + host: str + port: int + + def __init__(self, host: str, port: int, name: str): + super().__init__(name) + self.host = host + self.port = port + self._connect() + + def _connect(self): + connections.connect(alias="default", host=self.host, port=str(self.port)) + + def to_infra_object_proto(self) -> InfraObjectProto: + # Implement serialization if needed + pass + + def update(self): + # Implement update logic if needed + pass + + def teardown(self): + collection = Collection(name=self.name) + if collection.exists(): + collection.drop() diff --git a/sdk/python/tests/unit/online_store/test_online_retrieval.py b/sdk/python/tests/unit/online_store/test_online_retrieval.py index ffa8aee20a3..602715e5df8 100644 --- a/sdk/python/tests/unit/online_store/test_online_retrieval.py +++ b/sdk/python/tests/unit/online_store/test_online_retrieval.py @@ -11,14 +11,14 @@ from pandas.testing import assert_frame_equal from feast import FeatureStore, RepoConfig -from feast.infra.online_stores.contrib.milvus import MilvusOnlineStoreConfig from feast.errors import FeatureViewNotFoundException +from feast.infra.online_stores.milvus_online_store.milvus import MilvusOnlineStoreConfig +from feast.infra.provider import Provider from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import FloatList as FloatListProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import RegistryConfig from feast.utils import _utc_now -from feast.infra.provider import Provider from tests.integration.feature_repos.universal.feature_views import TAGS from tests.utils.cli_repo_creator import CliRunner, get_example_repo @@ -564,10 +564,7 @@ def test_sqlite_vec_import() -> None: result = [(rowid, round(distance, 2)) for rowid, distance in result] assert result == [(2, 2.39), (1, 2.39)] -def test_milvus_get_online_documents() -> None: - """ - Test retrieving documents from the online store in local mode. - """ + def test_milvus_get_online_documents() -> None: """ Test retrieving documents from the online store in local mode using Milvus. From 88bca02ef18292136223c8433f2850859d0388c5 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 14 Nov 2024 22:46:46 -0500 Subject: [PATCH 04/33] changed things and linted Signed-off-by: Francisco Javier Arceo --- .../integration/feature_repos/universal/online_store/milvus.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py b/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py index 8ffee04c12f..3b7eb25d925 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py @@ -1,3 +1,4 @@ +import time from typing import Any, Dict from testcontainers.milvus import MilvusContainer From 81bd84a2903148fd7fe3093a7f6ab4afce1fabcb Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 15 Nov 2024 13:22:40 -0500 Subject: [PATCH 05/33] adding updated builds Signed-off-by: Francisco Javier Arceo --- .../integration/feature_repos/universal/online_store/milvus.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py b/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py index 3b7eb25d925..8ffee04c12f 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py @@ -1,4 +1,3 @@ -import time from typing import Any, Dict from testcontainers.milvus import MilvusContainer From 083fe670f20c98de59b80df181efd18d66afd118 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 19 Nov 2024 21:55:29 -0500 Subject: [PATCH 06/33] adding repo config Signed-off-by: Francisco Javier Arceo --- .../adding-support-for-a-new-online-store.md | 2 +- .../milvus_online_store/milvus.py | 37 +++++++------------ sdk/python/feast/repo_config.py | 1 + 3 files changed, 15 insertions(+), 25 deletions(-) diff --git a/docs/how-to-guides/customizing-feast/adding-support-for-a-new-online-store.md b/docs/how-to-guides/customizing-feast/adding-support-for-a-new-online-store.md index 5e26f133cef..ee75aa6b74f 100644 --- a/docs/how-to-guides/customizing-feast/adding-support-for-a-new-online-store.md +++ b/docs/how-to-guides/customizing-feast/adding-support-for-a-new-online-store.md @@ -25,7 +25,7 @@ OnlineStore class names must end with the OnlineStore suffix! ### Contrib online stores -New online stores go in `sdk/python/feast/infra/online_stores/contrib/`. +New online stores go in `sdk/python/feast/infra/online_stores/`. #### What is a contrib plugin? diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 757b6469a02..cab30d27164 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -31,30 +31,19 @@ class MilvusOnlineStoreConfig(FeastConfigBaseModel, VectorStoreConfig): - """Online store config for Milvus vector database""" - - type: Literal["milvus", "feast.infra.online_stores.milvus.MilvusOnlineStore"] = ( - "milvus" - ) - """Online store type selector""" - - host: StrictStr = "localhost" - """Hostname for Milvus server""" - - port: int = 19530 - """Port for Milvus server""" - - index_type: str = "IVF_FLAT" - """Index type for Milvus collection""" - - metric_type: str = "L2" - """Distance metric type""" + """ + Configuration for the Milvus online store. + NOTE: The class *must* end with the `OnlineStoreConfig` suffix. + """ - embedding_dim: int = 128 - """Dimension of the embedding vectors""" + type: Literal["milvus"] = "milvus" - vector_enabled: bool = True - """Flag to enable vector search""" + host: Optional[StrictStr] = "localhost" + port: Optional[int] = 19530 + index_type: Optional[str] = "IVF_FLAT" + metric_type: Optional[str] = "L2" + embedding_dim: Optional[int] = 128 + vector_enabled: Optional[bool] = True class MilvusOnlineStore(OnlineStore): @@ -69,7 +58,7 @@ class MilvusOnlineStore(OnlineStore): def _connect(self, config: RepoConfig): connections.connect( - alias="default", + alias="feast", host=config.online_store.host, port=str(config.online_store.port), ) @@ -350,7 +339,7 @@ def __init__(self, host: str, port: int, name: str): self._connect() def _connect(self): - connections.connect(alias="default", host=self.host, port=str(self.port)) + return connections.connect(alias="default", host=self.host, port=str(self.port)) def to_infra_object_proto(self) -> InfraObjectProto: # Implement serialization if needed diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index fe34a12adf8..2b8d5174e1f 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -81,6 +81,7 @@ "singlestore": "feast.infra.online_stores.singlestore_online_store.singlestore.SingleStoreOnlineStore", "qdrant": "feast.infra.online_stores.cqdrant.QdrantOnlineStore", "couchbase": "feast.infra.online_stores.couchbase_online_store.couchbase.CouchbaseOnlineStore", + "milvus": "feast.infra.online_stores.milvus_online_store.milvus.MilvusOnlineStore", **LEGACY_ONLINE_STORE_CLASS_FOR_TYPE, } From fcf32d9161f59911893cac8017000300c34ea475 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 25 Nov 2024 13:26:32 -0500 Subject: [PATCH 07/33] updated Signed-off-by: Francisco Javier Arceo --- Makefile | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/Makefile b/Makefile index 84d1a552349..8eccc91a6e1 100644 --- a/Makefile +++ b/Makefile @@ -268,7 +268,7 @@ test-python-universal-postgres-online: not test_snowflake" \ sdk/python/tests - test-python-universal-mysql-online: +test-python-universal-mysql-online: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.mysql_online_store.mysql_repo_configuration \ PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.mysql \ @@ -292,7 +292,11 @@ test-python-universal-cassandra: FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.cassandra_online_store.cassandra_repo_configuration \ PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.cassandra \ python -m pytest -x --integration \ - sdk/python/tests + sdk/python/tests/integration/offline_store/test_feature_logging.py \ + --ignore=sdk/python/tests/integration/offline_store/test_validation.py \ + -k "not test_snowflake and \ + not test_spark_materialization_consistency and \ + not test_universal_materialization" test-python-universal-hazelcast: PYTHONPATH='.' \ @@ -330,7 +334,7 @@ test-python-universal-cassandra-no-cloud-providers: not test_snowflake" \ sdk/python/tests - test-python-universal-elasticsearch-online: +test-python-universal-elasticsearch-online: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.elasticsearch_online_store.elasticsearch_repo_configuration \ PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.elasticsearch \ @@ -349,23 +353,12 @@ test-python-universal-cassandra-no-cloud-providers: not test_snowflake" \ sdk/python/tests - test-python-universal-milvus-online: +test-python-universal-milvus-online: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.milvus_online_store.milvus_repo_configuration \ PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.milvus\ python -m pytest -n 8 --integration \ - -k "not test_universal_cli and \ - not test_go_feature_server and \ - not test_feature_logging and \ - not test_reorder_columns and \ - not test_logged_features_validation and \ - not test_lambda_materialization_consistency and \ - not test_offline_write and \ - not test_push_features_to_offline_store and \ - not gcs_registry and \ - not s3_registry and \ - not test_universal_types and \ - not test_snowflake" \ + -k "test_retrieve_online_documents and not test_validation" \ sdk/python/tests test-python-universal-singlestore-online: From 662119a610c1c7d00d963beaaa47ef38db352381 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 27 Nov 2024 08:41:28 -0500 Subject: [PATCH 08/33] renaming test and adding milvus to integration test Signed-off-by: Francisco Javier Arceo --- Makefile | 2 +- .../tests/integration/online_store/test_universal_online.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 8eccc91a6e1..02f39cf9521 100644 --- a/Makefile +++ b/Makefile @@ -358,7 +358,7 @@ test-python-universal-milvus-online: FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.milvus_online_store.milvus_repo_configuration \ PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.milvus\ python -m pytest -n 8 --integration \ - -k "test_retrieve_online_documents and not test_validation" \ + -k "test_retrieve_online_documents and not test_dqm" \ sdk/python/tests test-python-universal-singlestore-online: diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 4074dcb194e..7c7c5b31aee 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -857,7 +857,7 @@ def assert_feature_service_entity_mapping_correctness( @pytest.mark.integration -@pytest.mark.universal_online_stores(only=["pgvector", "elasticsearch", "qdrant"]) +@pytest.mark.universal_online_stores(only=["pgvector", "elasticsearch", "qdrant", "milvus"]) def test_retrieve_online_documents(vectordb_environment, fake_document_data): fs = vectordb_environment.feature_store df, data_source = fake_document_data From 4a7edd8e00938f9c69e00659f900b3bca43ed86b Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 27 Nov 2024 16:42:41 -0500 Subject: [PATCH 09/33] not quite working but have milvus talking Signed-off-by: Francisco Javier Arceo --- .../online_stores/milvus_online_store/milvus.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index cab30d27164..1d1d6922a14 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -54,14 +54,22 @@ class MilvusOnlineStore(OnlineStore): _collections: Dictionary to cache Milvus collections. """ + _conn: Optional[connections] = None _collections: Dict[str, Collection] = {} - def _connect(self, config: RepoConfig): - connections.connect( + def _connect(self, config: RepoConfig) -> connections: + if not self._conn: + self._conn = connections.connect( + alias="default", + host=config.online_store.host, + port=str(config.online_store.port), + ) + self._conn = connections.connect( alias="feast", host=config.online_store.host, port=str(config.online_store.port), ) + return self._conn def _get_collection(self, config: RepoConfig, table: FeatureView) -> Collection: collection_name = _table_id(config.project, table) From 4c16c0b72aa64dd03543a407aeac166d0910252c Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 30 Nov 2024 23:29:54 -0500 Subject: [PATCH 10/33] updated tests Signed-off-by: Francisco Javier Arceo --- Makefile | 6 +++--- sdk/python/tests/conftest.py | 12 ++++++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 02f39cf9521..84bc594a6ad 100644 --- a/Makefile +++ b/Makefile @@ -356,10 +356,10 @@ test-python-universal-elasticsearch-online: test-python-universal-milvus-online: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.milvus_online_store.milvus_repo_configuration \ - PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.milvus\ + PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.milvus \ python -m pytest -n 8 --integration \ - -k "test_retrieve_online_documents and not test_dqm" \ - sdk/python/tests + -k "test_retrieve_online_documents" \ + sdk/python/tests --ignore=sdk/python/tests/integration/offline_store/test_dqm_validation.py test-python-universal-singlestore-online: PYTHONPATH='.' \ diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 6e5f1e14870..4a1a727339d 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -191,6 +191,7 @@ def environment(request, worker_id): request.param, worker_id=worker_id, fixture_request=request, + entity_key_serialization_version=3, ) e.setup() @@ -204,9 +205,20 @@ def environment(request, worker_id): e.teardown() +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.universal.online_store.milvus import MilvusOnlineStoreCreator + @pytest.fixture def vectordb_environment(request, worker_id): + milvus_config = IntegrationTestRepoConfig( + provider="milvus", + online_store_creator=MilvusOnlineStoreCreator, + offline_store_creator='', + batch_engine={}) e = construct_test_environment( + milvus_config, request.param, worker_id=worker_id, fixture_request=request, From fabac181b32c4bf5b2fb6a27695a9310f2f30dd3 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 2 Dec 2024 12:55:09 -0500 Subject: [PATCH 11/33] apply() method now works Signed-off-by: Francisco Javier Arceo --- .../infra/online_stores/milvus_online_store/milvus.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 1d1d6922a14..9ee4b31982b 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -77,14 +77,11 @@ def _get_collection(self, config: RepoConfig, table: FeatureView) -> Collection: self._connect(config) fields = [ - FieldSchema( - name="pk", dtype=DataType.INT64, is_primary=True, auto_id=True - ), - FieldSchema(name="entity_key", dtype=DataType.VARCHAR, max_length=512), + FieldSchema(name="entity_key", dtype=DataType.VARCHAR, max_length=512, is_primary=True), FieldSchema( name="feature_name", dtype=DataType.VARCHAR, max_length=256 ), - FieldSchema(name="value", dtype=DataType.BINARY_VECTOR, dim=8 * 1024), + # FieldSchema(name="value", dtype=DataType.BINARY_VECTOR, dim=8 * 1024), FieldSchema( name="vector_value", dtype=DataType.FLOAT_VECTOR, @@ -256,7 +253,7 @@ def teardown( for table in tables: collection_name = _table_id(config.project, table) collection = Collection(name=collection_name) - if collection.exists(): + if collection: collection.drop() self._collections.pop(collection_name, None) From 2ee74c5e4c8ffdcefda464802475ad39021549a8 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 8 Dec 2024 22:43:34 -0500 Subject: [PATCH 12/33] updated setup Signed-off-by: Francisco Javier Arceo --- setup.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/setup.py b/setup.py index d1f9e14d519..7284782e10d 100644 --- a/setup.py +++ b/setup.py @@ -154,11 +154,9 @@ FAISS_REQUIRED = ["faiss-cpu>=1.7.0,<2"] QDRANT_REQUIRED = ["qdrant-client>=1.12.0"] -<<<<<<< HEAD GO_REQUIRED = ["cffi~=1.15.0"] -======= + MILVUS_REQUIRED = ["pymilvus"] ->>>>>>> 321538df2 (feat: Adding support for Milvus as a Vector database) MILVUS_REQUIRED = ["pymilvus"] From 1859e7690b76d8252a1a932c0182a4a22cf22f12 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 13 Dec 2024 16:35:42 -0500 Subject: [PATCH 13/33] checking in progresss...getting there Signed-off-by: Francisco Javier Arceo --- .../milvus_online_store/milvus.py | 122 +++++++++++------- sdk/python/feast/type_map.py | 73 +++++------ sdk/python/tests/conftest.py | 13 +- .../online_store/test_universal_online.py | 37 ++++++ 4 files changed, 155 insertions(+), 90 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 9ee4b31982b..076d0ca54c7 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -11,6 +11,7 @@ ) from feast import Entity +from feast.type_map import PROTO_VALUE_TO_VALUE_TYPE_MAP from feast.feature_view import FeatureView from feast.infra.infra_object import InfraObject from feast.infra.key_encoding_utils import ( @@ -28,7 +29,37 @@ _build_retrieve_online_document_record, to_naive_utc, ) - +from feast.types import VALUE_TYPES_TO_FEAST_TYPES, PrimitiveFeastType, Array, ValueType + +PROTO_TO_MILVUS_TYPE_MAPPING = { + PROTO_VALUE_TO_VALUE_TYPE_MAP['bytes_val']: DataType.STRING, + PROTO_VALUE_TO_VALUE_TYPE_MAP['bool_val']: DataType.BOOL, + PROTO_VALUE_TO_VALUE_TYPE_MAP['string_val']: DataType.STRING, + PROTO_VALUE_TO_VALUE_TYPE_MAP['float_val']: DataType.FLOAT, + PROTO_VALUE_TO_VALUE_TYPE_MAP['double_val']: DataType.DOUBLE, + PROTO_VALUE_TO_VALUE_TYPE_MAP['int32_val']: DataType.INT32, + PROTO_VALUE_TO_VALUE_TYPE_MAP['int64_val']: DataType.INT64, + PROTO_VALUE_TO_VALUE_TYPE_MAP['float_list_val']: DataType.FLOAT_VECTOR, + PROTO_VALUE_TO_VALUE_TYPE_MAP['int32_list_val']: DataType.FLOAT_VECTOR, + PROTO_VALUE_TO_VALUE_TYPE_MAP['int64_list_val']: DataType.FLOAT_VECTOR, + PROTO_VALUE_TO_VALUE_TYPE_MAP['double_list_val']: DataType.FLOAT_VECTOR, + PROTO_VALUE_TO_VALUE_TYPE_MAP['bool_list_val']: DataType.BINARY_VECTOR, +} + +FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING = {} + +for value_type, feast_type in VALUE_TYPES_TO_FEAST_TYPES.items(): + if isinstance(feast_type, PrimitiveFeastType): + milvus_type = PROTO_TO_MILVUS_TYPE_MAPPING.get(value_type) + if milvus_type: + FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING[feast_type] = milvus_type + elif isinstance(feast_type, Array): + base_type = feast_type.base_type + base_value_type = base_type.to_value_type() + if base_value_type in [ValueType.INT32, ValueType.INT64, ValueType.FLOAT, ValueType.DOUBLE]: + FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING[feast_type] = DataType.FLOAT_VECTOR + elif base_value_type == ValueType.BOOL: + FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING[feast_type] = DataType.BINARY_VECTOR class MilvusOnlineStoreConfig(FeastConfigBaseModel, VectorStoreConfig): """ @@ -60,7 +91,7 @@ class MilvusOnlineStore(OnlineStore): def _connect(self, config: RepoConfig) -> connections: if not self._conn: self._conn = connections.connect( - alias="default", + alias="feast", host=config.online_store.host, port=str(config.online_store.port), ) @@ -76,35 +107,36 @@ def _get_collection(self, config: RepoConfig, table: FeatureView) -> Collection: if collection_name not in self._collections: self._connect(config) + # Create a composite key by combining entity fields + composite_key_name = '_'.join([field.name for field in table.entity_columns]) + "_pk" + fields = [ - FieldSchema(name="entity_key", dtype=DataType.VARCHAR, max_length=512, is_primary=True), - FieldSchema( - name="feature_name", dtype=DataType.VARCHAR, max_length=256 - ), - # FieldSchema(name="value", dtype=DataType.BINARY_VECTOR, dim=8 * 1024), - FieldSchema( - name="vector_value", - dtype=DataType.FLOAT_VECTOR, - dim=config.online_store.embedding_dim, - ), + FieldSchema(name=composite_key_name, dtype=DataType.VARCHAR, max_length=512, is_primary=True), FieldSchema(name="event_ts", dtype=DataType.INT64), FieldSchema(name="created_ts", dtype=DataType.INT64), ] - schema = CollectionSchema( - fields=fields, description="Feast feature view data" - ) - collection = Collection( - name=collection_name, schema=schema, using="default" - ) + fields_to_exclude = [field.name for field in table.entity_columns] + ['event_ts', 'created_ts'] + fields_to_add = [f for f in table.schema if f.name not in fields_to_exclude] + for field in fields_to_add: + dtype = FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING.get(field.dtype) + if dtype: + if dtype == DataType.FLOAT_VECTOR: + fields.append(FieldSchema(name=field.name, dtype=dtype, dim=config.online_store.embedding_dim)) + + else: + fields.append(FieldSchema(name=field.name, dtype=dtype)) + + schema = CollectionSchema(fields=fields, description="Feast feature view data") + collection = Collection(name=collection_name, schema=schema, using="feast") if not collection.has_index(): index_params = { "index_type": config.online_store.index_type, "metric_type": config.online_store.metric_type, "params": {"nlist": 128}, } - collection.create_index( - field_name="vector_value", index_params=index_params - ) + for vector_field in schema.fields: + if vector_field.dtype in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR]: + collection.create_index(field_name=vector_field.name, index_params=index_params) collection.load() self._collections[collection_name] = collection return self._collections[collection_name] @@ -124,13 +156,9 @@ def online_write_batch( progress: Optional[Callable[[int], Any]], ) -> None: collection = self._get_collection(config, table) - entity_keys = [] - feature_names = [] - values = [] - vector_values = [] - event_tss = [] - created_tss = [] + numeric_vector_list_types = [k for k in PROTO_VALUE_TO_VALUE_TYPE_MAP.keys() if k is not None and 'list' in k and 'string' not in k] + entity_batch_to_insert = [] for entity_key, values_dict, timestamp, created_ts in data: entity_key_str = serialize_entity_key( entity_key, @@ -140,30 +168,28 @@ def online_write_batch( created_ts_int = ( int(to_naive_utc(created_ts).timestamp() * 1e6) if created_ts else 0 ) - for feature_name, val in values_dict.items(): - entity_keys.append(entity_key_str) - feature_names.append(feature_name) - values.append(val.SerializeToString()) - if config.online_store.vector_enabled: - vector_values.append(val.float_list_val.val) - else: - vector_values.append([0.0] * config.online_store.embedding_dim) - event_tss.append(timestamp_int) - created_tss.append(created_ts_int) + for feature_name in values_dict: + for vector_list_type_name in numeric_vector_list_types: + vector_list = getattr(values_dict[feature_name], vector_list_type_name, None) + if vector_list: + vector_values = getattr(values_dict[feature_name], vector_list_type_name).val + if vector_values != []: + # Note here we are over-writing the feature and collapsing the list into a single value + values_dict[feature_name] = vector_values + + single_entity_record = { + "entity_key": entity_key_str, + "event_ts": timestamp_int, + "created_ts": created_ts_int, + } + single_entity_record.update(values_dict) + entity_batch_to_insert.append(single_entity_record) + if progress: progress(1) - if entity_keys: - insert_data = { - "entity_key": entity_keys, - "feature_name": feature_names, - "value": values, - "vector_value": vector_values, - "event_ts": event_tss, - "created_ts": created_tss, - } - collection.insert(insert_data) - collection.flush() + collection.insert(entity_batch_to_insert) + collection.flush() def online_read( self, diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 8a88c24ffc1..1c9d99d19f2 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -119,10 +119,10 @@ def feast_value_type_to_pandas_type(value_type: ValueType) -> Any: def python_type_to_feast_value_type( - name: str, - value: Optional[Any] = None, - recurse: bool = True, - type_name: Optional[str] = None, + name: str, + value: Optional[Any] = None, + recurse: bool = True, + type_name: Optional[str] = None, ) -> ValueType: """ Finds the equivalent Feast Value Type for a Python value. Both native @@ -197,8 +197,8 @@ def python_type_to_feast_value_type( ) # Validate whether the type stays consistent if ( - common_item_value_type - and not common_item_value_type == current_item_value_type + common_item_value_type + and not common_item_value_type == current_item_value_type ): raise ValueError( f"List value type for field {name} is inconsistent. " @@ -217,7 +217,7 @@ def python_type_to_feast_value_type( def python_values_to_feast_value_type( - name: str, values: Any, recurse: bool = True + name: str, values: Any, recurse: bool = True ) -> ValueType: inferred_dtype = ValueType.UNKNOWN for row in values: @@ -229,8 +229,8 @@ def python_values_to_feast_value_type( inferred_dtype = current_dtype else: if current_dtype != inferred_dtype and current_dtype not in ( - ValueType.UNKNOWN, - ValueType.NULL, + ValueType.UNKNOWN, + ValueType.NULL, ): raise TypeError( f"Input entity {name} has mixed types, {current_dtype} and {inferred_dtype}. That is not allowed. " @@ -318,7 +318,7 @@ def _type_err(item, dtype): def _python_datetime_to_int_timestamp( - values: Sequence[Any], + values: Sequence[Any], ) -> Sequence[Union[int, np.int_]]: # Fast path for Numpy array. if isinstance(values, np.ndarray) and isinstance(values.dtype, np.datetime64): @@ -342,7 +342,7 @@ def _python_datetime_to_int_timestamp( def _python_value_to_proto_value( - feast_value_type: ValueType, values: List[Any] + feast_value_type: ValueType, values: List[Any] ) -> List[ProtoValue]: """ Converts a Python (native, pandas) value to a Feast Proto Value based @@ -387,7 +387,7 @@ def _python_value_to_proto_value( raise _type_err(sample, valid_types[0]) if sample is not None and not all( - type(item) in valid_types for item in sample + type(item) in valid_types for item in sample ): # to_numpy() in utils._convert_arrow_to_proto() upcasts values of type Array of INT32 or INT64 with NULL values to Float64 automatically. for item in sample: @@ -459,11 +459,11 @@ def _python_value_to_proto_value( # So, if value is 0, type validation must pass if scalar_types are either int or float. allowed_types = {np.int64, int, np.float64, float} assert ( - type(sample) in allowed_types + type(sample) in allowed_types ), f"Type `{type(sample)}` not in {allowed_types}" else: assert ( - type(sample) in valid_scalar_types + type(sample) in valid_scalar_types ), f"Type `{type(sample)}` not in {valid_scalar_types}" if feast_value_type == ValueType.BOOL: # ProtoValue does not support conversion of np.bool_ so we need to convert it to support np.bool_. @@ -496,7 +496,7 @@ def _python_value_to_proto_value( def python_values_to_proto_values( - values: List[Any], feature_type: ValueType = ValueType.UNKNOWN + values: List[Any], feature_type: ValueType = ValueType.UNKNOWN ) -> List[ProtoValue]: value_type = feature_type sample = next(filter(_non_empty_value, values), None) # first not empty value @@ -523,6 +523,25 @@ def python_values_to_proto_values( return proto_values +PROTO_VALUE_TO_VALUE_TYPE_MAP: Dict[str, ValueType] = { + "int32_val": ValueType.INT32, + "int64_val": ValueType.INT64, + "double_val": ValueType.DOUBLE, + "float_val": ValueType.FLOAT, + "string_val": ValueType.STRING, + "bytes_val": ValueType.BYTES, + "bool_val": ValueType.BOOL, + "int32_list_val": ValueType.INT32_LIST, + "int64_list_val": ValueType.INT64_LIST, + "double_list_val": ValueType.DOUBLE_LIST, + "float_list_val": ValueType.FLOAT_LIST, + "string_list_val": ValueType.STRING_LIST, + "bytes_list_val": ValueType.BYTES_LIST, + "bool_list_val": ValueType.BOOL_LIST, + None: ValueType.NULL, +} + + def _proto_value_to_value_type(proto_value: ProtoValue) -> ValueType: """ Returns Feast ValueType given Feast ValueType string. @@ -534,25 +553,7 @@ def _proto_value_to_value_type(proto_value: ProtoValue) -> ValueType: A variant of ValueType. """ proto_str = proto_value.WhichOneof("val") - type_map = { - "int32_val": ValueType.INT32, - "int64_val": ValueType.INT64, - "double_val": ValueType.DOUBLE, - "float_val": ValueType.FLOAT, - "string_val": ValueType.STRING, - "bytes_val": ValueType.BYTES, - "bool_val": ValueType.BOOL, - "int32_list_val": ValueType.INT32_LIST, - "int64_list_val": ValueType.INT64_LIST, - "double_list_val": ValueType.DOUBLE_LIST, - "float_list_val": ValueType.FLOAT_LIST, - "string_list_val": ValueType.STRING_LIST, - "bytes_list_val": ValueType.BYTES_LIST, - "bool_list_val": ValueType.BOOL_LIST, - None: ValueType.NULL, - } - - return type_map[proto_str] + return PROTO_VALUE_TO_VALUE_TYPE_MAP[proto_str] def pa_to_feast_value_type(pa_type_as_str: str) -> ValueType: @@ -793,7 +794,7 @@ def _non_empty_value(value: Any) -> bool: String is special case: "" - empty string is considered non empty """ return value is not None and ( - not isinstance(value, Sized) or len(value) > 0 or isinstance(value, str) + not isinstance(value, Sized) or len(value) > 0 or isinstance(value, str) ) @@ -929,7 +930,7 @@ def pg_type_to_feast_value_type(type_str: str) -> ValueType: def feast_value_type_to_pa( - feast_type: ValueType, timestamp_unit: str = "us" + feast_type: ValueType, timestamp_unit: str = "us" ) -> "pyarrow.DataType": import pyarrow diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 4a1a727339d..e6e079b6923 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -209,19 +209,20 @@ def environment(request, worker_id): IntegrationTestRepoConfig, ) from tests.integration.feature_repos.universal.online_store.milvus import MilvusOnlineStoreCreator +from tests.integration.feature_repos.universal.data_sources.file import FileDataSourceCreator + @pytest.fixture def vectordb_environment(request, worker_id): milvus_config = IntegrationTestRepoConfig( - provider="milvus", + provider="local", online_store_creator=MilvusOnlineStoreCreator, - offline_store_creator='', - batch_engine={}) + offline_store_creator=FileDataSourceCreator, + ) e = construct_test_environment( milvus_config, - request.param, - worker_id=worker_id, fixture_request=request, + worker_id=worker_id, entity_key_serialization_version=3, ) @@ -233,7 +234,7 @@ def vectordb_environment(request, worker_id): else: yield e - e.teardown() + # e.teardown() _config_cache: Any = {} diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 7c7c5b31aee..daaa4c7f3e6 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -891,3 +891,40 @@ def test_retrieve_online_documents(vectordb_environment, fake_document_data): top_k=2, distance_metric="wrong", ).to_dict() + +@pytest.mark.integration +@pytest.mark.universal_online_stores(only=["milvus"]) +def test_retrieve_online_documents2(vectordb_environment, fake_document_data): + fs = vectordb_environment.feature_store + df, data_source = fake_document_data + item_embeddings_feature_view = create_item_embeddings_feature_view(data_source) + fs.apply([item_embeddings_feature_view, item()]) + print(df.head().T) + fs.write_to_online_store("item_embeddings", df) + # + # documents = fs.retrieve_online_documents( + # feature="item_embeddings:embedding_float", + # query=[1.0, 2.0], + # top_k=2, + # distance_metric="L2", + # ).to_dict() + # assert len(documents["embedding_float"]) == 2 + # + # # assert returned the entity_id + # assert len(documents["item_id"]) == 2 + # + # documents = fs.retrieve_online_documents( + # feature="item_embeddings:embedding_float", + # query=[1.0, 2.0], + # top_k=2, + # distance_metric="L1", + # ).to_dict() + # assert len(documents["embedding_float"]) == 2 + # + # with pytest.raises(ValueError): + # fs.retrieve_online_documents( + # feature="item_embeddings:embedding_float", + # query=[1.0, 2.0], + # top_k=2, + # distance_metric="wrong", + # ).to_dict() From 35557ce7d141080c7e6b841a5a3ff67e49c50914 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 14 Dec 2024 06:59:44 -0500 Subject: [PATCH 14/33] making some progress Signed-off-by: Francisco Javier Arceo --- .../online_stores/milvus_online_store/milvus.py | 13 ++++++++----- sdk/python/tests/conftest.py | 6 ++---- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 076d0ca54c7..86f759ed849 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -11,7 +11,6 @@ ) from feast import Entity -from feast.type_map import PROTO_VALUE_TO_VALUE_TYPE_MAP from feast.feature_view import FeatureView from feast.infra.infra_object import InfraObject from feast.infra.key_encoding_utils import ( @@ -25,11 +24,12 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.type_map import PROTO_VALUE_TO_VALUE_TYPE_MAP +from feast.types import VALUE_TYPES_TO_FEAST_TYPES, Array, PrimitiveFeastType, ValueType from feast.utils import ( _build_retrieve_online_document_record, to_naive_utc, ) -from feast.types import VALUE_TYPES_TO_FEAST_TYPES, PrimitiveFeastType, Array, ValueType PROTO_TO_MILVUS_TYPE_MAPPING = { PROTO_VALUE_TO_VALUE_TYPE_MAP['bytes_val']: DataType.STRING, @@ -75,6 +75,7 @@ class MilvusOnlineStoreConfig(FeastConfigBaseModel, VectorStoreConfig): metric_type: Optional[str] = "L2" embedding_dim: Optional[int] = 128 vector_enabled: Optional[bool] = True + nlist: Optional[int] = 128 class MilvusOnlineStore(OnlineStore): @@ -132,7 +133,7 @@ def _get_collection(self, config: RepoConfig, table: FeatureView) -> Collection: index_params = { "index_type": config.online_store.index_type, "metric_type": config.online_store.metric_type, - "params": {"nlist": 128}, + "params": {"nlist": config.online_store.nlist}, } for vector_field in schema.fields: if vector_field.dtype in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR]: @@ -160,10 +161,12 @@ def online_write_batch( entity_batch_to_insert = [] for entity_key, values_dict, timestamp, created_ts in data: + # need to construct the composite primary key also need to handle the fact that entities are a list entity_key_str = serialize_entity_key( entity_key, entity_key_serialization_version=config.entity_key_serialization_version, ).hex() + composite_key_name = '_'.join([str(value) for value in entity_key.join_keys]) + "_pk" timestamp_int = int(to_naive_utc(timestamp).timestamp() * 1e6) created_ts_int = ( int(to_naive_utc(created_ts).timestamp() * 1e6) if created_ts else 0 @@ -173,12 +176,12 @@ def online_write_batch( vector_list = getattr(values_dict[feature_name], vector_list_type_name, None) if vector_list: vector_values = getattr(values_dict[feature_name], vector_list_type_name).val - if vector_values != []: + if vector_values != []: # Note here we are over-writing the feature and collapsing the list into a single value values_dict[feature_name] = vector_values single_entity_record = { - "entity_key": entity_key_str, + composite_key_name: entity_key_str, "event_ts": timestamp_int, "created_ts": created_ts_int, } diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index e6e079b6923..0eac1a39d1b 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -205,11 +205,9 @@ def environment(request, worker_id): e.teardown() -from tests.integration.feature_repos.integration_test_repo_config import ( - IntegrationTestRepoConfig, +from tests.integration.feature_repos.universal.online_store.milvus import ( + MilvusOnlineStoreCreator, ) -from tests.integration.feature_repos.universal.online_store.milvus import MilvusOnlineStoreCreator -from tests.integration.feature_repos.universal.data_sources.file import FileDataSourceCreator @pytest.fixture From c54309a0779439ee4f6149a30cb17aac123002dc Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 14 Dec 2024 21:26:56 -0500 Subject: [PATCH 15/33] partially running Signed-off-by: Francisco Javier Arceo --- .../feast/infra/online_stores/milvus_online_store/milvus.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 86f759ed849..06a398c888c 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -96,11 +96,6 @@ def _connect(self, config: RepoConfig) -> connections: host=config.online_store.host, port=str(config.online_store.port), ) - self._conn = connections.connect( - alias="feast", - host=config.online_store.host, - port=str(config.online_store.port), - ) return self._conn def _get_collection(self, config: RepoConfig, table: FeatureView) -> Collection: From 0094e271ff32af84b30344efb53aef75b35e0c0c Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 15 Dec 2024 21:53:40 -0500 Subject: [PATCH 16/33] checking in progress...finding issues still Signed-off-by: Francisco Javier Arceo --- .../milvus_online_store/milvus.py | 21 ++++--------------- sdk/python/tests/conftest.py | 3 ++- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 06a398c888c..2a42f460e86 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -249,23 +249,10 @@ def update( collection.drop() self._collections.pop(collection_name, None) - def plan( - self, config: RepoConfig, desired_registry_proto: RegistryProto - ) -> List[InfraObject]: - project = config.project - - infra_objects: List[InfraObject] = [ - MilvusTable( - host=config.online_store.host, - port=config.online_store.port, - name=_table_id(project, FeatureView.from_proto(view)), - ) - for view in [ - *desired_registry_proto.feature_views, - *desired_registry_proto.stream_feature_views, - ] - ] - return infra_objects + # def plan( + # self, config: RepoConfig, desired_registry_proto: RegistryProto + # ) -> List[InfraObject]: + # raise NotImplementedError def teardown( self, diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 0eac1a39d1b..f60f741d973 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -214,6 +214,7 @@ def environment(request, worker_id): def vectordb_environment(request, worker_id): milvus_config = IntegrationTestRepoConfig( provider="local", + online_store="milvus", online_store_creator=MilvusOnlineStoreCreator, offline_store_creator=FileDataSourceCreator, ) @@ -232,7 +233,7 @@ def vectordb_environment(request, worker_id): else: yield e - # e.teardown() + e.teardown() _config_cache: Any = {} From 25a065e94d8fff8cee72bcf5fdd02be2b7ba33aa Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 16 Dec 2024 14:15:00 -0500 Subject: [PATCH 17/33] have the apply working Signed-off-by: Francisco Javier Arceo --- .../milvus_online_store/milvus.py | 104 ++++++++++++------ sdk/python/feast/type_map.py | 34 +++--- sdk/python/tests/conftest.py | 8 +- .../online_store/test_universal_online.py | 6 +- 4 files changed, 95 insertions(+), 57 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 2a42f460e86..9a41f216a27 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -20,7 +20,6 @@ from feast.infra.online_stores.online_store import OnlineStore from feast.infra.online_stores.vector_store import VectorStoreConfig from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto -from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig @@ -32,18 +31,18 @@ ) PROTO_TO_MILVUS_TYPE_MAPPING = { - PROTO_VALUE_TO_VALUE_TYPE_MAP['bytes_val']: DataType.STRING, - PROTO_VALUE_TO_VALUE_TYPE_MAP['bool_val']: DataType.BOOL, - PROTO_VALUE_TO_VALUE_TYPE_MAP['string_val']: DataType.STRING, - PROTO_VALUE_TO_VALUE_TYPE_MAP['float_val']: DataType.FLOAT, - PROTO_VALUE_TO_VALUE_TYPE_MAP['double_val']: DataType.DOUBLE, - PROTO_VALUE_TO_VALUE_TYPE_MAP['int32_val']: DataType.INT32, - PROTO_VALUE_TO_VALUE_TYPE_MAP['int64_val']: DataType.INT64, - PROTO_VALUE_TO_VALUE_TYPE_MAP['float_list_val']: DataType.FLOAT_VECTOR, - PROTO_VALUE_TO_VALUE_TYPE_MAP['int32_list_val']: DataType.FLOAT_VECTOR, - PROTO_VALUE_TO_VALUE_TYPE_MAP['int64_list_val']: DataType.FLOAT_VECTOR, - PROTO_VALUE_TO_VALUE_TYPE_MAP['double_list_val']: DataType.FLOAT_VECTOR, - PROTO_VALUE_TO_VALUE_TYPE_MAP['bool_list_val']: DataType.BINARY_VECTOR, + PROTO_VALUE_TO_VALUE_TYPE_MAP["bytes_val"]: DataType.STRING, + PROTO_VALUE_TO_VALUE_TYPE_MAP["bool_val"]: DataType.BOOL, + PROTO_VALUE_TO_VALUE_TYPE_MAP["string_val"]: DataType.STRING, + PROTO_VALUE_TO_VALUE_TYPE_MAP["float_val"]: DataType.FLOAT, + PROTO_VALUE_TO_VALUE_TYPE_MAP["double_val"]: DataType.DOUBLE, + PROTO_VALUE_TO_VALUE_TYPE_MAP["int32_val"]: DataType.INT32, + PROTO_VALUE_TO_VALUE_TYPE_MAP["int64_val"]: DataType.INT64, + PROTO_VALUE_TO_VALUE_TYPE_MAP["float_list_val"]: DataType.FLOAT_VECTOR, + PROTO_VALUE_TO_VALUE_TYPE_MAP["int32_list_val"]: DataType.FLOAT_VECTOR, + PROTO_VALUE_TO_VALUE_TYPE_MAP["int64_list_val"]: DataType.FLOAT_VECTOR, + PROTO_VALUE_TO_VALUE_TYPE_MAP["double_list_val"]: DataType.FLOAT_VECTOR, + PROTO_VALUE_TO_VALUE_TYPE_MAP["bool_list_val"]: DataType.BINARY_VECTOR, } FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING = {} @@ -56,11 +55,17 @@ elif isinstance(feast_type, Array): base_type = feast_type.base_type base_value_type = base_type.to_value_type() - if base_value_type in [ValueType.INT32, ValueType.INT64, ValueType.FLOAT, ValueType.DOUBLE]: + if base_value_type in [ + ValueType.INT32, + ValueType.INT64, + ValueType.FLOAT, + ValueType.DOUBLE, + ]: FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING[feast_type] = DataType.FLOAT_VECTOR elif base_value_type == ValueType.BOOL: FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING[feast_type] = DataType.BINARY_VECTOR + class MilvusOnlineStoreConfig(FeastConfigBaseModel, VectorStoreConfig): """ Configuration for the Milvus online store. @@ -91,11 +96,12 @@ class MilvusOnlineStore(OnlineStore): def _connect(self, config: RepoConfig) -> connections: if not self._conn: - self._conn = connections.connect( - alias="feast", - host=config.online_store.host, - port=str(config.online_store.port), - ) + if not connections.has_connection("feast"): + self._conn = connections.connect( + alias="feast", + host=config.online_store.host, + port=str(config.online_store.port), + ) return self._conn def _get_collection(self, config: RepoConfig, table: FeatureView) -> Collection: @@ -104,25 +110,43 @@ def _get_collection(self, config: RepoConfig, table: FeatureView) -> Collection: self._connect(config) # Create a composite key by combining entity fields - composite_key_name = '_'.join([field.name for field in table.entity_columns]) + "_pk" + composite_key_name = ( + "_".join([field.name for field in table.entity_columns]) + "_pk" + ) fields = [ - FieldSchema(name=composite_key_name, dtype=DataType.VARCHAR, max_length=512, is_primary=True), + FieldSchema( + name=composite_key_name, + dtype=DataType.VARCHAR, + max_length=512, + is_primary=True, + ), FieldSchema(name="event_ts", dtype=DataType.INT64), FieldSchema(name="created_ts", dtype=DataType.INT64), ] - fields_to_exclude = [field.name for field in table.entity_columns] + ['event_ts', 'created_ts'] + fields_to_exclude = [field.name for field in table.entity_columns] + [ + "event_ts", + "created_ts", + ] fields_to_add = [f for f in table.schema if f.name not in fields_to_exclude] for field in fields_to_add: dtype = FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING.get(field.dtype) if dtype: if dtype == DataType.FLOAT_VECTOR: - fields.append(FieldSchema(name=field.name, dtype=dtype, dim=config.online_store.embedding_dim)) + fields.append( + FieldSchema( + name=field.name, + dtype=dtype, + dim=config.online_store.embedding_dim, + ) + ) else: fields.append(FieldSchema(name=field.name, dtype=dtype)) - schema = CollectionSchema(fields=fields, description="Feast feature view data") + schema = CollectionSchema( + fields=fields, description="Feast feature view data" + ) collection = Collection(name=collection_name, schema=schema, using="feast") if not collection.has_index(): index_params = { @@ -131,8 +155,13 @@ def _get_collection(self, config: RepoConfig, table: FeatureView) -> Collection: "params": {"nlist": config.online_store.nlist}, } for vector_field in schema.fields: - if vector_field.dtype in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR]: - collection.create_index(field_name=vector_field.name, index_params=index_params) + if vector_field.dtype in [ + DataType.FLOAT_VECTOR, + DataType.BINARY_VECTOR, + ]: + collection.create_index( + field_name=vector_field.name, index_params=index_params + ) collection.load() self._collections[collection_name] = collection return self._collections[collection_name] @@ -152,7 +181,11 @@ def online_write_batch( progress: Optional[Callable[[int], Any]], ) -> None: collection = self._get_collection(config, table) - numeric_vector_list_types = [k for k in PROTO_VALUE_TO_VALUE_TYPE_MAP.keys() if k is not None and 'list' in k and 'string' not in k] + numeric_vector_list_types = [ + k + for k in PROTO_VALUE_TO_VALUE_TYPE_MAP.keys() + if k is not None and "list" in k and "string" not in k + ] entity_batch_to_insert = [] for entity_key, values_dict, timestamp, created_ts in data: @@ -161,16 +194,22 @@ def online_write_batch( entity_key, entity_key_serialization_version=config.entity_key_serialization_version, ).hex() - composite_key_name = '_'.join([str(value) for value in entity_key.join_keys]) + "_pk" + composite_key_name = ( + "_".join([str(value) for value in entity_key.join_keys]) + "_pk" + ) timestamp_int = int(to_naive_utc(timestamp).timestamp() * 1e6) created_ts_int = ( int(to_naive_utc(created_ts).timestamp() * 1e6) if created_ts else 0 ) for feature_name in values_dict: for vector_list_type_name in numeric_vector_list_types: - vector_list = getattr(values_dict[feature_name], vector_list_type_name, None) + vector_list = getattr( + values_dict[feature_name], vector_list_type_name, None + ) if vector_list: - vector_values = getattr(values_dict[feature_name], vector_list_type_name).val + vector_values = getattr( + values_dict[feature_name], vector_list_type_name + ).val if vector_values != []: # Note here we are over-writing the feature and collapsing the list into a single value values_dict[feature_name] = vector_values @@ -262,11 +301,10 @@ def teardown( ): self._connect(config) for table in tables: - collection_name = _table_id(config.project, table) - collection = Collection(name=collection_name) + collection = self._get_collection(config, table) if collection: collection.drop() - self._collections.pop(collection_name, None) + self._collections.pop(collection.name, None) def retrieve_online_documents( self, diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 1c9d99d19f2..b41d0c98422 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -119,10 +119,10 @@ def feast_value_type_to_pandas_type(value_type: ValueType) -> Any: def python_type_to_feast_value_type( - name: str, - value: Optional[Any] = None, - recurse: bool = True, - type_name: Optional[str] = None, + name: str, + value: Optional[Any] = None, + recurse: bool = True, + type_name: Optional[str] = None, ) -> ValueType: """ Finds the equivalent Feast Value Type for a Python value. Both native @@ -197,8 +197,8 @@ def python_type_to_feast_value_type( ) # Validate whether the type stays consistent if ( - common_item_value_type - and not common_item_value_type == current_item_value_type + common_item_value_type + and not common_item_value_type == current_item_value_type ): raise ValueError( f"List value type for field {name} is inconsistent. " @@ -217,7 +217,7 @@ def python_type_to_feast_value_type( def python_values_to_feast_value_type( - name: str, values: Any, recurse: bool = True + name: str, values: Any, recurse: bool = True ) -> ValueType: inferred_dtype = ValueType.UNKNOWN for row in values: @@ -229,8 +229,8 @@ def python_values_to_feast_value_type( inferred_dtype = current_dtype else: if current_dtype != inferred_dtype and current_dtype not in ( - ValueType.UNKNOWN, - ValueType.NULL, + ValueType.UNKNOWN, + ValueType.NULL, ): raise TypeError( f"Input entity {name} has mixed types, {current_dtype} and {inferred_dtype}. That is not allowed. " @@ -318,7 +318,7 @@ def _type_err(item, dtype): def _python_datetime_to_int_timestamp( - values: Sequence[Any], + values: Sequence[Any], ) -> Sequence[Union[int, np.int_]]: # Fast path for Numpy array. if isinstance(values, np.ndarray) and isinstance(values.dtype, np.datetime64): @@ -342,7 +342,7 @@ def _python_datetime_to_int_timestamp( def _python_value_to_proto_value( - feast_value_type: ValueType, values: List[Any] + feast_value_type: ValueType, values: List[Any] ) -> List[ProtoValue]: """ Converts a Python (native, pandas) value to a Feast Proto Value based @@ -387,7 +387,7 @@ def _python_value_to_proto_value( raise _type_err(sample, valid_types[0]) if sample is not None and not all( - type(item) in valid_types for item in sample + type(item) in valid_types for item in sample ): # to_numpy() in utils._convert_arrow_to_proto() upcasts values of type Array of INT32 or INT64 with NULL values to Float64 automatically. for item in sample: @@ -459,11 +459,11 @@ def _python_value_to_proto_value( # So, if value is 0, type validation must pass if scalar_types are either int or float. allowed_types = {np.int64, int, np.float64, float} assert ( - type(sample) in allowed_types + type(sample) in allowed_types ), f"Type `{type(sample)}` not in {allowed_types}" else: assert ( - type(sample) in valid_scalar_types + type(sample) in valid_scalar_types ), f"Type `{type(sample)}` not in {valid_scalar_types}" if feast_value_type == ValueType.BOOL: # ProtoValue does not support conversion of np.bool_ so we need to convert it to support np.bool_. @@ -496,7 +496,7 @@ def _python_value_to_proto_value( def python_values_to_proto_values( - values: List[Any], feature_type: ValueType = ValueType.UNKNOWN + values: List[Any], feature_type: ValueType = ValueType.UNKNOWN ) -> List[ProtoValue]: value_type = feature_type sample = next(filter(_non_empty_value, values), None) # first not empty value @@ -794,7 +794,7 @@ def _non_empty_value(value: Any) -> bool: String is special case: "" - empty string is considered non empty """ return value is not None and ( - not isinstance(value, Sized) or len(value) > 0 or isinstance(value, str) + not isinstance(value, Sized) or len(value) > 0 or isinstance(value, str) ) @@ -930,7 +930,7 @@ def pg_type_to_feast_value_type(type_str: str) -> ValueType: def feast_value_type_to_pa( - feast_type: ValueType, timestamp_unit: str = "us" + feast_type: ValueType, timestamp_unit: str = "us" ) -> "pyarrow.DataType": import pyarrow diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index f60f741d973..cc95d03a2c8 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -56,6 +56,9 @@ driver, location, ) +from tests.integration.feature_repos.universal.online_store.milvus import ( + MilvusOnlineStoreCreator, +) from tests.utils.auth_permissions_util import default_store from tests.utils.http_server import check_port_open, free_port # noqa: E402 from tests.utils.ssl_certifcates_util import ( @@ -205,11 +208,6 @@ def environment(request, worker_id): e.teardown() -from tests.integration.feature_repos.universal.online_store.milvus import ( - MilvusOnlineStoreCreator, -) - - @pytest.fixture def vectordb_environment(request, worker_id): milvus_config = IntegrationTestRepoConfig( diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index daaa4c7f3e6..c6bbb177b94 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -857,7 +857,9 @@ def assert_feature_service_entity_mapping_correctness( @pytest.mark.integration -@pytest.mark.universal_online_stores(only=["pgvector", "elasticsearch", "qdrant", "milvus"]) +@pytest.mark.universal_online_stores( + only=["pgvector", "elasticsearch", "qdrant", "milvus"] +) def test_retrieve_online_documents(vectordb_environment, fake_document_data): fs = vectordb_environment.feature_store df, data_source = fake_document_data @@ -892,6 +894,7 @@ def test_retrieve_online_documents(vectordb_environment, fake_document_data): distance_metric="wrong", ).to_dict() + @pytest.mark.integration @pytest.mark.universal_online_stores(only=["milvus"]) def test_retrieve_online_documents2(vectordb_environment, fake_document_data): @@ -899,7 +902,6 @@ def test_retrieve_online_documents2(vectordb_environment, fake_document_data): df, data_source = fake_document_data item_embeddings_feature_view = create_item_embeddings_feature_view(data_source) fs.apply([item_embeddings_feature_view, item()]) - print(df.head().T) fs.write_to_online_store("item_embeddings", df) # # documents = fs.retrieve_online_documents( From f1a92e00653690e14970b37a5e86feccf6936b56 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 16 Dec 2024 15:44:35 -0500 Subject: [PATCH 18/33] adjusting some type issues Signed-off-by: Francisco Javier Arceo --- .../milvus_online_store/milvus.py | 20 +++++++++---------- sdk/python/feast/type_map.py | 3 ++- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 9a41f216a27..4c66238d600 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple +from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union from pydantic import StrictStr from pymilvus import ( @@ -24,13 +24,13 @@ from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.type_map import PROTO_VALUE_TO_VALUE_TYPE_MAP -from feast.types import VALUE_TYPES_TO_FEAST_TYPES, Array, PrimitiveFeastType, ValueType +from feast.types import VALUE_TYPES_TO_FEAST_TYPES, Array, PrimitiveFeastType, ComplexFeastType, ValueType from feast.utils import ( _build_retrieve_online_document_record, to_naive_utc, ) -PROTO_TO_MILVUS_TYPE_MAPPING = { +PROTO_TO_MILVUS_TYPE_MAPPING: Dict[ValueType, DataType] = { PROTO_VALUE_TO_VALUE_TYPE_MAP["bytes_val"]: DataType.STRING, PROTO_VALUE_TO_VALUE_TYPE_MAP["bool_val"]: DataType.BOOL, PROTO_VALUE_TO_VALUE_TYPE_MAP["string_val"]: DataType.STRING, @@ -45,7 +45,7 @@ PROTO_VALUE_TO_VALUE_TYPE_MAP["bool_list_val"]: DataType.BINARY_VECTOR, } -FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING = {} +FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING: Dict[Union[PrimitiveFeastType, Array, ComplexFeastType], DataType] = {} for value_type, feast_type in VALUE_TYPES_TO_FEAST_TYPES.items(): if isinstance(feast_type, PrimitiveFeastType): @@ -288,10 +288,10 @@ def update( collection.drop() self._collections.pop(collection_name, None) - # def plan( - # self, config: RepoConfig, desired_registry_proto: RegistryProto - # ) -> List[InfraObject]: - # raise NotImplementedError + def plan( + self, config: RepoConfig, desired_registry_proto: RegistryProto + ) -> List[InfraObject]: + raise NotImplementedError def teardown( self, @@ -397,11 +397,11 @@ def _connect(self): def to_infra_object_proto(self) -> InfraObjectProto: # Implement serialization if needed - pass + raise NotImplementedError def update(self): # Implement update logic if needed - pass + raise NotImplementedError def teardown(self): collection = Collection(name=self.name) diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index b41d0c98422..000e9cdae4e 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -538,7 +538,6 @@ def python_values_to_proto_values( "string_list_val": ValueType.STRING_LIST, "bytes_list_val": ValueType.BYTES_LIST, "bool_list_val": ValueType.BOOL_LIST, - None: ValueType.NULL, } @@ -553,6 +552,8 @@ def _proto_value_to_value_type(proto_value: ProtoValue) -> ValueType: A variant of ValueType. """ proto_str = proto_value.WhichOneof("val") + if proto_str is None: + return ValueType.UNKNOWN return PROTO_VALUE_TO_VALUE_TYPE_MAP[proto_str] From 3282a05e2eab8c519f0b9df5cab87cb4e18b7985 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 16 Dec 2024 16:16:31 -0500 Subject: [PATCH 19/33] updated and removed test Signed-off-by: Francisco Javier Arceo --- .../milvus_online_store/milvus.py | 1 + .../online_store/test_online_retrieval.py | 97 ------------------- 2 files changed, 1 insertion(+), 97 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 4c66238d600..4765d04bb86 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -20,6 +20,7 @@ from feast.infra.online_stores.online_store import OnlineStore from feast.infra.online_stores.vector_store import VectorStoreConfig from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig diff --git a/sdk/python/tests/unit/online_store/test_online_retrieval.py b/sdk/python/tests/unit/online_store/test_online_retrieval.py index 602715e5df8..f9abd3b1509 100644 --- a/sdk/python/tests/unit/online_store/test_online_retrieval.py +++ b/sdk/python/tests/unit/online_store/test_online_retrieval.py @@ -564,100 +564,3 @@ def test_sqlite_vec_import() -> None: result = [(rowid, round(distance, 2)) for rowid, distance in result] assert result == [(2, 2.39), (1, 2.39)] - -def test_milvus_get_online_documents() -> None: - """ - Test retrieving documents from the online store in local mode using Milvus. - """ - n = 10 # number of samples - note: we'll actually double it - vector_length = 8 - runner = CliRunner() - with runner.local_repo( - get_example_repo("example_feature_repo_1.py"), "file" - ) as store: - # Configure the online store to use Milvus - new_config = RepoConfig( - project=store.config.project, - registry=store.config.registry, - provider=store.config.provider, - online_store=MilvusOnlineStoreConfig( - type="milvus", - host="localhost", - port=19530, - index_type="IVF_FLAT", - metric_type="L2", - embedding_dim=vector_length, - vector_enabled=True, - ), - entity_key_serialization_version=store.config.entity_key_serialization_version, - ) - store = FeatureStore(config=new_config, repo_path=store.repo_path) - # Apply the new configuration - store.apply([]) - - # Write some data to the feature view - document_embeddings_fv = store.get_feature_view(name="document_embeddings") - - provider: Provider = store._get_provider() - - item_keys = [ - EntityKeyProto( - join_keys=["item_id"], entity_values=[ValueProto.Value(int64_val=i)] - ) - for i in range(n) - ] - data = [] - for item_key in item_keys: - embedding_vector = np.random.random(vector_length).tolist() - data.append( - ( - item_key, - { - "Embeddings": ValueProto.Value( - float_list_val=FloatListProto(val=embedding_vector) - ) - }, - _utc_now(), - _utc_now(), - ) - ) - - provider.online_write_batch( - config=store.config, - table=document_embeddings_fv, - data=data, - progress=None, - ) - - documents_df = pd.DataFrame( - { - "item_id": [i for i in range(n)], - "Embeddings": [ - np.random.random(vector_length).tolist() for _ in range(n) - ], - "event_timestamp": [_utc_now() for _ in range(n)], - } - ) - - store.write_to_online_store( - feature_view_name="document_embeddings", - df=documents_df, - ) - - # For Milvus, get the collection and check the number of entities - collection = provider._online_store._get_collection( - store.config, document_embeddings_fv - ) - record_count = collection.num_entities - assert record_count == len(data) + documents_df.shape[0] - - query_embedding = np.random.random(vector_length).tolist() - - # Retrieve online documents using Milvus - result = store.retrieve_online_documents( - feature="document_embeddings:Embeddings", query=query_embedding, top_k=3 - ).to_dict() - - assert "Embeddings" in result - assert "distance" in result - assert len(result["distance"]) == 3 From 2dee46298ae2e2ba12b34d31f96cdc44c33f754e Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 16 Dec 2024 23:14:41 -0500 Subject: [PATCH 20/33] have things behaving with the enviornment arg Signed-off-by: Francisco Javier Arceo --- Makefile | 2 +- .../online_stores/milvus_online_store/milvus.py | 1 - sdk/python/tests/conftest.py | 13 +++++++------ .../online_store/test_universal_online.py | 7 ++++--- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index 84bc594a6ad..ef1386fd1a5 100644 --- a/Makefile +++ b/Makefile @@ -358,7 +358,7 @@ test-python-universal-milvus-online: FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.milvus_online_store.milvus_repo_configuration \ PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.milvus \ python -m pytest -n 8 --integration \ - -k "test_retrieve_online_documents" \ + -k "test_retrieve_online_documents2" \ sdk/python/tests --ignore=sdk/python/tests/integration/offline_store/test_dqm_validation.py test-python-universal-singlestore-online: diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 4765d04bb86..ac323b0e3e5 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -187,7 +187,6 @@ def online_write_batch( for k in PROTO_VALUE_TO_VALUE_TYPE_MAP.keys() if k is not None and "list" in k and "string" not in k ] - entity_batch_to_insert = [] for entity_key, values_dict, timestamp, created_ts in data: # need to construct the composite primary key also need to handle the fact that entities are a list diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index cc95d03a2c8..0163e7575b7 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -208,16 +208,18 @@ def environment(request, worker_id): e.teardown() + @pytest.fixture def vectordb_environment(request, worker_id): - milvus_config = IntegrationTestRepoConfig( + db_config = IntegrationTestRepoConfig( provider="local", - online_store="milvus", - online_store_creator=MilvusOnlineStoreCreator, - offline_store_creator=FileDataSourceCreator, + # online_store="milvus", + # online_store_creator=MilvusOnlineStoreCreator, + # offline_store_creator=FileDataSourceCreator, ) + print(request) e = construct_test_environment( - milvus_config, + db_config, fixture_request=request, worker_id=worker_id, entity_key_serialization_version=3, @@ -233,7 +235,6 @@ def vectordb_environment(request, worker_id): e.teardown() - _config_cache: Any = {} diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index c6bbb177b94..6a6796e3535 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -858,7 +858,7 @@ def assert_feature_service_entity_mapping_correctness( @pytest.mark.integration @pytest.mark.universal_online_stores( - only=["pgvector", "elasticsearch", "qdrant", "milvus"] + only=["pgvector", "elasticsearch", "qdrant"] ) def test_retrieve_online_documents(vectordb_environment, fake_document_data): fs = vectordb_environment.feature_store @@ -897,8 +897,9 @@ def test_retrieve_online_documents(vectordb_environment, fake_document_data): @pytest.mark.integration @pytest.mark.universal_online_stores(only=["milvus"]) -def test_retrieve_online_documents2(vectordb_environment, fake_document_data): - fs = vectordb_environment.feature_store +def test_retrieve_online_documents2(environment, fake_document_data): + print(environment.online_store) + fs = environment.feature_store df, data_source = fake_document_data item_embeddings_feature_view = create_item_embeddings_feature_view(data_source) fs.apply([item_embeddings_feature_view, item()]) From be8c5bc04513c5446624c584baabd1f90b853c8c Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 17 Dec 2024 16:28:41 -0500 Subject: [PATCH 21/33] updated milvus Signed-off-by: Francisco Javier Arceo --- .../milvus_online_store/milvus.py | 13 +++++++++++-- .../universal/online_store/milvus.py | 1 + .../online_store/test_universal_online.py | 19 ++++++++----------- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index ac323b0e3e5..5ac750f2bd3 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -25,7 +25,13 @@ from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.type_map import PROTO_VALUE_TO_VALUE_TYPE_MAP -from feast.types import VALUE_TYPES_TO_FEAST_TYPES, Array, PrimitiveFeastType, ComplexFeastType, ValueType +from feast.types import ( + VALUE_TYPES_TO_FEAST_TYPES, + Array, + ComplexFeastType, + PrimitiveFeastType, + ValueType, +) from feast.utils import ( _build_retrieve_online_document_record, to_naive_utc, @@ -46,7 +52,9 @@ PROTO_VALUE_TO_VALUE_TYPE_MAP["bool_list_val"]: DataType.BINARY_VECTOR, } -FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING: Dict[Union[PrimitiveFeastType, Array, ComplexFeastType], DataType] = {} +FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING: Dict[ + Union[PrimitiveFeastType, Array, ComplexFeastType], DataType +] = {} for value_type, feast_type in VALUE_TYPES_TO_FEAST_TYPES.items(): if isinstance(feast_type, PrimitiveFeastType): @@ -267,6 +275,7 @@ def online_read( results.append((None, None)) else: results.append((res_ts, res_dict)) + return results def update( diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py b/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py index 8ffee04c12f..7de60db2daf 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py @@ -32,4 +32,5 @@ def create_online_store(self) -> Dict[str, Any]: } def teardown(self): + # assert 1 == 4 self.container.stop() diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 6a6796e3535..b6c8756144e 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -857,9 +857,7 @@ def assert_feature_service_entity_mapping_correctness( @pytest.mark.integration -@pytest.mark.universal_online_stores( - only=["pgvector", "elasticsearch", "qdrant"] -) +@pytest.mark.universal_online_stores(only=["pgvector", "elasticsearch", "qdrant"]) def test_retrieve_online_documents(vectordb_environment, fake_document_data): fs = vectordb_environment.feature_store df, data_source = fake_document_data @@ -904,14 +902,13 @@ def test_retrieve_online_documents2(environment, fake_document_data): item_embeddings_feature_view = create_item_embeddings_feature_view(data_source) fs.apply([item_embeddings_feature_view, item()]) fs.write_to_online_store("item_embeddings", df) - # - # documents = fs.retrieve_online_documents( - # feature="item_embeddings:embedding_float", - # query=[1.0, 2.0], - # top_k=2, - # distance_metric="L2", - # ).to_dict() - # assert len(documents["embedding_float"]) == 2 + documents = fs.retrieve_online_documents( + feature="item_embeddings:embedding_float", + query=[1.0, 2.0], + top_k=2, + distance_metric="L2", + ).to_dict() + assert len(documents["embedding_float"]) == 2 # # # assert returned the entity_id # assert len(documents["item_id"]) == 2 From dbc11e29d98f863e9ce15aec94778a92a8424031 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 18 Dec 2024 17:07:02 -0500 Subject: [PATCH 22/33] almost have retrieval working, having to make a lot of changes to online retrieval. long term this can all go in the FeatureView class and in get_online_features Signed-off-by: Francisco Javier Arceo --- .../milvus_online_store/milvus.py | 81 ++++++++----------- sdk/python/tests/conftest.py | 5 +- sdk/python/tests/foo_provider.py | 1 + .../online_store/test_universal_online.py | 7 +- .../online_store/test_online_retrieval.py | 3 - 5 files changed, 40 insertions(+), 57 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 5ac750f2bd3..e7ae51979eb 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -38,9 +38,9 @@ ) PROTO_TO_MILVUS_TYPE_MAPPING: Dict[ValueType, DataType] = { - PROTO_VALUE_TO_VALUE_TYPE_MAP["bytes_val"]: DataType.STRING, + PROTO_VALUE_TO_VALUE_TYPE_MAP["bytes_val"]: DataType.VARCHAR, PROTO_VALUE_TO_VALUE_TYPE_MAP["bool_val"]: DataType.BOOL, - PROTO_VALUE_TO_VALUE_TYPE_MAP["string_val"]: DataType.STRING, + PROTO_VALUE_TO_VALUE_TYPE_MAP["string_val"]: DataType.VARCHAR, PROTO_VALUE_TO_VALUE_TYPE_MAP["float_val"]: DataType.FLOAT, PROTO_VALUE_TO_VALUE_TYPE_MAP["double_val"]: DataType.DOUBLE, PROTO_VALUE_TO_VALUE_TYPE_MAP["int32_val"]: DataType.INT32, @@ -71,6 +71,8 @@ ValueType.DOUBLE, ]: FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING[feast_type] = DataType.FLOAT_VECTOR + elif base_value_type == ValueType.STRING: + FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING[feast_type] = DataType.VARCHAR elif base_value_type == ValueType.BOOL: FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING[feast_type] = DataType.BINARY_VECTOR @@ -149,7 +151,14 @@ def _get_collection(self, config: RepoConfig, table: FeatureView) -> Collection: dim=config.online_store.embedding_dim, ) ) - + elif dtype == DataType.VARCHAR: + fields.append( + FieldSchema( + name=field.name, + dtype=dtype, + max_length=512, + ) + ) else: fields.append(FieldSchema(name=field.name, dtype=dtype)) @@ -210,17 +219,14 @@ def online_write_batch( int(to_naive_utc(created_ts).timestamp() * 1e6) if created_ts else 0 ) for feature_name in values_dict: - for vector_list_type_name in numeric_vector_list_types: - vector_list = getattr( - values_dict[feature_name], vector_list_type_name, None - ) - if vector_list: - vector_values = getattr( - values_dict[feature_name], vector_list_type_name - ).val - if vector_values != []: - # Note here we are over-writing the feature and collapsing the list into a single value - values_dict[feature_name] = vector_values + feature_values = values_dict[feature_name] + for proto_val_type in PROTO_VALUE_TO_VALUE_TYPE_MAP: + if feature_values.HasField(proto_val_type): + if proto_val_type in numeric_vector_list_types: + vector_values = getattr(feature_values, proto_val_type).val + else: + vector_values = getattr(feature_values, proto_val_type) + values_dict[feature_name] = vector_values single_entity_record = { composite_key_name: entity_key_str, @@ -243,40 +249,7 @@ def online_read( entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: - collection = self._get_collection(config, table) - results = [] - - for entity_key in entity_keys: - entity_key_str = serialize_entity_key( - entity_key, - entity_key_serialization_version=config.entity_key_serialization_version, - ).hex() - expr = f"entity_key == '{entity_key_str}'" - if requested_features: - features_str = ", ".join([f"'{f}'" for f in requested_features]) - expr += f" && feature_name in [{features_str}]" - - res = collection.query( - expr, - output_fields=["feature_name", "value", "event_ts"], - consistency_level="Strong", - ) - - res_dict = {} - res_ts = None - for r in res: - feature_name = r["feature_name"] - val_bin = r["value"] - val = ValueProto() - val.ParseFromString(val_bin) - res_dict[feature_name] = val - res_ts = datetime.fromtimestamp(r["event_ts"] / 1e6) - if not res_dict: - results.append((None, None)) - else: - results.append((res_ts, res_dict)) - - return results + raise NotImplementedError def update( self, @@ -320,6 +293,7 @@ def retrieve_online_documents( config: RepoConfig, table: FeatureView, requested_feature: str, + requested_features: List[str], embedding: List[float], top_k: int, distance_metric: Optional[str] = None, @@ -342,13 +316,22 @@ def retrieve_online_documents( } expr = f"feature_name == '{requested_feature}'" + composite_key_name = ( + "_".join([str(value) for value in table.entity_columns]) + "_pk" + ) + if requested_features: + features_str = ", ".join([f"'{f}'" for f in requested_features]) + expr += f" && feature_name in [{features_str}]" + results = collection.search( data=[embedding], anns_field="vector_value", param=search_params, limit=top_k, expr=expr, - output_fields=["entity_key", "value", "event_ts"], + output_fields=[composite_key_name] + + requested_features + + ["created_ts", "event_ts"], consistency_level="Strong", ) diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 0163e7575b7..c7116d4e313 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -56,9 +56,6 @@ driver, location, ) -from tests.integration.feature_repos.universal.online_store.milvus import ( - MilvusOnlineStoreCreator, -) from tests.utils.auth_permissions_util import default_store from tests.utils.http_server import check_port_open, free_port # noqa: E402 from tests.utils.ssl_certifcates_util import ( @@ -208,7 +205,6 @@ def environment(request, worker_id): e.teardown() - @pytest.fixture def vectordb_environment(request, worker_id): db_config = IntegrationTestRepoConfig( @@ -235,6 +231,7 @@ def vectordb_environment(request, worker_id): e.teardown() + _config_cache: Any = {} diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index 570a6d4f8d5..3d1f9219991 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -150,6 +150,7 @@ def retrieve_online_documents( config: RepoConfig, table: FeatureView, requested_feature: str, + requested_features: Optional[List[str]], query: List[float], top_k: int, distance_metric: Optional[str] = None, diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index b6c8756144e..5de41062f1f 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -903,7 +903,12 @@ def test_retrieve_online_documents2(environment, fake_document_data): fs.apply([item_embeddings_feature_view, item()]) fs.write_to_online_store("item_embeddings", df) documents = fs.retrieve_online_documents( - feature="item_embeddings:embedding_float", + feature=None, + features=[ + "item_embeddings:embedding_float", + "item_embeddings:item_id", + "item_embeddings:string_feature", + ], query=[1.0, 2.0], top_k=2, distance_metric="L2", diff --git a/sdk/python/tests/unit/online_store/test_online_retrieval.py b/sdk/python/tests/unit/online_store/test_online_retrieval.py index f9abd3b1509..83184643f35 100644 --- a/sdk/python/tests/unit/online_store/test_online_retrieval.py +++ b/sdk/python/tests/unit/online_store/test_online_retrieval.py @@ -12,8 +12,6 @@ from feast import FeatureStore, RepoConfig from feast.errors import FeatureViewNotFoundException -from feast.infra.online_stores.milvus_online_store.milvus import MilvusOnlineStoreConfig -from feast.infra.provider import Provider from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import FloatList as FloatListProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto @@ -563,4 +561,3 @@ def test_sqlite_vec_import() -> None: """).fetchall() result = [(rowid, round(distance, 2)) for rowid, distance in result] assert result == [(2, 2.39), (1, 2.39)] - From 37d93f1264aa0a692a11ff5c5696265793efd9e7 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 18 Dec 2024 22:20:00 -0500 Subject: [PATCH 23/33] almost have deserialization from the search results done Signed-off-by: Francisco Javier Arceo --- .../milvus_online_store/milvus.py | 78 +++++++++++++------ 1 file changed, 55 insertions(+), 23 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index e7ae51979eb..ab41072c842 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -135,7 +135,7 @@ def _get_collection(self, config: RepoConfig, table: FeatureView) -> Collection: FieldSchema(name="event_ts", dtype=DataType.INT64), FieldSchema(name="created_ts", dtype=DataType.INT64), ] - fields_to_exclude = [field.name for field in table.entity_columns] + [ + fields_to_exclude = [ "event_ts", "created_ts", ] @@ -199,11 +199,6 @@ def online_write_batch( progress: Optional[Callable[[int], Any]], ) -> None: collection = self._get_collection(config, table) - numeric_vector_list_types = [ - k - for k in PROTO_VALUE_TO_VALUE_TYPE_MAP.keys() - if k is not None and "list" in k and "string" not in k - ] entity_batch_to_insert = [] for entity_key, values_dict, timestamp, created_ts in data: # need to construct the composite primary key also need to handle the fact that entities are a list @@ -218,15 +213,11 @@ def online_write_batch( created_ts_int = ( int(to_naive_utc(created_ts).timestamp() * 1e6) if created_ts else 0 ) - for feature_name in values_dict: - feature_values = values_dict[feature_name] - for proto_val_type in PROTO_VALUE_TO_VALUE_TYPE_MAP: - if feature_values.HasField(proto_val_type): - if proto_val_type in numeric_vector_list_types: - vector_values = getattr(feature_values, proto_val_type).val - else: - vector_values = getattr(feature_values, proto_val_type) - values_dict[feature_name] = vector_values + values_dict = _extract_proto_values_to_dict(values_dict) + entity_dict = _extract_proto_values_to_dict( + dict(zip(entity_key.join_keys, entity_key.entity_values)) + ) + values_dict.update(entity_dict) single_entity_record = { composite_key_name: entity_key_str, @@ -317,28 +308,51 @@ def retrieve_online_documents( expr = f"feature_name == '{requested_feature}'" composite_key_name = ( - "_".join([str(value) for value in table.entity_columns]) + "_pk" + "_".join([str(field.name) for field in table.entity_columns]) + "_pk" ) if requested_features: features_str = ", ".join([f"'{f}'" for f in requested_features]) expr += f" && feature_name in [{features_str}]" + output_fields = ( + [composite_key_name] + requested_features + ["created_ts", "event_ts"] + ) + assert all(field for field in output_fields if field in [f.name for f in collection.schema.fields]), \ + f"field(s) [{[field for field in output_fields if field not in [f.name for f in collection.schema.fields]]}'] not found in collection schema" + + # Note we choose the first vector field as the field to search on. Not ideal but it's something. + ann_search_field = None + for field in collection.schema.fields: + if field.dtype in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR]: + ann_search_field = field.name + break + results = collection.search( data=[embedding], - anns_field="vector_value", + anns_field=ann_search_field, param=search_params, limit=top_k, - expr=expr, - output_fields=[composite_key_name] - + requested_features - + ["created_ts", "event_ts"], + # expr=expr, + output_fields=output_fields, consistency_level="Strong", ) result_list = [] for hits in results: for hit in hits: - entity_key_str = hit.entity.get("entity_key") + single_record = {} + for field in output_fields: + val = hit.entity.get(field) + if field == composite_key_name: + val = deserialize_entity_key( + bytes.fromhex(val), + config.entity_key_serialization_version, + ) + entity_key_proto = val + single_record[field] = val + + + entity_key_str = hit.entity.get(composite_key_name) val_bin = hit.entity.get("value") val = ValueProto() val.ParseFromString(val_bin) @@ -350,7 +364,7 @@ def retrieve_online_documents( ) result_list.append( _build_retrieve_online_document_record( - entity_key, + entity_key_proto, val.SerializeToString(), embedding, distance, @@ -365,6 +379,24 @@ def _table_id(project: str, table: FeatureView) -> str: return f"{project}_{table.name}" +def _extract_proto_values_to_dict(input_dict: Dict[str, Any]) -> Dict[str, Any]: + numeric_vector_list_types = [ + k + for k in PROTO_VALUE_TO_VALUE_TYPE_MAP.keys() + if k is not None and "list" in k and "string" not in k + ] + output_dict = {} + for feature_name, feature_values in input_dict.items(): + for proto_val_type in PROTO_VALUE_TO_VALUE_TYPE_MAP: + if feature_values.HasField(proto_val_type): + if proto_val_type in numeric_vector_list_types: + vector_values = getattr(feature_values, proto_val_type).val + else: + vector_values = getattr(feature_values, proto_val_type) + output_dict[feature_name] = vector_values + return output_dict + + class MilvusTable(InfraObject): """ A Milvus collection managed by Feast. From 67ad68866dcd48b55144a81c43a4e8d602b2d0ca Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 19 Dec 2024 16:58:51 -0500 Subject: [PATCH 24/33] got the retrieval working now too :D Signed-off-by: Francisco Javier Arceo --- Makefile | 2 +- .../milvus_online_store/milvus.py | 51 ++++++++----------- .../online_store/test_universal_online.py | 24 ++------- 3 files changed, 27 insertions(+), 50 deletions(-) diff --git a/Makefile b/Makefile index ef1386fd1a5..bef7437bc8a 100644 --- a/Makefile +++ b/Makefile @@ -358,7 +358,7 @@ test-python-universal-milvus-online: FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.milvus_online_store.milvus_repo_configuration \ PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.milvus \ python -m pytest -n 8 --integration \ - -k "test_retrieve_online_documents2" \ + -k "test_retrieve_online_milvus_ocuments" \ sdk/python/tests --ignore=sdk/python/tests/integration/offline_store/test_dqm_validation.py test-python-universal-singlestore-online: diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index ab41072c842..a0a0ab56f67 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -14,7 +14,6 @@ from feast.feature_view import FeatureView from feast.infra.infra_object import InfraObject from feast.infra.key_encoding_utils import ( - deserialize_entity_key, serialize_entity_key, ) from feast.infra.online_stores.online_store import OnlineStore @@ -34,6 +33,7 @@ ) from feast.utils import ( _build_retrieve_online_document_record, + _serialize_vector_to_float_list, to_naive_utc, ) @@ -317,13 +317,19 @@ def retrieve_online_documents( output_fields = ( [composite_key_name] + requested_features + ["created_ts", "event_ts"] ) - assert all(field for field in output_fields if field in [f.name for f in collection.schema.fields]), \ - f"field(s) [{[field for field in output_fields if field not in [f.name for f in collection.schema.fields]]}'] not found in collection schema" + assert all( + field + for field in output_fields + if field in [f.name for f in collection.schema.fields] + ), f"field(s) [{[field for field in output_fields if field not in [f.name for f in collection.schema.fields]]}'] not found in collection schema" # Note we choose the first vector field as the field to search on. Not ideal but it's something. ann_search_field = None for field in collection.schema.fields: - if field.dtype in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR]: + if ( + field.dtype in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR] + and field.name in output_fields + ): ann_search_field = field.name break @@ -342,36 +348,23 @@ def retrieve_online_documents( for hit in hits: single_record = {} for field in output_fields: - val = hit.entity.get(field) - if field == composite_key_name: - val = deserialize_entity_key( - bytes.fromhex(val), - config.entity_key_serialization_version, - ) - entity_key_proto = val - single_record[field] = val - + single_record[field] = hit.entity.get(field) - entity_key_str = hit.entity.get(composite_key_name) - val_bin = hit.entity.get("value") - val = ValueProto() - val.ParseFromString(val_bin) + entity_key_bytes = bytes.fromhex(hit.entity.get(composite_key_name)) + embedding = hit.entity.get(ann_search_field) + serialized_embedding = _serialize_vector_to_float_list(embedding) distance = hit.distance event_ts = datetime.fromtimestamp(hit.entity.get("event_ts") / 1e6) - entity_key = deserialize_entity_key( - bytes.fromhex(entity_key_str), + prepared_result = _build_retrieve_online_document_record( + entity_key_bytes, + # This may have a bug + serialized_embedding.SerializeToString(), + embedding, + distance, + event_ts, config.entity_key_serialization_version, ) - result_list.append( - _build_retrieve_online_document_record( - entity_key_proto, - val.SerializeToString(), - embedding, - distance, - event_ts, - config.entity_key_serialization_version, - ) - ) + result_list.append(prepared_result) return result_list diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 5de41062f1f..f113e255555 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -895,7 +895,7 @@ def test_retrieve_online_documents(vectordb_environment, fake_document_data): @pytest.mark.integration @pytest.mark.universal_online_stores(only=["milvus"]) -def test_retrieve_online_documents2(environment, fake_document_data): +def test_retrieve_online_milvus_ocuments(environment, fake_document_data): print(environment.online_store) fs = environment.feature_store df, data_source = fake_document_data @@ -914,22 +914,6 @@ def test_retrieve_online_documents2(environment, fake_document_data): distance_metric="L2", ).to_dict() assert len(documents["embedding_float"]) == 2 - # - # # assert returned the entity_id - # assert len(documents["item_id"]) == 2 - # - # documents = fs.retrieve_online_documents( - # feature="item_embeddings:embedding_float", - # query=[1.0, 2.0], - # top_k=2, - # distance_metric="L1", - # ).to_dict() - # assert len(documents["embedding_float"]) == 2 - # - # with pytest.raises(ValueError): - # fs.retrieve_online_documents( - # feature="item_embeddings:embedding_float", - # query=[1.0, 2.0], - # top_k=2, - # distance_metric="wrong", - # ).to_dict() + + assert len(documents["item_id"]) == 2 + assert documents["item_id"] == [2, 3] From d1c15cff2c147b6304b150eeb948d3a432fc8865 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 19 Dec 2024 21:34:53 -0500 Subject: [PATCH 25/33] updates to fix linter and new signature for all implementations Signed-off-by: Francisco Javier Arceo --- .../feast/infra/online_stores/milvus_online_store/milvus.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index a0a0ab56f67..2725051ff8a 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -283,8 +283,8 @@ def retrieve_online_documents( self, config: RepoConfig, table: FeatureView, - requested_feature: str, - requested_features: List[str], + requested_feature: Optional[str], + requested_features: Optional[List[str]], embedding: List[float], top_k: int, distance_metric: Optional[str] = None, @@ -315,7 +315,7 @@ def retrieve_online_documents( expr += f" && feature_name in [{features_str}]" output_fields = ( - [composite_key_name] + requested_features + ["created_ts", "event_ts"] + [composite_key_name] + (requested_features if requested_features else []) + ["created_ts", "event_ts"] ) assert all( field From 6d94d7acde75bcb3485ea780e3748d2cf0883391 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 19 Dec 2024 22:23:42 -0500 Subject: [PATCH 26/33] linter Signed-off-by: Francisco Javier Arceo --- .../feast/infra/online_stores/milvus_online_store/milvus.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 2725051ff8a..31e5d1b994f 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -315,7 +315,9 @@ def retrieve_online_documents( expr += f" && feature_name in [{features_str}]" output_fields = ( - [composite_key_name] + (requested_features if requested_features else []) + ["created_ts", "event_ts"] + [composite_key_name] + + (requested_features if requested_features else []) + + ["created_ts", "event_ts"] ) assert all( field From fe518ed9cda3909f071fd97f9946a1dcc80ba58e Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 19 Dec 2024 22:45:24 -0500 Subject: [PATCH 27/33] more linting Signed-off-by: Francisco Javier Arceo --- .../feast/infra/online_stores/milvus_online_store/milvus.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 31e5d1b994f..e7fbf526036 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -9,6 +9,7 @@ FieldSchema, connections, ) +from pymilvus.orm.connections import Connections from feast import Entity from feast.feature_view import FeatureView @@ -102,7 +103,7 @@ class MilvusOnlineStore(OnlineStore): _collections: Dictionary to cache Milvus collections. """ - _conn: Optional[connections] = None + _conn: Optional[Connections] = None _collections: Dict[str, Collection] = {} def _connect(self, config: RepoConfig) -> connections: From a639013d5cbc301b3eedf664e4c7c1c1ded5f4bd Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 20 Dec 2024 09:44:32 -0500 Subject: [PATCH 28/33] Removing some unnecessary code Signed-off-by: Francisco Javier Arceo --- .../infra/online_stores/milvus_online_store/milvus.py | 1 - sdk/python/tests/conftest.py | 11 ++--------- .../feature_repos/universal/online_store/milvus.py | 1 - 3 files changed, 2 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index e7fbf526036..a1a4a3a5fe5 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -341,7 +341,6 @@ def retrieve_online_documents( anns_field=ann_search_field, param=search_params, limit=top_k, - # expr=expr, output_fields=output_fields, consistency_level="Strong", ) diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index c7116d4e313..a8c03bee4a0 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -207,17 +207,10 @@ def environment(request, worker_id): @pytest.fixture def vectordb_environment(request, worker_id): - db_config = IntegrationTestRepoConfig( - provider="local", - # online_store="milvus", - # online_store_creator=MilvusOnlineStoreCreator, - # offline_store_creator=FileDataSourceCreator, - ) - print(request) e = construct_test_environment( - db_config, - fixture_request=request, + request.param, worker_id=worker_id, + fixture_request=request, entity_key_serialization_version=3, ) diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py b/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py index 7de60db2daf..8ffee04c12f 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/milvus.py @@ -32,5 +32,4 @@ def create_online_store(self) -> Dict[str, Any]: } def teardown(self): - # assert 1 == 4 self.container.stop() From c91681f4f1bf8475bdfd0265f848e33e5ad01e90 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 20 Dec 2024 14:50:05 -0500 Subject: [PATCH 29/33] removing change to setup Signed-off-by: Francisco Javier Arceo --- setup.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/setup.py b/setup.py index 7284782e10d..59b881c9715 100644 --- a/setup.py +++ b/setup.py @@ -158,8 +158,6 @@ MILVUS_REQUIRED = ["pymilvus"] -MILVUS_REQUIRED = ["pymilvus"] - CI_REQUIRED = ( [ "build", From bd5ff480b2a73a6dda4b8ecfa240f3cf9d609deb Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 20 Dec 2024 14:50:43 -0500 Subject: [PATCH 30/33] adding sphinx docs Signed-off-by: Francisco Javier Arceo --- .../feast.infra.online_stores.milvus_online_store.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdk/python/docs/source/feast.infra.online_stores.milvus_online_store.rst b/sdk/python/docs/source/feast.infra.online_stores.milvus_online_store.rst index ee9faa55dc0..5ae3015bf37 100644 --- a/sdk/python/docs/source/feast.infra.online_stores.milvus_online_store.rst +++ b/sdk/python/docs/source/feast.infra.online_stores.milvus_online_store.rst @@ -4,6 +4,14 @@ feast.infra.online\_stores.milvus\_online\_store package Submodules ---------- +feast.infra.online\_stores.milvus\_online\_store.milvus module +-------------------------------------------------------------- + +.. automodule:: feast.infra.online_stores.milvus_online_store.milvus + :members: + :undoc-members: + :show-inheritance: + feast.infra.online\_stores.milvus\_online\_store.milvus\_repo\_configuration module ----------------------------------------------------------------------------------- From 32c9a9fa817509b71620a987fda2e3c3050767b0 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 22 Dec 2024 22:19:10 -0500 Subject: [PATCH 31/33] adjusting workflow Signed-off-by: Francisco Javier Arceo --- .github/workflows/pr_local_integration_tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr_local_integration_tests.yml b/.github/workflows/pr_local_integration_tests.yml index 2825b96f482..d3488cd08c3 100644 --- a/.github/workflows/pr_local_integration_tests.yml +++ b/.github/workflows/pr_local_integration_tests.yml @@ -50,7 +50,7 @@ jobs: uses: actions/cache@v4 with: path: ${{ steps.uv-cache.outputs.dir }} - key: ${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-uv-${{ hashFiles(format('**/py{0}-ci-requirements.txt', env.PYTHON)) }} + key: ${{ runner.os }}-${{ matrix.python-version }}-uv-${{ hashFiles(format('**/py{0}-ci-requirements.txt', matrix.python-version)) }} - name: Install dependencies run: make install-python-dependencies-ci - name: Test local integration tests From 22bac8ba7e6b48f5290f4051d9ca831bc9513111 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 22 Dec 2024 22:50:24 -0500 Subject: [PATCH 32/33] adding logging to debug Signed-off-by: Francisco Javier Arceo --- .../tests/integration/online_store/test_universal_online.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index f113e255555..b365aa75328 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -614,6 +614,10 @@ def eventually_apply() -> Tuple[None, bool]: online_features = fs.get_online_features( features=features, entity_rows=entity_rows ).to_dict() + + # Debugging print statement + print("Online features values:", online_features["value"]) + assert all(v is None for v in online_features["value"]) From 134b908595d36a50831bb4d86a23f1d9f6991a6a Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 23 Dec 2024 08:27:21 -0500 Subject: [PATCH 33/33] changing to vectordb environment Signed-off-by: Francisco Javier Arceo --- sdk/python/tests/conftest.py | 1 - .../tests/integration/online_store/test_universal_online.py | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index a8c03bee4a0..6e5f1e14870 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -191,7 +191,6 @@ def environment(request, worker_id): request.param, worker_id=worker_id, fixture_request=request, - entity_key_serialization_version=3, ) e.setup() diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index b365aa75328..d337d365e9b 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -899,9 +899,8 @@ def test_retrieve_online_documents(vectordb_environment, fake_document_data): @pytest.mark.integration @pytest.mark.universal_online_stores(only=["milvus"]) -def test_retrieve_online_milvus_ocuments(environment, fake_document_data): - print(environment.online_store) - fs = environment.feature_store +def test_retrieve_online_milvus_documents(vectordb_environment, fake_document_data): + fs = vectordb_environment.feature_store df, data_source = fake_document_data item_embeddings_feature_view = create_item_embeddings_feature_view(data_source) fs.apply([item_embeddings_feature_view, item()])