From d97aaea26e661f4c43ba5e39cb4598d978c922c6 Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 26 Apr 2021 01:13:37 +0200 Subject: [PATCH 01/22] Add support for Redis as online store Signed-off-by: qooba --- sdk/python/feast/feature_store.py | 5 +- sdk/python/feast/infra/gcp.py | 1 + sdk/python/feast/infra/local.py | 1 + sdk/python/feast/infra/provider.py | 5 + sdk/python/feast/infra/redis_provider.py | 209 +++++++++++++++++++++++ sdk/python/feast/repo_config.py | 18 +- sdk/python/tests/foo_provider.py | 1 + 7 files changed, 237 insertions(+), 3 deletions(-) create mode 100644 sdk/python/feast/infra/redis_provider.py diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 89825d82057..5e5a1f5ec18 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -541,7 +541,10 @@ def get_online_features( table, union_of_entity_keys, entity_name_to_join_key_map ) read_rows = provider.online_read( - project=self.project, table=table, entity_keys=entity_keys, + project=self.project, + table=table, + entity_keys=entity_keys, + requested_features=requested_features, ) for row_idx, read_row in enumerate(read_rows): row_ts, feature_data = read_row diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 7299dd99f1a..dce073e1d39 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -128,6 +128,7 @@ def online_read( project: str, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], + requested_features: List[str] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: client = self._initialize_client() diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index f9516dc7819..3827e6aea60 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -131,6 +131,7 @@ def online_read( project: str, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], + requested_features: List[str] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: conn = self._get_conn() diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 05dac141c85..82eedf6b6e0 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -125,6 +125,7 @@ def online_read( project: str, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], + requested_features: List[str] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: """ Read feature values given an Entity Key. This is a low level interface, not @@ -144,6 +145,10 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider: from feast.infra.gcp import GcpProvider return GcpProvider(config) + elif config.provider == "redis": + from feast.infra.redis_provider import RedisProvider + + return RedisProvider(config) elif config.provider == "local": from feast.infra.local import LocalProvider diff --git a/sdk/python/feast/infra/redis_provider.py b/sdk/python/feast/infra/redis_provider.py new file mode 100644 index 00000000000..69cf8587379 --- /dev/null +++ b/sdk/python/feast/infra/redis_provider.py @@ -0,0 +1,209 @@ +import os +from datetime import datetime +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union + +import mmh3 +import pandas as pd +from redis import Redis +from rediscluster import RedisCluster + +from feast import FeatureTable, utils +from feast.entity import Entity +from feast.feature_view import FeatureView +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.offline_stores.helpers import get_offline_store_from_sources +from feast.infra.provider import ( + Provider, + RetrievalJob, + _convert_arrow_to_proto, + _get_column_names, + _run_field_mapping, +) +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.registry import Registry +from feast.repo_config import RedisOnlineStoreConfig, RepoConfig + + +class RedisProvider(Provider): + _db_path: Path + + def __init__(self, config: RepoConfig): + assert isinstance(config.online_store, RedisOnlineStoreConfig) + + def _get_client(self): + if os.environ["REDIS_TYPE"] == "REDIS_CLUSTER": + return RedisCluster( + host=os.environ["REDIS_HOST"], + port=os.environ["REDIS_PORT"], + decode_responses=True, + ) + else: + return Redis( + host=os.environ["REDIS_HOST"], port=os.environ["REDIS_PORT"], db=0 + ) + + def update_infra( + self, + project: str, + tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], + tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + client = self._get_client() + # TODO + + def teardown_infra( + self, + project: str, + tables: Sequence[Union[FeatureTable, FeatureView]], + entities: Sequence[Entity], + ) -> None: + # according to the repos_operations.py we can delete the whole project + client = self._get_client() + keys = client.keys("{project}:*") + client.unlink(*keys) + + def online_write_batch( + self, + project: str, + table: Union[FeatureTable, FeatureView], + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + client = self._get_client() + + entity_hset = {} + feature_view = table.name + + for entity_key, values, timestamp, created_ts in data: + redis_key_bin = _redis_key(project, entity_key) + timestamp = utils.make_tzaware(timestamp).strftime("%Y-%m-%d %H:%M:%S") + entity_hset[f"_ts:{feature_view}"] = timestamp + + if created_ts is not None: + created_ts = utils.make_tzaware(created_ts).strftime( + "%Y-%m-%d %H:%M:%S" + ) + entity_hset[f"_created_ts:{feature_view}"] = created_ts + + for feature_name, val in values.items(): + f_key = _mmh3(f"{feature_view}:{feature_name}") + entity_hset[f_key] = val.SerializeToString() + + client.hset(redis_key_bin, mapping=entity_hset) + + def online_read( + self, + project: str, + table: Union[FeatureTable, FeatureView], + entity_keys: List[EntityKeyProto], + requested_features: List[str] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + + client = self._get_client() + feature_view = table.name + + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + + for entity_key in entity_keys: + redis_key_bin = _redis_key(project, entity_key) + hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features] + ts_key = f"_ts:{feature_view}" + hset_keys.append(ts_key) + values = client.hmget(redis_key_bin, hset_keys) + + requested_features.append(ts_key) + res_val = dict(zip(requested_features, values)) + res_ts = res_val.pop(ts_key) + + res = {} + for feature_name, val_bin in res_val.items(): + val = ValueProto() + val.ParseFromString(val_bin) + res[feature_name] = val + + if not res: + result.append((None, None)) + else: + result.append((res_ts, res)) + return result + + def materialize_single_feature_view( + self, + feature_view: FeatureView, + start_date: datetime, + end_date: datetime, + registry: Registry, + project: str, + ) -> None: + entities = [] + for entity_name in feature_view.entities: + entities.append(registry.get_entity(entity_name, project)) + + ( + join_key_columns, + feature_name_columns, + event_timestamp_column, + created_timestamp_column, + ) = _get_column_names(feature_view, entities) + + start_date = utils.make_tzaware(start_date) + end_date = utils.make_tzaware(end_date) + + offline_store = get_offline_store_from_sources([feature_view.input]) + table = offline_store.pull_latest_from_table_or_query( + data_source=feature_view.input, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + event_timestamp_column=event_timestamp_column, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + ) + + if feature_view.input.field_mapping is not None: + table = _run_field_mapping(table, feature_view.input.field_mapping) + + join_keys = [entity.join_key for entity in entities] + rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) + + self.online_write_batch(project, feature_view, rows_to_write, None) + + feature_view.materialization_intervals.append((start_date, end_date)) + registry.apply_feature_view(feature_view, project) + + @staticmethod + def get_historical_features( + config: RepoConfig, + feature_views: List[FeatureView], + feature_refs: List[str], + entity_df: Union[pd.DataFrame, str], + registry: Registry, + project: str, + ) -> RetrievalJob: + offline_store = get_offline_store_from_sources( + [feature_view.input for feature_view in feature_views] + ) + return offline_store.get_historical_features( + config=config, + feature_views=feature_views, + feature_refs=feature_refs, + entity_df=entity_df, + registry=registry, + project=project, + ) + + +def _redis_key(project: str, entity_key: EntityKeyProto) -> str: + key = _mmh3(serialize_entity_key(entity_key)) + return f"{project}:{key}" + + +def _mmh3(key: str) -> str: + return mmh3.hash_bytes(key).hex() diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 89268584f36..c404003fd0c 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -52,7 +52,16 @@ class DatastoreOnlineStoreConfig(FeastBaseModel): """ (optional) Amount of feature rows per batch being written into Datastore""" -OnlineStoreConfig = Union[DatastoreOnlineStoreConfig, SqliteOnlineStoreConfig] +class RedisOnlineStoreConfig(FeastBaseModel): + """Online store config for Redis store""" + + type: Literal["redis"] = "redis" + """Online store type selector""" + + +OnlineStoreConfig = Union[ + DatastoreOnlineStoreConfig, SqliteOnlineStoreConfig, RedisOnlineStoreConfig +] class FileOfflineStoreConfig(FeastBaseModel): @@ -101,7 +110,7 @@ class RepoConfig(FeastBaseModel): """ provider: StrictStr - """ str: local or gcp """ + """ str: local or gcp or redis """ online_store: OnlineStoreConfig = SqliteOnlineStoreConfig() """ OnlineStoreConfig: Online store configuration (optional depending on provider) """ @@ -141,6 +150,9 @@ def _validate_online_store_config(cls, values): values["online_store"]["type"] = "sqlite" elif values["provider"] == "gcp": values["online_store"]["type"] = "datastore" + elif values["provider"] == "redis": + values["online_store"]["type"] = "redis" + online_store_type = values["online_store"]["type"] @@ -153,6 +165,8 @@ def _validate_online_store_config(cls, values): SqliteOnlineStoreConfig(**values["online_store"]) elif online_store_type == "datastore": DatastoreOnlineStoreConfig(**values["online_store"]) + elif online_store_type == "redis": + RedisOnlineStoreConfig(**values["online_store"]) else: raise ValueError(f"Invalid online store type {online_store_type}") except ValidationError as e: diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index b8705eef99c..b313ad9cc7d 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -70,6 +70,7 @@ def online_read( project: str, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], + requested_features: List[str] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: pass From ae9ed8ebf5deeb6b991f07bd6b9717403618bd58 Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 26 Apr 2021 01:49:22 +0200 Subject: [PATCH 02/22] Add support for Redis as online store Signed-off-by: qooba --- sdk/python/setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index e2bb02f10d0..970d191d18e 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -56,6 +56,7 @@ "tabulate==0.8.*", "toml==0.10.*", "tqdm==4.*", + "redis-py-cluster==2.1.2", ] GCP_REQUIRED = [ From e165f003a81ad386359626c2e5a5d55602befd60 Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 26 Apr 2021 22:36:03 +0200 Subject: [PATCH 03/22] Add redis provider Signed-off-by: qooba --- sdk/python/feast/infra/redis_provider.py | 56 ++++++++++++++++++------ 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/sdk/python/feast/infra/redis_provider.py b/sdk/python/feast/infra/redis_provider.py index 69cf8587379..bccc12c88a7 100644 --- a/sdk/python/feast/infra/redis_provider.py +++ b/sdk/python/feast/infra/redis_provider.py @@ -1,3 +1,4 @@ +import json import os from datetime import datetime from pathlib import Path @@ -32,18 +33,6 @@ class RedisProvider(Provider): def __init__(self, config: RepoConfig): assert isinstance(config.online_store, RedisOnlineStoreConfig) - def _get_client(self): - if os.environ["REDIS_TYPE"] == "REDIS_CLUSTER": - return RedisCluster( - host=os.environ["REDIS_HOST"], - port=os.environ["REDIS_PORT"], - decode_responses=True, - ) - else: - return Redis( - host=os.environ["REDIS_HOST"], port=os.environ["REDIS_PORT"], db=0 - ) - def update_infra( self, project: str, @@ -54,7 +43,6 @@ def update_infra( partial: bool, ): client = self._get_client() - # TODO def teardown_infra( self, @@ -178,6 +166,48 @@ def materialize_single_feature_view( feature_view.materialization_intervals.append((start_date, end_date)) registry.apply_feature_view(feature_view, project) + def _get_cs(self): + """ + Reads Redis connections string using format + for RedisCluster: + redis1:6379,redis2:6379,decode_responses=true,skip_full_coverage_check=true,ssl=true,password=... + for Redis: + redis_master:6379,db=0,ssl=true,password=... + """ + connection_string = os.environ["REDIS_CONNECTION_STRING"] + startup_nodes = [ + dict(zip(["host", "port"], c.split(":"))) + for c in connection_string.split(",") + if not "=" in c + ] + params = {} + for c in connection_string.split(","): + if "=" in c: + kv = c.split("=") + try: + kv[1] = json.loads(kv[1]) + except json.JSONDecodeError: + ... + + it = iter(kv) + params.update(dict(zip(it, it))) + + return startup_nodes, params + + def _get_client(self): + """ + Creates the Redis client RedisCluster or Redis depending on configuration + """ + startup_nodes, kwargs = self._get_cs() + + if os.environ["REDIS_TYPE"] == "REDIS_CLUSTER": + kwargs["startup_nodes"] = startup_nodes + return RedisCluster(**kwargs) + else: + kwargs["host"] = startup_nodes[0]["host"] + kwargs["port"] = startup_nodes[0]["port"] + return Redis(**kwargs) + @staticmethod def get_historical_features( config: RepoConfig, From df8f3cba48a86c3c247466cfa70335659d5ea8b3 Mon Sep 17 00:00:00 2001 From: qooba Date: Tue, 27 Apr 2021 03:02:19 +0200 Subject: [PATCH 04/22] Add redis online provider Signed-off-by: qooba --- sdk/python/feast/infra/redis_provider.py | 50 ++++++++++++++++-------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/sdk/python/feast/infra/redis_provider.py b/sdk/python/feast/infra/redis_provider.py index bccc12c88a7..18c42e92ea2 100644 --- a/sdk/python/feast/infra/redis_provider.py +++ b/sdk/python/feast/infra/redis_provider.py @@ -3,12 +3,12 @@ from datetime import datetime from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union - +import struct import mmh3 import pandas as pd from redis import Redis from rediscluster import RedisCluster - +from google.protobuf.timestamp_pb2 import Timestamp from feast import FeatureTable, utils from feast.entity import Entity from feast.feature_view import FeatureView @@ -22,10 +22,12 @@ _run_field_mapping, ) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.storage.Redis_pb2 import RedisKeyV2 as RedisKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import Registry from feast.repo_config import RedisOnlineStoreConfig, RepoConfig +EX_SECONDS=253402300799 class RedisProvider(Provider): _db_path: Path @@ -52,8 +54,8 @@ def teardown_infra( ) -> None: # according to the repos_operations.py we can delete the whole project client = self._get_client() - keys = client.keys("{project}:*") - client.unlink(*keys) + #keys = client.keys("{project}:*") + #client.unlink(*keys) def online_write_batch( self, @@ -69,16 +71,17 @@ def online_write_batch( entity_hset = {} feature_view = table.name + ex=Timestamp() + ex.seconds=EX_SECONDS + ex_str=ex.SerializeToString() + for entity_key, values, timestamp, created_ts in data: redis_key_bin = _redis_key(project, entity_key) - timestamp = utils.make_tzaware(timestamp).strftime("%Y-%m-%d %H:%M:%S") - entity_hset[f"_ts:{feature_view}"] = timestamp - - if created_ts is not None: - created_ts = utils.make_tzaware(created_ts).strftime( - "%Y-%m-%d %H:%M:%S" - ) - entity_hset[f"_created_ts:{feature_view}"] = created_ts + timestamp = int(utils.make_tzaware(timestamp).timestamp()) + ts=Timestamp() + ts.seconds=timestamp + entity_hset[f"_ts:{feature_view}"] = ts.SerializeToString() + entity_hset[f"_ex:{feature_view}"] = ex_str for feature_name, val in values.items(): f_key = _mmh3(f"{feature_view}:{feature_name}") @@ -113,7 +116,7 @@ def online_read( res = {} for feature_name, val_bin in res_val.items(): val = ValueProto() - val.ParseFromString(val_bin) + val.FromString(val_bin) res[feature_name] = val if not res: @@ -231,9 +234,22 @@ def get_historical_features( def _redis_key(project: str, entity_key: EntityKeyProto) -> str: - key = _mmh3(serialize_entity_key(entity_key)) - return f"{project}:{key}" - + redis_key = RedisKeyProto( + project=project, + entity_names=entity_key.join_keys, + entity_values=entity_key.entity_values, + ) + #key = _mmh3(serialize_entity_key(entity_key)) + return redis_key.SerializeToString() def _mmh3(key: str) -> str: - return mmh3.hash_bytes(key).hex() + """ + Calculate murmur3_32 hash which is equal to scala version which is using little endian: + https://stackoverflow.com/questions/29932956/murmur3-hash-different-result-between-python-and-java-implementation + https://stackoverflow.com/questions/13141787/convert-decimal-int-to-little-endian-string-x-x + """ + key_hash = mmh3.hash(key,signed=False) + bytes.fromhex(struct.pack(' Date: Tue, 27 Apr 2021 22:44:00 +0200 Subject: [PATCH 05/22] Add redis online provider Signed-off-by: qooba --- sdk/python/feast/infra/redis_provider.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/infra/redis_provider.py b/sdk/python/feast/infra/redis_provider.py index 18c42e92ea2..9302c3dbf2a 100644 --- a/sdk/python/feast/infra/redis_provider.py +++ b/sdk/python/feast/infra/redis_provider.py @@ -54,8 +54,8 @@ def teardown_infra( ) -> None: # according to the repos_operations.py we can delete the whole project client = self._get_client() - #keys = client.keys("{project}:*") - #client.unlink(*keys) + keys = client.keys("*{project}:*") + client.unlink(*keys) def online_write_batch( self, @@ -108,15 +108,19 @@ def online_read( ts_key = f"_ts:{feature_view}" hset_keys.append(ts_key) values = client.hmget(redis_key_bin, hset_keys) - requested_features.append(ts_key) res_val = dict(zip(requested_features, values)) - res_ts = res_val.pop(ts_key) + + res_ts = Timestamp() + ts_val = res_val.pop(ts_key) + if ts_val: + res_ts.ParseFromString(ts_val) res = {} for feature_name, val_bin in res_val.items(): val = ValueProto() - val.FromString(val_bin) + if val_bin: + val.ParseFromString(val_bin) res[feature_name] = val if not res: @@ -235,10 +239,10 @@ def get_historical_features( def _redis_key(project: str, entity_key: EntityKeyProto) -> str: redis_key = RedisKeyProto( - project=project, - entity_names=entity_key.join_keys, - entity_values=entity_key.entity_values, - ) + project=project, + entity_names=entity_key.join_keys, + entity_values=entity_key.entity_values, + ) #key = _mmh3(serialize_entity_key(entity_key)) return redis_key.SerializeToString() @@ -249,7 +253,7 @@ def _mmh3(key: str) -> str: https://stackoverflow.com/questions/13141787/convert-decimal-int-to-little-endian-string-x-x """ key_hash = mmh3.hash(key,signed=False) - bytes.fromhex(struct.pack(' Date: Tue, 27 Apr 2021 22:48:04 +0200 Subject: [PATCH 06/22] Add redis online provider Signed-off-by: qooba --- sdk/python/feast/infra/redis_provider.py | 31 ++++++++++++------------ 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/sdk/python/feast/infra/redis_provider.py b/sdk/python/feast/infra/redis_provider.py index 9302c3dbf2a..352b577bf2b 100644 --- a/sdk/python/feast/infra/redis_provider.py +++ b/sdk/python/feast/infra/redis_provider.py @@ -1,14 +1,16 @@ import json import os +import struct from datetime import datetime from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union -import struct + import mmh3 import pandas as pd +from google.protobuf.timestamp_pb2 import Timestamp from redis import Redis from rediscluster import RedisCluster -from google.protobuf.timestamp_pb2 import Timestamp + from feast import FeatureTable, utils from feast.entity import Entity from feast.feature_view import FeatureView @@ -21,13 +23,14 @@ _get_column_names, _run_field_mapping, ) -from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.storage.Redis_pb2 import RedisKeyV2 as RedisKeyProto +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import Registry from feast.repo_config import RedisOnlineStoreConfig, RepoConfig -EX_SECONDS=253402300799 +EX_SECONDS = 253402300799 + class RedisProvider(Provider): _db_path: Path @@ -71,15 +74,15 @@ def online_write_batch( entity_hset = {} feature_view = table.name - ex=Timestamp() - ex.seconds=EX_SECONDS - ex_str=ex.SerializeToString() + ex = Timestamp() + ex.seconds = EX_SECONDS + ex_str = ex.SerializeToString() for entity_key, values, timestamp, created_ts in data: redis_key_bin = _redis_key(project, entity_key) timestamp = int(utils.make_tzaware(timestamp).timestamp()) - ts=Timestamp() - ts.seconds=timestamp + ts = Timestamp() + ts.seconds = timestamp entity_hset[f"_ts:{feature_view}"] = ts.SerializeToString() entity_hset[f"_ex:{feature_view}"] = ex_str @@ -243,17 +246,15 @@ def _redis_key(project: str, entity_key: EntityKeyProto) -> str: entity_names=entity_key.join_keys, entity_values=entity_key.entity_values, ) - #key = _mmh3(serialize_entity_key(entity_key)) + # key = _mmh3(serialize_entity_key(entity_key)) return redis_key.SerializeToString() + def _mmh3(key: str) -> str: """ Calculate murmur3_32 hash which is equal to scala version which is using little endian: https://stackoverflow.com/questions/29932956/murmur3-hash-different-result-between-python-and-java-implementation https://stackoverflow.com/questions/13141787/convert-decimal-int-to-little-endian-string-x-x """ - key_hash = mmh3.hash(key,signed=False) - return bytes.fromhex(struct.pack(' Date: Thu, 29 Apr 2021 01:57:49 +0200 Subject: [PATCH 07/22] Add redis online provider Signed-off-by: qooba --- sdk/python/feast/infra/redis_provider.py | 64 ++++++++++++------------ 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/sdk/python/feast/infra/redis_provider.py b/sdk/python/feast/infra/redis_provider.py index 352b577bf2b..3e4373cc22b 100644 --- a/sdk/python/feast/infra/redis_provider.py +++ b/sdk/python/feast/infra/redis_provider.py @@ -14,7 +14,6 @@ from feast import FeatureTable, utils from feast.entity import Entity from feast.feature_view import FeatureView -from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.offline_stores.helpers import get_offline_store_from_sources from feast.infra.provider import ( Provider, @@ -47,7 +46,7 @@ def update_infra( entities_to_keep: Sequence[Entity], partial: bool, ): - client = self._get_client() + pass def teardown_infra( self, @@ -80,9 +79,8 @@ def online_write_batch( for entity_key, values, timestamp, created_ts in data: redis_key_bin = _redis_key(project, entity_key) - timestamp = int(utils.make_tzaware(timestamp).timestamp()) ts = Timestamp() - ts.seconds = timestamp + ts.seconds = int(utils.make_tzaware(timestamp).timestamp()) entity_hset[f"_ts:{feature_view}"] = ts.SerializeToString() entity_hset[f"_ex:{feature_view}"] = ex_str @@ -105,31 +103,33 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - for entity_key in entity_keys: - redis_key_bin = _redis_key(project, entity_key) - hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features] - ts_key = f"_ts:{feature_view}" - hset_keys.append(ts_key) - values = client.hmget(redis_key_bin, hset_keys) - requested_features.append(ts_key) - res_val = dict(zip(requested_features, values)) - - res_ts = Timestamp() - ts_val = res_val.pop(ts_key) - if ts_val: - res_ts.ParseFromString(ts_val) - - res = {} - for feature_name, val_bin in res_val.items(): - val = ValueProto() - if val_bin: - val.ParseFromString(val_bin) - res[feature_name] = val - - if not res: - result.append((None, None)) - else: - result.append((res_ts, res)) + if requested_features: + for entity_key in entity_keys: + redis_key_bin = _redis_key(project, entity_key) + hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features] + ts_key = f"_ts:{feature_view}" + hset_keys.append(ts_key) + values = client.hmget(redis_key_bin, hset_keys) + requested_features.append(ts_key) + res_val = dict(zip(requested_features, values)) + + res_ts = Timestamp() + ts_val = res_val.pop(ts_key) + if ts_val: + res_ts.ParseFromString(ts_val) + + res = {} + for feature_name, val_bin in res_val.items(): + val = ValueProto() + if val_bin: + val.ParseFromString(val_bin) + res[feature_name] = val + + if not res: + result.append((None, None)) + else: + timestamp = datetime.fromtimestamp(res_ts.seconds) + result.append((timestamp, res)) return result def materialize_single_feature_view( @@ -188,7 +188,7 @@ def _get_cs(self): startup_nodes = [ dict(zip(["host", "port"], c.split(":"))) for c in connection_string.split(",") - if not "=" in c + if "=" not in c ] params = {} for c in connection_string.split(","): @@ -240,7 +240,7 @@ def get_historical_features( ) -def _redis_key(project: str, entity_key: EntityKeyProto) -> str: +def _redis_key(project: str, entity_key: EntityKeyProto): redis_key = RedisKeyProto( project=project, entity_names=entity_key.join_keys, @@ -250,7 +250,7 @@ def _redis_key(project: str, entity_key: EntityKeyProto) -> str: return redis_key.SerializeToString() -def _mmh3(key: str) -> str: +def _mmh3(key: str): """ Calculate murmur3_32 hash which is equal to scala version which is using little endian: https://stackoverflow.com/questions/29932956/murmur3-hash-different-result-between-python-and-java-implementation From e434e59a848b450875e2cda8b3198961968f36aa Mon Sep 17 00:00:00 2001 From: qooba Date: Fri, 7 May 2021 00:06:43 +0200 Subject: [PATCH 08/22] Add redis online provider - integration tests Signed-off-by: qooba --- .github/workflows/integration_tests.yml | 15 ++++- sdk/python/tests/test_cli_redis.py | 55 +++++++++++++++++++ .../test_offline_online_store_consistency.py | 39 +++++++++++++ 3 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 sdk/python/tests/test_cli_redis.py diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index 70abce32dfe..596fa80307c 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -16,6 +16,16 @@ jobs: env: OS: ${{ matrix.os }} PYTHON: ${{ matrix.python-version }} + services: + redis: + image: redis + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 steps: - uses: actions/checkout@v2 - name: Setup Python @@ -43,6 +53,9 @@ jobs: run: make install-python-ci-dependencies - name: Test python run: FEAST_TELEMETRY=False pytest --cov=./ --cov-report=xml --verbose --color=yes sdk/python/tests --integration + env: + REDIS_TYPE: REDIS + REDIS_CONNECTION_STRING: localhost:6379,db=0 - name: Upload coverage to Codecov uses: codecov/codecov-action@v1 with: @@ -51,4 +64,4 @@ jobs: flags: integrationtests env_vars: OS,PYTHON fail_ci_if_error: true - verbose: true \ No newline at end of file + verbose: true diff --git a/sdk/python/tests/test_cli_redis.py b/sdk/python/tests/test_cli_redis.py new file mode 100644 index 00000000000..c1a1a4f4cd7 --- /dev/null +++ b/sdk/python/tests/test_cli_redis.py @@ -0,0 +1,55 @@ +import random +import string +import tempfile +from pathlib import Path +from textwrap import dedent + +import pytest + +from feast.feature_store import FeatureStore +from tests.cli_utils import CliRunner +from tests.online_read_write_test import basic_rw_test + + +@pytest.mark.integration +def test_basic() -> None: + project_id = "".join( + random.choice(string.ascii_lowercase + string.digits) for _ in range(10) + ) + runner = CliRunner() + with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: + + repo_path = Path(repo_dir_name) + data_path = Path(data_dir_name) + + repo_config = repo_path / "feature_store.yaml" + + repo_config.write_text( + dedent( + f""" + project: {project_id} + registry: {data_path / "registry.db"} + provider: redis + """ + ) + ) + + repo_example = repo_path / "example.py" + repo_example.write_text( + (Path(__file__).parent / "example_feature_repo_1.py").read_text() + ) + + result = runner.run(["apply"], cwd=repo_path) + assert result.returncode == 0 + + # Doing another apply should be a no op, and should not cause errors + result = runner.run(["apply"], cwd=repo_path) + assert result.returncode == 0 + + basic_rw_test( + FeatureStore(repo_path=str(repo_path), config=None), + view_name="driver_locations", + ) + + result = runner.run(["teardown"], cwd=repo_path) + assert result.returncode == 0 diff --git a/sdk/python/tests/test_offline_online_store_consistency.py b/sdk/python/tests/test_offline_online_store_consistency.py index c39fb427541..b20effc9434 100644 --- a/sdk/python/tests/test_offline_online_store_consistency.py +++ b/sdk/python/tests/test_offline_online_store_consistency.py @@ -146,6 +146,39 @@ def prep_local_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: yield fs, fv +@contextlib.contextmanager +def prep_redis_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: + with tempfile.NamedTemporaryFile(suffix=".parquet") as f: + df = create_dataset() + f.close() + df.to_parquet(f.name) + file_source = FileSource( + file_format=ParquetFormat(), + file_url=f"file://{f.name}", + event_timestamp_column="ts", + created_timestamp_column="created_ts", + date_partition_column="", + field_mapping={"ts_1": "ts", "id": "driver_id"}, + ) + fv = get_feature_view(file_source) + e = Entity( + name="driver", + description="id for driver", + join_key="driver_id", + value_type=ValueType.INT32, + ) + with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory(): + config = RepoConfig( + registry=str(Path(repo_dir_name) / "registry.db"), + project=f"test_bq_correctness_{str(uuid.uuid4()).replace('-', '')}", + provider="redis", + ) + fs = FeatureStore(config=config) + fs.apply([fv, e]) + + yield fs, fv + + # Checks that both offline & online store values are as expected def check_offline_and_online_features( fs: FeatureStore, @@ -221,6 +254,12 @@ def test_bq_offline_online_store_consistency(bq_source_type: str): run_offline_online_store_consistency_test(fs, fv) +@pytest.mark.integration +def test_redis_offline_online_store_consistency(): + with prep_redis_fs_and_fv() as (fs, fv): + run_offline_online_store_consistency_test(fs, fv) + + def test_local_offline_online_store_consistency(): with prep_local_fs_and_fv() as (fs, fv): run_offline_online_store_consistency_test(fs, fv) From 8ccfc6bd0bedc5c263b6fb7c8bc810debea018ac Mon Sep 17 00:00:00 2001 From: qooba Date: Fri, 7 May 2021 01:41:48 +0200 Subject: [PATCH 09/22] Add redis online provider - integration tests Signed-off-by: qooba --- sdk/python/feast/infra/redis_provider.py | 67 ++++++++++++++---------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/sdk/python/feast/infra/redis_provider.py b/sdk/python/feast/infra/redis_provider.py index 3e4373cc22b..5ebe2e0ab8e 100644 --- a/sdk/python/feast/infra/redis_provider.py +++ b/sdk/python/feast/infra/redis_provider.py @@ -56,8 +56,15 @@ def teardown_infra( ) -> None: # according to the repos_operations.py we can delete the whole project client = self._get_client() - keys = client.keys("*{project}:*") - client.unlink(*keys) + + tables_join_keys = [[e for e in t.entities] for t in tables] + for table_join_keys in tables_join_keys: + redis_key_bin = _redis_key( + project, EntityKeyProto(join_keys=table_join_keys) + ) + keys = client.keys(f"{redis_key_bin}*") + if keys: + client.unlink(*keys) def online_write_batch( self, @@ -103,33 +110,35 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - if requested_features: - for entity_key in entity_keys: - redis_key_bin = _redis_key(project, entity_key) - hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features] - ts_key = f"_ts:{feature_view}" - hset_keys.append(ts_key) - values = client.hmget(redis_key_bin, hset_keys) - requested_features.append(ts_key) - res_val = dict(zip(requested_features, values)) - - res_ts = Timestamp() - ts_val = res_val.pop(ts_key) - if ts_val: - res_ts.ParseFromString(ts_val) - - res = {} - for feature_name, val_bin in res_val.items(): - val = ValueProto() - if val_bin: - val.ParseFromString(val_bin) - res[feature_name] = val - - if not res: - result.append((None, None)) - else: - timestamp = datetime.fromtimestamp(res_ts.seconds) - result.append((timestamp, res)) + if not requested_features: + requested_features = [f.name for f in table.features] + + for entity_key in entity_keys: + redis_key_bin = _redis_key(project, entity_key) + hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features] + ts_key = f"_ts:{feature_view}" + hset_keys.append(ts_key) + values = client.hmget(redis_key_bin, hset_keys) + requested_features.append(ts_key) + res_val = dict(zip(requested_features, values)) + + res_ts = Timestamp() + ts_val = res_val.pop(ts_key) + if ts_val: + res_ts.ParseFromString(ts_val) + + res = {} + for feature_name, val_bin in res_val.items(): + val = ValueProto() + if val_bin: + val.ParseFromString(val_bin) + res[feature_name] = val + + if not res: + result.append((None, None)) + else: + timestamp = datetime.fromtimestamp(res_ts.seconds) + result.append((timestamp, res)) return result def materialize_single_feature_view( From be9a17fd822827c85d026c2f2d08a624ef32b557 Mon Sep 17 00:00:00 2001 From: qooba Date: Wed, 12 May 2021 22:49:27 +0200 Subject: [PATCH 10/22] Add redis provider Signed-off-by: qooba --- sdk/python/feast/infra/redis_provider.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/redis_provider.py b/sdk/python/feast/infra/redis_provider.py index 5ebe2e0ab8e..e9997f4b038 100644 --- a/sdk/python/feast/infra/redis_provider.py +++ b/sdk/python/feast/infra/redis_provider.py @@ -10,6 +10,7 @@ from google.protobuf.timestamp_pb2 import Timestamp from redis import Redis from rediscluster import RedisCluster +from tqdm import tqdm from feast import FeatureTable, utils from feast.entity import Entity @@ -96,6 +97,8 @@ def online_write_batch( entity_hset[f_key] = val.SerializeToString() client.hset(redis_key_bin, mapping=entity_hset) + if progress: + progress(1) def online_read( self, @@ -148,6 +151,7 @@ def materialize_single_feature_view( end_date: datetime, registry: Registry, project: str, + tqdm_builder: Callable[[int], tqdm], ) -> None: entities = [] for entity_name in feature_view.entities: @@ -180,7 +184,10 @@ def materialize_single_feature_view( join_keys = [entity.join_key for entity in entities] rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) - self.online_write_batch(project, feature_view, rows_to_write, None) + with tqdm_builder(len(rows_to_write)) as pbar: + self.online_write_batch( + project, feature_view, rows_to_write, lambda x: pbar.update(x) + ) feature_view.materialization_intervals.append((start_date, end_date)) registry.apply_feature_view(feature_view, project) From 7606fe71fa6a945d1384f509037da5c5b0b955e7 Mon Sep 17 00:00:00 2001 From: qooba Date: Wed, 12 May 2021 23:44:32 +0200 Subject: [PATCH 11/22] Add redis provider Signed-off-by: qooba --- .github/workflows/pr_integration_tests.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index 8172c62b30d..0a624a2a778 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -76,3 +76,4 @@ jobs: env_vars: OS,PYTHON fail_ci_if_error: true verbose: true + From 2a95e3e37899e293b09c52725545ccab15dcfd5c Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 24 May 2021 23:14:46 +0000 Subject: [PATCH 12/22] correct redis provider Signed-off-by: qooba --- sdk/python/feast/infra/redis_provider.py | 21 +++++++++++++-------- sdk/python/feast/repo_config.py | 18 ++++++++++++++++++ 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/infra/redis_provider.py b/sdk/python/feast/infra/redis_provider.py index e9997f4b038..d4099b226aa 100644 --- a/sdk/python/feast/infra/redis_provider.py +++ b/sdk/python/feast/infra/redis_provider.py @@ -1,8 +1,6 @@ import json -import os import struct from datetime import datetime -from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union import mmh3 @@ -27,16 +25,24 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import Registry -from feast.repo_config import RedisOnlineStoreConfig, RepoConfig +from feast.repo_config import RedisOnlineStoreConfig, RedisType, RepoConfig EX_SECONDS = 253402300799 class RedisProvider(Provider): - _db_path: Path + _redis_type: Optional[RedisType] + _redis_connection_string: str def __init__(self, config: RepoConfig): assert isinstance(config.online_store, RedisOnlineStoreConfig) + if config and config.online_store: + if config.online_store.redis_type: + self._redis_type = config.online_store.redis_type + if config.online_store.redis_connection_string: + self._redis_connection_string = ( + config.online_store.redis_connection_string + ) def update_infra( self, @@ -63,7 +69,7 @@ def teardown_infra( redis_key_bin = _redis_key( project, EntityKeyProto(join_keys=table_join_keys) ) - keys = client.keys(f"{redis_key_bin}*") + keys = [k for k in client.scan_iter(match=f"{redis_key_bin}*", count=100)] if keys: client.unlink(*keys) @@ -200,7 +206,7 @@ def _get_cs(self): for Redis: redis_master:6379,db=0,ssl=true,password=... """ - connection_string = os.environ["REDIS_CONNECTION_STRING"] + connection_string = self._redis_connection_string startup_nodes = [ dict(zip(["host", "port"], c.split(":"))) for c in connection_string.split(",") @@ -225,8 +231,7 @@ def _get_client(self): Creates the Redis client RedisCluster or Redis depending on configuration """ startup_nodes, kwargs = self._get_cs() - - if os.environ["REDIS_TYPE"] == "REDIS_CLUSTER": + if self._redis_type == RedisType.redis_cluster: kwargs["startup_nodes"] = startup_nodes return RedisCluster(**kwargs) else: diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index c404003fd0c..0dc2f47a114 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -1,3 +1,5 @@ +import os +from enum import Enum from pathlib import Path import yaml @@ -52,12 +54,24 @@ class DatastoreOnlineStoreConfig(FeastBaseModel): """ (optional) Amount of feature rows per batch being written into Datastore""" +class RedisType(str, Enum): + redis = "redis" + redis_cluster = "redis_cluster" + + class RedisOnlineStoreConfig(FeastBaseModel): """Online store config for Redis store""" type: Literal["redis"] = "redis" """Online store type selector""" + redis_type: RedisType = RedisType.redis + """Redis type: redis or redis_cluster""" + + redis_connection_string: Optional[StrictStr] = None + """Redis connection string from REDIS_CONNECTION_STRING environment variable + format: host:port,parameter1,parameter2 eg. redis:6379,db=0 """ + OnlineStoreConfig = Union[ DatastoreOnlineStoreConfig, SqliteOnlineStoreConfig, RedisOnlineStoreConfig @@ -152,6 +166,10 @@ def _validate_online_store_config(cls, values): values["online_store"]["type"] = "datastore" elif values["provider"] == "redis": values["online_store"]["type"] = "redis" + values["online_store"]["redis_connection_string"] = os.environ.get( + "REDIS_CONNECTION_STRING" + ) + online_store_type = values["online_store"]["type"] From 435c8566f03b37b178f1fc25722eb6858260143d Mon Sep 17 00:00:00 2001 From: qooba Date: Wed, 26 May 2021 21:00:23 +0000 Subject: [PATCH 13/22] correct redis provider Signed-off-by: qooba --- .github/workflows/pr_integration_tests.yml | 7 +++--- sdk/python/feast/infra/provider.py | 2 +- .../infra/{redis_provider.py => redis.py} | 24 +++++++------------ sdk/python/feast/repo_config.py | 12 +++++----- 4 files changed, 19 insertions(+), 26 deletions(-) rename sdk/python/feast/infra/{redis_provider.py => redis.py} (92%) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index 0a624a2a778..a172763df38 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -18,6 +18,9 @@ jobs: matrix: python-version: [ 3.7, 3.8, 3.9 ] os: [ ubuntu-latest ] + env: + OS: ${{ matrix.os }} + PYTHON: ${{ matrix.python-version }} services: redis: image: redis @@ -28,9 +31,6 @@ jobs: --health-interval 10s --health-timeout 5s --health-retries 5 - env: - OS: ${{ matrix.os }} - PYTHON: ${{ matrix.python-version }} steps: - uses: actions/checkout@v2 with: @@ -76,4 +76,3 @@ jobs: env_vars: OS,PYTHON fail_ci_if_error: true verbose: true - diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 82eedf6b6e0..6a378d0759c 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -146,7 +146,7 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider: return GcpProvider(config) elif config.provider == "redis": - from feast.infra.redis_provider import RedisProvider + from feast.infra.redis import RedisProvider return RedisProvider(config) elif config.provider == "local": diff --git a/sdk/python/feast/infra/redis_provider.py b/sdk/python/feast/infra/redis.py similarity index 92% rename from sdk/python/feast/infra/redis_provider.py rename to sdk/python/feast/infra/redis.py index d4099b226aa..fb452ffdaef 100644 --- a/sdk/python/feast/infra/redis_provider.py +++ b/sdk/python/feast/infra/redis.py @@ -13,7 +13,7 @@ from feast import FeatureTable, utils from feast.entity import Entity from feast.feature_view import FeatureView -from feast.infra.offline_stores.helpers import get_offline_store_from_sources +from feast.infra.offline_stores.helpers import get_offline_store_from_config from feast.infra.provider import ( Provider, RetrievalJob, @@ -36,13 +36,11 @@ class RedisProvider(Provider): def __init__(self, config: RepoConfig): assert isinstance(config.online_store, RedisOnlineStoreConfig) - if config and config.online_store: - if config.online_store.redis_type: - self._redis_type = config.online_store.redis_type - if config.online_store.redis_connection_string: - self._redis_connection_string = ( - config.online_store.redis_connection_string - ) + if config.online_store.redis_type: + self._redis_type = config.online_store.redis_type + if config.online_store.redis_connection_string: + self._redis_connection_string = config.online_store.redis_connection_string + self.offline_store = get_offline_store_from_config(config.offline_store) def update_infra( self, @@ -173,8 +171,7 @@ def materialize_single_feature_view( start_date = utils.make_tzaware(start_date) end_date = utils.make_tzaware(end_date) - offline_store = get_offline_store_from_sources([feature_view.input]) - table = offline_store.pull_latest_from_table_or_query( + table = self.offline_store.pull_latest_from_table_or_query( data_source=feature_view.input, join_key_columns=join_key_columns, feature_name_columns=feature_name_columns, @@ -239,8 +236,8 @@ def _get_client(self): kwargs["port"] = startup_nodes[0]["port"] return Redis(**kwargs) - @staticmethod def get_historical_features( + self, config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], @@ -248,10 +245,7 @@ def get_historical_features( registry: Registry, project: str, ) -> RetrievalJob: - offline_store = get_offline_store_from_sources( - [feature_view.input for feature_view in feature_views] - ) - return offline_store.get_historical_features( + return self.offline_store.get_historical_features( config=config, feature_views=feature_views, feature_refs=feature_refs, diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 0dc2f47a114..b98fcadce61 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -166,16 +166,16 @@ def _validate_online_store_config(cls, values): values["online_store"]["type"] = "datastore" elif values["provider"] == "redis": values["online_store"]["type"] = "redis" - values["online_store"]["redis_connection_string"] = os.environ.get( - "REDIS_CONNECTION_STRING" - ) - + if values["online_store"]["type"] == "redis": + values["online_store"]["redis_connection_string"] = os.environ.get( + "REDIS_CONNECTION_STRING" + ) online_store_type = values["online_store"]["type"] # Make sure the user hasn't provided the wrong type - assert online_store_type in ["datastore", "sqlite"] + assert online_store_type in ["datastore", "sqlite", "redis"] # Validate the dict to ensure one of the union types match try: @@ -209,7 +209,7 @@ def _validate_offline_store_config(cls, values): # Set the default type if "type" not in values["offline_store"]: - if values["provider"] == "local": + if values["provider"] == "local" or values["provider"] == "redis": values["offline_store"]["type"] = "file" elif values["provider"] == "gcp": values["offline_store"]["type"] = "bigquery" From d3294390cc63e25a3369346508f5b4a4eb0c5304 Mon Sep 17 00:00:00 2001 From: qooba Date: Wed, 26 May 2021 21:20:12 +0000 Subject: [PATCH 14/22] correct redis provider test Signed-off-by: qooba --- sdk/python/tests/test_cli_redis.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/python/tests/test_cli_redis.py b/sdk/python/tests/test_cli_redis.py index c1a1a4f4cd7..7de958bd68a 100644 --- a/sdk/python/tests/test_cli_redis.py +++ b/sdk/python/tests/test_cli_redis.py @@ -30,6 +30,8 @@ def test_basic() -> None: project: {project_id} registry: {data_path / "registry.db"} provider: redis + offline_store: + type: bigquery """ ) ) From b5269e7809d1bc86c548971e3b80885cee1a0b11 Mon Sep 17 00:00:00 2001 From: qooba Date: Sun, 30 May 2021 19:28:48 +0000 Subject: [PATCH 15/22] correct redis provider Signed-off-by: qooba --- sdk/python/feast/infra/redis.py | 5 ++--- sdk/python/setup.py | 6 +++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/redis.py b/sdk/python/feast/infra/redis.py index fb452ffdaef..6c7687a6203 100644 --- a/sdk/python/feast/infra/redis.py +++ b/sdk/python/feast/infra/redis.py @@ -195,7 +195,7 @@ def materialize_single_feature_view( feature_view.materialization_intervals.append((start_date, end_date)) registry.apply_feature_view(feature_view, project) - def _get_cs(self): + def _parse_connection_string(self): """ Reads Redis connections string using format for RedisCluster: @@ -227,7 +227,7 @@ def _get_client(self): """ Creates the Redis client RedisCluster or Redis depending on configuration """ - startup_nodes, kwargs = self._get_cs() + startup_nodes, kwargs = self._parse_connection_string() if self._redis_type == RedisType.redis_cluster: kwargs["startup_nodes"] = startup_nodes return RedisCluster(**kwargs) @@ -261,7 +261,6 @@ def _redis_key(project: str, entity_key: EntityKeyProto): entity_names=entity_key.join_keys, entity_values=entity_key.entity_values, ) - # key = _mmh3(serialize_entity_key(entity_key)) return redis_key.SerializeToString() diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 970d191d18e..27ebcc1d29b 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -56,7 +56,6 @@ "tabulate==0.8.*", "toml==0.10.*", "tqdm==4.*", - "redis-py-cluster==2.1.2", ] GCP_REQUIRED = [ @@ -67,6 +66,10 @@ "google-cloud-core==1.4.*", ] +REDIS_REQUIRED = [ + "redis-py-cluster==2.1.2", +] + CI_REQUIRED = [ "cryptography==3.3.2", "flake8", @@ -193,6 +196,7 @@ def run(self): "dev": ["mypy-protobuf==1.*", "grpcio-testing==1.*"], "ci": CI_REQUIRED, "gcp": GCP_REQUIRED, + "redis": REDIS_REQUIRED, }, include_package_data=True, license="Apache", From f602b9a57b74499ea7e058cd83cafdc27ad13305 Mon Sep 17 00:00:00 2001 From: qooba Date: Sun, 30 May 2021 19:41:39 +0000 Subject: [PATCH 16/22] correct redis provider Signed-off-by: qooba --- sdk/python/setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 27ebcc1d29b..36886c25649 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -102,6 +102,7 @@ "google-cloud-datastore>=2.1.*", "google-cloud-storage>=1.20.*", "google-cloud-core==1.4.*", + "redis-py-cluster==2.1.2", ] # README file from Feast repo root directory From bbde0b0ade2d1d45b67ce63f4c9e7316ba5542a4 Mon Sep 17 00:00:00 2001 From: qooba Date: Wed, 2 Jun 2021 20:57:41 +0000 Subject: [PATCH 17/22] change redis connection string Signed-off-by: qooba --- .github/workflows/integration_tests.yml | 3 --- .github/workflows/pr_integration_tests.yml | 3 --- sdk/python/feast/repo_config.py | 7 +------ sdk/python/tests/test_cli_redis.py | 3 +++ sdk/python/tests/test_offline_online_store_consistency.py | 7 +++++++ 5 files changed, 11 insertions(+), 12 deletions(-) diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index 596fa80307c..d82d882ff26 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -53,9 +53,6 @@ jobs: run: make install-python-ci-dependencies - name: Test python run: FEAST_TELEMETRY=False pytest --cov=./ --cov-report=xml --verbose --color=yes sdk/python/tests --integration - env: - REDIS_TYPE: REDIS - REDIS_CONNECTION_STRING: localhost:6379,db=0 - name: Upload coverage to Codecov uses: codecov/codecov-action@v1 with: diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index a172763df38..240d87069ec 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -64,9 +64,6 @@ jobs: run: make install-python-ci-dependencies - name: Test python run: FEAST_TELEMETRY=False pytest --cov=./ --cov-report=xml --verbose --color=yes sdk/python/tests --integration - env: - REDIS_TYPE: REDIS - REDIS_CONNECTION_STRING: localhost:6379,db=0 - name: Upload coverage to Codecov uses: codecov/codecov-action@v1 with: diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index b98fcadce61..2da6cee93fd 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -68,7 +68,7 @@ class RedisOnlineStoreConfig(FeastBaseModel): redis_type: RedisType = RedisType.redis """Redis type: redis or redis_cluster""" - redis_connection_string: Optional[StrictStr] = None + redis_connection_string: StrictStr """Redis connection string from REDIS_CONNECTION_STRING environment variable format: host:port,parameter1,parameter2 eg. redis:6379,db=0 """ @@ -167,11 +167,6 @@ def _validate_online_store_config(cls, values): elif values["provider"] == "redis": values["online_store"]["type"] = "redis" - if values["online_store"]["type"] == "redis": - values["online_store"]["redis_connection_string"] = os.environ.get( - "REDIS_CONNECTION_STRING" - ) - online_store_type = values["online_store"]["type"] # Make sure the user hasn't provided the wrong type diff --git a/sdk/python/tests/test_cli_redis.py b/sdk/python/tests/test_cli_redis.py index 7de958bd68a..112ed5b5b2a 100644 --- a/sdk/python/tests/test_cli_redis.py +++ b/sdk/python/tests/test_cli_redis.py @@ -32,6 +32,9 @@ def test_basic() -> None: provider: redis offline_store: type: bigquery + online_store: + redis_type: redis + redis_connection_string: localhost:6379,db=0 """ ) ) diff --git a/sdk/python/tests/test_offline_online_store_consistency.py b/sdk/python/tests/test_offline_online_store_consistency.py index b20effc9434..f1c43b71a13 100644 --- a/sdk/python/tests/test_offline_online_store_consistency.py +++ b/sdk/python/tests/test_offline_online_store_consistency.py @@ -21,6 +21,8 @@ DatastoreOnlineStoreConfig, RepoConfig, SqliteOnlineStoreConfig, + RedisOnlineStoreConfig, + RedisType, ) from feast.value_type import ValueType @@ -172,6 +174,11 @@ def prep_redis_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: registry=str(Path(repo_dir_name) / "registry.db"), project=f"test_bq_correctness_{str(uuid.uuid4()).replace('-', '')}", provider="redis", + online_store=RedisOnlineStoreConfig( + redis_type=RedisType.redis, + redis_connection_string="localhost:6379,db=0", + ), + ) fs = FeatureStore(config=config) fs.apply([fv, e]) From d86f23cffa67c8bc683c705aabcc6b6b0f216962 Mon Sep 17 00:00:00 2001 From: qooba Date: Wed, 2 Jun 2021 21:04:26 +0000 Subject: [PATCH 18/22] change redis connection string Signed-off-by: qooba --- sdk/python/feast/repo_config.py | 1 - sdk/python/tests/test_offline_online_store_consistency.py | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 2da6cee93fd..d12ab858617 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -1,4 +1,3 @@ -import os from enum import Enum from pathlib import Path diff --git a/sdk/python/tests/test_offline_online_store_consistency.py b/sdk/python/tests/test_offline_online_store_consistency.py index f1c43b71a13..30e3b275d0a 100644 --- a/sdk/python/tests/test_offline_online_store_consistency.py +++ b/sdk/python/tests/test_offline_online_store_consistency.py @@ -19,10 +19,10 @@ from feast.feature_view import FeatureView from feast.repo_config import ( DatastoreOnlineStoreConfig, - RepoConfig, - SqliteOnlineStoreConfig, RedisOnlineStoreConfig, RedisType, + RepoConfig, + SqliteOnlineStoreConfig, ) from feast.value_type import ValueType @@ -178,7 +178,6 @@ def prep_redis_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: redis_type=RedisType.redis, redis_connection_string="localhost:6379,db=0", ), - ) fs = FeatureStore(config=config) fs.apply([fv, e]) From b0c8101c729ba17ecf86f27a2bd5e5063a93986f Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 7 Jun 2021 20:33:24 +0000 Subject: [PATCH 19/22] fix redis materialize Signed-off-by: qooba --- sdk/python/feast/infra/redis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/redis.py b/sdk/python/feast/infra/redis.py index 6c7687a6203..68b76fd5b71 100644 --- a/sdk/python/feast/infra/redis.py +++ b/sdk/python/feast/infra/redis.py @@ -271,4 +271,4 @@ def _mmh3(key: str): https://stackoverflow.com/questions/13141787/convert-decimal-int-to-little-endian-string-x-x """ key_hash = mmh3.hash(key, signed=False) - return bytes.fromhex(struct.pack(" Date: Tue, 8 Jun 2021 16:06:28 -0700 Subject: [PATCH 20/22] Rename connection string and set defaults Signed-off-by: Willem Pienaar --- sdk/python/feast/infra/redis.py | 19 +++++++++++++------ sdk/python/feast/repo_config.py | 4 ++-- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/infra/redis.py b/sdk/python/feast/infra/redis.py index 68b76fd5b71..f4200918b3f 100644 --- a/sdk/python/feast/infra/redis.py +++ b/sdk/python/feast/infra/redis.py @@ -6,8 +6,15 @@ import mmh3 import pandas as pd from google.protobuf.timestamp_pb2 import Timestamp -from redis import Redis -from rediscluster import RedisCluster + +try: + from redis import Redis + from rediscluster import RedisCluster +except ImportError as e: + from feast.errors import FeastExtrasDependencyImportError + + raise FeastExtrasDependencyImportError("redis", str(e)) + from tqdm import tqdm from feast import FeatureTable, utils @@ -32,14 +39,14 @@ class RedisProvider(Provider): _redis_type: Optional[RedisType] - _redis_connection_string: str + _connection_string: str def __init__(self, config: RepoConfig): assert isinstance(config.online_store, RedisOnlineStoreConfig) if config.online_store.redis_type: self._redis_type = config.online_store.redis_type - if config.online_store.redis_connection_string: - self._redis_connection_string = config.online_store.redis_connection_string + if config.online_store.connection_string: + self._connection_string = config.online_store.connection_string self.offline_store = get_offline_store_from_config(config.offline_store) def update_infra( @@ -203,7 +210,7 @@ def _parse_connection_string(self): for Redis: redis_master:6379,db=0,ssl=true,password=... """ - connection_string = self._redis_connection_string + connection_string = self._connection_string startup_nodes = [ dict(zip(["host", "port"], c.split(":"))) for c in connection_string.split(",") diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index d12ab858617..9c5bbca8dd0 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -67,8 +67,8 @@ class RedisOnlineStoreConfig(FeastBaseModel): redis_type: RedisType = RedisType.redis """Redis type: redis or redis_cluster""" - redis_connection_string: StrictStr - """Redis connection string from REDIS_CONNECTION_STRING environment variable + connection_string: StrictStr = "localhost:6379" + """Connection string containing the host, port, and configuration parameters for Redis format: host:port,parameter1,parameter2 eg. redis:6379,db=0 """ From 53f8f72483cdcdc566869a49b0b9de54be38e136 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Tue, 8 Jun 2021 17:11:44 -0700 Subject: [PATCH 21/22] Fix argument in redis test Signed-off-by: Willem Pienaar --- sdk/python/tests/test_offline_online_store_consistency.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/python/tests/test_offline_online_store_consistency.py b/sdk/python/tests/test_offline_online_store_consistency.py index 30e3b275d0a..b6d2e399e05 100644 --- a/sdk/python/tests/test_offline_online_store_consistency.py +++ b/sdk/python/tests/test_offline_online_store_consistency.py @@ -175,8 +175,7 @@ def prep_redis_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: project=f"test_bq_correctness_{str(uuid.uuid4()).replace('-', '')}", provider="redis", online_store=RedisOnlineStoreConfig( - redis_type=RedisType.redis, - redis_connection_string="localhost:6379,db=0", + redis_type=RedisType.redis, connection_string="localhost:6379,db=0", ), ) fs = FeatureStore(config=config) From e12ecff1fb9cc29309dce0f2b3e597ce3dcaaacd Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Wed, 9 Jun 2021 10:55:12 -0700 Subject: [PATCH 22/22] Fix broken connection string Signed-off-by: Willem Pienaar --- sdk/python/tests/test_cli_redis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/test_cli_redis.py b/sdk/python/tests/test_cli_redis.py index 112ed5b5b2a..ed330461559 100644 --- a/sdk/python/tests/test_cli_redis.py +++ b/sdk/python/tests/test_cli_redis.py @@ -34,7 +34,7 @@ def test_basic() -> None: type: bigquery online_store: redis_type: redis - redis_connection_string: localhost:6379,db=0 + connection_string: localhost:6379,db=0 """ ) )