diff --git a/.github/workflows/mirror.yml b/.github/workflows/mirror.yml new file mode 100644 index 0000000..2acd1cd --- /dev/null +++ b/.github/workflows/mirror.yml @@ -0,0 +1,24 @@ +name: mirror + +on: + push: + branches: master + tags: + - 'v*.*.*' + +jobs: + mirror: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + fetch-depth: 0 + - uses: webfactory/ssh-agent@v0.4.1 + with: + ssh-private-key: ${{ secrets.MIRROR_SSH_KEY }} + - name: Mirror all origin branches and tags to internal repo + run: | + export GIT_SSH_COMMAND="ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no" + git remote add internal ${{ secrets.INTERNAL_REPO }} + git push internal --all -f + git push internal --tags -f diff --git a/common/pom.xml b/common/pom.xml index ce89e9c..99008e1 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -125,7 +125,12 @@ com.google.auth google-auth-library-oauth2-http - + + sh.ory.keto + keto-client + 0.5.7-alpha.1.pre.0 + + org.openapitools diff --git a/common/src/main/java/feast/common/auth/config/SecurityConfig.java b/common/src/main/java/feast/common/auth/config/SecurityConfig.java index aa7f8a2..94a4018 100644 --- a/common/src/main/java/feast/common/auth/config/SecurityConfig.java +++ b/common/src/main/java/feast/common/auth/config/SecurityConfig.java @@ -19,6 +19,7 @@ import feast.common.auth.authentication.DefaultJwtAuthenticationProvider; import feast.common.auth.authorization.AuthorizationProvider; import feast.common.auth.providers.http.HttpAuthorizationProvider; +import feast.common.auth.providers.keto.KetoAuthorizationProvider; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -107,12 +108,40 @@ AccessDecisionManager accessDecisionManager() { AuthorizationProvider authorizationProvider() { if (securityProperties.getAuthentication().isEnabled() && securityProperties.getAuthorization().isEnabled()) { + // Merge authentication and authorization options to create HttpAuthorizationProvider. + Map options = securityProperties.getAuthorization().getOptions(); + + options.putAll(securityProperties.getAuthentication().getOptions()); switch (securityProperties.getAuthorization().getProvider()) { case "http": - // Merge authenticatoin and authorization options to create HttpAuthorizationProvider. - Map options = securityProperties.getAuthorization().getOptions(); - options.putAll(securityProperties.getAuthentication().getOptions()); return new HttpAuthorizationProvider(options); + case "keto": + String subjectClaim = + options.get(SecurityProperties.AuthenticationProperties.SUBJECT_CLAIM); + String flavor = options.get("flavor"); + String action = options.get("action"); + String subjectPrefix = options.get("subjectPrefix"); + String resourcePrefix = options.get("resourcePrefix"); + + KetoAuthorizationProvider.Builder builder = + new KetoAuthorizationProvider.Builder(options.get("authorizationUrl")); + if (subjectClaim != null) { + builder = builder.withSubjectClaim(subjectClaim); + } + if (flavor != null) { + builder = builder.withFlavor(flavor); + } + if (action != null) { + builder = builder.withAction(action); + } + if (subjectPrefix != null) { + builder = builder.withSubjectPrefix(subjectPrefix); + } + if (resourcePrefix != null) { + builder = builder.withResourcePrefix(resourcePrefix); + } + + return builder.build(); default: throw new IllegalArgumentException( "Please configure an Authorization Provider if you have enabled authorization."); diff --git a/common/src/main/java/feast/common/auth/config/SecurityProperties.java b/common/src/main/java/feast/common/auth/config/SecurityProperties.java index 135cc4b..f48d734 100644 --- a/common/src/main/java/feast/common/auth/config/SecurityProperties.java +++ b/common/src/main/java/feast/common/auth/config/SecurityProperties.java @@ -53,7 +53,7 @@ public static class AuthorizationProperties { private boolean enabled; // Named authorization provider to use. - @OneOfStrings({"none", "http"}) + @OneOfStrings({"none", "http", "keto"}) private String provider; // K/V options to initialize the provider with diff --git a/common/src/main/java/feast/common/auth/providers/keto/KetoAuthorizationProvider.java b/common/src/main/java/feast/common/auth/providers/keto/KetoAuthorizationProvider.java new file mode 100644 index 0000000..05fcd3c --- /dev/null +++ b/common/src/main/java/feast/common/auth/providers/keto/KetoAuthorizationProvider.java @@ -0,0 +1,164 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2021 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.common.auth.providers.keto; + +import feast.common.auth.authorization.AuthorizationProvider; +import feast.common.auth.authorization.AuthorizationResult; +import feast.common.auth.utils.AuthUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.security.core.Authentication; +import sh.ory.keto.ApiClient; +import sh.ory.keto.ApiException; +import sh.ory.keto.Configuration; +import sh.ory.keto.api.EnginesApi; +import sh.ory.keto.model.OryAccessControlPolicyAllowedInput; + +public class KetoAuthorizationProvider implements AuthorizationProvider { + + /** Builder for KetoAuthorizationProvider */ + public static class Builder { + private final String url; + private String subjectClaim = "email"; + private String flavor = "glob"; + private String action = "edit"; + private String subjectPrefix = ""; + private String resourcePrefix = ""; + + /** + * Initialized builder for Keto authorization provider. + * + * @param url Url string for Keto server. + */ + public Builder(String url) { + this.url = url; + } + + /** + * Set subject claim for authentication + * + * @param subjectClaim Subject claim. Default: email. + * @return Returns Builder + */ + public Builder withSubjectClaim(String subjectClaim) { + this.subjectClaim = subjectClaim; + return this; + } + + /** + * Set flavor for Keto authorization. One of [exact, glob regex] + * + * @param flavor Keto authorization flavor. Default: glob. + * @return Returns Builder + */ + public Builder withFlavor(String flavor) { + this.flavor = flavor; + return this; + } + + /** + * Set action that corresponds to the permission to edit a Feast project resource. + * + * @param action Keto action. Default: edit. + * @return Returns Builder + */ + public Builder withAction(String action) { + this.action = action; + return this; + } + + /** + * If set, The subject will be prefixed before sending the request to Keto. Example: + * users:someuser@email.com + * + * @param prefix Subject prefix. Default: Empty string. + * @return Returns Builder + */ + public Builder withSubjectPrefix(String prefix) { + this.subjectPrefix = prefix; + return this; + } + + /** + * If set, The resource will be prefixed before sending the request to Keto. Example: + * projects:somefeastproject + * + * @param prefix Resource prefix. Default: Empty string. + * @return Returns Builder + */ + public Builder withResourcePrefix(String prefix) { + this.resourcePrefix = prefix; + return this; + } + + /** + * Build KetoAuthorizationProvider + * + * @return Returns KetoAuthorizationProvider + */ + public KetoAuthorizationProvider build() { + return new KetoAuthorizationProvider(this); + } + } + + private static final Logger log = LoggerFactory.getLogger(KetoAuthorizationProvider.class); + + private final EnginesApi apiInstance; + private final String subjectClaim; + private final String flavor; + private final String action; + private final String subjectPrefix; + private final String resourcePrefix; + + private KetoAuthorizationProvider(Builder builder) { + ApiClient defaultClient = Configuration.getDefaultApiClient(); + defaultClient.setBasePath(builder.url); + apiInstance = new EnginesApi(defaultClient); + subjectClaim = builder.subjectClaim; + flavor = builder.flavor; + action = builder.action; + subjectPrefix = builder.subjectPrefix; + resourcePrefix = builder.resourcePrefix; + } + + @Override + public AuthorizationResult checkAccessToProject(String projectId, Authentication authentication) { + String subject = AuthUtils.getSubjectFromAuth(authentication, subjectClaim); + OryAccessControlPolicyAllowedInput body = new OryAccessControlPolicyAllowedInput(); + body.setAction(action); + body.setSubject(String.format("%s%s", subjectPrefix, subject)); + body.setResource(String.format("%s%s", resourcePrefix, projectId)); + try { + sh.ory.keto.model.AuthorizationResult authResult = + apiInstance.doOryAccessControlPoliciesAllow(flavor, body); + if (authResult == null) { + throw new RuntimeException( + String.format( + "Empty response returned for access to project %s for subject %s", + projectId, subject)); + } + if (authResult.getAllowed()) { + return AuthorizationResult.success(); + } + } catch (ApiException e) { + log.error("API exception has occurred during authorization: {}", e.getMessage(), e); + } + + return AuthorizationResult.failed( + String.format("Access denied to project %s for subject %s", projectId, subject)); + } +} diff --git a/core/pom.xml b/core/pom.xml index 7a34b79..1c33e15 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -298,7 +298,6 @@ sh.ory.keto keto-client 0.4.4-alpha.1 - test com.github.tomakehurst diff --git a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java index efdf0fc..648195a 100644 --- a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java +++ b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java @@ -311,14 +311,14 @@ public void applyFeatureTable( String.format( "ApplyFeatureTable: Unable to apply Feature Table due to a conflict: " + "Ensure that name is unique within Project: (name: %s, project: %s)", - projectName, tableName)); + tableName, projectName)); responseObserver.onError( Status.ALREADY_EXISTS.withDescription(e.getMessage()).withCause(e).asRuntimeException()); } catch (IllegalArgumentException e) { log.error( String.format( "ApplyFeatureTable: Invalid apply Feature Table Request: (name: %s, project: %s)", - projectName, tableName)); + tableName, projectName)); responseObserver.onError( Status.INVALID_ARGUMENT .withDescription(e.getMessage()) @@ -328,7 +328,7 @@ public void applyFeatureTable( log.error( String.format( "ApplyFeatureTable: Unsupported apply Feature Table Request: (name: %s, project: %s)", - projectName, tableName)); + tableName, projectName)); responseObserver.onError( Status.UNIMPLEMENTED.withDescription(e.getMessage()).withCause(e).asRuntimeException()); } catch (Exception e) { diff --git a/core/src/main/java/feast/core/model/FeatureTable.java b/core/src/main/java/feast/core/model/FeatureTable.java index b442d57..479c11e 100644 --- a/core/src/main/java/feast/core/model/FeatureTable.java +++ b/core/src/main/java/feast/core/model/FeatureTable.java @@ -359,13 +359,13 @@ public void delete() { public String protoHash() { List sortedEntities = - this.getEntities().stream().map(entity -> entity.getName()).collect(Collectors.toList()); - Collections.sort(sortedEntities); + this.getEntities().stream().map(EntityV2::getName).sorted().collect(Collectors.toList()); - List sortedFeatures = new ArrayList(this.getFeatures()); List sortedFeatureSpecs = - sortedFeatures.stream().map(featureV2 -> featureV2.toProto()).collect(Collectors.toList()); - sortedFeatures.sort(Comparator.comparing(FeatureV2::getName)); + this.getFeatures().stream() + .sorted(Comparator.comparing(FeatureV2::getName)) + .map(FeatureV2::toProto) + .collect(Collectors.toList()); DataSourceProto.DataSource streamSource = DataSourceProto.DataSource.getDefaultInstance(); if (getStreamSource() != null) { diff --git a/core/src/test/java/feast/core/auth/CoreServiceKetoAuthorizationIT.java b/core/src/test/java/feast/core/auth/CoreServiceKetoAuthorizationIT.java new file mode 100644 index 0000000..0a09cce --- /dev/null +++ b/core/src/test/java/feast/core/auth/CoreServiceKetoAuthorizationIT.java @@ -0,0 +1,352 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.auth; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.testcontainers.containers.wait.strategy.Wait.forHttp; + +import avro.shaded.com.google.common.collect.ImmutableMap; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.junit.WireMockClassRule; +import com.google.protobuf.InvalidProtocolBufferException; +import com.nimbusds.jose.JOSEException; +import com.nimbusds.jose.jwk.JWKSet; +import feast.common.it.BaseIT; +import feast.common.it.DataGenerator; +import feast.common.it.SimpleCoreClient; +import feast.core.auth.infra.JwtHelper; +import feast.core.config.FeastProperties; +import feast.proto.core.CoreServiceGrpc; +import feast.proto.core.EntityProto; +import feast.proto.types.ValueProto; +import io.grpc.CallCredentials; +import io.grpc.Channel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.StatusRuntimeException; +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.springframework.util.SocketUtils; +import org.testcontainers.containers.DockerComposeContainer; +import sh.ory.keto.ApiClient; +import sh.ory.keto.ApiException; +import sh.ory.keto.Configuration; +import sh.ory.keto.api.EnginesApi; +import sh.ory.keto.model.OryAccessControlPolicy; +import sh.ory.keto.model.OryAccessControlPolicyRole; + +@SpringBootTest( + properties = { + "feast.security.authentication.enabled=true", + "feast.security.authorization.enabled=true", + "feast.security.authorization.provider=keto", + "feast.security.authorization.options.action=actions:any", + "feast.security.authorization.options.subjectPrefix=users:", + "feast.security.authorization.options.resourcePrefix=resources:projects:", + }) +public class CoreServiceKetoAuthorizationIT extends BaseIT { + + @Autowired FeastProperties feastProperties; + + private static final String DEFAULT_FLAVOR = "glob"; + private static int KETO_PORT = 4466; + private static int feast_core_port; + private static int JWKS_PORT = SocketUtils.findAvailableTcpPort(); + + private static JwtHelper jwtHelper = new JwtHelper(); + + static String project = "myproject"; + static String subjectInProject = "good_member@example.com"; + static String subjectIsAdmin = "bossman@example.com"; + static String subjectClaim = "sub"; + + static SimpleCoreClient insecureApiClient; + + @ClassRule public static WireMockClassRule wireMockRule = new WireMockClassRule(JWKS_PORT); + + @Rule public WireMockClassRule instanceRule = wireMockRule; + + @ClassRule + public static DockerComposeContainer environment = + new DockerComposeContainer(new File("src/test/resources/keto/docker-compose.yml")) + .withExposedService("keto_1", KETO_PORT, forHttp("/health/ready").forStatusCode(200)); + + @DynamicPropertySource + static void initialize(DynamicPropertyRegistry registry) { + + // Start Keto and with Docker Compose + environment.start(); + + // Seed Keto with data + String ketoExternalHost = environment.getServiceHost("keto_1", KETO_PORT); + Integer ketoExternalPort = environment.getServicePort("keto_1", KETO_PORT); + String ketoExternalUrl = String.format("http://%s:%s", ketoExternalHost, ketoExternalPort); + try { + seedKeto(ketoExternalUrl); + } catch (ApiException e) { + throw new RuntimeException(String.format("Could not seed Keto store %s", ketoExternalUrl)); + } + + // Start Wiremock Server to act as fake JWKS server + wireMockRule.start(); + JWKSet keySet = jwtHelper.getKeySet(); + String jwksJson = String.valueOf(keySet.toPublicJWKSet().toJSONObject()); + + // When Feast Core looks up a Json Web Token Key Set, we provide our self-signed public key + wireMockRule.stubFor( + WireMock.get(WireMock.urlPathEqualTo("/.well-known/jwks.json")) + .willReturn( + WireMock.aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody(jwksJson))); + + String jwkEndpointURI = + String.format("http://localhost:%s/.well-known/jwks.json", wireMockRule.port()); + + // Initialize dynamic properties + registry.add("feast.security.authentication.options.subjectClaim", () -> subjectClaim); + registry.add("feast.security.authentication.options.jwkEndpointURI", () -> jwkEndpointURI); + registry.add("feast.security.authorization.options.authorizationUrl", () -> ketoExternalUrl); + registry.add("feast.security.authorization.options.flavor", () -> DEFAULT_FLAVOR); + } + + @BeforeAll + public static void globalSetUp(@Value("${grpc.server.port}") int port) { + feast_core_port = port; + // Create insecure Feast Core gRPC client + Channel insecureChannel = + ManagedChannelBuilder.forAddress("localhost", feast_core_port).usePlaintext().build(); + CoreServiceGrpc.CoreServiceBlockingStub insecureCoreService = + CoreServiceGrpc.newBlockingStub(insecureChannel); + insecureApiClient = new SimpleCoreClient(insecureCoreService); + } + + @BeforeEach + public void setUp() { + SimpleCoreClient secureApiClient = getSecureApiClient(subjectIsAdmin); + EntityProto.EntitySpecV2 expectedEntitySpec = + DataGenerator.createEntitySpecV2( + "entity1", + "Entity 1 description", + ValueProto.ValueType.Enum.STRING, + ImmutableMap.of("label_key", "label_value")); + secureApiClient.simpleApplyEntity(project, expectedEntitySpec); + } + + @AfterAll + static void tearDown() { + environment.stop(); + wireMockRule.stop(); + } + + @Test + public void shouldGetVersionFromFeastCoreAlways() { + SimpleCoreClient secureApiClient = + getSecureApiClient("fakeUserThatIsAuthenticated@example.com"); + + String feastCoreVersionSecure = secureApiClient.getFeastCoreVersion(); + String feastCoreVersionInsecure = insecureApiClient.getFeastCoreVersion(); + + assertEquals(feastCoreVersionSecure, feastCoreVersionInsecure); + assertEquals(feastProperties.getVersion(), feastCoreVersionSecure); + } + + @Test + public void shouldNotAllowUnauthenticatedEntityListing() { + Exception exception = + assertThrows( + StatusRuntimeException.class, + () -> { + insecureApiClient.simpleListEntities("8"); + }); + + String expectedMessage = "UNAUTHENTICATED: Authentication failed"; + String actualMessage = exception.getMessage(); + assertEquals(actualMessage, expectedMessage); + } + + @Test + public void shouldAllowAuthenticatedEntityListing() { + SimpleCoreClient secureApiClient = + getSecureApiClient("AuthenticatedUserWithoutAuthorization@example.com"); + EntityProto.EntitySpecV2 expectedEntitySpec = + DataGenerator.createEntitySpecV2( + "entity1", + "Entity 1 description", + ValueProto.ValueType.Enum.STRING, + ImmutableMap.of("label_key", "label_value")); + List listEntitiesResponse = secureApiClient.simpleListEntities("myproject"); + EntityProto.Entity actualEntity = listEntitiesResponse.get(0); + + assert listEntitiesResponse.size() == 1; + assertEquals(actualEntity.getSpec().getName(), expectedEntitySpec.getName()); + } + + @Test + void cantApplyEntityIfNotProjectMember() throws InvalidProtocolBufferException { + String userName = "random_user@example.com"; + SimpleCoreClient secureApiClient = getSecureApiClient(userName); + EntityProto.EntitySpecV2 expectedEntitySpec = + DataGenerator.createEntitySpecV2( + "entity1", + "Entity 1 description", + ValueProto.ValueType.Enum.STRING, + ImmutableMap.of("label_key", "label_value")); + + StatusRuntimeException exception = + assertThrows( + StatusRuntimeException.class, + () -> secureApiClient.simpleApplyEntity(project, expectedEntitySpec)); + + String expectedMessage = + String.format( + "PERMISSION_DENIED: Access denied to project %s for subject %s", project, userName); + String actualMessage = exception.getMessage(); + assertEquals(actualMessage, expectedMessage); + } + + @Test + void canApplyEntityIfProjectMember() { + SimpleCoreClient secureApiClient = getSecureApiClient(subjectInProject); + EntityProto.EntitySpecV2 expectedEntitySpec = + DataGenerator.createEntitySpecV2( + "entity_6", + "Entity 1 description", + ValueProto.ValueType.Enum.STRING, + ImmutableMap.of("label_key", "label_value")); + + secureApiClient.simpleApplyEntity(project, expectedEntitySpec); + + EntityProto.Entity actualEntity = secureApiClient.simpleGetEntity(project, "entity_6"); + + assertEquals(expectedEntitySpec.getName(), actualEntity.getSpec().getName()); + assertEquals(expectedEntitySpec.getValueType(), actualEntity.getSpec().getValueType()); + } + + @Test + void canApplyEntityIfAdmin() { + SimpleCoreClient secureApiClient = getSecureApiClient(subjectIsAdmin); + EntityProto.EntitySpecV2 expectedEntitySpec = + DataGenerator.createEntitySpecV2( + "entity_7", + "Entity 1 description", + ValueProto.ValueType.Enum.STRING, + ImmutableMap.of("label_key", "label_value")); + + secureApiClient.simpleApplyEntity(project, expectedEntitySpec); + + EntityProto.Entity actualEntity = secureApiClient.simpleGetEntity(project, "entity_7"); + + assertEquals(expectedEntitySpec.getName(), actualEntity.getSpec().getName()); + assertEquals(expectedEntitySpec.getValueType(), actualEntity.getSpec().getValueType()); + } + + @TestConfiguration + public static class TestConfig extends BaseTestConfig {} + + private static void seedKeto(String url) throws ApiException { + ApiClient ketoClient = Configuration.getDefaultApiClient(); + ketoClient.setBasePath(url); + EnginesApi enginesApi = new EnginesApi(ketoClient); + + // Add policies + OryAccessControlPolicy adminPolicy = getAdminPolicy(); + enginesApi.upsertOryAccessControlPolicy(DEFAULT_FLAVOR, adminPolicy); + + OryAccessControlPolicy projectPolicy = getMyProjectMemberPolicy(); + enginesApi.upsertOryAccessControlPolicy(DEFAULT_FLAVOR, projectPolicy); + + // Add policy roles + OryAccessControlPolicyRole adminPolicyRole = getAdminPolicyRole(); + enginesApi.upsertOryAccessControlPolicyRole(DEFAULT_FLAVOR, adminPolicyRole); + + OryAccessControlPolicyRole myProjectMemberPolicyRole = getMyProjectMemberPolicyRole(); + enginesApi.upsertOryAccessControlPolicyRole(DEFAULT_FLAVOR, myProjectMemberPolicyRole); + } + + private static OryAccessControlPolicyRole getMyProjectMemberPolicyRole() { + OryAccessControlPolicyRole role = new OryAccessControlPolicyRole(); + role.setId(String.format("roles:%s-project-members", project)); + role.setMembers(Collections.singletonList("users:" + subjectInProject)); + return role; + } + + private static OryAccessControlPolicyRole getAdminPolicyRole() { + OryAccessControlPolicyRole role = new OryAccessControlPolicyRole(); + role.setId("roles:admin"); + role.setMembers(Collections.singletonList("users:" + subjectIsAdmin)); + return role; + } + + private static OryAccessControlPolicy getAdminPolicy() { + OryAccessControlPolicy policy = new OryAccessControlPolicy(); + policy.setId("policies:admin"); + policy.subjects(Collections.singletonList("roles:admin")); + policy.resources(Collections.singletonList("resources:**")); + policy.actions(Collections.singletonList("actions:**")); + policy.effect("allow"); + policy.conditions(null); + return policy; + } + + private static OryAccessControlPolicy getMyProjectMemberPolicy() { + OryAccessControlPolicy policy = new OryAccessControlPolicy(); + policy.setId(String.format("policies:%s-project-members-policy", project)); + policy.subjects(Collections.singletonList(String.format("roles:%s-project-members", project))); + policy.resources( + Arrays.asList( + String.format("resources:projects:%s", project), + String.format("resources:projects:%s:**", project))); + policy.actions(Collections.singletonList("actions:**")); + policy.effect("allow"); + policy.conditions(null); + return policy; + } + + // Create secure Feast Core gRPC client for a specific user + private static SimpleCoreClient getSecureApiClient(String subjectEmail) { + CallCredentials callCredentials = null; + try { + callCredentials = jwtHelper.getCallCredentials(subjectEmail); + } catch (JOSEException e) { + throw new RuntimeException( + String.format("Could not build call credentials: %s", e.getMessage())); + } + Channel secureChannel = + ManagedChannelBuilder.forAddress("localhost", feast_core_port).usePlaintext().build(); + + CoreServiceGrpc.CoreServiceBlockingStub secureCoreService = + CoreServiceGrpc.newBlockingStub(secureChannel).withCallCredentials(callCredentials); + + return new SimpleCoreClient(secureCoreService); + } +} diff --git a/core/src/test/resources/keto/docker-compose.yml b/core/src/test/resources/keto/docker-compose.yml index 4d0fc43..714f83e 100644 --- a/core/src/test/resources/keto/docker-compose.yml +++ b/core/src/test/resources/keto/docker-compose.yml @@ -7,6 +7,7 @@ services: image: oryd/keto:v0.4.3-alpha.2 environment: - DSN=postgres://keto:keto@db:5432/keto?sslmode=disable + - SERVE_CORS_ENABLED=true command: - serve ports: @@ -32,11 +33,11 @@ services: adaptor: depends_on: - - keto + - keto image: gcr.io/kf-feast/feast-keto-auth-server:latest environment: SERVER_PORT: 8080 KETO_URL: http://keto:4466 ports: - - 8080 - restart: on-failure \ No newline at end of file + - 8080 + restart: on-failure diff --git a/datatypes/java/README.md b/datatypes/java/README.md index a04c729..10c3e50 100644 --- a/datatypes/java/README.md +++ b/datatypes/java/README.md @@ -16,7 +16,7 @@ Dependency Coordinates dev.feast datatypes-java - 0.25.0-SNAPSHOT + 0.25.1-SNAPSHOT ``` diff --git a/infra/charts/feast-core/Chart.yaml b/infra/charts/feast-core/Chart.yaml index 9dc00ab..41d988a 100644 --- a/infra/charts/feast-core/Chart.yaml +++ b/infra/charts/feast-core/Chart.yaml @@ -1,10 +1,10 @@ apiVersion: v1 description: "Feast Core: Feature registry for Feast." name: feast-core -version: 0.25.0 -appVersion: 0.25.0 +version: 0.25.2 +appVersion: 0.25.2 keywords: - machine learning - big data - mlops -home: https://github.com/feast-dev/feast-java \ No newline at end of file +home: https://github.com/feast-dev/feast-java diff --git a/infra/charts/feast-core/README.md b/infra/charts/feast-core/README.md index d248710..5455083 100644 --- a/infra/charts/feast-core/README.md +++ b/infra/charts/feast-core/README.md @@ -2,7 +2,7 @@ feast-core ========== Feast Core: Feature registry for Feast. -Current chart version is `0.25.0` +Current chart version is `0.25.1` Source code can be found [here](https://github.com/feast-dev/feast-java) diff --git a/infra/charts/feast-serving/Chart.yaml b/infra/charts/feast-serving/Chart.yaml index 5e2d683..0fbe112 100644 --- a/infra/charts/feast-serving/Chart.yaml +++ b/infra/charts/feast-serving/Chart.yaml @@ -1,10 +1,10 @@ apiVersion: v1 description: "Feast Serving: Online feature serving service for Feast" name: feast-serving -version: 0.25.0 -appVersion: 0.25.0 +version: 0.25.2 +appVersion: 0.25.2 keywords: - machine learning - big data - mlops -home: https://github.com/feast-dev/feast-java \ No newline at end of file +home: https://github.com/feast-dev/feast-java diff --git a/infra/charts/feast-serving/README.md b/infra/charts/feast-serving/README.md index c46fa2f..23d60a3 100644 --- a/infra/charts/feast-serving/README.md +++ b/infra/charts/feast-serving/README.md @@ -2,7 +2,7 @@ feast-serving ============= Feast Serving: Online feature serving service for Feast -Current chart version is `0.25.0` +Current chart version is `0.25.1` Source code can be found [here](https://github.com/feast-dev/feast-java) diff --git a/pom.xml b/pom.xml index 2e002de..d7de307 100644 --- a/pom.xml +++ b/pom.xml @@ -40,7 +40,7 @@ - 0.25.0-SNAPSHOT + 0.25.1-SNAPSHOT https://github.com/feast-dev/feast UTF-8 diff --git a/serving/src/main/java/feast/serving/config/FeastProperties.java b/serving/src/main/java/feast/serving/config/FeastProperties.java index bf04845..3b8548a 100644 --- a/serving/src/main/java/feast/serving/config/FeastProperties.java +++ b/serving/src/main/java/feast/serving/config/FeastProperties.java @@ -21,24 +21,18 @@ // https://www.baeldung.com/configuration-properties-in-spring-boot // https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-external-config.html#boot-features-external-config-typesafe-configuration-properties -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.util.JsonFormat; import feast.common.auth.config.SecurityProperties; import feast.common.auth.config.SecurityProperties.AuthenticationProperties; import feast.common.auth.config.SecurityProperties.AuthorizationProperties; import feast.common.auth.credentials.CoreAuthenticationProperties; import feast.common.logging.config.LoggingProperties; -import feast.proto.core.StoreProto; +import feast.storage.connectors.redis.retriever.RedisClusterStoreConfig; +import feast.storage.connectors.redis.retriever.RedisStoreConfig; +import io.lettuce.core.ReadFrom; +import java.time.Duration; import java.util.*; -import java.util.stream.Collectors; import javax.annotation.PostConstruct; -import javax.validation.ConstraintViolation; -import javax.validation.ConstraintViolationException; -import javax.validation.Validation; -import javax.validation.Validator; -import javax.validation.ValidatorFactory; +import javax.validation.*; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; import javax.validation.constraints.Positive; @@ -146,9 +140,6 @@ public void setActiveStore(String activeStore) { */ private List stores = new ArrayList<>(); - /* Job Store properties to retain state of async jobs. */ - private JobStoreProperties jobStore; - /* Metric tracing properties. */ private TracingProperties tracing; @@ -259,8 +250,6 @@ public static class Store { private Map config = new HashMap<>(); - private List subscriptions = new ArrayList<>(); - /** * Gets name of this store. This is unique to this specific instance. * @@ -280,12 +269,12 @@ public void setName(String name) { } /** - * Gets the store type. Example are REDIS or BIGQUERY + * Gets the store type. Example are REDIS or REDIS_CLUSTER * * @return the store type as a String. */ - public String getType() { - return type; + public StoreType getType() { + return StoreType.valueOf(this.type); } /** @@ -297,64 +286,6 @@ public void setType(String type) { this.type = type; } - /** - * Converts this {@link Store} to a {@link StoreProto.Store} - * - * @return {@link StoreProto.Store} with configuration set - * @throws InvalidProtocolBufferException the invalid protocol buffer exception - * @throws JsonProcessingException the json processing exception - */ - public StoreProto.Store toProto() - throws InvalidProtocolBufferException, JsonProcessingException { - List subscriptions = getSubscriptions(); - List subscriptionProtos = - subscriptions.stream().map(Subscription::toProto).collect(Collectors.toList()); - - StoreProto.Store.Builder storeProtoBuilder = - StoreProto.Store.newBuilder() - .setName(name) - .setType(StoreProto.Store.StoreType.valueOf(type)) - .addAllSubscriptions(subscriptionProtos); - - ObjectMapper jsonWriter = new ObjectMapper(); - - // TODO: All of this logic should be moved to the store layer. Only a Map - // should be sent to a store and it should do its own validation. - switch (StoreProto.Store.StoreType.valueOf(type)) { - case REDIS_CLUSTER: - StoreProto.Store.RedisClusterConfig.Builder redisClusterConfig = - StoreProto.Store.RedisClusterConfig.newBuilder(); - JsonFormat.parser().merge(jsonWriter.writeValueAsString(config), redisClusterConfig); - return storeProtoBuilder.setRedisClusterConfig(redisClusterConfig.build()).build(); - case REDIS: - StoreProto.Store.RedisConfig.Builder redisConfig = - StoreProto.Store.RedisConfig.newBuilder(); - JsonFormat.parser().merge(jsonWriter.writeValueAsString(config), redisConfig); - return storeProtoBuilder.setRedisConfig(redisConfig.build()).build(); - default: - throw new InvalidProtocolBufferException("Invalid store set"); - } - } - - /** - * Get the subscriptions to this specific store. The subscriptions indicate which feature sets a - * store subscribes to. - * - * @return List of subscriptions. - */ - public List getSubscriptions() { - return subscriptions; - } - - /** - * Sets the store specific configuration. See getSubscriptions() for more details. - * - * @param subscriptions the subscriptions list - */ - public void setSubscriptions(List subscriptions) { - this.subscriptions = subscriptions; - } - /** * Gets the configuration to this specific store. This is a map of strings. These options are * unique to the store. Please see protos/feast/core/Store.proto for the store specific @@ -366,6 +297,20 @@ public Map getConfig() { return config; } + public RedisClusterStoreConfig getRedisClusterConfig() { + return new RedisClusterStoreConfig( + this.config.get("connection_string"), + ReadFrom.valueOf(this.config.get("read_from")), + Duration.parse(this.config.get("timeout"))); + } + + public RedisStoreConfig getRedisConfig() { + return new RedisStoreConfig( + this.config.get("host"), + Integer.valueOf(this.config.get("port")), + Boolean.valueOf(this.config.getOrDefault("ssl", "false"))); + } + /** * Sets the store config. Please protos/feast/core/Store.proto for the specific options for each * store. @@ -375,129 +320,11 @@ public Map getConfig() { public void setConfig(Map config) { this.config = config; } - - /** - * The Subscription type. - * - *

Note: Please see protos/feast/core/CoreService.proto for details on how to subscribe to - * feature sets. - */ - public static class Subscription { - /** Feast project to subscribe to. */ - String project; - - /** Feature set to subscribe to. */ - String name; - - /** Feature set versions to subscribe to. */ - String version; - - /** Project/Feature set exclude flag to subscribe to. */ - boolean exclude; - - /** - * Gets Feast project subscribed to. - * - * @return the project string - */ - public String getProject() { - return project; - } - - /** - * Sets Feast project to subscribe to for this store. - * - * @param project the project - */ - public void setProject(String project) { - this.project = project; - } - - /** - * Gets the feature set name to subscribe to. - * - * @return the name - */ - public String getName() { - return name; - } - - /** - * Sets the feature set name to subscribe to. - * - * @param name the name - */ - public void setName(String name) { - this.name = name; - } - - /** - * Gets the feature set version that is being subscribed to by this store. - * - * @return the version - */ - public String getVersion() { - return version; - } - - /** - * Sets the feature set version that is being subscribed to by this store. - * - * @param version the version - */ - public void setVersion(String version) { - this.version = version; - } - - /** - * Gets the exclude flag to subscribe to. - * - * @return the exclude flag - */ - public boolean getExclude() { - return exclude; - } - - /** - * Sets the exclude flag to subscribe to. - * - * @param exclude the exclude flag - */ - public void setExclude(boolean exclude) { - this.exclude = exclude; - } - - /** - * Convert this {@link Subscription} to a {@link StoreProto.Store.Subscription}. - * - * @return the store proto . store . subscription - */ - public StoreProto.Store.Subscription toProto() { - return StoreProto.Store.Subscription.newBuilder() - .setName(getName()) - .setProject(getProject()) - .setExclude(getExclude()) - .build(); - } - } } - /** - * Gets job store properties - * - * @return the job store properties - */ - public JobStoreProperties getJobStore() { - return jobStore; - } - - /** - * Set job store properties - * - * @param jobStore Job store properties to set - */ - public void setJobStore(JobStoreProperties jobStore) { - this.jobStore = jobStore; + public enum StoreType { + REDIS, + REDIS_CLUSTER; } /** @@ -532,52 +359,6 @@ public void setLogging(LoggingProperties logging) { this.logging = logging; } - /** The type Job store properties. */ - public static class JobStoreProperties { - - /** Job Store Redis Host */ - private String redisHost; - - /** Job Store Redis Host */ - private int redisPort; - - /** - * Gets redis host. - * - * @return the redis host - */ - public String getRedisHost() { - return redisHost; - } - - /** - * Sets redis host. - * - * @param redisHost the redis host - */ - public void setRedisHost(String redisHost) { - this.redisHost = redisHost; - } - - /** - * Gets redis port. - * - * @return the redis port - */ - public int getRedisPort() { - return redisPort; - } - - /** - * Sets redis port. - * - * @param redisPort the redis port - */ - public void setRedisPort(int redisPort) { - this.redisPort = redisPort; - } - } - /** Trace metric collection properties */ public static class TracingProperties { diff --git a/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java b/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java index 9ec35c3..518f3a1 100644 --- a/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java +++ b/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java @@ -18,7 +18,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.protobuf.InvalidProtocolBufferException; -import feast.proto.core.StoreProto; import feast.serving.service.OnlineServingServiceV2; import feast.serving.service.ServingServiceV2; import feast.serving.specs.CachedSpecService; @@ -39,26 +38,19 @@ public ServingServiceV2 servingServiceV2( throws InvalidProtocolBufferException, JsonProcessingException { ServingServiceV2 servingService = null; FeastProperties.Store store = feastProperties.getActiveStore(); - StoreProto.Store.StoreType storeType = store.toProto().getType(); - switch (storeType) { + switch (store.getType()) { case REDIS_CLUSTER: RedisClientAdapter redisClusterClient = - RedisClusterClient.create(store.toProto().getRedisClusterConfig()); + RedisClusterClient.create(store.getRedisClusterConfig()); OnlineRetrieverV2 redisClusterRetriever = new OnlineRetriever(redisClusterClient); servingService = new OnlineServingServiceV2(redisClusterRetriever, specService, tracer); break; case REDIS: - RedisClientAdapter redisClient = RedisClient.create(store.toProto().getRedisConfig()); + RedisClientAdapter redisClient = RedisClient.create(store.getRedisConfig()); OnlineRetrieverV2 redisRetriever = new OnlineRetriever(redisClient); servingService = new OnlineServingServiceV2(redisRetriever, specService, tracer); break; - case UNRECOGNIZED: - case INVALID: - throw new IllegalArgumentException( - String.format( - "Unsupported store type '%s' for store name '%s'", - store.getType(), store.getName())); } return servingService; diff --git a/serving/src/main/java/feast/serving/config/SpecServiceConfig.java b/serving/src/main/java/feast/serving/config/SpecServiceConfig.java index b41a0f5..369d543 100644 --- a/serving/src/main/java/feast/serving/config/SpecServiceConfig.java +++ b/serving/src/main/java/feast/serving/config/SpecServiceConfig.java @@ -18,7 +18,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.protobuf.InvalidProtocolBufferException; -import feast.proto.core.StoreProto; import feast.serving.specs.CachedSpecService; import feast.serving.specs.CoreSpecService; import io.grpc.CallCredentials; @@ -61,13 +60,11 @@ public ScheduledExecutorService cachedSpecServiceScheduledExecutorService( } @Bean - public CachedSpecService specService( - FeastProperties feastProperties, ObjectProvider callCredentials) + public CachedSpecService specService(ObjectProvider callCredentials) throws InvalidProtocolBufferException, JsonProcessingException { CoreSpecService coreService = new CoreSpecService(feastCoreHost, feastCorePort, callCredentials); - StoreProto.Store storeProto = feastProperties.getActiveStore().toProto(); - CachedSpecService cachedSpecStorage = new CachedSpecService(coreService, storeProto); + CachedSpecService cachedSpecStorage = new CachedSpecService(coreService); try { cachedSpecStorage.populateCache(); } catch (Exception e) { diff --git a/serving/src/main/java/feast/serving/controller/HealthServiceController.java b/serving/src/main/java/feast/serving/controller/HealthServiceController.java index 6615cf5..4bee981 100644 --- a/serving/src/main/java/feast/serving/controller/HealthServiceController.java +++ b/serving/src/main/java/feast/serving/controller/HealthServiceController.java @@ -16,7 +16,6 @@ */ package feast.serving.controller; -import feast.proto.core.StoreProto.Store; import feast.proto.serving.ServingAPIProto.GetFeastServingInfoRequest; import feast.serving.interceptors.GrpcMonitoringInterceptor; import feast.serving.service.ServingServiceV2; @@ -51,7 +50,6 @@ public void check( // Implement similary for batch service. try { - Store store = specService.getStore(); servingService.getFeastServingInfo(GetFeastServingInfoRequest.getDefaultInstance()); responseObserver.onNext( HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build()); diff --git a/serving/src/main/java/feast/serving/specs/CachedSpecService.java b/serving/src/main/java/feast/serving/specs/CachedSpecService.java index f54e08f..440b224 100644 --- a/serving/src/main/java/feast/serving/specs/CachedSpecService.java +++ b/serving/src/main/java/feast/serving/specs/CachedSpecService.java @@ -26,8 +26,6 @@ import feast.proto.core.FeatureProto; import feast.proto.core.FeatureTableProto.FeatureTable; import feast.proto.core.FeatureTableProto.FeatureTableSpec; -import feast.proto.core.StoreProto; -import feast.proto.core.StoreProto.Store; import feast.proto.serving.ServingAPIProto; import feast.serving.exception.SpecRetrievalException; import io.grpc.StatusRuntimeException; @@ -47,7 +45,6 @@ public class CachedSpecService { private static final String DEFAULT_PROJECT_NAME = "default"; private final CoreSpecService coreService; - private Store store; private static Gauge cacheLastUpdated = Gauge.build() @@ -69,9 +66,8 @@ public class CachedSpecService { ImmutablePair, FeatureProto.FeatureSpecV2> featureCache; - public CachedSpecService(CoreSpecService coreService, StoreProto.Store store) { + public CachedSpecService(CoreSpecService coreService) { this.coreService = coreService; - this.store = coreService.registerStore(store); CacheLoader, FeatureTableSpec> featureTableCacheLoader = CacheLoader.from(k -> retrieveSingleFeatureTable(k.getLeft(), k.getRight())); @@ -85,15 +81,6 @@ public CachedSpecService(CoreSpecService coreService, StoreProto.Store store) { featureCache = CacheBuilder.newBuilder().build(featureCacheLoader); } - /** - * Get the current store configuration. - * - * @return StoreProto.Store store configuration for this serving instance - */ - public Store getStore() { - return this.store; - } - /** * Reload the store configuration from the given config path, then retrieve the necessary specs * from core to preload the cache. diff --git a/serving/src/main/java/feast/serving/specs/CoreSpecService.java b/serving/src/main/java/feast/serving/specs/CoreSpecService.java index 5429d22..eee50d8 100644 --- a/serving/src/main/java/feast/serving/specs/CoreSpecService.java +++ b/serving/src/main/java/feast/serving/specs/CoreSpecService.java @@ -24,7 +24,6 @@ import feast.proto.core.CoreServiceProto.ListProjectsResponse; import feast.proto.core.CoreServiceProto.UpdateStoreRequest; import feast.proto.core.CoreServiceProto.UpdateStoreResponse; -import feast.proto.core.StoreProto.Store; import io.grpc.CallCredentials; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -53,26 +52,6 @@ public UpdateStoreResponse updateStore(UpdateStoreRequest updateStoreRequest) { return blockingStub.updateStore(updateStoreRequest); } - /** - * Register the given store entry in Feast Core. If store already exists in Feast Core, updates - * the store entry in feast core. - * - * @param store entry to register/update in Feast Core. - * @return The register/updated store entry - */ - public Store registerStore(Store store) { - UpdateStoreRequest request = UpdateStoreRequest.newBuilder().setStore(store).build(); - try { - UpdateStoreResponse updateStoreResponse = this.updateStore(request); - if (!updateStoreResponse.getStore().equals(store)) { - throw new RuntimeException("Core store config not matching current store config"); - } - return updateStoreResponse.getStore(); - } catch (Exception e) { - throw new RuntimeException("Unable to update store configuration", e); - } - } - public ListProjectsResponse listProjects(ListProjectsRequest listProjectsRequest) { return blockingStub.listProjects(listProjectsRequest); } diff --git a/serving/src/main/resources/application.yml b/serving/src/main/resources/application.yml index 288ec7e..b20fd8e 100644 --- a/serving/src/main/resources/application.yml +++ b/serving/src/main/resources/application.yml @@ -41,26 +41,19 @@ feast: stores: # Please see https://api.docs.feast.dev/grpc/feast.core.pb.html#Store for configuration options - name: online # Name of the store (referenced by active_store) - type: REDIS # Type of the store. REDIS, REDIS_CLUSTER, BIGQUERY are available options + type: REDIS # Type of the store. REDIS, REDIS_CLUSTER are available options config: # Store specific configuration. See host: localhost port: 6379 # Subscriptions indicate which feature sets needs to be retrieved and used to populate this store - subscriptions: - # Wildcards match all options. No filtering is done. - - name: "*" - project: "*" - name: online_cluster type: REDIS_CLUSTER config: # Store specific configuration. # Connection string specifies the host:port of Redis instances in the redis cluster. connection_string: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005" read_from: MASTER - subscriptions: - - name: "*" - project: "*" - version: "*" - + # Redis operation timeout in ISO-8601 format + timeout: PT0.5S tracing: # If true, Feast will provide tracing data (using OpenTracing API) for various RPC method calls # which can be useful to debug performance issues and perform benchmarking diff --git a/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java b/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java index 57932d4..4e48b64 100644 --- a/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java +++ b/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java @@ -18,8 +18,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -32,7 +30,6 @@ import feast.proto.core.CoreServiceProto.ListProjectsResponse; import feast.proto.core.FeatureTableProto; import feast.proto.core.FeatureTableProto.FeatureTableSpec; -import feast.proto.core.StoreProto.Store; import feast.proto.serving.ServingAPIProto.FeatureReferenceV2; import feast.proto.types.ValueProto; import feast.serving.specs.CachedSpecService; @@ -45,8 +42,6 @@ public class CachedSpecServiceTest { - private Store store; - @Rule public final ExpectedException expectedException = ExpectedException.none(); @Mock CoreSpecService coreService; @@ -63,8 +58,6 @@ public class CachedSpecServiceTest { public void setUp() { initMocks(this); - this.store = Store.newBuilder().build(); - this.setupProject("default"); this.featureTableEntities = ImmutableList.of("entity1"); this.featureTable1Features = @@ -94,8 +87,7 @@ public void setUp() { this.setupFeatureTableAndProject("default"); - when(this.coreService.registerStore(store)).thenReturn(store); - cachedSpecService = new CachedSpecService(this.coreService, this.store); + cachedSpecService = new CachedSpecService(this.coreService); } private void setupProject(String project) { @@ -120,18 +112,6 @@ private void setupFeatureTableAndProject(String project) { .build()); } - @Test - public void shouldRegisterStoreWithCore() { - verify(coreService, times(1)).registerStore(cachedSpecService.getStore()); - } - - @Test - public void shouldPopulateAndReturnStore() { - cachedSpecService.populateCache(); - Store actual = cachedSpecService.getStore(); - assertThat(actual, equalTo(store)); - } - @Test public void shouldPopulateAndReturnDifferentFeatureTables() { // test that CachedSpecService can retrieve fully qualified feature references. diff --git a/storage/connectors/redis/pom.xml b/storage/connectors/redis/pom.xml index bbda8da..f65cbd0 100644 --- a/storage/connectors/redis/pom.xml +++ b/storage/connectors/redis/pom.xml @@ -16,6 +16,14 @@ io.lettuce lettuce-core + 6.0.2.RELEASE + + + + io.netty + netty-transport-native-epoll + 4.1.52.Final + linux-x86_64 diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClient.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClient.java index 5a7f4b7..faa8e96 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClient.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClient.java @@ -16,7 +16,6 @@ */ package feast.storage.connectors.redis.retriever; -import feast.proto.core.StoreProto; import io.lettuce.core.KeyValue; import io.lettuce.core.RedisFuture; import io.lettuce.core.RedisURI; @@ -46,7 +45,7 @@ private RedisClient(StatefulRedisConnection connection) { this.asyncCommands.setAutoFlushCommands(false); } - public static RedisClientAdapter create(StoreProto.Store.RedisConfig config) { + public static RedisClientAdapter create(RedisStoreConfig config) { RedisURI uri = RedisURI.create(config.getHost(), config.getPort()); diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java index aeb8220..5395b72 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterClient.java @@ -16,36 +16,19 @@ */ package feast.storage.connectors.redis.retriever; -import com.google.common.collect.ImmutableMap; -import feast.proto.core.StoreProto; -import feast.proto.core.StoreProto.Store.RedisClusterConfig; -import feast.storage.connectors.redis.serializer.RedisKeyPrefixSerializerV2; -import feast.storage.connectors.redis.serializer.RedisKeySerializerV2; -import io.lettuce.core.KeyValue; -import io.lettuce.core.ReadFrom; -import io.lettuce.core.RedisFuture; -import io.lettuce.core.RedisURI; +import io.lettuce.core.*; +import io.lettuce.core.cluster.ClusterClientOptions; +import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; import io.lettuce.core.codec.ByteArrayCodec; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; -import javax.annotation.Nullable; public class RedisClusterClient implements RedisClientAdapter { private final RedisAdvancedClusterAsyncCommands asyncCommands; - private final RedisKeySerializerV2 serializer; - @Nullable private final RedisKeySerializerV2 fallbackSerializer; - - private static final Map PROTO_TO_LETTUCE_TYPES = - ImmutableMap.of( - RedisClusterConfig.ReadFrom.MASTER, ReadFrom.MASTER, - RedisClusterConfig.ReadFrom.MASTER_PREFERRED, ReadFrom.MASTER_PREFERRED, - RedisClusterConfig.ReadFrom.REPLICA, ReadFrom.REPLICA, - RedisClusterConfig.ReadFrom.REPLICA_PREFERRED, ReadFrom.REPLICA_PREFERRED); @Override public RedisFuture>> hmget(byte[] key, byte[]... fields) { @@ -59,19 +42,9 @@ public void flushCommands() { static class Builder { private final StatefulRedisClusterConnection connection; - private final RedisKeySerializerV2 serializer; - @Nullable private RedisKeySerializerV2 fallbackSerializer; - Builder( - StatefulRedisClusterConnection connection, - RedisKeySerializerV2 serializer) { + Builder(StatefulRedisClusterConnection connection) { this.connection = connection; - this.serializer = serializer; - } - - Builder withFallbackSerializer(RedisKeySerializerV2 fallbackSerializer) { - this.fallbackSerializer = fallbackSerializer; - return this; } RedisClusterClient build() { @@ -81,8 +54,6 @@ RedisClusterClient build() { private RedisClusterClient(Builder builder) { this.asyncCommands = builder.connection.async(); - this.serializer = builder.serializer; - this.fallbackSerializer = builder.fallbackSerializer; // allows reading from replicas this.asyncCommands.readOnly(); @@ -91,7 +62,7 @@ private RedisClusterClient(Builder builder) { this.asyncCommands.setAutoFlushCommands(false); } - public static RedisClientAdapter create(StoreProto.Store.RedisClusterConfig config) { + public static RedisClientAdapter create(RedisClusterStoreConfig config) { List redisURIList = Arrays.stream(config.getConnectionString().split(",")) .map( @@ -100,22 +71,22 @@ public static RedisClientAdapter create(StoreProto.Store.RedisClusterConfig conf return RedisURI.create(hostPortSplit[0], Integer.parseInt(hostPortSplit[1])); }) .collect(Collectors.toList()); - StatefulRedisClusterConnection connection = - io.lettuce.core.cluster.RedisClusterClient.create(redisURIList) - .connect(new ByteArrayCodec()); - connection.setReadFrom(PROTO_TO_LETTUCE_TYPES.get(config.getReadFrom())); + io.lettuce.core.cluster.RedisClusterClient client = + io.lettuce.core.cluster.RedisClusterClient.create(redisURIList); + client.setOptions( + ClusterClientOptions.builder() + .socketOptions(SocketOptions.builder().keepAlive(true).tcpNoDelay(true).build()) + .timeoutOptions(TimeoutOptions.enabled(config.getTimeout())) + .pingBeforeActivateConnection(true) + .topologyRefreshOptions( + ClusterTopologyRefreshOptions.builder().enableAllAdaptiveRefreshTriggers().build()) + .build()); - RedisKeySerializerV2 serializer = new RedisKeyPrefixSerializerV2(config.getKeyPrefix()); - - Builder builder = new Builder(connection, serializer); - - if (config.getEnableFallback()) { - RedisKeySerializerV2 fallbackSerializer = - new RedisKeyPrefixSerializerV2(config.getKeyPrefix()); - builder = builder.withFallbackSerializer(fallbackSerializer); - } + StatefulRedisClusterConnection connection = + client.connect(new ByteArrayCodec()); + connection.setReadFrom(config.getReadFrom()); - return builder.build(); + return new Builder(connection).build(); } } diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterStoreConfig.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterStoreConfig.java new file mode 100644 index 0000000..c179ffe --- /dev/null +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterStoreConfig.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2021 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.storage.connectors.redis.retriever; + +import io.lettuce.core.ReadFrom; +import java.time.Duration; + +public class RedisClusterStoreConfig { + private final String connectionString; + private final ReadFrom readFrom; + private final Duration timeout; + + public RedisClusterStoreConfig(String connectionString, ReadFrom readFrom, Duration timeout) { + this.connectionString = connectionString; + this.readFrom = readFrom; + this.timeout = timeout; + } + + public String getConnectionString() { + return this.connectionString; + } + + public ReadFrom getReadFrom() { + return this.readFrom; + } + + public Duration getTimeout() { + return this.timeout; + } +} diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyProtoSerializerV2.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisStoreConfig.java similarity index 54% rename from storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyProtoSerializerV2.java rename to storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisStoreConfig.java index 252d6d1..5e4560a 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyProtoSerializerV2.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisStoreConfig.java @@ -1,6 +1,6 @@ /* * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors + * Copyright 2018-2021 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,13 +14,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.storage.connectors.redis.serializer; +package feast.storage.connectors.redis.retriever; -import feast.proto.storage.RedisProto.RedisKeyV2; +public class RedisStoreConfig { + private final String host; + private final Integer port; + private final Boolean ssl; -public class RedisKeyProtoSerializerV2 implements RedisKeySerializerV2 { + public RedisStoreConfig(String host, Integer port, Boolean ssl) { + this.host = host; + this.port = port; + this.ssl = ssl; + } + + public String getHost() { + return this.host; + } + + public Integer getPort() { + return this.port; + } - public byte[] serialize(RedisKeyV2 redisKey) { - return redisKey.toByteArray(); + public Boolean getSsl() { + return this.ssl; } } diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerV2.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerV2.java deleted file mode 100644 index 1c869b4..0000000 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerV2.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.storage.connectors.redis.serializer; - -import feast.proto.storage.RedisProto.RedisKeyV2; - -public class RedisKeyPrefixSerializerV2 implements RedisKeySerializerV2 { - - private final byte[] prefixBytes; - - public RedisKeyPrefixSerializerV2(String prefix) { - this.prefixBytes = prefix.getBytes(); - } - - public byte[] serialize(RedisKeyV2 redisKey) { - byte[] key = redisKey.toByteArray(); - - if (prefixBytes.length == 0) { - return key; - } - - byte[] keyWithPrefix = new byte[prefixBytes.length + key.length]; - System.arraycopy(prefixBytes, 0, keyWithPrefix, 0, prefixBytes.length); - System.arraycopy(key, 0, keyWithPrefix, prefixBytes.length, key.length); - return keyWithPrefix; - } -} diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeySerializerV2.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeySerializerV2.java deleted file mode 100644 index b79e158..0000000 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/serializer/RedisKeySerializerV2.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.storage.connectors.redis.serializer; - -import feast.proto.storage.RedisProto.RedisKeyV2; - -public interface RedisKeySerializerV2 { - - byte[] serialize(RedisKeyV2 key); -} diff --git a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerTest.java deleted file mode 100644 index e663cf8..0000000 --- a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/serializer/RedisKeyPrefixSerializerTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.storage.connectors.redis.serializer; - -import static org.junit.Assert.*; - -import feast.proto.storage.RedisProto.RedisKeyV2; -import feast.proto.types.ValueProto; -import org.junit.Test; - -public class RedisKeyPrefixSerializerTest { - - private RedisKeyV2 key = - RedisKeyV2.newBuilder() - .addEntityNames("entity1") - .addEntityValues(ValueProto.Value.newBuilder().setInt64Val(1)) - .build(); - - @Test - public void shouldPrependKey() { - RedisKeyPrefixSerializerV2 serializer = new RedisKeyPrefixSerializerV2("namespace:"); - String keyWithPrefix = new String(serializer.serialize(key)); - assertEquals(String.format("namespace:%s", new String(key.toByteArray())), keyWithPrefix); - } - - @Test - public void shouldNotPrependKeyIfEmptyString() { - RedisKeyPrefixSerializerV2 serializer = new RedisKeyPrefixSerializerV2(""); - assertArrayEquals(key.toByteArray(), serializer.serialize(key)); - } -}