From 9d5022b8d765d7308aadef597bbce787963bf194 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Wed, 28 Aug 2024 20:37:46 -0700 Subject: [PATCH 01/14] add faiss & in memory online store Signed-off-by: cmuhao --- .../contrib/faiss_online_store.py | 210 ++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py diff --git a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py new file mode 100644 index 00000000000..97ed354f0f5 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py @@ -0,0 +1,210 @@ +import faiss +import numpy as np +from typing import Sequence, Tuple, List, Optional, Dict, Any, Callable, Union +from feast import Entity, FeatureView, RepoConfig +from feast.infra.online_stores.online_store import OnlineStore +from feast.repo_config import FeastConfigBaseModel +from feast.protos.feast.types.EntityKey_pb2 import EntityKey +from feast.protos.feast.types.Value_pb2 import Value +from datetime import datetime +from google.protobuf.timestamp_pb2 import Timestamp +import logging + +from protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from protos.feast.types.Value_pb2 import Value as ValueProto + + +class FaissOnlineStoreConfig(FeastConfigBaseModel): + dimension: int + index_path: str + index_type: str = "IVFFlat" + nlist: int = 100 + + +class InMemoryStore: + def __init__(self): + self._index = None + self.feature_names = [] + self.entity_keys = {} + + def update(self, + feature_names: List[str], + entity_keys: Dict[Tuple[str, ...], int]): + self.feature_names = feature_names + self.entity_keys = entity_keys + + def delete(self, + entity_keys: List[EntityKey]): + for entity_key in entity_keys: + del self.entity_keys[entity_key] + + def read(self, + entity_keys: List[EntityKey]): + return [self.entity_keys.get(entity_key, None) for entity_key in entity_keys] + + def teardown(self): + self._index = None + self.feature_names = [] + self.entity_keys = {} + + +class FaissOnlineStore(OnlineStore): + + def __init__(self, + config: Optional[Dict[str, Any]] = None): + self._index = None + self._in_memory_store = InMemoryStore() + self._config = FaissOnlineStoreConfig(**config) if config else None + self._logger = logging.getLogger(__name__) + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + feature_views = tables_to_keep + if not feature_views: + return + + feature_names = [f.name for f in feature_views[0].features] + dimension = len(feature_names) + + if self._index is None or not partial: + quantizer = faiss.IndexFlatL2(dimension) + self._index = faiss.IndexIVFFlat(quantizer, dimension, self._config.nlist) + self._index.train(np.random.rand(self._config.nlist * 100, dimension).astype(np.float32)) + self._in_memory_store = InMemoryStore() + + self._in_memory_store.update(feature_names, {}) + + def teardown(self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity]): + # reset index + self._index = None + self._in_memory_store.teardown() + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKey], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]]: + if self._index is None: + return [(None, None)] * len(entity_keys) + + results = [] + for entity_key in entity_keys: + entity_key_tuple = tuple(f"{field.name}:{field.value.string_val}" for field in entity_key.join_keys) + idx = self._in_memory_store.entity_keys.get(entity_key_tuple, -1) + if idx == -1: + results.append((None, None)) + else: + feature_vector = self._index.reconstruct(int(idx)) + feature_dict = { + name: Value(double_val=value) + for name, value in zip(self._in_memory_store.feature_names, feature_vector) + } + results.append((None, feature_dict)) + return results + + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[Tuple[EntityKey, Dict[str, Value], datetime, Optional[datetime]]], + progress: Optional[Callable[[int], Any]], + ) -> None: + if self._index is None: + self._logger.warning("Index is not initialized. Skipping write operation.") + return + + feature_vectors = [] + entity_key_tuples = [] + + for entity_key, feature_dict, _, _ in data: + entity_key_tuple = tuple(f"{field.name}:{field.value.string_val}" for field in entity_key.join_keys) + feature_vector = np.array([ + feature_dict[name].double_val for name in self._in_memory_store.feature_names + ], dtype=np.float32) + + feature_vectors.append(feature_vector) + entity_key_tuples.append(entity_key_tuple) + + feature_vectors = np.array(feature_vectors) + + existing_indices = [self._in_memory_store.entity_keys.get(ekt, -1) for ekt in entity_key_tuples] + mask = np.array(existing_indices) != -1 + if np.any(mask): + self._index.remove_ids(np.array([idx for idx in existing_indices if idx != -1])) + + new_indices = np.arange(self._index.ntotal, self._index.ntotal + len(feature_vectors)) + self._index.add(feature_vectors) + + for ekt, idx in zip(entity_key_tuples, new_indices): + self._in_memory_store.entity_keys[ekt] = idx + + if progress: + progress(len(data)) + + def retrieve_online_documents( + self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], + top_k: int, + distance_metric: Optional[str] = None, + ) -> List[ + Tuple[ + Optional[datetime], + Optional[Value], + Optional[Value], + Optional[Value], + ] + ]: + if self._index is None: + self._logger.warning("Index is not initialized. Returning empty result.") + return [] + + query_vector = np.array(embedding, dtype=np.float32).reshape(1, -1) + distances, indices = self._index.search(query_vector, top_k) + + results = [] + for i, idx in enumerate(indices[0]): + if idx == -1: + continue + + feature_vector = self._index.reconstruct(int(idx)) + + timestamp = Timestamp() + timestamp.GetCurrentTime() + + feature_value = Value(string_val=",".join(map(str, feature_vector))) + vector_value = Value(string_val=",".join(map(str, feature_vector))) + distance_value = Value(float_val=distances[0][i]) + + results.append( + ( + timestamp.ToDatetime(), + feature_value, + vector_value, + distance_value, + ) + ) + + return results + + async def online_read_async(self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None) -> List[ + Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + pass From 1f10a728160bcc8ae02afd033f54d9c8b8593db5 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Wed, 28 Aug 2024 20:38:53 -0700 Subject: [PATCH 02/14] add faiss & in memory online store Signed-off-by: cmuhao --- .../contrib/faiss_online_store.py | 142 ++++++++++-------- 1 file changed, 80 insertions(+), 62 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py index 97ed354f0f5..685b21c2ee0 100644 --- a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py @@ -1,17 +1,18 @@ +import logging +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple + import faiss import numpy as np -from typing import Sequence, Tuple, List, Optional, Dict, Any, Callable, Union +from google.protobuf.timestamp_pb2 import Timestamp +from protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from protos.feast.types.Value_pb2 import Value as ValueProto + from feast import Entity, FeatureView, RepoConfig from feast.infra.online_stores.online_store import OnlineStore -from feast.repo_config import FeastConfigBaseModel from feast.protos.feast.types.EntityKey_pb2 import EntityKey from feast.protos.feast.types.Value_pb2 import Value -from datetime import datetime -from google.protobuf.timestamp_pb2 import Timestamp -import logging - -from protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel class FaissOnlineStoreConfig(FeastConfigBaseModel): @@ -27,19 +28,15 @@ def __init__(self): self.feature_names = [] self.entity_keys = {} - def update(self, - feature_names: List[str], - entity_keys: Dict[Tuple[str, ...], int]): + def update(self, feature_names: List[str], entity_keys: Dict[Tuple[str, ...], int]): self.feature_names = feature_names self.entity_keys = entity_keys - def delete(self, - entity_keys: List[EntityKey]): + def delete(self, entity_keys: List[EntityKey]): for entity_key in entity_keys: del self.entity_keys[entity_key] - def read(self, - entity_keys: List[EntityKey]): + def read(self, entity_keys: List[EntityKey]): return [self.entity_keys.get(entity_key, None) for entity_key in entity_keys] def teardown(self): @@ -49,22 +46,20 @@ def teardown(self): class FaissOnlineStore(OnlineStore): - - def __init__(self, - config: Optional[Dict[str, Any]] = None): + def __init__(self, config: Optional[Dict[str, Any]] = None): self._index = None self._in_memory_store = InMemoryStore() self._config = FaissOnlineStoreConfig(**config) if config else None self._logger = logging.getLogger(__name__) def update( - self, - config: RepoConfig, - tables_to_delete: Sequence[FeatureView], - tables_to_keep: Sequence[FeatureView], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, ): feature_views = tables_to_keep if not feature_views: @@ -76,32 +71,39 @@ def update( if self._index is None or not partial: quantizer = faiss.IndexFlatL2(dimension) self._index = faiss.IndexIVFFlat(quantizer, dimension, self._config.nlist) - self._index.train(np.random.rand(self._config.nlist * 100, dimension).astype(np.float32)) + self._index.train( + np.random.rand(self._config.nlist * 100, dimension).astype(np.float32) + ) self._in_memory_store = InMemoryStore() self._in_memory_store.update(feature_names, {}) - def teardown(self, - config: RepoConfig, - tables: Sequence[FeatureView], - entities: Sequence[Entity]): + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): # reset index self._index = None self._in_memory_store.teardown() def online_read( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKey], - requested_features: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKey], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]]: if self._index is None: return [(None, None)] * len(entity_keys) results = [] for entity_key in entity_keys: - entity_key_tuple = tuple(f"{field.name}:{field.value.string_val}" for field in entity_key.join_keys) + entity_key_tuple = tuple( + f"{field.name}:{field.value.string_val}" + for field in entity_key.join_keys + ) idx = self._in_memory_store.entity_keys.get(entity_key_tuple, -1) if idx == -1: results.append((None, None)) @@ -109,17 +111,19 @@ def online_read( feature_vector = self._index.reconstruct(int(idx)) feature_dict = { name: Value(double_val=value) - for name, value in zip(self._in_memory_store.feature_names, feature_vector) + for name, value in zip( + self._in_memory_store.feature_names, feature_vector + ) } results.append((None, feature_dict)) return results def online_write_batch( - self, - config: RepoConfig, - table: FeatureView, - data: List[Tuple[EntityKey, Dict[str, Value], datetime, Optional[datetime]]], - progress: Optional[Callable[[int], Any]], + self, + config: RepoConfig, + table: FeatureView, + data: List[Tuple[EntityKey, Dict[str, Value], datetime, Optional[datetime]]], + progress: Optional[Callable[[int], Any]], ) -> None: if self._index is None: self._logger.warning("Index is not initialized. Skipping write operation.") @@ -129,22 +133,35 @@ def online_write_batch( entity_key_tuples = [] for entity_key, feature_dict, _, _ in data: - entity_key_tuple = tuple(f"{field.name}:{field.value.string_val}" for field in entity_key.join_keys) - feature_vector = np.array([ - feature_dict[name].double_val for name in self._in_memory_store.feature_names - ], dtype=np.float32) + entity_key_tuple = tuple( + f"{field.name}:{field.value.string_val}" + for field in entity_key.join_keys + ) + feature_vector = np.array( + [ + feature_dict[name].double_val + for name in self._in_memory_store.feature_names + ], + dtype=np.float32, + ) feature_vectors.append(feature_vector) entity_key_tuples.append(entity_key_tuple) feature_vectors = np.array(feature_vectors) - existing_indices = [self._in_memory_store.entity_keys.get(ekt, -1) for ekt in entity_key_tuples] + existing_indices = [ + self._in_memory_store.entity_keys.get(ekt, -1) for ekt in entity_key_tuples + ] mask = np.array(existing_indices) != -1 if np.any(mask): - self._index.remove_ids(np.array([idx for idx in existing_indices if idx != -1])) + self._index.remove_ids( + np.array([idx for idx in existing_indices if idx != -1]) + ) - new_indices = np.arange(self._index.ntotal, self._index.ntotal + len(feature_vectors)) + new_indices = np.arange( + self._index.ntotal, self._index.ntotal + len(feature_vectors) + ) self._index.add(feature_vectors) for ekt, idx in zip(entity_key_tuples, new_indices): @@ -154,13 +171,13 @@ def online_write_batch( progress(len(data)) def retrieve_online_documents( - self, - config: RepoConfig, - table: FeatureView, - requested_feature: str, - embedding: List[float], - top_k: int, - distance_metric: Optional[str] = None, + self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], + top_k: int, + distance_metric: Optional[str] = None, ) -> List[ Tuple[ Optional[datetime], @@ -201,10 +218,11 @@ def retrieve_online_documents( return results - async def online_read_async(self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKeyProto], - requested_features: Optional[List[str]] = None) -> List[ - Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + async def online_read_async( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: pass From ab231a7cf1ece2792bdda56e158841499f0478fe Mon Sep 17 00:00:00 2001 From: cmuhao Date: Wed, 28 Aug 2024 20:56:01 -0700 Subject: [PATCH 03/14] add faiss & in memory online store Signed-off-by: cmuhao --- .../contrib/faiss_online_store.py | 209 +++++++----------- 1 file changed, 85 insertions(+), 124 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py index 685b21c2ee0..c3af4cf78a2 100644 --- a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py @@ -1,18 +1,14 @@ -import logging -from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple - -import faiss -import numpy as np -from google.protobuf.timestamp_pb2 import Timestamp -from protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from protos.feast.types.Value_pb2 import Value as ValueProto - -from feast import Entity, FeatureView, RepoConfig from feast.infra.online_stores.online_store import OnlineStore +from feast import Entity, FeatureView, RepoConfig from feast.protos.feast.types.EntityKey_pb2 import EntityKey from feast.protos.feast.types.Value_pb2 import Value from feast.repo_config import FeastConfigBaseModel +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple +from datetime import datetime +import faiss +import numpy as np +import logging +from google.protobuf.timestamp_pb2 import Timestamp class FaissOnlineStoreConfig(FeastConfigBaseModel): @@ -23,146 +19,127 @@ class FaissOnlineStoreConfig(FeastConfigBaseModel): class InMemoryStore: + feature_names: List[str] + entity_keys: Dict[Tuple[str, ...], int] + def __init__(self): - self._index = None self.feature_names = [] self.entity_keys = {} - def update(self, feature_names: List[str], entity_keys: Dict[Tuple[str, ...], int]): + def update(self, + feature_names: List[str], + entity_keys: Dict[Tuple[str, ...], int]): self.feature_names = feature_names self.entity_keys = entity_keys - def delete(self, entity_keys: List[EntityKey]): + def delete(self, + entity_keys: List[EntityKey]): for entity_key in entity_keys: del self.entity_keys[entity_key] - def read(self, entity_keys: List[EntityKey]): + def read(self, + entity_keys: List[EntityKey]): return [self.entity_keys.get(entity_key, None) for entity_key in entity_keys] def teardown(self): - self._index = None self.feature_names = [] self.entity_keys = {} class FaissOnlineStore(OnlineStore): - def __init__(self, config: Optional[Dict[str, Any]] = None): - self._index = None - self._in_memory_store = InMemoryStore() - self._config = FaissOnlineStoreConfig(**config) if config else None - self._logger = logging.getLogger(__name__) + _index: Optional[faiss.IndexIVFFlat] = None + _in_memory_store: InMemoryStore = InMemoryStore() + _config: Optional[FaissOnlineStoreConfig] = None + _logger: logging.Logger = logging.getLogger(__name__) + + def _get_index(self, + config: RepoConfig) -> faiss.IndexIVFFlat: + if self._index is None: + dimension = config.online_store.dimension + quantizer = faiss.IndexFlatL2(dimension) + self._index = faiss.IndexIVFFlat(quantizer, dimension, self._config.nlist) + self._index.train(np.random.rand(self._config.nlist * 100, dimension).astype(np.float32)) + return self._index def update( - self, - config: RepoConfig, - tables_to_delete: Sequence[FeatureView], - tables_to_keep: Sequence[FeatureView], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, ): feature_views = tables_to_keep if not feature_views: return feature_names = [f.name for f in feature_views[0].features] - dimension = len(feature_names) - - if self._index is None or not partial: - quantizer = faiss.IndexFlatL2(dimension) - self._index = faiss.IndexIVFFlat(quantizer, dimension, self._config.nlist) - self._index.train( - np.random.rand(self._config.nlist * 100, dimension).astype(np.float32) - ) - self._in_memory_store = InMemoryStore() - self._in_memory_store.update(feature_names, {}) + self._config = FaissOnlineStoreConfig(**config.online_store.dict()) + self._get_index(config) def teardown( - self, - config: RepoConfig, - tables: Sequence[FeatureView], - entities: Sequence[Entity], + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], ): - # reset index self._index = None self._in_memory_store.teardown() def online_read( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKey], - requested_features: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKey], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]]: - if self._index is None: - return [(None, None)] * len(entity_keys) - + index = self._get_index(config) results = [] for entity_key in entity_keys: - entity_key_tuple = tuple( - f"{field.name}:{field.value.string_val}" - for field in entity_key.join_keys - ) + entity_key_tuple = tuple(f"{field.name}:{field.value.string_val}" for field in entity_key.join_keys) idx = self._in_memory_store.entity_keys.get(entity_key_tuple, -1) if idx == -1: results.append((None, None)) else: - feature_vector = self._index.reconstruct(int(idx)) + feature_vector = index.reconstruct(int(idx)) feature_dict = { name: Value(double_val=value) - for name, value in zip( - self._in_memory_store.feature_names, feature_vector - ) + for name, value in zip(self._in_memory_store.feature_names, feature_vector) } results.append((None, feature_dict)) return results def online_write_batch( - self, - config: RepoConfig, - table: FeatureView, - data: List[Tuple[EntityKey, Dict[str, Value], datetime, Optional[datetime]]], - progress: Optional[Callable[[int], Any]], + self, + config: RepoConfig, + table: FeatureView, + data: List[Tuple[EntityKey, Dict[str, Value], datetime, Optional[datetime]]], + progress: Optional[Callable[[int], Any]], ) -> None: - if self._index is None: - self._logger.warning("Index is not initialized. Skipping write operation.") - return - + index = self._get_index(config) feature_vectors = [] entity_key_tuples = [] for entity_key, feature_dict, _, _ in data: - entity_key_tuple = tuple( - f"{field.name}:{field.value.string_val}" - for field in entity_key.join_keys - ) - feature_vector = np.array( - [ - feature_dict[name].double_val - for name in self._in_memory_store.feature_names - ], - dtype=np.float32, - ) + entity_key_tuple = tuple(f"{field.name}:{field.value.string_val}" for field in entity_key.join_keys) + feature_vector = np.array([ + feature_dict[name].double_val for name in self._in_memory_store.feature_names + ], dtype=np.float32) feature_vectors.append(feature_vector) entity_key_tuples.append(entity_key_tuple) feature_vectors = np.array(feature_vectors) - existing_indices = [ - self._in_memory_store.entity_keys.get(ekt, -1) for ekt in entity_key_tuples - ] + existing_indices = [self._in_memory_store.entity_keys.get(ekt, -1) for ekt in entity_key_tuples] mask = np.array(existing_indices) != -1 if np.any(mask): - self._index.remove_ids( - np.array([idx for idx in existing_indices if idx != -1]) - ) + index.remove_ids(np.array([idx for idx in existing_indices if idx != -1])) - new_indices = np.arange( - self._index.ntotal, self._index.ntotal + len(feature_vectors) - ) - self._index.add(feature_vectors) + new_indices = np.arange(index.ntotal, index.ntotal + len(feature_vectors)) + index.add(feature_vectors) for ekt, idx in zip(entity_key_tuples, new_indices): self._in_memory_store.entity_keys[ekt] = idx @@ -171,34 +148,24 @@ def online_write_batch( progress(len(data)) def retrieve_online_documents( - self, - config: RepoConfig, - table: FeatureView, - requested_feature: str, - embedding: List[float], - top_k: int, - distance_metric: Optional[str] = None, - ) -> List[ - Tuple[ - Optional[datetime], - Optional[Value], - Optional[Value], - Optional[Value], - ] - ]: - if self._index is None: - self._logger.warning("Index is not initialized. Returning empty result.") - return [] - + self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], + top_k: int, + distance_metric: Optional[str] = None, + ) -> List[Tuple[Optional[datetime], Optional[Value], Optional[Value], Optional[Value]]]: + index = self._get_index(config) query_vector = np.array(embedding, dtype=np.float32).reshape(1, -1) - distances, indices = self._index.search(query_vector, top_k) + distances, indices = index.search(query_vector, top_k) results = [] for i, idx in enumerate(indices[0]): if idx == -1: continue - feature_vector = self._index.reconstruct(int(idx)) + feature_vector = index.reconstruct(int(idx)) timestamp = Timestamp() timestamp.GetCurrentTime() @@ -207,22 +174,16 @@ def retrieve_online_documents( vector_value = Value(string_val=",".join(map(str, feature_vector))) distance_value = Value(float_val=distances[0][i]) - results.append( - ( - timestamp.ToDatetime(), - feature_value, - vector_value, - distance_value, - ) - ) + results.append((timestamp.ToDatetime(), feature_value, vector_value, distance_value)) return results async def online_read_async( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKeyProto], - requested_features: Optional[List[str]] = None, - ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKey], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]]: + # Implement async read if needed pass From b45ea90f56da10de164ed29cf06cd78596d28d98 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Wed, 28 Aug 2024 20:58:39 -0700 Subject: [PATCH 04/14] add faiss & in memory online store Signed-off-by: cmuhao --- .../contrib/faiss_online_store.py | 143 ++++++++++++------ 1 file changed, 97 insertions(+), 46 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py index c3af4cf78a2..3f70a275129 100644 --- a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py @@ -1,15 +1,17 @@ -from feast.infra.online_stores.online_store import OnlineStore -from feast import Entity, FeatureView, RepoConfig -from feast.protos.feast.types.EntityKey_pb2 import EntityKey -from feast.protos.feast.types.Value_pb2 import Value -from feast.repo_config import FeastConfigBaseModel -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple +import logging from datetime import datetime +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union + import faiss import numpy as np -import logging from google.protobuf.timestamp_pb2 import Timestamp +from feast import Entity, FeatureView, RepoConfig +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey +from feast.protos.feast.types.Value_pb2 import Value +from feast.repo_config import FeastConfigBaseModel + class FaissOnlineStoreConfig(FeastConfigBaseModel): dimension: int @@ -19,12 +21,9 @@ class FaissOnlineStoreConfig(FeastConfigBaseModel): class InMemoryStore: - feature_names: List[str] - entity_keys: Dict[Tuple[str, ...], int] - def __init__(self): - self.feature_names = [] - self.entity_keys = {} + self.feature_names: List[str] = [] + self.entity_keys: Dict[Tuple[str, ...], int] = {} def update(self, feature_names: List[str], @@ -33,13 +32,14 @@ def update(self, self.entity_keys = entity_keys def delete(self, - entity_keys: List[EntityKey]): + entity_keys: List[Tuple[str, ...]]): for entity_key in entity_keys: - del self.entity_keys[entity_key] + if entity_key in self.entity_keys: + del self.entity_keys[entity_key] def read(self, - entity_keys: List[EntityKey]): - return [self.entity_keys.get(entity_key, None) for entity_key in entity_keys] + entity_keys: List[Tuple[str, ...]]) -> List[Optional[int]]: + return [self.entity_keys.get(entity_key) for entity_key in entity_keys] def teardown(self): self.feature_names = [] @@ -54,11 +54,8 @@ class FaissOnlineStore(OnlineStore): def _get_index(self, config: RepoConfig) -> faiss.IndexIVFFlat: - if self._index is None: - dimension = config.online_store.dimension - quantizer = faiss.IndexFlatL2(dimension) - self._index = faiss.IndexIVFFlat(quantizer, dimension, self._config.nlist) - self._index.train(np.random.rand(self._config.nlist * 100, dimension).astype(np.float32)) + if self._index is None or self._config is None: + raise ValueError("Index is not initialized") return self._index def update( @@ -75,9 +72,16 @@ def update( return feature_names = [f.name for f in feature_views[0].features] - self._in_memory_store.update(feature_names, {}) + dimension = len(feature_names) + self._config = FaissOnlineStoreConfig(**config.online_store.dict()) - self._get_index(config) + if self._index is None or not partial: + quantizer = faiss.IndexFlatL2(dimension) + self._index = faiss.IndexIVFFlat(quantizer, dimension, self._config.nlist) + self._index.train(np.random.rand(self._config.nlist * 100, dimension).astype(np.float32)) + self._in_memory_store = InMemoryStore() + + self._in_memory_store.update(feature_names, {}) def teardown( self, @@ -95,18 +99,25 @@ def online_read( entity_keys: List[EntityKey], requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]]: - index = self._get_index(config) - results = [] + if self._index is None: + return [(None, None)] * len(entity_keys) + + results: List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]] = [] for entity_key in entity_keys: - entity_key_tuple = tuple(f"{field.name}:{field.value.string_val}" for field in entity_key.join_keys) + entity_key_tuple = tuple( + f"{field.name}:{field.value.string_val}" + for field in entity_key.join_keys + ) idx = self._in_memory_store.entity_keys.get(entity_key_tuple, -1) if idx == -1: results.append((None, None)) else: - feature_vector = index.reconstruct(int(idx)) + feature_vector = self._index.reconstruct(int(idx)) feature_dict = { name: Value(double_val=value) - for name, value in zip(self._in_memory_store.feature_names, feature_vector) + for name, value in zip( + self._in_memory_store.feature_names, feature_vector + ) } results.append((None, feature_dict)) return results @@ -118,28 +129,44 @@ def online_write_batch( data: List[Tuple[EntityKey, Dict[str, Value], datetime, Optional[datetime]]], progress: Optional[Callable[[int], Any]], ) -> None: - index = self._get_index(config) + if self._index is None: + self._logger.warning("Index is not initialized. Skipping write operation.") + return + feature_vectors = [] entity_key_tuples = [] for entity_key, feature_dict, _, _ in data: - entity_key_tuple = tuple(f"{field.name}:{field.value.string_val}" for field in entity_key.join_keys) - feature_vector = np.array([ - feature_dict[name].double_val for name in self._in_memory_store.feature_names - ], dtype=np.float32) + entity_key_tuple = tuple( + f"{field.name}:{field.value.string_val}" + for field in entity_key.join_keys + ) + feature_vector = np.array( + [ + feature_dict[name].double_val + for name in self._in_memory_store.feature_names + ], + dtype=np.float32, + ) feature_vectors.append(feature_vector) entity_key_tuples.append(entity_key_tuple) - feature_vectors = np.array(feature_vectors) + feature_vectors_array = np.array(feature_vectors) - existing_indices = [self._in_memory_store.entity_keys.get(ekt, -1) for ekt in entity_key_tuples] + existing_indices = [ + self._in_memory_store.entity_keys.get(ekt, -1) for ekt in entity_key_tuples + ] mask = np.array(existing_indices) != -1 if np.any(mask): - index.remove_ids(np.array([idx for idx in existing_indices if idx != -1])) + self._index.remove_ids( + np.array([idx for idx in existing_indices if idx != -1]) + ) - new_indices = np.arange(index.ntotal, index.ntotal + len(feature_vectors)) - index.add(feature_vectors) + new_indices = np.arange( + self._index.ntotal, self._index.ntotal + len(feature_vectors_array) + ) + self._index.add(feature_vectors_array) for ekt, idx in zip(entity_key_tuples, new_indices): self._in_memory_store.entity_keys[ekt] = idx @@ -155,17 +182,34 @@ def retrieve_online_documents( embedding: List[float], top_k: int, distance_metric: Optional[str] = None, - ) -> List[Tuple[Optional[datetime], Optional[Value], Optional[Value], Optional[Value]]]: - index = self._get_index(config) - query_vector = np.array(embedding, dtype=np.float32).reshape(1, -1) - distances, indices = index.search(query_vector, top_k) + ) -> List[ + Tuple[ + Optional[datetime], + Optional[Value], + Optional[Value], + Optional[Value], + ] + ]: + if self._index is None: + self._logger.warning("Index is not initialized. Returning empty result.") + return [] - results = [] + query_vector = np.array(embedding, dtype=np.float32).reshape(1, -1) + distances, indices = self._index.search(query_vector, top_k) + + results: List[ + Tuple[ + Optional[datetime], + Optional[Value], + Optional[Value], + Optional[Value], + ] + ] = [] for i, idx in enumerate(indices[0]): if idx == -1: continue - feature_vector = index.reconstruct(int(idx)) + feature_vector = self._index.reconstruct(int(idx)) timestamp = Timestamp() timestamp.GetCurrentTime() @@ -174,7 +218,14 @@ def retrieve_online_documents( vector_value = Value(string_val=",".join(map(str, feature_vector))) distance_value = Value(float_val=distances[0][i]) - results.append((timestamp.ToDatetime(), feature_value, vector_value, distance_value)) + results.append( + ( + timestamp.ToDatetime(), + feature_value, + vector_value, + distance_value, + ) + ) return results @@ -186,4 +237,4 @@ async def online_read_async( requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]]: # Implement async read if needed - pass + raise NotImplementedError("Async read is not implemented for FaissOnlineStore") From bc84eb5c239bd4e62fb397bdbd5e7f914b90d600 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Wed, 28 Aug 2024 21:01:26 -0700 Subject: [PATCH 05/14] add faiss & in memory online store Signed-off-by: cmuhao --- .../contrib/faiss_online_store.py | 93 +++++++++---------- 1 file changed, 45 insertions(+), 48 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py index 3f70a275129..ce26eca0ebf 100644 --- a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py @@ -1,6 +1,6 @@ import logging from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple import faiss import numpy as np @@ -25,20 +25,16 @@ def __init__(self): self.feature_names: List[str] = [] self.entity_keys: Dict[Tuple[str, ...], int] = {} - def update(self, - feature_names: List[str], - entity_keys: Dict[Tuple[str, ...], int]): + def update(self, feature_names: List[str], entity_keys: Dict[Tuple[str, ...], int]): self.feature_names = feature_names self.entity_keys = entity_keys - def delete(self, - entity_keys: List[Tuple[str, ...]]): + def delete(self, entity_keys: List[Tuple[str, ...]]): for entity_key in entity_keys: if entity_key in self.entity_keys: del self.entity_keys[entity_key] - def read(self, - entity_keys: List[Tuple[str, ...]]) -> List[Optional[int]]: + def read(self, entity_keys: List[Tuple[str, ...]]) -> List[Optional[int]]: return [self.entity_keys.get(entity_key) for entity_key in entity_keys] def teardown(self): @@ -52,20 +48,19 @@ class FaissOnlineStore(OnlineStore): _config: Optional[FaissOnlineStoreConfig] = None _logger: logging.Logger = logging.getLogger(__name__) - def _get_index(self, - config: RepoConfig) -> faiss.IndexIVFFlat: + def _get_index(self, config: RepoConfig) -> faiss.IndexIVFFlat: if self._index is None or self._config is None: raise ValueError("Index is not initialized") return self._index def update( - self, - config: RepoConfig, - tables_to_delete: Sequence[FeatureView], - tables_to_keep: Sequence[FeatureView], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, ): feature_views = tables_to_keep if not feature_views: @@ -78,31 +73,33 @@ def update( if self._index is None or not partial: quantizer = faiss.IndexFlatL2(dimension) self._index = faiss.IndexIVFFlat(quantizer, dimension, self._config.nlist) - self._index.train(np.random.rand(self._config.nlist * 100, dimension).astype(np.float32)) + self._index.train( + np.random.rand(self._config.nlist * 100, dimension).astype(np.float32) + ) self._in_memory_store = InMemoryStore() self._in_memory_store.update(feature_names, {}) def teardown( - self, - config: RepoConfig, - tables: Sequence[FeatureView], - entities: Sequence[Entity], + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], ): self._index = None self._in_memory_store.teardown() def online_read( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKey], - requested_features: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKey], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]]: if self._index is None: return [(None, None)] * len(entity_keys) - results: List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]] = [] + results = [] for entity_key in entity_keys: entity_key_tuple = tuple( f"{field.name}:{field.value.string_val}" @@ -123,11 +120,11 @@ def online_read( return results def online_write_batch( - self, - config: RepoConfig, - table: FeatureView, - data: List[Tuple[EntityKey, Dict[str, Value], datetime, Optional[datetime]]], - progress: Optional[Callable[[int], Any]], + self, + config: RepoConfig, + table: FeatureView, + data: List[Tuple[EntityKey, Dict[str, Value], datetime, Optional[datetime]]], + progress: Optional[Callable[[int], Any]], ) -> None: if self._index is None: self._logger.warning("Index is not initialized. Skipping write operation.") @@ -152,7 +149,7 @@ def online_write_batch( feature_vectors.append(feature_vector) entity_key_tuples.append(entity_key_tuple) - feature_vectors_array = np.array(feature_vectors) + feature_vectors = np.array(feature_vectors) existing_indices = [ self._in_memory_store.entity_keys.get(ekt, -1) for ekt in entity_key_tuples @@ -164,9 +161,9 @@ def online_write_batch( ) new_indices = np.arange( - self._index.ntotal, self._index.ntotal + len(feature_vectors_array) + self._index.ntotal, self._index.ntotal + len(feature_vectors) ) - self._index.add(feature_vectors_array) + self._index.add(feature_vectors) for ekt, idx in zip(entity_key_tuples, new_indices): self._in_memory_store.entity_keys[ekt] = idx @@ -175,13 +172,13 @@ def online_write_batch( progress(len(data)) def retrieve_online_documents( - self, - config: RepoConfig, - table: FeatureView, - requested_feature: str, - embedding: List[float], - top_k: int, - distance_metric: Optional[str] = None, + self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], + top_k: int, + distance_metric: Optional[str] = None, ) -> List[ Tuple[ Optional[datetime], @@ -230,11 +227,11 @@ def retrieve_online_documents( return results async def online_read_async( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKey], - requested_features: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKey], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]]: # Implement async read if needed raise NotImplementedError("Async read is not implemented for FaissOnlineStore") From 14ffa550fe45dee28375544c8e12577f37ae4bee Mon Sep 17 00:00:00 2001 From: cmuhao Date: Wed, 28 Aug 2024 21:10:49 -0700 Subject: [PATCH 06/14] add faiss & in memory online store Signed-off-by: cmuhao --- .../contrib/faiss_online_store.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py index ce26eca0ebf..6167f20f3a4 100644 --- a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py @@ -99,12 +99,9 @@ def online_read( if self._index is None: return [(None, None)] * len(entity_keys) - results = [] + results: List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]] = [] for entity_key in entity_keys: - entity_key_tuple = tuple( - f"{field.name}:{field.value.string_val}" - for field in entity_key.join_keys - ) + entity_key_tuple = tuple(entity_key.name, entity_key.join_keys) idx = self._in_memory_store.entity_keys.get(entity_key_tuple, -1) if idx == -1: results.append((None, None)) @@ -134,10 +131,7 @@ def online_write_batch( entity_key_tuples = [] for entity_key, feature_dict, _, _ in data: - entity_key_tuple = tuple( - f"{field.name}:{field.value.string_val}" - for field in entity_key.join_keys - ) + entity_key_tuple = (entity_key.name, entity_key.join_keys) feature_vector = np.array( [ feature_dict[name].double_val @@ -149,7 +143,7 @@ def online_write_batch( feature_vectors.append(feature_vector) entity_key_tuples.append(entity_key_tuple) - feature_vectors = np.array(feature_vectors) + feature_vectors_array = np.array(feature_vectors) existing_indices = [ self._in_memory_store.entity_keys.get(ekt, -1) for ekt in entity_key_tuples @@ -161,9 +155,9 @@ def online_write_batch( ) new_indices = np.arange( - self._index.ntotal, self._index.ntotal + len(feature_vectors) + self._index.ntotal, self._index.ntotal + len(feature_vectors_array) ) - self._index.add(feature_vectors) + self._index.add(feature_vectors_array) for ekt, idx in zip(entity_key_tuples, new_indices): self._in_memory_store.entity_keys[ekt] = idx From 7d7a1c6320338dbd4dbbbfa34f54615843684b1a Mon Sep 17 00:00:00 2001 From: cmuhao Date: Wed, 28 Aug 2024 21:17:07 -0700 Subject: [PATCH 07/14] add faiss & in memory online store Signed-off-by: cmuhao --- .../contrib/faiss_online_store.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py index 6167f20f3a4..fb8ab690c91 100644 --- a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py @@ -11,6 +11,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey from feast.protos.feast.types.Value_pb2 import Value from feast.repo_config import FeastConfigBaseModel +from feast.infra.key_encoding_utils import serialize_entity_key, deserialize_entity_key class FaissOnlineStoreConfig(FeastConfigBaseModel): @@ -99,10 +100,10 @@ def online_read( if self._index is None: return [(None, None)] * len(entity_keys) - results: List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]] = [] + results = [] for entity_key in entity_keys: - entity_key_tuple = tuple(entity_key.name, entity_key.join_keys) - idx = self._in_memory_store.entity_keys.get(entity_key_tuple, -1) + serialized_key = serialize_entity_key(entity_key, entity_key_serialization_version=2) + idx = self._in_memory_store.entity_keys.get(serialized_key, -1) if idx == -1: results.append((None, None)) else: @@ -128,10 +129,10 @@ def online_write_batch( return feature_vectors = [] - entity_key_tuples = [] + serialized_keys = [] for entity_key, feature_dict, _, _ in data: - entity_key_tuple = (entity_key.name, entity_key.join_keys) + serialized_key = serialize_entity_key(entity_key, entity_key_serialization_version=2) feature_vector = np.array( [ feature_dict[name].double_val @@ -141,12 +142,12 @@ def online_write_batch( ) feature_vectors.append(feature_vector) - entity_key_tuples.append(entity_key_tuple) + serialized_keys.append(serialized_key) feature_vectors_array = np.array(feature_vectors) existing_indices = [ - self._in_memory_store.entity_keys.get(ekt, -1) for ekt in entity_key_tuples + self._in_memory_store.entity_keys.get(sk, -1) for sk in serialized_keys ] mask = np.array(existing_indices) != -1 if np.any(mask): @@ -159,8 +160,8 @@ def online_write_batch( ) self._index.add(feature_vectors_array) - for ekt, idx in zip(entity_key_tuples, new_indices): - self._in_memory_store.entity_keys[ekt] = idx + for sk, idx in zip(serialized_keys, new_indices): + self._in_memory_store.entity_keys[sk] = idx if progress: progress(len(data)) From 36ed17642f8a7a4c607a4b00f7cc25b2c43f9ba1 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Wed, 28 Aug 2024 21:17:32 -0700 Subject: [PATCH 08/14] add faiss & in memory online store Signed-off-by: cmuhao --- .../contrib/faiss_online_store.py | 89 ++++++++++--------- 1 file changed, 49 insertions(+), 40 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py index fb8ab690c91..37a325fc159 100644 --- a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py @@ -7,11 +7,11 @@ from google.protobuf.timestamp_pb2 import Timestamp from feast import Entity, FeatureView, RepoConfig +from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey from feast.protos.feast.types.Value_pb2 import Value from feast.repo_config import FeastConfigBaseModel -from feast.infra.key_encoding_utils import serialize_entity_key, deserialize_entity_key class FaissOnlineStoreConfig(FeastConfigBaseModel): @@ -26,16 +26,20 @@ def __init__(self): self.feature_names: List[str] = [] self.entity_keys: Dict[Tuple[str, ...], int] = {} - def update(self, feature_names: List[str], entity_keys: Dict[Tuple[str, ...], int]): + def update(self, + feature_names: List[str], + entity_keys: Dict[Tuple[str, ...], int]): self.feature_names = feature_names self.entity_keys = entity_keys - def delete(self, entity_keys: List[Tuple[str, ...]]): + def delete(self, + entity_keys: List[Tuple[str, ...]]): for entity_key in entity_keys: if entity_key in self.entity_keys: del self.entity_keys[entity_key] - def read(self, entity_keys: List[Tuple[str, ...]]) -> List[Optional[int]]: + def read(self, + entity_keys: List[Tuple[str, ...]]) -> List[Optional[int]]: return [self.entity_keys.get(entity_key) for entity_key in entity_keys] def teardown(self): @@ -49,19 +53,20 @@ class FaissOnlineStore(OnlineStore): _config: Optional[FaissOnlineStoreConfig] = None _logger: logging.Logger = logging.getLogger(__name__) - def _get_index(self, config: RepoConfig) -> faiss.IndexIVFFlat: + def _get_index(self, + config: RepoConfig) -> faiss.IndexIVFFlat: if self._index is None or self._config is None: raise ValueError("Index is not initialized") return self._index def update( - self, - config: RepoConfig, - tables_to_delete: Sequence[FeatureView], - tables_to_keep: Sequence[FeatureView], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, ): feature_views = tables_to_keep if not feature_views: @@ -82,27 +87,29 @@ def update( self._in_memory_store.update(feature_names, {}) def teardown( - self, - config: RepoConfig, - tables: Sequence[FeatureView], - entities: Sequence[Entity], + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], ): self._index = None self._in_memory_store.teardown() def online_read( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKey], - requested_features: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKey], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]]: if self._index is None: return [(None, None)] * len(entity_keys) results = [] for entity_key in entity_keys: - serialized_key = serialize_entity_key(entity_key, entity_key_serialization_version=2) + serialized_key = serialize_entity_key( + entity_key, entity_key_serialization_version=2 + ) idx = self._in_memory_store.entity_keys.get(serialized_key, -1) if idx == -1: results.append((None, None)) @@ -118,11 +125,11 @@ def online_read( return results def online_write_batch( - self, - config: RepoConfig, - table: FeatureView, - data: List[Tuple[EntityKey, Dict[str, Value], datetime, Optional[datetime]]], - progress: Optional[Callable[[int], Any]], + self, + config: RepoConfig, + table: FeatureView, + data: List[Tuple[EntityKey, Dict[str, Value], datetime, Optional[datetime]]], + progress: Optional[Callable[[int], Any]], ) -> None: if self._index is None: self._logger.warning("Index is not initialized. Skipping write operation.") @@ -132,7 +139,9 @@ def online_write_batch( serialized_keys = [] for entity_key, feature_dict, _, _ in data: - serialized_key = serialize_entity_key(entity_key, entity_key_serialization_version=2) + serialized_key = serialize_entity_key( + entity_key, entity_key_serialization_version=2 + ) feature_vector = np.array( [ feature_dict[name].double_val @@ -167,13 +176,13 @@ def online_write_batch( progress(len(data)) def retrieve_online_documents( - self, - config: RepoConfig, - table: FeatureView, - requested_feature: str, - embedding: List[float], - top_k: int, - distance_metric: Optional[str] = None, + self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], + top_k: int, + distance_metric: Optional[str] = None, ) -> List[ Tuple[ Optional[datetime], @@ -222,11 +231,11 @@ def retrieve_online_documents( return results async def online_read_async( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKey], - requested_features: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKey], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]]: # Implement async read if needed raise NotImplementedError("Async read is not implemented for FaissOnlineStore") From 2d096ba6a70dd5bb82b2e0887b817ec2c6d574f7 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Wed, 28 Aug 2024 21:18:53 -0700 Subject: [PATCH 09/14] add faiss & in memory online store Signed-off-by: cmuhao --- .../contrib/faiss_online_store.py | 81 +++++++++---------- 1 file changed, 38 insertions(+), 43 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py index 37a325fc159..eac37fcb9af 100644 --- a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py @@ -24,22 +24,18 @@ class FaissOnlineStoreConfig(FeastConfigBaseModel): class InMemoryStore: def __init__(self): self.feature_names: List[str] = [] - self.entity_keys: Dict[Tuple[str, ...], int] = {} + self.entity_keys: Dict[str, int] = {} - def update(self, - feature_names: List[str], - entity_keys: Dict[Tuple[str, ...], int]): + def update(self, feature_names: List[str], entity_keys: Dict[str, int]): self.feature_names = feature_names self.entity_keys = entity_keys - def delete(self, - entity_keys: List[Tuple[str, ...]]): + def delete(self, entity_keys: List[str]): for entity_key in entity_keys: if entity_key in self.entity_keys: del self.entity_keys[entity_key] - def read(self, - entity_keys: List[Tuple[str, ...]]) -> List[Optional[int]]: + def read(self, entity_keys: List[str]) -> List[Optional[int]]: return [self.entity_keys.get(entity_key) for entity_key in entity_keys] def teardown(self): @@ -53,20 +49,19 @@ class FaissOnlineStore(OnlineStore): _config: Optional[FaissOnlineStoreConfig] = None _logger: logging.Logger = logging.getLogger(__name__) - def _get_index(self, - config: RepoConfig) -> faiss.IndexIVFFlat: + def _get_index(self, config: RepoConfig) -> faiss.IndexIVFFlat: if self._index is None or self._config is None: raise ValueError("Index is not initialized") return self._index def update( - self, - config: RepoConfig, - tables_to_delete: Sequence[FeatureView], - tables_to_keep: Sequence[FeatureView], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, ): feature_views = tables_to_keep if not feature_views: @@ -87,20 +82,20 @@ def update( self._in_memory_store.update(feature_names, {}) def teardown( - self, - config: RepoConfig, - tables: Sequence[FeatureView], - entities: Sequence[Entity], + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], ): self._index = None self._in_memory_store.teardown() def online_read( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKey], - requested_features: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKey], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]]: if self._index is None: return [(None, None)] * len(entity_keys) @@ -125,11 +120,11 @@ def online_read( return results def online_write_batch( - self, - config: RepoConfig, - table: FeatureView, - data: List[Tuple[EntityKey, Dict[str, Value], datetime, Optional[datetime]]], - progress: Optional[Callable[[int], Any]], + self, + config: RepoConfig, + table: FeatureView, + data: List[Tuple[EntityKey, Dict[str, Value], datetime, Optional[datetime]]], + progress: Optional[Callable[[int], Any]], ) -> None: if self._index is None: self._logger.warning("Index is not initialized. Skipping write operation.") @@ -176,13 +171,13 @@ def online_write_batch( progress(len(data)) def retrieve_online_documents( - self, - config: RepoConfig, - table: FeatureView, - requested_feature: str, - embedding: List[float], - top_k: int, - distance_metric: Optional[str] = None, + self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], + top_k: int, + distance_metric: Optional[str] = None, ) -> List[ Tuple[ Optional[datetime], @@ -231,11 +226,11 @@ def retrieve_online_documents( return results async def online_read_async( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKey], - requested_features: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKey], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]]: # Implement async read if needed raise NotImplementedError("Async read is not implemented for FaissOnlineStore") From 05b54c87690bdd135a85342bf56ee4c53c56a974 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Wed, 28 Aug 2024 21:23:15 -0700 Subject: [PATCH 10/14] add faiss & in memory online store Signed-off-by: cmuhao --- .../infra/online_stores/contrib/faiss_online_store.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py index eac37fcb9af..9103d57c4b7 100644 --- a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py @@ -7,7 +7,7 @@ from google.protobuf.timestamp_pb2 import Timestamp from feast import Entity, FeatureView, RepoConfig -from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.key_encoding_utils import serialize_entity_key, deserialize_entity_key from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey from feast.protos.feast.types.Value_pb2 import Value @@ -102,9 +102,7 @@ def online_read( results = [] for entity_key in entity_keys: - serialized_key = serialize_entity_key( - entity_key, entity_key_serialization_version=2 - ) + serialized_key = serialize_entity_key(entity_key, config.entity_key_serialization_version).hex() idx = self._in_memory_store.entity_keys.get(serialized_key, -1) if idx == -1: results.append((None, None)) @@ -134,9 +132,7 @@ def online_write_batch( serialized_keys = [] for entity_key, feature_dict, _, _ in data: - serialized_key = serialize_entity_key( - entity_key, entity_key_serialization_version=2 - ) + serialized_key = serialize_entity_key(entity_key, config.entity_key_serialization_version).hex() feature_vector = np.array( [ feature_dict[name].double_val From 086b02ed854dc652b12e00c6522896958b0366f6 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Wed, 28 Aug 2024 21:24:57 -0700 Subject: [PATCH 11/14] add faiss & in memory online store Signed-off-by: cmuhao --- .../feast/infra/online_stores/contrib/faiss_online_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py index 9103d57c4b7..1fbc7216957 100644 --- a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py @@ -100,7 +100,7 @@ def online_read( if self._index is None: return [(None, None)] * len(entity_keys) - results = [] + results: List[Tuple[Optional[datetime], Optional[Dict[str, Any]]]] = [] for entity_key in entity_keys: serialized_key = serialize_entity_key(entity_key, config.entity_key_serialization_version).hex() idx = self._in_memory_store.entity_keys.get(serialized_key, -1) From 2485e2b9fcc1253846e45250dcbda850e4bdef68 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Wed, 28 Aug 2024 21:25:13 -0700 Subject: [PATCH 12/14] add faiss & in memory online store Signed-off-by: cmuhao --- .../infra/online_stores/contrib/faiss_online_store.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py index 1fbc7216957..f69ca899d62 100644 --- a/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/faiss_online_store.py @@ -7,7 +7,7 @@ from google.protobuf.timestamp_pb2 import Timestamp from feast import Entity, FeatureView, RepoConfig -from feast.infra.key_encoding_utils import serialize_entity_key, deserialize_entity_key +from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey from feast.protos.feast.types.Value_pb2 import Value @@ -102,7 +102,9 @@ def online_read( results: List[Tuple[Optional[datetime], Optional[Dict[str, Any]]]] = [] for entity_key in entity_keys: - serialized_key = serialize_entity_key(entity_key, config.entity_key_serialization_version).hex() + serialized_key = serialize_entity_key( + entity_key, config.entity_key_serialization_version + ).hex() idx = self._in_memory_store.entity_keys.get(serialized_key, -1) if idx == -1: results.append((None, None)) @@ -132,7 +134,9 @@ def online_write_batch( serialized_keys = [] for entity_key, feature_dict, _, _ in data: - serialized_key = serialize_entity_key(entity_key, config.entity_key_serialization_version).hex() + serialized_key = serialize_entity_key( + entity_key, config.entity_key_serialization_version + ).hex() feature_vector = np.array( [ feature_dict[name].double_val From a109184943c6cf174ec12ab93a726571ae781cc7 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Fri, 6 Sep 2024 01:47:10 -0700 Subject: [PATCH 13/14] add dependency Signed-off-by: cmuhao --- README.md | 6 +++++- setup.py | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 10c20050d3f..6f17e7fa6c6 100644 --- a/README.md +++ b/README.md @@ -187,6 +187,7 @@ The list below contains the functionality that contributors are planning to deve * [x] On-demand Transformations (Beta release. See [RFC](https://docs.google.com/document/d/1lgfIw0Drc65LpaxbUu49RCeJgMew547meSJttnUqz7c/edit#)) * [x] Streaming Transformations (Alpha release. See [RFC](https://docs.google.com/document/d/1UzEyETHUaGpn0ap4G82DHluiCj7zEbrQLkJJkKSv4e8/edit)) * [ ] Batch transformation (In progress. See [RFC](https://docs.google.com/document/d/1964OkzuBljifDvkV-0fakp2uaijnVzdwWNGdz7Vz50A/edit)) + * [ ] Persistent On-demand Transformations (Beta release. See [GitHub Issue](https://github.com/feast-dev/feast/issues/4376)) * **Streaming** * [x] [Custom streaming ingestion job support](https://docs.feast.dev/how-to-guides/customizing-feast/creating-a-custom-provider) * [x] [Push based streaming data ingestion to online store](https://docs.feast.dev/reference/data-sources/push) @@ -208,6 +209,9 @@ The list below contains the functionality that contributors are planning to deve * [x] Amundsen integration (see [Feast extractor](https://github.com/amundsen-io/amundsen/blob/main/databuilder/databuilder/extractor/feast_extractor.py)) * [x] DataHub integration (see [DataHub Feast docs](https://datahubproject.io/docs/generated/ingestion/sources/feast/)) * [x] Feast Web UI (Beta release. See [docs](https://docs.feast.dev/reference/alpha-web-ui)) + * [ ] Feast Lineage Explorer +* **Natural Language Processing** + * [x] Vector Search (Alpha release. See [RFC](https://docs.google.com/document/d/18IWzLEA9i2lDWnbfbwXnMCg3StlqaLVI-uRpQjr_Vos/edit#heading=h.9gaqqtox9jg6)) ## 🎓 Important Resources @@ -230,4 +234,4 @@ Thanks goes to these incredible people: - + \ No newline at end of file diff --git a/setup.py b/setup.py index 6da5e8226af..9667212acc6 100644 --- a/setup.py +++ b/setup.py @@ -155,6 +155,8 @@ MSSQL_REQUIRED = ["ibis-framework[mssql]>=9.0.0,<10"] +FAISS_REQUIRED = ["faiss-cpu>=1.7.0,<2"] + CI_REQUIRED = ( [ "build", @@ -219,6 +221,7 @@ + SQLITE_VEC_REQUIRED + SINGLESTORE_REQUIRED + OPENTELEMETRY + + FAISS_REQUIRED ) DOCS_REQUIRED = CI_REQUIRED From 44763db48237f5f9483c23002a598e0f84f97c81 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Mon, 9 Sep 2024 21:03:57 -0700 Subject: [PATCH 14/14] update package name Signed-off-by: cmuhao --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 9667212acc6..1dead275519 100644 --- a/setup.py +++ b/setup.py @@ -392,6 +392,7 @@ def run(self): "sqlite_vec": SQLITE_VEC_REQUIRED, "singlestore": SINGLESTORE_REQUIRED, "opentelemetry": OPENTELEMETRY, + "faiss": FAISS_REQUIRED, }, include_package_data=True, license="Apache",