config) {
this.config = config;
}
-
- /**
- * The Subscription type.
- *
- * Note: Please see protos/feast/core/CoreService.proto for details on how to subscribe to
- * feature sets.
- */
- public static class Subscription {
- /** Feast project to subscribe to. */
- String project;
-
- /** Feature set to subscribe to. */
- String name;
-
- /** Feature set versions to subscribe to. */
- String version;
-
- /** Project/Feature set exclude flag to subscribe to. */
- boolean exclude;
-
- /**
- * Gets Feast project subscribed to.
- *
- * @return the project string
- */
- public String getProject() {
- return project;
- }
-
- /**
- * Sets Feast project to subscribe to for this store.
- *
- * @param project the project
- */
- public void setProject(String project) {
- this.project = project;
- }
-
- /**
- * Gets the feature set name to subscribe to.
- *
- * @return the name
- */
- public String getName() {
- return name;
- }
-
- /**
- * Sets the feature set name to subscribe to.
- *
- * @param name the name
- */
- public void setName(String name) {
- this.name = name;
- }
-
- /**
- * Gets the feature set version that is being subscribed to by this store.
- *
- * @return the version
- */
- public String getVersion() {
- return version;
- }
-
- /**
- * Sets the feature set version that is being subscribed to by this store.
- *
- * @param version the version
- */
- public void setVersion(String version) {
- this.version = version;
- }
-
- /**
- * Gets the exclude flag to subscribe to.
- *
- * @return the exclude flag
- */
- public boolean getExclude() {
- return exclude;
- }
-
- /**
- * Sets the exclude flag to subscribe to.
- *
- * @param exclude the exclude flag
- */
- public void setExclude(boolean exclude) {
- this.exclude = exclude;
- }
-
- /**
- * Convert this {@link Subscription} to a {@link StoreProto.Store.Subscription}.
- *
- * @return the store proto . store . subscription
- */
- public StoreProto.Store.Subscription toProto() {
- return StoreProto.Store.Subscription.newBuilder()
- .setName(getName())
- .setProject(getProject())
- .setExclude(getExclude())
- .build();
- }
- }
}
- /**
- * Gets job store properties
- *
- * @return the job store properties
- */
- public JobStoreProperties getJobStore() {
- return jobStore;
- }
-
- /**
- * Set job store properties
- *
- * @param jobStore Job store properties to set
- */
- public void setJobStore(JobStoreProperties jobStore) {
- this.jobStore = jobStore;
+ public enum StoreType {
+ REDIS,
+ REDIS_CLUSTER;
}
/**
@@ -532,52 +359,6 @@ public void setLogging(LoggingProperties logging) {
this.logging = logging;
}
- /** The type Job store properties. */
- public static class JobStoreProperties {
-
- /** Job Store Redis Host */
- private String redisHost;
-
- /** Job Store Redis Host */
- private int redisPort;
-
- /**
- * Gets redis host.
- *
- * @return the redis host
- */
- public String getRedisHost() {
- return redisHost;
- }
-
- /**
- * Sets redis host.
- *
- * @param redisHost the redis host
- */
- public void setRedisHost(String redisHost) {
- this.redisHost = redisHost;
- }
-
- /**
- * Gets redis port.
- *
- * @return the redis port
- */
- public int getRedisPort() {
- return redisPort;
- }
-
- /**
- * Sets redis port.
- *
- * @param redisPort the redis port
- */
- public void setRedisPort(int redisPort) {
- this.redisPort = redisPort;
- }
- }
-
/** Trace metric collection properties */
public static class TracingProperties {
diff --git a/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java b/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java
index 9ec35c3..518f3a1 100644
--- a/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java
+++ b/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java
@@ -18,7 +18,6 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.protobuf.InvalidProtocolBufferException;
-import feast.proto.core.StoreProto;
import feast.serving.service.OnlineServingServiceV2;
import feast.serving.service.ServingServiceV2;
import feast.serving.specs.CachedSpecService;
@@ -39,26 +38,19 @@ public ServingServiceV2 servingServiceV2(
throws InvalidProtocolBufferException, JsonProcessingException {
ServingServiceV2 servingService = null;
FeastProperties.Store store = feastProperties.getActiveStore();
- StoreProto.Store.StoreType storeType = store.toProto().getType();
- switch (storeType) {
+ switch (store.getType()) {
case REDIS_CLUSTER:
RedisClientAdapter redisClusterClient =
- RedisClusterClient.create(store.toProto().getRedisClusterConfig());
+ RedisClusterClient.create(store.getRedisClusterConfig());
OnlineRetrieverV2 redisClusterRetriever = new OnlineRetriever(redisClusterClient);
servingService = new OnlineServingServiceV2(redisClusterRetriever, specService, tracer);
break;
case REDIS:
- RedisClientAdapter redisClient = RedisClient.create(store.toProto().getRedisConfig());
+ RedisClientAdapter redisClient = RedisClient.create(store.getRedisConfig());
OnlineRetrieverV2 redisRetriever = new OnlineRetriever(redisClient);
servingService = new OnlineServingServiceV2(redisRetriever, specService, tracer);
break;
- case UNRECOGNIZED:
- case INVALID:
- throw new IllegalArgumentException(
- String.format(
- "Unsupported store type '%s' for store name '%s'",
- store.getType(), store.getName()));
}
return servingService;
diff --git a/serving/src/main/java/feast/serving/config/SpecServiceConfig.java b/serving/src/main/java/feast/serving/config/SpecServiceConfig.java
index b41a0f5..369d543 100644
--- a/serving/src/main/java/feast/serving/config/SpecServiceConfig.java
+++ b/serving/src/main/java/feast/serving/config/SpecServiceConfig.java
@@ -18,7 +18,6 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.protobuf.InvalidProtocolBufferException;
-import feast.proto.core.StoreProto;
import feast.serving.specs.CachedSpecService;
import feast.serving.specs.CoreSpecService;
import io.grpc.CallCredentials;
@@ -61,13 +60,11 @@ public ScheduledExecutorService cachedSpecServiceScheduledExecutorService(
}
@Bean
- public CachedSpecService specService(
- FeastProperties feastProperties, ObjectProvider callCredentials)
+ public CachedSpecService specService(ObjectProvider callCredentials)
throws InvalidProtocolBufferException, JsonProcessingException {
CoreSpecService coreService =
new CoreSpecService(feastCoreHost, feastCorePort, callCredentials);
- StoreProto.Store storeProto = feastProperties.getActiveStore().toProto();
- CachedSpecService cachedSpecStorage = new CachedSpecService(coreService, storeProto);
+ CachedSpecService cachedSpecStorage = new CachedSpecService(coreService);
try {
cachedSpecStorage.populateCache();
} catch (Exception e) {
diff --git a/serving/src/main/java/feast/serving/controller/HealthServiceController.java b/serving/src/main/java/feast/serving/controller/HealthServiceController.java
index 6615cf5..4bee981 100644
--- a/serving/src/main/java/feast/serving/controller/HealthServiceController.java
+++ b/serving/src/main/java/feast/serving/controller/HealthServiceController.java
@@ -16,7 +16,6 @@
*/
package feast.serving.controller;
-import feast.proto.core.StoreProto.Store;
import feast.proto.serving.ServingAPIProto.GetFeastServingInfoRequest;
import feast.serving.interceptors.GrpcMonitoringInterceptor;
import feast.serving.service.ServingServiceV2;
@@ -51,7 +50,6 @@ public void check(
// Implement similary for batch service.
try {
- Store store = specService.getStore();
servingService.getFeastServingInfo(GetFeastServingInfoRequest.getDefaultInstance());
responseObserver.onNext(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
diff --git a/serving/src/main/java/feast/serving/specs/CachedSpecService.java b/serving/src/main/java/feast/serving/specs/CachedSpecService.java
index f54e08f..440b224 100644
--- a/serving/src/main/java/feast/serving/specs/CachedSpecService.java
+++ b/serving/src/main/java/feast/serving/specs/CachedSpecService.java
@@ -26,8 +26,6 @@
import feast.proto.core.FeatureProto;
import feast.proto.core.FeatureTableProto.FeatureTable;
import feast.proto.core.FeatureTableProto.FeatureTableSpec;
-import feast.proto.core.StoreProto;
-import feast.proto.core.StoreProto.Store;
import feast.proto.serving.ServingAPIProto;
import feast.serving.exception.SpecRetrievalException;
import io.grpc.StatusRuntimeException;
@@ -47,7 +45,6 @@ public class CachedSpecService {
private static final String DEFAULT_PROJECT_NAME = "default";
private final CoreSpecService coreService;
- private Store store;
private static Gauge cacheLastUpdated =
Gauge.build()
@@ -69,9 +66,8 @@ public class CachedSpecService {
ImmutablePair, FeatureProto.FeatureSpecV2>
featureCache;
- public CachedSpecService(CoreSpecService coreService, StoreProto.Store store) {
+ public CachedSpecService(CoreSpecService coreService) {
this.coreService = coreService;
- this.store = coreService.registerStore(store);
CacheLoader, FeatureTableSpec> featureTableCacheLoader =
CacheLoader.from(k -> retrieveSingleFeatureTable(k.getLeft(), k.getRight()));
@@ -85,15 +81,6 @@ public CachedSpecService(CoreSpecService coreService, StoreProto.Store store) {
featureCache = CacheBuilder.newBuilder().build(featureCacheLoader);
}
- /**
- * Get the current store configuration.
- *
- * @return StoreProto.Store store configuration for this serving instance
- */
- public Store getStore() {
- return this.store;
- }
-
/**
* Reload the store configuration from the given config path, then retrieve the necessary specs
* from core to preload the cache.
diff --git a/serving/src/main/java/feast/serving/specs/CoreSpecService.java b/serving/src/main/java/feast/serving/specs/CoreSpecService.java
index 5429d22..eee50d8 100644
--- a/serving/src/main/java/feast/serving/specs/CoreSpecService.java
+++ b/serving/src/main/java/feast/serving/specs/CoreSpecService.java
@@ -24,7 +24,6 @@
import feast.proto.core.CoreServiceProto.ListProjectsResponse;
import feast.proto.core.CoreServiceProto.UpdateStoreRequest;
import feast.proto.core.CoreServiceProto.UpdateStoreResponse;
-import feast.proto.core.StoreProto.Store;
import io.grpc.CallCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@@ -53,26 +52,6 @@ public UpdateStoreResponse updateStore(UpdateStoreRequest updateStoreRequest) {
return blockingStub.updateStore(updateStoreRequest);
}
- /**
- * Register the given store entry in Feast Core. If store already exists in Feast Core, updates
- * the store entry in feast core.
- *
- * @param store entry to register/update in Feast Core.
- * @return The register/updated store entry
- */
- public Store registerStore(Store store) {
- UpdateStoreRequest request = UpdateStoreRequest.newBuilder().setStore(store).build();
- try {
- UpdateStoreResponse updateStoreResponse = this.updateStore(request);
- if (!updateStoreResponse.getStore().equals(store)) {
- throw new RuntimeException("Core store config not matching current store config");
- }
- return updateStoreResponse.getStore();
- } catch (Exception e) {
- throw new RuntimeException("Unable to update store configuration", e);
- }
- }
-
public ListProjectsResponse listProjects(ListProjectsRequest listProjectsRequest) {
return blockingStub.listProjects(listProjectsRequest);
}
diff --git a/serving/src/main/resources/application.yml b/serving/src/main/resources/application.yml
index 288ec7e..b20fd8e 100644
--- a/serving/src/main/resources/application.yml
+++ b/serving/src/main/resources/application.yml
@@ -41,26 +41,19 @@ feast:
stores:
# Please see https://api.docs.feast.dev/grpc/feast.core.pb.html#Store for configuration options
- name: online # Name of the store (referenced by active_store)
- type: REDIS # Type of the store. REDIS, REDIS_CLUSTER, BIGQUERY are available options
+ type: REDIS # Type of the store. REDIS, REDIS_CLUSTER are available options
config: # Store specific configuration. See
host: localhost
port: 6379
# Subscriptions indicate which feature sets needs to be retrieved and used to populate this store
- subscriptions:
- # Wildcards match all options. No filtering is done.
- - name: "*"
- project: "*"
- name: online_cluster
type: REDIS_CLUSTER
config: # Store specific configuration.
# Connection string specifies the host:port of Redis instances in the redis cluster.
connection_string: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005"
read_from: MASTER
- subscriptions:
- - name: "*"
- project: "*"
- version: "*"
-
+ # Redis operation timeout in ISO-8601 format
+ timeout: PT0.5S
tracing:
# If true, Feast will provide tracing data (using OpenTracing API) for various RPC method calls
# which can be useful to debug performance issues and perform benchmarking
diff --git a/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java b/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java
index 57932d4..4e48b64 100644
--- a/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java
+++ b/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java
@@ -18,8 +18,6 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
@@ -32,7 +30,6 @@
import feast.proto.core.CoreServiceProto.ListProjectsResponse;
import feast.proto.core.FeatureTableProto;
import feast.proto.core.FeatureTableProto.FeatureTableSpec;
-import feast.proto.core.StoreProto.Store;
import feast.proto.serving.ServingAPIProto.FeatureReferenceV2;
import feast.proto.types.ValueProto;
import feast.serving.specs.CachedSpecService;
@@ -45,8 +42,6 @@
public class CachedSpecServiceTest {
- private Store store;
-
@Rule public final ExpectedException expectedException = ExpectedException.none();
@Mock CoreSpecService coreService;
@@ -63,8 +58,6 @@ public class CachedSpecServiceTest {
public void setUp() {
initMocks(this);
- this.store = Store.newBuilder().build();
-
this.setupProject("default");
this.featureTableEntities = ImmutableList.of("entity1");
this.featureTable1Features =
@@ -94,8 +87,7 @@ public void setUp() {
this.setupFeatureTableAndProject("default");
- when(this.coreService.registerStore(store)).thenReturn(store);
- cachedSpecService = new CachedSpecService(this.coreService, this.store);
+ cachedSpecService = new CachedSpecService(this.coreService);
}
private void setupProject(String project) {
@@ -120,18 +112,6 @@ private void setupFeatureTableAndProject(String project) {
.build());
}
- @Test
- public void shouldRegisterStoreWithCore() {
- verify(coreService, times(1)).registerStore(cachedSpecService.getStore());
- }
-
- @Test
- public void shouldPopulateAndReturnStore() {
- cachedSpecService.populateCache();
- Store actual = cachedSpecService.getStore();
- assertThat(actual, equalTo(store));
- }
-
@Test
public void shouldPopulateAndReturnDifferentFeatureTables() {
// test that CachedSpecService can retrieve fully qualified feature references.
diff --git a/storage/connectors/redis/pom.xml b/storage/connectors/redis/pom.xml
index bbda8da..f65cbd0 100644
--- a/storage/connectors/redis/pom.xml
+++ b/storage/connectors/redis/pom.xml
@@ -16,6 +16,14 @@
io.lettuce
lettuce-core
+ 6.0.2.RELEASE
+
+
+
+ io.netty
+ netty-transport-native-epoll
+ 4.1.52.Final
+ linux-x86_64
diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClient.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClient.java
index 5a7f4b7..faa8e96 100644
--- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClient.java
+++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClient.java
@@ -16,7 +16,6 @@
*/
package feast.storage.connectors.redis.retriever;
-import feast.proto.core.StoreProto;
import io.lettuce.core.KeyValue;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
@@ -46,7 +45,7 @@ private RedisClient(StatefulRedisConnection connection) {
this.asyncCommands.setAutoFlushCommands(false);
}
- public static RedisClientAdapter create(StoreProto.Store.RedisConfig config) {
+ public static RedisClientAdapter create(RedisStoreConfig config) {
RedisURI uri = RedisURI.create(config.getHost(), config.getPort());
diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java
index aeb8220..5395b72 100644
--- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java
+++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java
@@ -16,36 +16,19 @@
*/
package feast.storage.connectors.redis.retriever;
-import com.google.common.collect.ImmutableMap;
-import feast.proto.core.StoreProto;
-import feast.proto.core.StoreProto.Store.RedisClusterConfig;
-import feast.storage.connectors.redis.serializer.RedisKeyPrefixSerializerV2;
-import feast.storage.connectors.redis.serializer.RedisKeySerializerV2;
-import io.lettuce.core.KeyValue;
-import io.lettuce.core.ReadFrom;
-import io.lettuce.core.RedisFuture;
-import io.lettuce.core.RedisURI;
+import io.lettuce.core.*;
+import io.lettuce.core.cluster.ClusterClientOptions;
+import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import io.lettuce.core.codec.ByteArrayCodec;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
-import javax.annotation.Nullable;
public class RedisClusterClient implements RedisClientAdapter {
private final RedisAdvancedClusterAsyncCommands asyncCommands;
- private final RedisKeySerializerV2 serializer;
- @Nullable private final RedisKeySerializerV2 fallbackSerializer;
-
- private static final Map PROTO_TO_LETTUCE_TYPES =
- ImmutableMap.of(
- RedisClusterConfig.ReadFrom.MASTER, ReadFrom.MASTER,
- RedisClusterConfig.ReadFrom.MASTER_PREFERRED, ReadFrom.MASTER_PREFERRED,
- RedisClusterConfig.ReadFrom.REPLICA, ReadFrom.REPLICA,
- RedisClusterConfig.ReadFrom.REPLICA_PREFERRED, ReadFrom.REPLICA_PREFERRED);
@Override
public RedisFuture>> hmget(byte[] key, byte[]... fields) {
@@ -59,19 +42,9 @@ public void flushCommands() {
static class Builder {
private final StatefulRedisClusterConnection connection;
- private final RedisKeySerializerV2 serializer;
- @Nullable private RedisKeySerializerV2 fallbackSerializer;
- Builder(
- StatefulRedisClusterConnection connection,
- RedisKeySerializerV2 serializer) {
+ Builder(StatefulRedisClusterConnection connection) {
this.connection = connection;
- this.serializer = serializer;
- }
-
- Builder withFallbackSerializer(RedisKeySerializerV2 fallbackSerializer) {
- this.fallbackSerializer = fallbackSerializer;
- return this;
}
RedisClusterClient build() {
@@ -81,8 +54,6 @@ RedisClusterClient build() {
private RedisClusterClient(Builder builder) {
this.asyncCommands = builder.connection.async();
- this.serializer = builder.serializer;
- this.fallbackSerializer = builder.fallbackSerializer;
// allows reading from replicas
this.asyncCommands.readOnly();
@@ -91,7 +62,7 @@ private RedisClusterClient(Builder builder) {
this.asyncCommands.setAutoFlushCommands(false);
}
- public static RedisClientAdapter create(StoreProto.Store.RedisClusterConfig config) {
+ public static RedisClientAdapter create(RedisClusterStoreConfig config) {
List redisURIList =
Arrays.stream(config.getConnectionString().split(","))
.map(
@@ -100,22 +71,22 @@ public static RedisClientAdapter create(StoreProto.Store.RedisClusterConfig conf
return RedisURI.create(hostPortSplit[0], Integer.parseInt(hostPortSplit[1]));
})
.collect(Collectors.toList());
- StatefulRedisClusterConnection connection =
- io.lettuce.core.cluster.RedisClusterClient.create(redisURIList)
- .connect(new ByteArrayCodec());
- connection.setReadFrom(PROTO_TO_LETTUCE_TYPES.get(config.getReadFrom()));
+ io.lettuce.core.cluster.RedisClusterClient client =
+ io.lettuce.core.cluster.RedisClusterClient.create(redisURIList);
+ client.setOptions(
+ ClusterClientOptions.builder()
+ .socketOptions(SocketOptions.builder().keepAlive(true).tcpNoDelay(true).build())
+ .timeoutOptions(TimeoutOptions.enabled(config.getTimeout()))
+ .pingBeforeActivateConnection(true)
+ .topologyRefreshOptions(
+ ClusterTopologyRefreshOptions.builder().enableAllAdaptiveRefreshTriggers().build())
+ .build());
- RedisKeySerializerV2 serializer = new RedisKeyPrefixSerializerV2(config.getKeyPrefix());
-
- Builder builder = new Builder(connection, serializer);
-
- if (config.getEnableFallback()) {
- RedisKeySerializerV2 fallbackSerializer =
- new RedisKeyPrefixSerializerV2(config.getKeyPrefix());
- builder = builder.withFallbackSerializer(fallbackSerializer);
- }
+ StatefulRedisClusterConnection connection =
+ client.connect(new ByteArrayCodec());
+ connection.setReadFrom(config.getReadFrom());
- return builder.build();
+ return new Builder(connection).build();
}
}
diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterStoreConfig.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterStoreConfig.java
new file mode 100644
index 0000000..c179ffe
--- /dev/null
+++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterStoreConfig.java
@@ -0,0 +1,44 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2021 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.storage.connectors.redis.retriever;
+
+import io.lettuce.core.ReadFrom;
+import java.time.Duration;
+
+public class RedisClusterStoreConfig {
+ private final String connectionString;
+ private final ReadFrom readFrom;
+ private final Duration timeout;
+
+ public RedisClusterStoreConfig(String connectionString, ReadFrom readFrom, Duration timeout) {
+ this.connectionString = connectionString;
+ this.readFrom = readFrom;
+ this.timeout = timeout;
+ }
+
+ public String getConnectionString() {
+ return this.connectionString;
+ }
+
+ public ReadFrom getReadFrom() {
+ return this.readFrom;
+ }
+
+ public Duration getTimeout() {
+ return this.timeout;
+ }
+}
diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyProtoSerializerV2.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisStoreConfig.java
similarity index 54%
rename from storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyProtoSerializerV2.java
rename to storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisStoreConfig.java
index 252d6d1..5e4560a 100644
--- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyProtoSerializerV2.java
+++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisStoreConfig.java
@@ -1,6 +1,6 @@
/*
* SPDX-License-Identifier: Apache-2.0
- * Copyright 2018-2020 The Feast Authors
+ * Copyright 2018-2021 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,13 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package feast.storage.connectors.redis.serializer;
+package feast.storage.connectors.redis.retriever;
-import feast.proto.storage.RedisProto.RedisKeyV2;
+public class RedisStoreConfig {
+ private final String host;
+ private final Integer port;
+ private final Boolean ssl;
-public class RedisKeyProtoSerializerV2 implements RedisKeySerializerV2 {
+ public RedisStoreConfig(String host, Integer port, Boolean ssl) {
+ this.host = host;
+ this.port = port;
+ this.ssl = ssl;
+ }
+
+ public String getHost() {
+ return this.host;
+ }
+
+ public Integer getPort() {
+ return this.port;
+ }
- public byte[] serialize(RedisKeyV2 redisKey) {
- return redisKey.toByteArray();
+ public Boolean getSsl() {
+ return this.ssl;
}
}
diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerV2.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerV2.java
deleted file mode 100644
index 1c869b4..0000000
--- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerV2.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- * Copyright 2018-2020 The Feast Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package feast.storage.connectors.redis.serializer;
-
-import feast.proto.storage.RedisProto.RedisKeyV2;
-
-public class RedisKeyPrefixSerializerV2 implements RedisKeySerializerV2 {
-
- private final byte[] prefixBytes;
-
- public RedisKeyPrefixSerializerV2(String prefix) {
- this.prefixBytes = prefix.getBytes();
- }
-
- public byte[] serialize(RedisKeyV2 redisKey) {
- byte[] key = redisKey.toByteArray();
-
- if (prefixBytes.length == 0) {
- return key;
- }
-
- byte[] keyWithPrefix = new byte[prefixBytes.length + key.length];
- System.arraycopy(prefixBytes, 0, keyWithPrefix, 0, prefixBytes.length);
- System.arraycopy(key, 0, keyWithPrefix, prefixBytes.length, key.length);
- return keyWithPrefix;
- }
-}
diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeySerializerV2.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeySerializerV2.java
deleted file mode 100644
index b79e158..0000000
--- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeySerializerV2.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- * Copyright 2018-2020 The Feast Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package feast.storage.connectors.redis.serializer;
-
-import feast.proto.storage.RedisProto.RedisKeyV2;
-
-public interface RedisKeySerializerV2 {
-
- byte[] serialize(RedisKeyV2 key);
-}
diff --git a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerTest.java
deleted file mode 100644
index e663cf8..0000000
--- a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- * Copyright 2018-2020 The Feast Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package feast.storage.connectors.redis.serializer;
-
-import static org.junit.Assert.*;
-
-import feast.proto.storage.RedisProto.RedisKeyV2;
-import feast.proto.types.ValueProto;
-import org.junit.Test;
-
-public class RedisKeyPrefixSerializerTest {
-
- private RedisKeyV2 key =
- RedisKeyV2.newBuilder()
- .addEntityNames("entity1")
- .addEntityValues(ValueProto.Value.newBuilder().setInt64Val(1))
- .build();
-
- @Test
- public void shouldPrependKey() {
- RedisKeyPrefixSerializerV2 serializer = new RedisKeyPrefixSerializerV2("namespace:");
- String keyWithPrefix = new String(serializer.serialize(key));
- assertEquals(String.format("namespace:%s", new String(key.toByteArray())), keyWithPrefix);
- }
-
- @Test
- public void shouldNotPrependKeyIfEmptyString() {
- RedisKeyPrefixSerializerV2 serializer = new RedisKeyPrefixSerializerV2("");
- assertArrayEquals(key.toByteArray(), serializer.serialize(key));
- }
-}