diff --git a/infra/feast-operator/api/v1alpha1/featurestore_types.go b/infra/feast-operator/api/v1alpha1/featurestore_types.go index 508f3d4bf7a..8587fc98240 100644 --- a/infra/feast-operator/api/v1alpha1/featurestore_types.go +++ b/infra/feast-operator/api/v1alpha1/featurestore_types.go @@ -392,8 +392,8 @@ var ValidOnlineStoreDBStorePersistenceTypes = []string{ // LocalRegistryConfig configures the registry service type LocalRegistryConfig struct { // Creates a registry server container - Server *ServerConfigs `json:"server,omitempty"` - Persistence *RegistryPersistence `json:"persistence,omitempty"` + Server *RegistryServerConfigs `json:"server,omitempty"` + Persistence *RegistryPersistence `json:"persistence,omitempty"` } // RegistryPersistence configures the persistence settings for the registry service @@ -502,6 +502,18 @@ type ServerConfigs struct { VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` } +// RegistryServerConfigs creates a registry server for the feast service, with specified container configurations. +// +kubebuilder:validation:XValidation:rule="self.restAPI == true || self.grpc == true || !has(self.grpc)", message="At least one of restAPI or grpc must be true" +type RegistryServerConfigs struct { + ServerConfigs `json:",inline"` + + // Enable REST API registry server. + RestAPI *bool `json:"restAPI,omitempty"` + + // Enable gRPC registry server. Defaults to true if unset. + GRPC *bool `json:"grpc,omitempty"` +} + // CronJobContainerConfigs k8s container settings for the CronJob type CronJobContainerConfigs struct { ContainerConfigs `json:",inline"` @@ -613,6 +625,7 @@ type ServiceHostnames struct { OfflineStore string `json:"offlineStore,omitempty"` OnlineStore string `json:"onlineStore,omitempty"` Registry string `json:"registry,omitempty"` + RegistryRest string `json:"registryRest,omitempty"` UI string `json:"ui,omitempty"` } diff --git a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go index 6c0bc66d9df..1a893c82cf8 100644 --- a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go @@ -525,7 +525,7 @@ func (in *LocalRegistryConfig) DeepCopyInto(out *LocalRegistryConfig) { *out = *in if in.Server != nil { in, out := &in.Server, &out.Server - *out = new(ServerConfigs) + *out = new(RegistryServerConfigs) (*in).DeepCopyInto(*out) } if in.Persistence != nil { @@ -928,6 +928,32 @@ func (in *RegistryPersistence) DeepCopy() *RegistryPersistence { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RegistryServerConfigs) DeepCopyInto(out *RegistryServerConfigs) { + *out = *in + in.ServerConfigs.DeepCopyInto(&out.ServerConfigs) + if in.RestAPI != nil { + in, out := &in.RestAPI, &out.RestAPI + *out = new(bool) + **out = **in + } + if in.GRPC != nil { + in, out := &in.GRPC, &out.GRPC + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RegistryServerConfigs. +func (in *RegistryServerConfigs) DeepCopy() *RegistryServerConfigs { + if in == nil { + return nil + } + out := new(RegistryServerConfigs) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RemoteRegistryConfig) DeepCopyInto(out *RemoteRegistryConfig) { *out = *in diff --git a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml index f1928bfdeae..b2fed6992d5 100644 --- a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml +++ b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml @@ -1924,6 +1924,10 @@ spec: x-kubernetes-map-type: atomic type: object type: array + grpc: + description: Enable gRPC registry server. Defaults + to true if unset. + type: boolean image: type: string imagePullPolicy: @@ -1987,6 +1991,9 @@ spec: of compute resources required. type: object type: object + restAPI: + description: Enable REST API registry server. + type: boolean tls: description: TlsConfigs configures server TLS for a feast service. @@ -2071,6 +2078,9 @@ spec: type: object type: array type: object + x-kubernetes-validations: + - message: At least one of restAPI or grpc must be true + rule: self.restAPI == true || self.grpc == true || !has(self.grpc) type: object remote: description: RemoteRegistryConfig points to a remote feast @@ -5902,6 +5912,10 @@ spec: x-kubernetes-map-type: atomic type: object type: array + grpc: + description: Enable gRPC registry server. Defaults + to true if unset. + type: boolean image: type: string imagePullPolicy: @@ -5965,6 +5979,9 @@ spec: amount of compute resources required. type: object type: object + restAPI: + description: Enable REST API registry server. + type: boolean tls: description: TlsConfigs configures server TLS for a feast service. @@ -6053,6 +6070,11 @@ spec: type: object type: array type: object + x-kubernetes-validations: + - message: At least one of restAPI or grpc must be + true + rule: self.restAPI == true || self.grpc == true + || !has(self.grpc) type: object remote: description: RemoteRegistryConfig points to a remote feast @@ -8051,6 +8073,8 @@ spec: type: string registry: type: string + registryRest: + type: string ui: type: string type: object diff --git a/infra/feast-operator/dist/install.yaml b/infra/feast-operator/dist/install.yaml index c54ea39e5ab..8cfbf968b34 100644 --- a/infra/feast-operator/dist/install.yaml +++ b/infra/feast-operator/dist/install.yaml @@ -1932,6 +1932,10 @@ spec: x-kubernetes-map-type: atomic type: object type: array + grpc: + description: Enable gRPC registry server. Defaults + to true if unset. + type: boolean image: type: string imagePullPolicy: @@ -1995,6 +1999,9 @@ spec: of compute resources required. type: object type: object + restAPI: + description: Enable REST API registry server. + type: boolean tls: description: TlsConfigs configures server TLS for a feast service. @@ -2079,6 +2086,9 @@ spec: type: object type: array type: object + x-kubernetes-validations: + - message: At least one of restAPI or grpc must be true + rule: self.restAPI == true || self.grpc == true || !has(self.grpc) type: object remote: description: RemoteRegistryConfig points to a remote feast @@ -5910,6 +5920,10 @@ spec: x-kubernetes-map-type: atomic type: object type: array + grpc: + description: Enable gRPC registry server. Defaults + to true if unset. + type: boolean image: type: string imagePullPolicy: @@ -5973,6 +5987,9 @@ spec: amount of compute resources required. type: object type: object + restAPI: + description: Enable REST API registry server. + type: boolean tls: description: TlsConfigs configures server TLS for a feast service. @@ -6061,6 +6078,11 @@ spec: type: object type: array type: object + x-kubernetes-validations: + - message: At least one of restAPI or grpc must be + true + rule: self.restAPI == true || self.grpc == true + || !has(self.grpc) type: object remote: description: RemoteRegistryConfig points to a remote feast @@ -8059,6 +8081,8 @@ spec: type: string registry: type: string + registryRest: + type: string ui: type: string type: object diff --git a/infra/feast-operator/docs/api/markdown/ref.md b/infra/feast-operator/docs/api/markdown/ref.md index 2e90f0c7b91..9452d9c838b 100644 --- a/infra/feast-operator/docs/api/markdown/ref.md +++ b/infra/feast-operator/docs/api/markdown/ref.md @@ -36,6 +36,7 @@ ContainerConfigs k8s container settings for the server _Appears in:_ - [CronJobContainerConfigs](#cronjobcontainerconfigs) +- [RegistryServerConfigs](#registryserverconfigs) - [ServerConfigs](#serverconfigs) | Field | Description | @@ -76,6 +77,7 @@ DefaultCtrConfigs k8s container settings that are applied by default _Appears in:_ - [ContainerConfigs](#containerconfigs) - [CronJobContainerConfigs](#cronjobcontainerconfigs) +- [RegistryServerConfigs](#registryserverconfigs) - [ServerConfigs](#serverconfigs) | Field | Description | @@ -405,7 +407,7 @@ _Appears in:_ | Field | Description | | --- | --- | -| `server` _[ServerConfigs](#serverconfigs)_ | Creates a registry server container | +| `server` _[RegistryServerConfigs](#registryserverconfigs)_ | Creates a registry server container | | `persistence` _[RegistryPersistence](#registrypersistence)_ | | @@ -555,6 +557,7 @@ OptionalCtrConfigs k8s container settings that are optional _Appears in:_ - [ContainerConfigs](#containerconfigs) - [CronJobContainerConfigs](#cronjobcontainerconfigs) +- [RegistryServerConfigs](#registryserverconfigs) - [ServerConfigs](#serverconfigs) | Field | Description | @@ -669,6 +672,33 @@ _Appears in:_ | `store` _[RegistryDBStorePersistence](#registrydbstorepersistence)_ | | +#### RegistryServerConfigs + + + +RegistryServerConfigs creates a registry server for the feast service, with specified container configurations. + +_Appears in:_ +- [LocalRegistryConfig](#localregistryconfig) + +| Field | Description | +| --- | --- | +| `image` _string_ | | +| `env` _[EnvVar](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#envvar-v1-core)_ | | +| `envFrom` _[EnvFromSource](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#envfromsource-v1-core)_ | | +| `imagePullPolicy` _[PullPolicy](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#pullpolicy-v1-core)_ | | +| `resources` _[ResourceRequirements](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#resourcerequirements-v1-core)_ | | +| `tls` _[TlsConfigs](#tlsconfigs)_ | | +| `logLevel` _string_ | LogLevel sets the logging level for the server +Allowed values: "debug", "info", "warning", "error", "critical". | +| `volumeMounts` _[VolumeMount](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#volumemount-v1-core) array_ | VolumeMounts defines the list of volumes that should be mounted into the feast container. +This allows attaching persistent storage, config files, secrets, or other resources +required by the Feast components. Ensure that each volume mount has a corresponding +volume definition in the Volumes field. | +| `restAPI` _boolean_ | Enable REST API registry server. | +| `grpc` _boolean_ | Enable gRPC registry server. Defaults to true if unset. | + + #### RemoteRegistryConfig @@ -709,9 +739,9 @@ ServerConfigs creates a server for the feast service, with specified container c _Appears in:_ - [FeatureStoreServices](#featurestoreservices) -- [LocalRegistryConfig](#localregistryconfig) - [OfflineStore](#offlinestore) - [OnlineStore](#onlinestore) +- [RegistryServerConfigs](#registryserverconfigs) | Field | Description | | --- | --- | @@ -743,6 +773,7 @@ _Appears in:_ | `offlineStore` _string_ | | | `onlineStore` _string_ | | | `registry` _string_ | | +| `registryRest` _string_ | | | `ui` _string_ | | @@ -753,6 +784,7 @@ _Appears in:_ TlsConfigs configures server TLS for a feast service. in an openshift cluster, this is configured by default using service serving certificates. _Appears in:_ +- [RegistryServerConfigs](#registryserverconfigs) - [ServerConfigs](#serverconfigs) | Field | Description | diff --git a/infra/feast-operator/internal/controller/featurestore_controller_loglevel_test.go b/infra/feast-operator/internal/controller/featurestore_controller_loglevel_test.go index 8b3832d46c2..a02d0894c8c 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_loglevel_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_loglevel_test.go @@ -61,8 +61,10 @@ var _ = Describe("FeatureStore Controller - Feast service LogLevel", func() { Services: &feastdevv1alpha1.FeatureStoreServices{ Registry: &feastdevv1alpha1.Registry{ Local: &feastdevv1alpha1.LocalRegistryConfig{ - Server: &feastdevv1alpha1.ServerConfigs{ - LogLevel: strPtr("error"), + Server: &feastdevv1alpha1.RegistryServerConfigs{ + ServerConfigs: feastdevv1alpha1.ServerConfigs{ + LogLevel: strPtr("error"), + }, }, }, }, @@ -196,7 +198,9 @@ var _ = Describe("FeatureStore Controller - Feast service LogLevel", func() { resource.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ Registry: &feastdevv1alpha1.Registry{ Local: &feastdevv1alpha1.LocalRegistryConfig{ - Server: &feastdevv1alpha1.ServerConfigs{}, + Server: &feastdevv1alpha1.RegistryServerConfigs{ + ServerConfigs: feastdevv1alpha1.ServerConfigs{}, + }, }, }, OfflineStore: &feastdevv1alpha1.OfflineStore{}, diff --git a/infra/feast-operator/internal/controller/featurestore_controller_test.go b/infra/feast-operator/internal/controller/featurestore_controller_test.go index efbd69c78e9..cb45d85e766 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_test.go @@ -19,7 +19,9 @@ package controller import ( "context" "encoding/base64" + "fmt" "reflect" + "strings" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -49,6 +51,10 @@ const domainTls = ".svc.cluster.local:443" var image = "test:latest" +func ptr[T any](v T) *T { + return &v +} + var _ = Describe("FeatureStore Controller", func() { Context("When reconciling a resource", func() { const resourceName = "test-resource" @@ -1213,6 +1219,142 @@ var _ = Describe("FeatureStore Controller", func() { Expect(cond.Message).To(Equal("Error: Remote feast registry of referenced FeatureStore '" + referencedRegistry.Name + "' is not ready")) }) + It("should correctly set container command args for grpc/rest modes", func() { + controllerReconciler := &FeatureStoreReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + cases := []struct { + name string + grpc *bool + restAPI *bool + expectedArgs []string + }{ + { + name: "default grpc only", + grpc: nil, + restAPI: nil, + expectedArgs: []string{"--grpc"}, + }, + { + name: "explicit grpc true only", + grpc: ptr(true), + restAPI: ptr(false), + expectedArgs: []string{"--grpc"}, + }, + { + name: "rest only", + grpc: ptr(false), + restAPI: ptr(true), + expectedArgs: []string{"--no-grpc", "--rest-api"}, + }, + { + name: "both grpc and rest", + grpc: ptr(true), + restAPI: ptr(true), + expectedArgs: []string{"--grpc", "--rest-api"}, + }, + } + + for _, tc := range cases { + By(fmt.Sprintf("Testing: %s", tc.name)) + + name := strings.ReplaceAll(tc.name, " ", "-") + nsName := types.NamespacedName{ + Name: name, + Namespace: "default", + } + resource := &feastdevv1alpha1.FeatureStore{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + Spec: feastdevv1alpha1.FeatureStoreSpec{ + FeastProject: feastProject, + Services: &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Server: &feastdevv1alpha1.RegistryServerConfigs{ + GRPC: tc.grpc, + RestAPI: tc.restAPI, + }, + }, + }, + }, + }, + } + resource.SetGroupVersionKind(feastdevv1alpha1.GroupVersion.WithKind("FeatureStore")) + err := k8sClient.Create(ctx, resource) + Expect(err).NotTo(HaveOccurred()) + + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{NamespacedName: nsName}) + Expect(err).NotTo(HaveOccurred()) + + err = k8sClient.Get(ctx, nsName, resource) + Expect(err).NotTo(HaveOccurred()) + + feast := services.FeastServices{ + Handler: handler.FeastHandler{ + Client: controllerReconciler.Client, + Context: ctx, + Scheme: controllerReconciler.Scheme, + FeatureStore: resource, + }, + } + + deploy := &appsv1.Deployment{} + objMeta := feast.GetObjectMeta() + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: objMeta.Name, + Namespace: objMeta.Namespace, + }, deploy) + Expect(err).NotTo(HaveOccurred()) + + registryContainer := services.GetRegistryContainer(*deploy) + Expect(registryContainer).NotTo(BeNil()) + + for _, expectedArg := range tc.expectedArgs { + Expect(registryContainer.Command). + To(ContainElement(expectedArg), + "expected %s to be present in container command: %v", expectedArg, registryContainer.Command) + } + Expect(resource.Status.Conditions).NotTo(BeEmpty()) + cond := apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.RegistryReadyType) + Expect(cond).ToNot(BeNil()) + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + Expect(cond.Reason).To(Equal(feastdevv1alpha1.ReadyReason)) + Expect(cond.Type).To(Equal(feastdevv1alpha1.RegistryReadyType)) + Expect(cond.Message).To(Equal(feastdevv1alpha1.RegistryReadyMessage)) + } + + By("Verifying that creation fails when both REST API and gRPC are disabled") + disabledResource := &feastdevv1alpha1.FeatureStore{ + ObjectMeta: metav1.ObjectMeta{ + Name: "disabled-both", + Namespace: "default", + }, + Spec: feastdevv1alpha1.FeatureStoreSpec{ + FeastProject: feastProject, + Services: &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Server: &feastdevv1alpha1.RegistryServerConfigs{ + RestAPI: ptr(false), + GRPC: ptr(false), + }, + }, + }, + }, + }, + } + disabledResource.SetGroupVersionKind(feastdevv1alpha1.GroupVersion.WithKind("FeatureStore")) + + err := k8sClient.Create(ctx, disabledResource) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("At least one of restAPI or grpc must be true")) + }) + It("should error on reconcile", func() { By("Trying to set the controller OwnerRef of a Deployment that already has a controller") controllerReconciler := &FeatureStoreReconciler{ @@ -1360,6 +1502,59 @@ var _ = Describe("FeatureStore Controller", func() { err = k8sClient.Update(ctx, resource) Expect(err).NotTo(HaveOccurred()) }) + + It("should error if referencing a remote registry without gRPC server enabled", func() { + const remoteStoreName = "remote-featurestore" + remoteNamespacedName := types.NamespacedName{ + Name: remoteStoreName, + Namespace: "default", + } + + // Create remote FeatureStore with gRPC disabled + remote := &feastdevv1alpha1.FeatureStore{ + ObjectMeta: metav1.ObjectMeta{ + Name: remoteStoreName, + Namespace: "default", + }, + Spec: feastdevv1alpha1.FeatureStoreSpec{ + FeastProject: feastProject, + Services: &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Server: &feastdevv1alpha1.RegistryServerConfigs{ + GRPC: ptr(false), + RestAPI: ptr(true), + }, + }, + }, + }, + }, + } + Expect(k8sClient.Create(ctx, remote)).To(Succeed()) + reconciler := &FeatureStoreReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + _, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: remoteNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + // Update main FeatureStore to reference the remote registry + Expect(k8sClient.Get(ctx, typeNamespacedName, featurestore)).To(Succeed()) + featurestore.Spec.FeastProject = feastProject + featurestore.Spec.Services.Registry = &feastdevv1alpha1.Registry{ + Remote: &feastdevv1alpha1.RemoteRegistryConfig{ + FeastRef: &feastdevv1alpha1.FeatureStoreRef{Name: remoteStoreName}, + }, + } + Expect(k8sClient.Update(ctx, featurestore)).To(Succeed()) + _, err = reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("must have gRPC server enabled")) + }) }) }) diff --git a/infra/feast-operator/internal/controller/featurestore_controller_test_utils_test.go b/infra/feast-operator/internal/controller/featurestore_controller_test_utils_test.go index dcf684c7733..63df6d46f46 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_test_utils_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_test_utils_test.go @@ -130,7 +130,9 @@ func createFeatureStoreResource(resourceName string, image string, pullPolicy co }, Registry: &feastdevv1alpha1.Registry{ Local: &feastdevv1alpha1.LocalRegistryConfig{ - Server: &feastdevv1alpha1.ServerConfigs{}, + Server: &feastdevv1alpha1.RegistryServerConfigs{ + ServerConfigs: feastdevv1alpha1.ServerConfigs{}, + }, }, }, UI: &feastdevv1alpha1.ServerConfigs{ diff --git a/infra/feast-operator/internal/controller/featurestore_controller_tls_test.go b/infra/feast-operator/internal/controller/featurestore_controller_tls_test.go index e5b27f8ec56..b7aca319f05 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_tls_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_tls_test.go @@ -82,8 +82,10 @@ var _ = Describe("FeatureStore Controller - Feast service TLS", func() { }, Registry: &feastdevv1alpha1.Registry{ Local: &feastdevv1alpha1.LocalRegistryConfig{ - Server: &feastdevv1alpha1.ServerConfigs{ - TLS: tlsConfigs, + Server: &feastdevv1alpha1.RegistryServerConfigs{ + ServerConfigs: feastdevv1alpha1.ServerConfigs{ + TLS: tlsConfigs, + }, }, }, }, @@ -463,7 +465,7 @@ var _ = Describe("Test mountCustomCABundle functionality", func() { Spec: feastdevv1alpha1.FeatureStoreSpec{ FeastProject: feastProject, Services: &feastdevv1alpha1.FeatureStoreServices{ - Registry: &feastdevv1alpha1.Registry{Local: &feastdevv1alpha1.LocalRegistryConfig{Server: &feastdevv1alpha1.ServerConfigs{}}}, + Registry: &feastdevv1alpha1.Registry{Local: &feastdevv1alpha1.LocalRegistryConfig{Server: &feastdevv1alpha1.RegistryServerConfigs{ServerConfigs: feastdevv1alpha1.ServerConfigs{}}}}, OnlineStore: &feastdevv1alpha1.OnlineStore{Server: &feastdevv1alpha1.ServerConfigs{}}, OfflineStore: &feastdevv1alpha1.OfflineStore{Server: &feastdevv1alpha1.ServerConfigs{}}, UI: &feastdevv1alpha1.ServerConfigs{}, diff --git a/infra/feast-operator/internal/controller/services/repo_config_test.go b/infra/feast-operator/internal/controller/services/repo_config_test.go index a346ac72e80..bfe09c33e93 100644 --- a/infra/feast-operator/internal/controller/services/repo_config_test.go +++ b/infra/feast-operator/internal/controller/services/repo_config_test.go @@ -374,7 +374,7 @@ func minimalFeatureStoreWithAllServers() *feastdevv1alpha1.FeatureStore { }, Registry: &feastdevv1alpha1.Registry{ Local: &feastdevv1alpha1.LocalRegistryConfig{ - Server: &feastdevv1alpha1.ServerConfigs{}, + Server: &feastdevv1alpha1.RegistryServerConfigs{}, }, }, UI: &feastdevv1alpha1.ServerConfigs{}, diff --git a/infra/feast-operator/internal/controller/services/services.go b/infra/feast-operator/internal/controller/services/services.go index 768b4df74b1..959814b5a64 100644 --- a/infra/feast-operator/internal/controller/services/services.go +++ b/infra/feast-operator/internal/controller/services/services.go @@ -54,6 +54,11 @@ func (feast *FeastServices) Deploy() error { if feast.noLocalCoreServerConfigured() { return errors.New("at least one local server must be configured. e.g. registry / online / offline") } + if feast.isRegistryServer() { + if !feast.isRegistryGrpcEnabled() && !feast.isRegistryRestEnabled() { + return errors.New("at least one of gRPC or REST API must be enabled for registry service") + } + } openshiftTls, err := feast.checkOpenshiftTls() if err != nil { return err @@ -221,11 +226,52 @@ func (feast *FeastServices) deployFeastServiceByType(feastType FeastServiceType) _ = feast.Handler.DeleteOwnedFeastObj(feast.initPVC(feastType)) } if serviceConfig := feast.getServerConfigs(feastType); serviceConfig != nil { - if err := feast.createService(feastType); err != nil { - return feast.setFeastServiceCondition(err, feastType) + // For registry service, handle both gRPC and REST services + if feastType == RegistryFeastType && feast.isRegistryServer() { + // Create gRPC service if enabled + if feast.isRegistryGrpcEnabled() { + if err := feast.createService(feastType); err != nil { + return feast.setFeastServiceCondition(err, feastType) + } + } else { + // Delete gRPC service if disabled + _ = feast.Handler.DeleteOwnedFeastObj(&corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: feast.GetFeastServiceName(feastType), + Namespace: feast.Handler.FeatureStore.Namespace, + }, + }) + } + + // Create REST service if enabled + if feast.isRegistryRestEnabled() { + if err := feast.createRestService(feastType); err != nil { + return feast.setFeastServiceCondition(err, feastType) + } + } else { + // Delete REST service if disabled + _ = feast.Handler.DeleteOwnedFeastObj(&corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: feast.GetFeastRestServiceName(feastType), + Namespace: feast.Handler.FeatureStore.Namespace, + }, + }) + } + } else { + // For non-registry services, always create service + if err := feast.createService(feastType); err != nil { + return feast.setFeastServiceCondition(err, feastType) + } } } else { _ = feast.Handler.DeleteOwnedFeastObj(feast.initFeastSvc(feastType)) + // Delete REST API service if it exists + _ = feast.Handler.DeleteOwnedFeastObj(&corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: feast.GetFeastRestServiceName(feastType), + Namespace: feast.Handler.FeatureStore.Namespace, + }, + }) } return feast.setFeastServiceCondition(nil, feastType) } @@ -256,7 +302,7 @@ func (feast *FeastServices) createService(feastType FeastServiceType) error { logger := log.FromContext(feast.Handler.Context) svc := feast.initFeastSvc(feastType) if op, err := controllerutil.CreateOrUpdate(feast.Handler.Context, feast.Handler.Client, svc, controllerutil.MutateFn(func() error { - return feast.setService(svc, feastType) + return feast.setService(svc, feastType, false) })); err != nil { return err } else if op == controllerutil.OperationResultCreated || op == controllerutil.OperationResultUpdated { @@ -396,14 +442,32 @@ func (feast *FeastServices) setContainer(containers *[]corev1.Container, feastTy cmd := feast.getContainerCommand(feastType) container := getContainer(name, workingDir, cmd, serverConfigs.ContainerConfigs, fsYamlB64) tls := feast.getTlsConfigs(feastType) - probeHandler := getProbeHandler(feastType, tls) - container.Ports = []corev1.ContainerPort{ - { + probeHandler := feast.getProbeHandler(feastType, tls) + container.Ports = []corev1.ContainerPort{} + + if feastType == RegistryFeastType { + if feast.isRegistryGrpcEnabled() { + container.Ports = append(container.Ports, corev1.ContainerPort{ + Name: name, + ContainerPort: getTargetPort(feastType, tls), + Protocol: corev1.ProtocolTCP, + }) + } + if feast.isRegistryRestEnabled() { + container.Ports = append(container.Ports, corev1.ContainerPort{ + Name: name + "-rest", + ContainerPort: getTargetRestPort(feastType, tls), + Protocol: corev1.ProtocolTCP, + }) + } + } else { + container.Ports = append(container.Ports, corev1.ContainerPort{ Name: name, ContainerPort: getTargetPort(feastType, tls), Protocol: corev1.ProtocolTCP, - }, + }) } + container.StartupProbe = &corev1.Probe{ ProbeHandler: probeHandler, PeriodSeconds: 3, @@ -499,6 +563,18 @@ func (feast *FeastServices) getContainerCommand(feastType FeastServiceType) []st deploySettings := FeastServiceConstants[feastType] targetPort := deploySettings.TargetHttpPort tls := feast.getTlsConfigs(feastType) + + if feastType == RegistryFeastType && feast.isRegistryServer() { + if feast.isRegistryGrpcEnabled() { + deploySettings.Args = append(deploySettings.Args, "--grpc") + } else { + deploySettings.Args = append(deploySettings.Args, "--no-grpc") + } + if feast.isRegistryRestEnabled() { + deploySettings.Args = append(deploySettings.Args, "--rest-api") + deploySettings.Args = append(deploySettings.Args, "--rest-port", strconv.Itoa(int(getTargetRestPort(feastType, tls)))) + } + } if tls.IsTLS() { targetPort = deploySettings.TargetHttpsPort feastTlsPath := GetTlsPath(feastType) @@ -583,7 +659,7 @@ func (feast *FeastServices) setInitContainer(podSpec *corev1.PodSpec, fsYamlB64 } } -func (feast *FeastServices) setService(svc *corev1.Service, feastType FeastServiceType) error { +func (feast *FeastServices) setService(svc *corev1.Service, feastType FeastServiceType, isRestService bool) error { svc.Labels = feast.getFeastTypeLabels(feastType) if feast.isOpenShiftTls(feastType) { if len(svc.Annotations) == 0 { @@ -599,6 +675,14 @@ func (feast *FeastServices) setService(svc *corev1.Service, feastType FeastServi port = HttpsPort scheme = HttpsScheme } + + var targetPort int32 + if isRestService { + targetPort = getTargetRestPort(feastType, tls) + } else { + targetPort = getTargetPort(feastType, tls) + } + svc.Spec = corev1.ServiceSpec{ Selector: feast.getLabels(), Type: corev1.ServiceTypeClusterIP, @@ -607,7 +691,7 @@ func (feast *FeastServices) setService(svc *corev1.Service, feastType FeastServi Name: scheme, Port: port, Protocol: corev1.ProtocolTCP, - TargetPort: intstr.FromInt(int(getTargetPort(feastType, tls))), + TargetPort: intstr.FromInt(int(targetPort)), }, }, } @@ -615,6 +699,25 @@ func (feast *FeastServices) setService(svc *corev1.Service, feastType FeastServi return controllerutil.SetControllerReference(feast.Handler.FeatureStore, svc, feast.Handler.Scheme) } +// createRestService creates a separate service for the Registry REST API +func (feast *FeastServices) createRestService(feastType FeastServiceType) error { + if feast.isRegistryServer() { + if !feast.isRegistryRestEnabled() { + return nil + } + logger := log.FromContext(feast.Handler.Context) + svc := feast.initFeastRestSvc(feastType) + if op, err := controllerutil.CreateOrUpdate(feast.Handler.Context, feast.Handler.Client, svc, controllerutil.MutateFn(func() error { + return feast.setService(svc, feastType, true) + })); err != nil { + return err + } else if op == controllerutil.OperationResultCreated || op == controllerutil.OperationResultUpdated { + logger.Info("Successfully reconciled", "Service", svc.Name, "operation", op) + } + } + return nil +} + func (feast *FeastServices) setServiceAccount(sa *corev1.ServiceAccount) error { sa.Labels = feast.getLabels() return controllerutil.SetControllerReference(feast.Handler.FeatureStore, sa, feast.Handler.Scheme) @@ -645,8 +748,8 @@ func (feast *FeastServices) getServerConfigs(feastType FeastServiceType) *feastd return appliedServices.OnlineStore.Server } case RegistryFeastType: - if feast.isLocalRegistry() { - return appliedServices.Registry.Local.Server + if feast.isRegistryServer() { + return &appliedServices.Registry.Local.Server.ServerConfigs } case UIFeastType: return appliedServices.UI @@ -720,6 +823,12 @@ func (feast *FeastServices) setServiceHostnames() error { objMeta := feast.initFeastSvc(RegistryFeastType) feast.Handler.FeatureStore.Status.ServiceHostnames.Registry = objMeta.Name + "." + objMeta.Namespace + domain + getPortStr(feast.Handler.FeatureStore.Status.Applied.Services.Registry.Local.Server.TLS) + if feast.isRegistryRestEnabled() { + // Use the REST API service name + restSvcName := feast.GetFeastRestServiceName(RegistryFeastType) + feast.Handler.FeatureStore.Status.ServiceHostnames.RegistryRest = restSvcName + "." + objMeta.Namespace + domain + + getPortStr(feast.Handler.FeatureStore.Status.Applied.Services.Registry.Local.Server.TLS) + } } else if feast.isRemoteRegistry() { return feast.setRemoteRegistryURL() } @@ -759,6 +868,10 @@ func (feast *FeastServices) setRemoteRegistryURL() error { remoteFeast.isRegistryServer() && apimeta.IsStatusConditionTrue(remoteFeast.Handler.FeatureStore.Status.Conditions, feastdevv1alpha1.RegistryReadyType) && len(remoteFeast.Handler.FeatureStore.Status.ServiceHostnames.Registry) > 0 { + // Check if gRPC server is enabled + if !remoteFeast.isRegistryGrpcEnabled() { + return errors.New("Remote feast registry of referenced FeatureStore '" + remoteFeast.Handler.FeatureStore.Name + "' must have gRPC server enabled") + } feast.Handler.FeatureStore.Status.ServiceHostnames.Registry = remoteFeast.Handler.FeatureStore.Status.ServiceHostnames.Registry } else { return errors.New("Remote feast registry of referenced FeatureStore '" + remoteFeast.Handler.FeatureStore.Name + "' is not ready") @@ -864,6 +977,18 @@ func (feast *FeastServices) initFeastSvc(feastType FeastServiceType) *corev1.Ser return svc } +func (feast *FeastServices) initFeastRestSvc(feastType FeastServiceType) *corev1.Service { + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: feast.GetFeastRestServiceName(feastType), + Namespace: feast.Handler.FeatureStore.Namespace, + Labels: feast.getFeastTypeLabels(feastType), + }, + } + svc.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Service")) + return svc +} + func (feast *FeastServices) initFeastSA() *corev1.ServiceAccount { sa := &corev1.ServiceAccount{ ObjectMeta: feast.GetObjectMeta(), @@ -991,8 +1116,37 @@ func getTargetPort(feastType FeastServiceType, tls *feastdevv1alpha1.TlsConfigs) return FeastServiceConstants[feastType].TargetHttpPort } -func getProbeHandler(feastType FeastServiceType, tls *feastdevv1alpha1.TlsConfigs) corev1.ProbeHandler { +func getTargetRestPort(feastType FeastServiceType, tls *feastdevv1alpha1.TlsConfigs) int32 { + if tls.IsTLS() { + return FeastServiceConstants[feastType].TargetRestHttpsPort + } + return FeastServiceConstants[feastType].TargetRestHttpPort +} + +func (feast *FeastServices) getProbeHandler(feastType FeastServiceType, tls *feastdevv1alpha1.TlsConfigs) corev1.ProbeHandler { targetPort := getTargetPort(feastType, tls) + + if feastType == RegistryFeastType { + if feast.isRegistryGrpcEnabled() { + return corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.FromInt(int(targetPort)), + }, + } + } + if feast.isRegistryRestEnabled() { + targetPort = getTargetRestPort(feastType, tls) + probeHandler := corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.FromInt(int(targetPort)), + }, + } + if tls.IsTLS() { + probeHandler.HTTPGet.Scheme = corev1.URISchemeHTTPS + } + return probeHandler + } + } if feastType == OnlineFeastType { probeHandler := corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ @@ -1021,3 +1175,26 @@ func IsDeploymentAvailable(conditions []appsv1.DeploymentCondition) bool { return false } + +// GetFeastRestServiceName returns the feast REST service object name based on service type +func (feast *FeastServices) GetFeastRestServiceName(feastType FeastServiceType) string { + return feast.GetFeastServiceName(feastType) + "-rest" +} + +// isRegistryGrpcEnabled checks if gRPC is enabled for registry service +func (feast *FeastServices) isRegistryGrpcEnabled() bool { + if feast.isRegistryServer() { + registry := feast.Handler.FeatureStore.Status.Applied.Services.Registry + return registry.Local.Server.GRPC != nil && *registry.Local.Server.GRPC + } + return false +} + +// isRegistryRestEnabled checks if REST API is enabled for registry service +func (feast *FeastServices) isRegistryRestEnabled() bool { + if feast.isRegistryServer() { + registry := feast.Handler.FeatureStore.Status.Applied.Services.Registry + return registry.Local.Server.RestAPI != nil && *registry.Local.Server.RestAPI + } + return false +} diff --git a/infra/feast-operator/internal/controller/services/services_test.go b/infra/feast-operator/internal/controller/services/services_test.go new file mode 100644 index 00000000000..0c43aff5954 --- /dev/null +++ b/infra/feast-operator/internal/controller/services/services_test.go @@ -0,0 +1,206 @@ +/* +Copyright 2024 Feast Community. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package services + +import ( + "context" + + feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" + "github.com/feast-dev/feast/infra/feast-operator/internal/controller/handler" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" +) + +func ptr[T any](v T) *T { + return &v +} + +func (feast *FeastServices) refreshFeatureStore(ctx context.Context, key types.NamespacedName) { + fs := &feastdevv1alpha1.FeatureStore{} + Expect(k8sClient.Get(ctx, key, fs)).To(Succeed()) + feast.Handler.FeatureStore = fs +} + +func applySpecToStatus(fs *feastdevv1alpha1.FeatureStore) { + fs.Status.Applied.Services = fs.Spec.Services.DeepCopy() + fs.Status.Applied.FeastProject = fs.Spec.FeastProject + Expect(k8sClient.Status().Update(context.Background(), fs)).To(Succeed()) +} + +var _ = Describe("Registry Service", func() { + var ( + featureStore *feastdevv1alpha1.FeatureStore + feast *FeastServices + typeNamespacedName types.NamespacedName + ctx context.Context + ) + + var setFeatureStoreServerConfig = func(grpcEnabled, restEnabled bool) { + featureStore.Spec.Services.Registry.Local.Server.GRPC = ptr(grpcEnabled) + featureStore.Spec.Services.Registry.Local.Server.RestAPI = ptr(restEnabled) + Expect(k8sClient.Update(ctx, featureStore)).To(Succeed()) + Expect(feast.ApplyDefaults()).To(Succeed()) + applySpecToStatus(featureStore) + feast.refreshFeatureStore(ctx, typeNamespacedName) + } + + BeforeEach(func() { + ctx = context.Background() + typeNamespacedName = types.NamespacedName{ + Name: "testfeaturestore", + Namespace: "default", + } + + featureStore = &feastdevv1alpha1.FeatureStore{ + ObjectMeta: metav1.ObjectMeta{ + Name: typeNamespacedName.Name, + Namespace: typeNamespacedName.Namespace, + }, + Spec: feastdevv1alpha1.FeatureStoreSpec{ + FeastProject: "testproject", + Services: &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Server: &feastdevv1alpha1.RegistryServerConfigs{ + ServerConfigs: feastdevv1alpha1.ServerConfigs{ + ContainerConfigs: feastdevv1alpha1.ContainerConfigs{ + DefaultCtrConfigs: feastdevv1alpha1.DefaultCtrConfigs{ + Image: ptr("test-image"), + }, + }, + }, + GRPC: ptr(true), + RestAPI: ptr(false), + }, + }, + }, + }, + }, + } + + Expect(k8sClient.Create(ctx, featureStore)).To(Succeed()) + applySpecToStatus(featureStore) + + feast = &FeastServices{ + Handler: handler.FeastHandler{ + Client: k8sClient, + Context: ctx, + Scheme: k8sClient.Scheme(), + FeatureStore: featureStore, + }, + } + }) + + AfterEach(func() { + Expect(k8sClient.Delete(ctx, featureStore)).To(Succeed()) + }) + + Describe("Probe Handler Configuration", func() { + It("should configure TCP socket probe when gRPC is enabled", func() { + setFeatureStoreServerConfig(true, false) + probeHandler := feast.getProbeHandler(RegistryFeastType, featureStore.Spec.Services.Registry.Local.Server.TLS) + Expect(probeHandler.TCPSocket).NotTo(BeNil()) + Expect(probeHandler.TCPSocket.Port).To(Equal(intstr.FromInt(int(FeastServiceConstants[RegistryFeastType].TargetHttpPort)))) + }) + + It("should configure HTTP GET probe when REST is enabled", func() { + setFeatureStoreServerConfig(false, true) + probeHandler := feast.getProbeHandler(RegistryFeastType, featureStore.Spec.Services.Registry.Local.Server.TLS) + Expect(probeHandler.HTTPGet).NotTo(BeNil()) + Expect(probeHandler.HTTPGet.Port).To(Equal(intstr.FromInt(int(FeastServiceConstants[RegistryFeastType].TargetRestHttpPort)))) + }) + }) + + Describe("Registry Server Configuration", func() { + It("should enable both gRPC and REST", func() { + setFeatureStoreServerConfig(true, true) + Expect(feast.isRegistryGrpcEnabled()).To(BeTrue()) + Expect(feast.isRegistryRestEnabled()).To(BeTrue()) + }) + + It("should create both gRPC and REST services", func() { + setFeatureStoreServerConfig(true, true) + Expect(feast.deployFeastServiceByType(RegistryFeastType)).To(Succeed()) + Expect(feast.initFeastSvc(RegistryFeastType)).NotTo(BeNil()) + Expect(feast.initFeastRestSvc(RegistryFeastType)).NotTo(BeNil()) + }) + + It("should enable only gRPC", func() { + setFeatureStoreServerConfig(true, false) + Expect(feast.isRegistryGrpcEnabled()).To(BeTrue()) + Expect(feast.isRegistryRestEnabled()).To(BeFalse()) + }) + + It("should create only gRPC service and not REST service", func() { + setFeatureStoreServerConfig(true, false) + Expect(feast.deployFeastServiceByType(RegistryFeastType)).To(Succeed()) + Expect(feast.initFeastSvc(RegistryFeastType)).NotTo(BeNil()) + }) + }) + + Describe("Container Ports Configuration", func() { + It("should configure correct gRPC container ports", func() { + setFeatureStoreServerConfig(true, false) + Expect(feast.deployFeastServiceByType(RegistryFeastType)).To(Succeed()) + deployment := feast.initFeastDeploy() + Expect(deployment).NotTo(BeNil()) + Expect(feast.setDeployment(deployment)).To(Succeed()) + + ports := deployment.Spec.Template.Spec.Containers[0].Ports + Expect(ports).To(HaveLen(1)) + Expect(ports[0].ContainerPort).To(Equal(FeastServiceConstants[RegistryFeastType].TargetHttpPort)) + Expect(ports[0].Name).To(Equal(string(RegistryFeastType))) + }) + + It("should configure correct REST container ports", func() { + setFeatureStoreServerConfig(false, true) + Expect(feast.deployFeastServiceByType(RegistryFeastType)).To(Succeed()) + deployment := feast.initFeastDeploy() + Expect(deployment).NotTo(BeNil()) + Expect(feast.setDeployment(deployment)).To(Succeed()) + + ports := deployment.Spec.Template.Spec.Containers[0].Ports + Expect(ports).To(HaveLen(1)) + Expect(ports[0].ContainerPort).To(Equal(FeastServiceConstants[RegistryFeastType].TargetRestHttpPort)) + Expect(ports[0].Name).To(Equal(string(RegistryFeastType) + "-rest")) + + Expect(deployment.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(deployment.Spec.Template.Spec.Containers[0].Ports).To(HaveLen(1)) + Expect(deployment.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort).To(Equal(FeastServiceConstants[RegistryFeastType].TargetRestHttpPort)) + Expect(deployment.Spec.Template.Spec.Containers[0].Ports[0].Name).To(Equal(string(RegistryFeastType) + "-rest")) + }) + + It("should configure correct ports for both services", func() { + setFeatureStoreServerConfig(true, true) + Expect(feast.deployFeastServiceByType(RegistryFeastType)).To(Succeed()) + + deployment := feast.initFeastDeploy() + Expect(deployment).NotTo(BeNil()) + Expect(feast.setDeployment(deployment)).To(Succeed()) + + ports := deployment.Spec.Template.Spec.Containers[0].Ports + Expect(ports).To(HaveLen(2)) + Expect(ports[0].ContainerPort).To(Equal(FeastServiceConstants[RegistryFeastType].TargetHttpPort)) + Expect(ports[0].Name).To(Equal(string(RegistryFeastType))) + Expect(ports[1].ContainerPort).To(Equal(FeastServiceConstants[RegistryFeastType].TargetRestHttpPort)) + Expect(ports[1].Name).To(Equal(string(RegistryFeastType) + "-rest")) + }) + }) +}) diff --git a/infra/feast-operator/internal/controller/services/services_types.go b/infra/feast-operator/internal/controller/services/services_types.go index 962132ace07..4a84e9532cd 100644 --- a/infra/feast-operator/internal/controller/services/services_types.go +++ b/infra/feast-operator/internal/controller/services/services_types.go @@ -108,9 +108,11 @@ var ( TargetHttpsPort: 6567, }, RegistryFeastType: { - Args: []string{"serve_registry"}, - TargetHttpPort: 6570, - TargetHttpsPort: 6571, + Args: []string{"serve_registry"}, + TargetHttpPort: 6570, + TargetHttpsPort: 6571, + TargetRestHttpPort: 6572, + TargetRestHttpsPort: 6573, }, UIFeastType: { Args: []string{"ui", "-h", "0.0.0.0"}, @@ -283,9 +285,11 @@ type AuthzConfig struct { } type deploymentSettings struct { - Args []string - TargetHttpPort int32 - TargetHttpsPort int32 + Args []string + TargetHttpPort int32 + TargetHttpsPort int32 + TargetRestHttpPort int32 + TargetRestHttpsPort int32 } // CustomCertificatesBundle represents a custom CA bundle configuration diff --git a/infra/feast-operator/internal/controller/services/tls.go b/infra/feast-operator/internal/controller/services/tls.go index e3955f7d115..c447d9e99ec 100644 --- a/infra/feast-operator/internal/controller/services/tls.go +++ b/infra/feast-operator/internal/controller/services/tls.go @@ -71,10 +71,18 @@ func (feast *FeastServices) setOpenshiftTls() error { } } if feast.localRegistryOpenshiftTls() { - appliedServices.Registry.Local.Server.TLS = &feastdevv1alpha1.TlsConfigs{ - SecretRef: &corev1.LocalObjectReference{ - Name: feast.initFeastSvc(RegistryFeastType).Name + tlsNameSuffix, - }, + if feast.isRegistryRestEnabled() { + appliedServices.Registry.Local.Server.TLS = &feastdevv1alpha1.TlsConfigs{ + SecretRef: &corev1.LocalObjectReference{ + Name: feast.initFeastRestSvc(RegistryFeastType).Name + tlsNameSuffix, + }, + } + } else { + appliedServices.Registry.Local.Server.TLS = &feastdevv1alpha1.TlsConfigs{ + SecretRef: &corev1.LocalObjectReference{ + Name: feast.initFeastSvc(RegistryFeastType).Name + tlsNameSuffix, + }, + } } } else if remote, err := feast.remoteRegistryOpenshiftTls(); remote { // if the remote registry reference is using openshift's service serving certificates, we can use the injected service CA bundle configMap diff --git a/infra/feast-operator/internal/controller/services/tls_test.go b/infra/feast-operator/internal/controller/services/tls_test.go index caf694a2173..1b63b9d9830 100644 --- a/infra/feast-operator/internal/controller/services/tls_test.go +++ b/infra/feast-operator/internal/controller/services/tls_test.go @@ -56,7 +56,9 @@ var _ = Describe("TLS Config", func() { feast.Handler.FeatureStore.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ Registry: &feastdevv1alpha1.Registry{ Local: &feastdevv1alpha1.LocalRegistryConfig{ - Server: &feastdevv1alpha1.ServerConfigs{}, + Server: &feastdevv1alpha1.RegistryServerConfigs{ + ServerConfigs: feastdevv1alpha1.ServerConfigs{}, + }, }, }, } @@ -85,7 +87,9 @@ var _ = Describe("TLS Config", func() { feast.Handler.FeatureStore.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ Registry: &feastdevv1alpha1.Registry{ Local: &feastdevv1alpha1.LocalRegistryConfig{ - Server: &feastdevv1alpha1.ServerConfigs{}, + Server: &feastdevv1alpha1.RegistryServerConfigs{ + ServerConfigs: feastdevv1alpha1.ServerConfigs{}, + }, }, }, } @@ -190,11 +194,13 @@ var _ = Describe("TLS Config", func() { }, Registry: &feastdevv1alpha1.Registry{ Local: &feastdevv1alpha1.LocalRegistryConfig{ - Server: &feastdevv1alpha1.ServerConfigs{ - TLS: &feastdevv1alpha1.TlsConfigs{ - SecretRef: &corev1.LocalObjectReference{}, - SecretKeyNames: feastdevv1alpha1.SecretKeyNames{ - TlsCrt: "test.crt", + Server: &feastdevv1alpha1.RegistryServerConfigs{ + ServerConfigs: feastdevv1alpha1.ServerConfigs{ + TLS: &feastdevv1alpha1.TlsConfigs{ + SecretRef: &corev1.LocalObjectReference{}, + SecretKeyNames: feastdevv1alpha1.SecretKeyNames{ + TlsCrt: "test.crt", + }, }, }, }, @@ -244,9 +250,11 @@ var _ = Describe("TLS Config", func() { } feast.Handler.FeatureStore.Spec.Services.Registry = &feastdevv1alpha1.Registry{ Local: &feastdevv1alpha1.LocalRegistryConfig{ - Server: &feastdevv1alpha1.ServerConfigs{ - TLS: &feastdevv1alpha1.TlsConfigs{ - Disable: &disable, + Server: &feastdevv1alpha1.RegistryServerConfigs{ + ServerConfigs: feastdevv1alpha1.ServerConfigs{ + TLS: &feastdevv1alpha1.TlsConfigs{ + Disable: &disable, + }, }, }, }, @@ -294,19 +302,19 @@ var _ = Describe("TLS Config", func() { // check k8s service objects offlineSvc := feast.initFeastSvc(OfflineFeastType) Expect(offlineSvc.Annotations).To(BeEmpty()) - err = feast.setService(offlineSvc, OfflineFeastType) + err = feast.setService(offlineSvc, OfflineFeastType, false) Expect(err).ToNot(HaveOccurred()) Expect(offlineSvc.Annotations).NotTo(BeEmpty()) Expect(offlineSvc.Spec.Ports[0].Name).To(Equal(HttpsScheme)) onlineSvc := feast.initFeastSvc(OnlineFeastType) - err = feast.setService(onlineSvc, OnlineFeastType) + err = feast.setService(onlineSvc, OnlineFeastType, false) Expect(err).ToNot(HaveOccurred()) Expect(onlineSvc.Annotations).To(BeEmpty()) Expect(onlineSvc.Spec.Ports[0].Name).To(Equal(HttpScheme)) uiSvc := feast.initFeastSvc(UIFeastType) - err = feast.setService(uiSvc, UIFeastType) + err = feast.setService(uiSvc, UIFeastType, false) Expect(err).ToNot(HaveOccurred()) Expect(uiSvc.Annotations).To(BeEmpty()) Expect(uiSvc.Spec.Ports[0].Name).To(Equal(HttpScheme)) @@ -328,6 +336,46 @@ var _ = Describe("TLS Config", func() { Expect(GetUIContainer(*feastDeploy).Command).NotTo(ContainElements(ContainSubstring("--key"))) Expect(GetUIContainer(*feastDeploy).VolumeMounts).To(HaveLen(1)) + // Test REST registry server TLS configuration + feast.Handler.FeatureStore = minimalFeatureStore() + restEnabled := true + grpcEnabled := false + feast.Handler.FeatureStore.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Server: &feastdevv1alpha1.RegistryServerConfigs{ + ServerConfigs: feastdevv1alpha1.ServerConfigs{}, + RestAPI: &restEnabled, + GRPC: &grpcEnabled, + }, + }, + }, + } + testSetIsOpenShift() + err = feast.ApplyDefaults() + Expect(err).ToNot(HaveOccurred()) + + tls = feast.getTlsConfigs(RegistryFeastType) + Expect(tls).NotTo(BeNil()) + Expect(tls.IsTLS()).To(BeTrue()) + Expect(tls.SecretRef).NotTo(BeNil()) + Expect(tls.SecretRef.Name).To(Equal("feast-test-registry-rest-tls")) + Expect(tls.SecretKeyNames).To(Equal(secretKeyNames)) + Expect(getPortStr(tls)).To(Equal("443")) + Expect(GetTlsPath(RegistryFeastType)).To(Equal("/tls/registry/")) + + registryRestSvc := feast.initFeastRestSvc(RegistryFeastType) + err = feast.setService(registryRestSvc, RegistryFeastType, true) + Expect(err).ToNot(HaveOccurred()) + Expect(registryRestSvc.Annotations).NotTo(BeEmpty()) + Expect(registryRestSvc.Spec.Ports[0].Name).To(Equal(HttpsScheme)) + + feastDeploy = feast.initFeastDeploy() + err = feast.setDeployment(feastDeploy) + Expect(err).ToNot(HaveOccurred()) + registryContainer := GetRegistryContainer(*feastDeploy) + Expect(registryContainer).NotTo(BeNil()) + Expect(registryContainer.Command).To(ContainElements(ContainSubstring("--key"))) }) }) }) diff --git a/infra/feast-operator/internal/controller/services/util.go b/infra/feast-operator/internal/controller/services/util.go index 41f961e557b..662308056e2 100644 --- a/infra/feast-operator/internal/controller/services/util.go +++ b/infra/feast-operator/internal/controller/services/util.go @@ -125,6 +125,12 @@ func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { if services.Registry.Local.Server != nil { setDefaultCtrConfigs(&services.Registry.Local.Server.ContainerConfigs.DefaultCtrConfigs) + // Set default for GRPC: true if nil + if services.Registry.Local.Server.GRPC == nil { + defaultGRPC := true + services.Registry.Local.Server.GRPC = &defaultGRPC + } + } } else if services.Registry.Remote.FeastRef != nil && len(services.Registry.Remote.FeastRef.Namespace) == 0 { services.Registry.Remote.FeastRef.Namespace = cr.Namespace diff --git a/infra/feast-operator/test/api/featurestore_types_test.go b/infra/feast-operator/test/api/featurestore_types_test.go index 12a7406e80d..83ac2906ec0 100644 --- a/infra/feast-operator/test/api/featurestore_types_test.go +++ b/infra/feast-operator/test/api/featurestore_types_test.go @@ -18,6 +18,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func boolPtr(b bool) *bool { + return &b +} + func createFeatureStore() *feastdevv1alpha1.FeatureStore { return &feastdevv1alpha1.FeatureStore{ ObjectMeta: metav1.ObjectMeta{ @@ -336,6 +340,104 @@ func registryStoreWithDBPersistenceType(dbPersistenceType string, featureStore * return fsCopy } +func registryWithRestAPIFalse(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Server: &feastdevv1alpha1.RegistryServerConfigs{ + RestAPI: boolPtr(false), + }, + }, + }, + } + return fsCopy +} + +func registryWithOnlyRestAPI(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Server: &feastdevv1alpha1.RegistryServerConfigs{ + RestAPI: boolPtr(true), + }, + }, + }, + } + return fsCopy +} + +func registryWithOnlyGRPC(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Server: &feastdevv1alpha1.RegistryServerConfigs{ + GRPC: boolPtr(true), + }, + }, + }, + } + return fsCopy +} + +func registryWithBothAPIs(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Server: &feastdevv1alpha1.RegistryServerConfigs{ + RestAPI: boolPtr(true), + GRPC: boolPtr(true), + }, + }, + }, + } + return fsCopy +} + +func registryWithNoAPIs(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Server: &feastdevv1alpha1.RegistryServerConfigs{}, + }, + }, + } + return fsCopy +} + +func registryWithBothFalse(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Server: &feastdevv1alpha1.RegistryServerConfigs{ + RestAPI: boolPtr(false), + GRPC: boolPtr(false), + }, + }, + }, + } + return fsCopy +} + +func registryWithGRPCFalse(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Server: &feastdevv1alpha1.RegistryServerConfigs{ + GRPC: boolPtr(false), + }, + }, + }, + } + return fsCopy +} + func quotedSlice(stringSlice []string) string { quotedSlice := make([]string, len(stringSlice)) @@ -476,4 +578,71 @@ var _ = Describe("FeatureStore API", func() { attemptInvalidCreationAndAsserts(ctx, authzConfigWithOidc(authzConfigWithKubernetes(featurestore)), "One selection required between kubernetes or oidc") }) }) + + Context("When creating a Registry", func() { + ctx := context.Background() + + BeforeEach(func() { + By("verifying the custom resource FeatureStore is not there") + resource := &feastdevv1alpha1.FeatureStore{} + err := k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err != nil && errors.IsNotFound(err)).To(BeTrue()) + }) + AfterEach(func() { + By("Cleaning up the test resource") + resource := &feastdevv1alpha1.FeatureStore{} + err := k8sClient.Get(ctx, typeNamespacedName, resource) + if err == nil { + Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) + } + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err != nil && errors.IsNotFound(err)).To(BeTrue()) + }) + + Context("with valid API configurations", func() { + It("should succeed when restAPI is false and grpc is not specified (defaults to true)", func() { + featurestore := createFeatureStore() + resource := registryWithRestAPIFalse(featurestore) + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + }) + + It("should succeed when restAPI is true and grpc is not specified (defaults to true)", func() { + featurestore := createFeatureStore() + resource := registryWithOnlyRestAPI(featurestore) + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + }) + + It("should succeed when only grpc is true", func() { + featurestore := createFeatureStore() + resource := registryWithOnlyGRPC(featurestore) + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + }) + + It("should succeed when both APIs are true", func() { + featurestore := createFeatureStore() + resource := registryWithBothAPIs(featurestore) + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + }) + + It("should succeed when no APIs are specified (grpc defaults to true)", func() { + featurestore := createFeatureStore() + resource := registryWithNoAPIs(featurestore) + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + }) + }) + + Context("with invalid API configurations", func() { + It("should fail when both APIs are explicitly false", func() { + featurestore := createFeatureStore() + resource := registryWithBothFalse(featurestore) + attemptInvalidCreationAndAsserts(ctx, resource, "At least one of restAPI or grpc must be true") + }) + + It("should fail when grpc is false and restAPI is not specified", func() { + featurestore := createFeatureStore() + resource := registryWithGRPCFalse(featurestore) + attemptInvalidCreationAndAsserts(ctx, resource, "At least one of restAPI or grpc must be true") + }) + }) + }) }) diff --git a/sdk/python/feast/cli/serve.py b/sdk/python/feast/cli/serve.py index 049eb232165..b5ff950a042 100644 --- a/sdk/python/feast/cli/serve.py +++ b/sdk/python/feast/cli/serve.py @@ -232,7 +232,15 @@ def serve_registry_command( for p in servers: p.join() else: - store.serve_registry(port, tls_key_path, tls_cert_path, rest_api) + if grpc: + store.serve_registry(port, tls_key_path, tls_cert_path) + else: + store.serve_registry( + port=rest_port, + tls_key_path=tls_key_path, + tls_cert_path=tls_cert_path, + rest_api=rest_api, + ) def _serve_grpc_registry(