diff --git a/infra/feast-operator/api/v1alpha1/featurestore_types.go b/infra/feast-operator/api/v1alpha1/featurestore_types.go index 87e1cd64841..a8448571c04 100644 --- a/infra/feast-operator/api/v1alpha1/featurestore_types.go +++ b/infra/feast-operator/api/v1alpha1/featurestore_types.go @@ -71,16 +71,55 @@ type FeatureStoreServices struct { // OfflineStore configures the deployed offline store service type OfflineStore struct { ServiceConfigs `json:",inline"` + Persistence *OfflineStorePersistence `json:"persistence,omitempty"` +} + +// OfflineStorePersistence configures the persistence settings for the offline store service +type OfflineStorePersistence struct { + FilePersistence *OfflineStoreFilePersistence `json:"file,omitempty"` +} + +// OfflineStorePersistence configures the file-based persistence for the offline store service +type OfflineStoreFilePersistence struct { + // +kubebuilder:validation:Enum=dask;duckdb + Type string `json:"type,omitempty"` +} + +var ValidOfflineStoreFilePersistenceTypes = []string{ + "dask", + "duckdb", } // OnlineStore configures the deployed online store service type OnlineStore struct { ServiceConfigs `json:",inline"` + Persistence *OnlineStorePersistence `json:"persistence,omitempty"` +} + +// OnlineStorePersistence configures the persistence settings for the online store service +type OnlineStorePersistence struct { + FilePersistence *OnlineStoreFilePersistence `json:"file,omitempty"` +} + +// OnlineStoreFilePersistence configures the file-based persistence for the offline store service +type OnlineStoreFilePersistence struct { + Path string `json:"path,omitempty"` } // LocalRegistryConfig configures the deployed registry service type LocalRegistryConfig struct { ServiceConfigs `json:",inline"` + Persistence *RegistryPersistence `json:"persistence,omitempty"` +} + +// RegistryPersistence configures the persistence settings for the registry service +type RegistryPersistence struct { + FilePersistence *RegistryFilePersistence `json:"file,omitempty"` +} + +// RegistryFilePersistence configures the file-based persistence for the registry service +type RegistryFilePersistence struct { + Path string `json:"path,omitempty"` } // Registry configures the registry service. One selection is required. Local is the default setting. diff --git a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go index f37c8942ad2..6ff9957bb84 100644 --- a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go @@ -198,6 +198,11 @@ func (in *FeatureStoreStatus) DeepCopy() *FeatureStoreStatus { func (in *LocalRegistryConfig) DeepCopyInto(out *LocalRegistryConfig) { *out = *in in.ServiceConfigs.DeepCopyInto(&out.ServiceConfigs) + if in.Persistence != nil { + in, out := &in.Persistence, &out.Persistence + *out = new(RegistryPersistence) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LocalRegistryConfig. @@ -214,6 +219,11 @@ func (in *LocalRegistryConfig) DeepCopy() *LocalRegistryConfig { func (in *OfflineStore) DeepCopyInto(out *OfflineStore) { *out = *in in.ServiceConfigs.DeepCopyInto(&out.ServiceConfigs) + if in.Persistence != nil { + in, out := &in.Persistence, &out.Persistence + *out = new(OfflineStorePersistence) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OfflineStore. @@ -226,10 +236,50 @@ func (in *OfflineStore) DeepCopy() *OfflineStore { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OfflineStoreFilePersistence) DeepCopyInto(out *OfflineStoreFilePersistence) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OfflineStoreFilePersistence. +func (in *OfflineStoreFilePersistence) DeepCopy() *OfflineStoreFilePersistence { + if in == nil { + return nil + } + out := new(OfflineStoreFilePersistence) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OfflineStorePersistence) DeepCopyInto(out *OfflineStorePersistence) { + *out = *in + if in.FilePersistence != nil { + in, out := &in.FilePersistence, &out.FilePersistence + *out = new(OfflineStoreFilePersistence) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OfflineStorePersistence. +func (in *OfflineStorePersistence) DeepCopy() *OfflineStorePersistence { + if in == nil { + return nil + } + out := new(OfflineStorePersistence) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OnlineStore) DeepCopyInto(out *OnlineStore) { *out = *in in.ServiceConfigs.DeepCopyInto(&out.ServiceConfigs) + if in.Persistence != nil { + in, out := &in.Persistence, &out.Persistence + *out = new(OnlineStorePersistence) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OnlineStore. @@ -242,6 +292,41 @@ func (in *OnlineStore) DeepCopy() *OnlineStore { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OnlineStoreFilePersistence) DeepCopyInto(out *OnlineStoreFilePersistence) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OnlineStoreFilePersistence. +func (in *OnlineStoreFilePersistence) DeepCopy() *OnlineStoreFilePersistence { + if in == nil { + return nil + } + out := new(OnlineStoreFilePersistence) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OnlineStorePersistence) DeepCopyInto(out *OnlineStorePersistence) { + *out = *in + if in.FilePersistence != nil { + in, out := &in.FilePersistence, &out.FilePersistence + *out = new(OnlineStoreFilePersistence) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OnlineStorePersistence. +func (in *OnlineStorePersistence) DeepCopy() *OnlineStorePersistence { + if in == nil { + return nil + } + out := new(OnlineStorePersistence) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OptionalConfigs) DeepCopyInto(out *OptionalConfigs) { *out = *in @@ -303,6 +388,41 @@ func (in *Registry) DeepCopy() *Registry { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RegistryFilePersistence) DeepCopyInto(out *RegistryFilePersistence) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RegistryFilePersistence. +func (in *RegistryFilePersistence) DeepCopy() *RegistryFilePersistence { + if in == nil { + return nil + } + out := new(RegistryFilePersistence) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RegistryPersistence) DeepCopyInto(out *RegistryPersistence) { + *out = *in + if in.FilePersistence != nil { + in, out := &in.FilePersistence, &out.FilePersistence + *out = new(RegistryFilePersistence) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RegistryPersistence. +func (in *RegistryPersistence) DeepCopy() *RegistryPersistence { + if in == nil { + return nil + } + out := new(RegistryPersistence) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RemoteRegistryConfig) DeepCopyInto(out *RemoteRegistryConfig) { *out = *in diff --git a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml index b4c17b5eb80..a6d4551da5c 100644 --- a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml +++ b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml @@ -181,6 +181,21 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OfflineStorePersistence configures the persistence + settings for the offline store service + properties: + file: + description: OfflineStorePersistence configures the file-based + persistence for the offline store service + properties: + type: + enum: + - dask + - duckdb + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -361,6 +376,18 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OnlineStorePersistence configures the persistence + settings for the online store service + properties: + file: + description: OnlineStoreFilePersistence configures the + file-based persistence for the offline store service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -546,6 +573,18 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: RegistryPersistence configures the persistence + settings for the registry service + properties: + file: + description: RegistryFilePersistence configures the + file-based persistence for the registry service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -780,6 +819,21 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OfflineStorePersistence configures the persistence + settings for the offline store service + properties: + file: + description: OfflineStorePersistence configures the + file-based persistence for the offline store service + properties: + type: + enum: + - dask + - duckdb + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -962,6 +1016,19 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OnlineStorePersistence configures the persistence + settings for the online store service + properties: + file: + description: OnlineStoreFilePersistence configures + the file-based persistence for the offline store + service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -1151,6 +1218,19 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: RegistryPersistence configures the persistence + settings for the registry service + properties: + file: + description: RegistryFilePersistence configures + the file-based persistence for the registry + service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. diff --git a/infra/feast-operator/config/samples/v1alpha1_featurestore_ephemeral_persistence.yaml b/infra/feast-operator/config/samples/v1alpha1_featurestore_ephemeral_persistence.yaml new file mode 100644 index 00000000000..512fed9d4c0 --- /dev/null +++ b/infra/feast-operator/config/samples/v1alpha1_featurestore_ephemeral_persistence.yaml @@ -0,0 +1,20 @@ +apiVersion: feast.dev/v1alpha1 +kind: FeatureStore +metadata: + name: sample-ephemeral-persistence +spec: + feastProject: my_project + services: + onlineStore: + persistence: + file: + path: /data/online_store.db + offlineStore: + persistence: + file: + type: dask + registry: + local: + persistence: + file: + path: /data/registry.db diff --git a/infra/feast-operator/dist/install.yaml b/infra/feast-operator/dist/install.yaml index 4d66fbdc734..ceb6b34e5c4 100644 --- a/infra/feast-operator/dist/install.yaml +++ b/infra/feast-operator/dist/install.yaml @@ -189,6 +189,21 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OfflineStorePersistence configures the persistence + settings for the offline store service + properties: + file: + description: OfflineStorePersistence configures the file-based + persistence for the offline store service + properties: + type: + enum: + - dask + - duckdb + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -369,6 +384,18 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OnlineStorePersistence configures the persistence + settings for the online store service + properties: + file: + description: OnlineStoreFilePersistence configures the + file-based persistence for the offline store service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -554,6 +581,18 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: RegistryPersistence configures the persistence + settings for the registry service + properties: + file: + description: RegistryFilePersistence configures the + file-based persistence for the registry service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -788,6 +827,21 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OfflineStorePersistence configures the persistence + settings for the offline store service + properties: + file: + description: OfflineStorePersistence configures the + file-based persistence for the offline store service + properties: + type: + enum: + - dask + - duckdb + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -970,6 +1024,19 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OnlineStorePersistence configures the persistence + settings for the online store service + properties: + file: + description: OnlineStoreFilePersistence configures + the file-based persistence for the offline store + service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -1159,6 +1226,19 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: RegistryPersistence configures the persistence + settings for the registry service + properties: + file: + description: RegistryFilePersistence configures + the file-based persistence for the registry + service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. diff --git a/infra/feast-operator/go.mod b/infra/feast-operator/go.mod index 65d2aaac502..4e544d819e4 100644 --- a/infra/feast-operator/go.mod +++ b/infra/feast-operator/go.mod @@ -5,6 +5,8 @@ go 1.21 require ( github.com/onsi/ginkgo/v2 v2.14.0 github.com/onsi/gomega v1.30.0 + gopkg.in/yaml.v3 v3.0.1 + k8s.io/api v0.29.2 k8s.io/apimachinery v0.29.2 k8s.io/client-go v0.29.2 sigs.k8s.io/controller-runtime v0.17.3 @@ -60,8 +62,6 @@ require ( google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.29.2 // indirect k8s.io/apiextensions-apiserver v0.29.2 // indirect k8s.io/component-base v0.29.2 // indirect k8s.io/klog/v2 v2.110.1 // indirect diff --git a/infra/feast-operator/internal/controller/featurestore_controller.go b/infra/feast-operator/internal/controller/featurestore_controller.go index 244bbcaae80..5da9bad3aa6 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller.go +++ b/infra/feast-operator/internal/controller/featurestore_controller.go @@ -34,7 +34,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/feast-dev/feast/infra/feast-operator/api/feastversion" feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" "github.com/feast-dev/feast/infra/feast-operator/internal/controller/services" ) @@ -78,10 +77,10 @@ func (r *FeatureStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request currentStatus := cr.Status.DeepCopy() // initial status defaults must occur before feast deployment - applyDefaultsToStatus(cr) + services.ApplyDefaultsToStatus(cr) result, recErr = r.deployFeast(ctx, cr) if cr.DeletionTimestamp == nil && !reflect.DeepEqual(currentStatus, cr.Status) { - if err := r.Client.Status().Update(ctx, cr); err != nil { + if err = r.Client.Status().Update(ctx, cr); err != nil { if apierrors.IsConflict(err) { logger.Info("FeatureStore object modified, retry syncing status") // Re-queue and preserve existing recErr @@ -184,39 +183,3 @@ func (r *FeatureStoreReconciler) mapFeastRefsToFeastRequests(ctx context.Context return requests } - -func applyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { - cr.Status.FeastVersion = feastversion.FeastVersion - applied := cr.Spec.DeepCopy() - if applied.Services == nil { - applied.Services = &feastdevv1alpha1.FeatureStoreServices{} - } - - // default to registry service deployment - if applied.Services.Registry == nil { - applied.Services.Registry = &feastdevv1alpha1.Registry{} - } - // if remote registry not set, proceed w/ local registry defaults - if applied.Services.Registry.Remote == nil { - // if local registry not set, apply an empty pointer struct - if applied.Services.Registry.Local == nil { - applied.Services.Registry.Local = &feastdevv1alpha1.LocalRegistryConfig{} - } - setServiceDefaultConfigs(&applied.Services.Registry.Local.ServiceConfigs.DefaultConfigs) - } - if applied.Services.OfflineStore != nil { - setServiceDefaultConfigs(&applied.Services.OfflineStore.ServiceConfigs.DefaultConfigs) - } - if applied.Services.OnlineStore != nil { - setServiceDefaultConfigs(&applied.Services.OnlineStore.ServiceConfigs.DefaultConfigs) - } - - // overwrite status.applied with every reconcile - applied.DeepCopyInto(&cr.Status.Applied) -} - -func setServiceDefaultConfigs(defaultConfigs *feastdevv1alpha1.DefaultConfigs) { - if defaultConfigs.Image == nil { - defaultConfigs.Image = &services.DefaultImage - } -} diff --git a/infra/feast-operator/internal/controller/featurestore_controller_test.go b/infra/feast-operator/internal/controller/featurestore_controller_test.go index 10b5f64c567..aab260d9991 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_test.go @@ -19,6 +19,7 @@ package controller import ( "context" "encoding/base64" + "fmt" "reflect" . "github.com/onsi/ginkgo/v2" @@ -233,7 +234,7 @@ var _ = Describe("FeatureStore Controller", func() { EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, Registry: services.RegistryConfig{ RegistryType: services.RegistryFileConfigType, - Path: services.LocalRegistryPath, + Path: services.DefaultRegistryPath, }, } Expect(repoConfig).To(Equal(testConfig)) @@ -441,16 +442,25 @@ var _ = Describe("FeatureStore Controller", func() { Expect(resource.Status.Applied.FeastProject).To(Equal(resource.Spec.FeastProject)) Expect(resource.Status.Applied.Services).NotTo(BeNil()) Expect(resource.Status.Applied.Services.OfflineStore).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Persistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Persistence.FilePersistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Persistence.FilePersistence.Type).To(Equal("dask")) Expect(resource.Status.Applied.Services.OfflineStore.ImagePullPolicy).To(BeNil()) Expect(resource.Status.Applied.Services.OfflineStore.Resources).To(BeNil()) Expect(resource.Status.Applied.Services.OfflineStore.Image).To(Equal(&services.DefaultImage)) Expect(resource.Status.Applied.Services.OnlineStore).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OnlineStore.Persistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OnlineStore.Persistence.FilePersistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OnlineStore.Persistence.FilePersistence.Path).To(Equal(services.DefaultOnlinePath)) Expect(resource.Status.Applied.Services.OnlineStore.Env).To(Equal(&[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}})) Expect(resource.Status.Applied.Services.OnlineStore.ImagePullPolicy).To(Equal(&pullPolicy)) Expect(resource.Status.Applied.Services.OnlineStore.Resources).NotTo(BeNil()) Expect(resource.Status.Applied.Services.OnlineStore.Image).To(Equal(&image)) Expect(resource.Status.Applied.Services.Registry).NotTo(BeNil()) Expect(resource.Status.Applied.Services.Registry.Local).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence.Path).To(Equal(services.DefaultRegistryPath)) Expect(resource.Status.Applied.Services.Registry.Local.ImagePullPolicy).To(BeNil()) Expect(resource.Status.Applied.Services.Registry.Local.Resources).To(BeNil()) Expect(resource.Status.Applied.Services.Registry.Local.Image).To(Equal(&services.DefaultImage)) @@ -590,7 +600,7 @@ var _ = Describe("FeatureStore Controller", func() { EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, Registry: services.RegistryConfig{ RegistryType: services.RegistryFileConfigType, - Path: services.LocalRegistryPath, + Path: services.DefaultRegistryPath, }, } Expect(repoConfig).To(Equal(testConfig)) @@ -666,7 +676,7 @@ var _ = Describe("FeatureStore Controller", func() { EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, OfflineStore: offlineRemote, OnlineStore: services.OnlineStoreConfig{ - Path: services.LocalOnlinePath, + Path: services.DefaultOnlinePath, Type: services.OnlineSqliteConfigType, }, Registry: regRemote, @@ -1187,6 +1197,445 @@ var _ = Describe("FeatureStore Controller", func() { Expect(err).NotTo(HaveOccurred()) }) }) + + Context("When deploying a resource with all ephemeral services", func() { + const resourceName = "services-ephemeral" + image := "test:latest" + var pullPolicy = corev1.PullAlways + var testEnvVarName = "testEnvVarName" + var testEnvVarValue = "testEnvVarValue" + + ctx := context.Background() + + typeNamespacedName := types.NamespacedName{ + Name: resourceName, + Namespace: "default", + } + featurestore := &feastdevv1alpha1.FeatureStore{} + onlineStorePath := "/data/online.db" + registryPath := "/data/registry.db" + offlineType := "duckdb" + + BeforeEach(func() { + By("creating the custom resource for the Kind FeatureStore") + err := k8sClient.Get(ctx, typeNamespacedName, featurestore) + if err != nil && errors.IsNotFound(err) { + resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, + {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}}) + resource.Spec.Services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{ + FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ + Type: offlineType, + }, + } + resource.Spec.Services.OnlineStore.Persistence = &feastdevv1alpha1.OnlineStorePersistence{ + FilePersistence: &feastdevv1alpha1.OnlineStoreFilePersistence{ + Path: onlineStorePath, + }, + } + resource.Spec.Services.Registry = &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Persistence: &feastdevv1alpha1.RegistryPersistence{ + FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ + Path: registryPath, + }, + }, + }, + } + + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + } + }) + AfterEach(func() { + resource := &feastdevv1alpha1.FeatureStore{} + err := k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + By("Cleanup the specific resource instance FeatureStore") + Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) + }) + + It("should successfully reconcile the resource", func() { + By("Reconciling the created resource") + controllerReconciler := &FeatureStoreReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + resource := &feastdevv1alpha1.FeatureStore{} + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + feast := services.FeastServices{ + Client: controllerReconciler.Client, + Context: ctx, + Scheme: controllerReconciler.Scheme, + FeatureStore: resource, + } + Expect(resource.Status).NotTo(BeNil()) + Expect(resource.Status.FeastVersion).To(Equal(feastversion.FeastVersion)) + Expect(resource.Status.ClientConfigMap).To(Equal(feast.GetFeastServiceName(services.ClientFeastType))) + Expect(resource.Status.Applied.FeastProject).To(Equal(resource.Spec.FeastProject)) + Expect(resource.Status.Applied.Services).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Persistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Persistence.FilePersistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Persistence.FilePersistence.Type).To(Equal(offlineType)) + Expect(resource.Status.Applied.Services.OfflineStore.ImagePullPolicy).To(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Resources).To(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Image).To(Equal(&services.DefaultImage)) + Expect(resource.Status.Applied.Services.OnlineStore).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OnlineStore.Persistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OnlineStore.Persistence.FilePersistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OnlineStore.Persistence.FilePersistence.Path).To(Equal(onlineStorePath)) + Expect(resource.Status.Applied.Services.OnlineStore.Env).To(Equal(&[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}})) + Expect(resource.Status.Applied.Services.OnlineStore.ImagePullPolicy).To(Equal(&pullPolicy)) + Expect(resource.Status.Applied.Services.OnlineStore.Resources).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OnlineStore.Image).To(Equal(&image)) + Expect(resource.Status.Applied.Services.Registry).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence.Path).To(Equal(registryPath)) + Expect(resource.Status.Applied.Services.Registry.Local.ImagePullPolicy).To(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Resources).To(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Image).To(Equal(&services.DefaultImage)) + + domain := ".svc.cluster.local:80" + Expect(resource.Status.ServiceHostnames.OfflineStore).To(Equal(feast.GetFeastServiceName(services.OfflineFeastType) + "." + resource.Namespace + domain)) + Expect(resource.Status.ServiceHostnames.OnlineStore).To(Equal(feast.GetFeastServiceName(services.OnlineFeastType) + "." + resource.Namespace + domain)) + Expect(resource.Status.ServiceHostnames.Registry).To(Equal(feast.GetFeastServiceName(services.RegistryFeastType) + "." + resource.Namespace + domain)) + + Expect(resource.Status.Conditions).NotTo(BeEmpty()) + cond := apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.ReadyType) + Expect(cond).ToNot(BeNil()) + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + Expect(cond.Reason).To(Equal(feastdevv1alpha1.ReadyReason)) + Expect(cond.Type).To(Equal(feastdevv1alpha1.ReadyType)) + Expect(cond.Message).To(Equal(feastdevv1alpha1.ReadyMessage)) + + cond = apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.RegistryReadyType) + Expect(cond).ToNot(BeNil()) + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + Expect(cond.Reason).To(Equal(feastdevv1alpha1.ReadyReason)) + Expect(cond.Type).To(Equal(feastdevv1alpha1.RegistryReadyType)) + Expect(cond.Message).To(Equal(feastdevv1alpha1.RegistryReadyMessage)) + + cond = apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.ClientReadyType) + Expect(cond).ToNot(BeNil()) + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + Expect(cond.Reason).To(Equal(feastdevv1alpha1.ReadyReason)) + Expect(cond.Type).To(Equal(feastdevv1alpha1.ClientReadyType)) + Expect(cond.Message).To(Equal(feastdevv1alpha1.ClientReadyMessage)) + + cond = apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.OfflineStoreReadyType) + Expect(cond).ToNot(BeNil()) + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + Expect(cond.Reason).To(Equal(feastdevv1alpha1.ReadyReason)) + Expect(cond.Type).To(Equal(feastdevv1alpha1.OfflineStoreReadyType)) + Expect(cond.Message).To(Equal(feastdevv1alpha1.OfflineStoreReadyMessage)) + + cond = apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.OnlineStoreReadyType) + Expect(cond).ToNot(BeNil()) + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + Expect(cond.Reason).To(Equal(feastdevv1alpha1.ReadyReason)) + Expect(cond.Type).To(Equal(feastdevv1alpha1.OnlineStoreReadyType)) + Expect(cond.Message).To(Equal(feastdevv1alpha1.OnlineStoreReadyMessage)) + + Expect(resource.Status.Phase).To(Equal(feastdevv1alpha1.ReadyPhase)) + + deploy := &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.RegistryFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + Expect(deploy.Spec.Replicas).To(Equal(&services.DefaultReplicas)) + Expect(controllerutil.HasControllerReference(deploy)).To(BeTrue()) + Expect(deploy.Spec.Template.Spec.Containers).To(HaveLen(1)) + + svc := &corev1.Service{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.RegistryFeastType), + Namespace: resource.Namespace, + }, + svc) + Expect(err).NotTo(HaveOccurred()) + Expect(controllerutil.HasControllerReference(svc)).To(BeTrue()) + Expect(svc.Spec.Ports[0].TargetPort).To(Equal(intstr.FromInt(int(services.FeastServiceConstants[services.RegistryFeastType].TargetPort)))) + }) + + It("should properly encode a feature_store.yaml config", func() { + By("Reconciling the created resource") + controllerReconciler := &FeatureStoreReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + resource := &feastdevv1alpha1.FeatureStore{} + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + req, err := labels.NewRequirement(services.NameLabelKey, selection.Equals, []string{resource.Name}) + Expect(err).NotTo(HaveOccurred()) + labelSelector := labels.NewSelector().Add(*req) + listOpts := &client.ListOptions{Namespace: resource.Namespace, LabelSelector: labelSelector} + deployList := appsv1.DeploymentList{} + err = k8sClient.List(ctx, &deployList, listOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(deployList.Items).To(HaveLen(3)) + + svcList := corev1.ServiceList{} + err = k8sClient.List(ctx, &svcList, listOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(svcList.Items).To(HaveLen(3)) + + cmList := corev1.ConfigMapList{} + err = k8sClient.List(ctx, &cmList, listOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(cmList.Items).To(HaveLen(1)) + + feast := services.FeastServices{ + Client: controllerReconciler.Client, + Context: ctx, + Scheme: controllerReconciler.Scheme, + FeatureStore: resource, + } + + // check registry config + deploy := &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.RegistryFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + Expect(deploy.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(deploy.Spec.Template.Spec.Containers[0].Env).To(HaveLen(1)) + env := getFeatureStoreYamlEnvVar(deploy.Spec.Template.Spec.Containers[0].Env) + Expect(env).NotTo(BeNil()) + + fsYamlStr, err := feast.GetServiceFeatureStoreYamlBase64(services.RegistryFeastType) + Expect(err).NotTo(HaveOccurred()) + Expect(fsYamlStr).To(Equal(env.Value)) + + envByte, err := base64.StdEncoding.DecodeString(env.Value) + Expect(err).NotTo(HaveOccurred()) + repoConfig := &services.RepoConfig{} + err = yaml.Unmarshal(envByte, repoConfig) + Expect(err).NotTo(HaveOccurred()) + testConfig := &services.RepoConfig{ + Project: feastProject, + Provider: services.LocalProviderType, + EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, + Registry: services.RegistryConfig{ + RegistryType: services.RegistryFileConfigType, + Path: registryPath, + }, + } + Expect(repoConfig).To(Equal(testConfig)) + + // check offline config + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OfflineFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + Expect(deploy.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(deploy.Spec.Template.Spec.Containers[0].Env).To(HaveLen(1)) + env = getFeatureStoreYamlEnvVar(deploy.Spec.Template.Spec.Containers[0].Env) + Expect(env).NotTo(BeNil()) + + fsYamlStr, err = feast.GetServiceFeatureStoreYamlBase64(services.OfflineFeastType) + Expect(err).NotTo(HaveOccurred()) + Expect(fsYamlStr).To(Equal(env.Value)) + + envByte, err = base64.StdEncoding.DecodeString(env.Value) + Expect(err).NotTo(HaveOccurred()) + repoConfigOffline := &services.RepoConfig{} + err = yaml.Unmarshal(envByte, repoConfigOffline) + Expect(err).NotTo(HaveOccurred()) + regRemote := services.RegistryConfig{ + RegistryType: services.RegistryRemoteConfigType, + Path: fmt.Sprintf("feast-%s-registry.default.svc.cluster.local:80", resourceName), + } + offlineConfig := &services.RepoConfig{ + Project: feastProject, + Provider: services.LocalProviderType, + EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, + OfflineStore: services.OfflineStoreConfig{ + Type: services.OfflineDuckDbConfigType, + }, + Registry: regRemote, + } + Expect(repoConfigOffline).To(Equal(offlineConfig)) + + // check online config + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OnlineFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + Expect(deploy.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(deploy.Spec.Template.Spec.Containers[0].Env).To(HaveLen(3)) + Expect(deploy.Spec.Template.Spec.Containers[0].ImagePullPolicy).To(Equal(corev1.PullAlways)) + env = getFeatureStoreYamlEnvVar(deploy.Spec.Template.Spec.Containers[0].Env) + Expect(env).NotTo(BeNil()) + + fsYamlStr, err = feast.GetServiceFeatureStoreYamlBase64(services.OnlineFeastType) + Expect(err).NotTo(HaveOccurred()) + Expect(fsYamlStr).To(Equal(env.Value)) + + envByte, err = base64.StdEncoding.DecodeString(env.Value) + Expect(err).NotTo(HaveOccurred()) + repoConfigOnline := &services.RepoConfig{} + err = yaml.Unmarshal(envByte, repoConfigOnline) + Expect(err).NotTo(HaveOccurred()) + offlineRemote := services.OfflineStoreConfig{ + Host: fmt.Sprintf("feast-%s-offline.default.svc.cluster.local", resourceName), + Type: services.OfflineRemoteConfigType, + Port: services.HttpPort, + } + onlineConfig := &services.RepoConfig{ + Project: feastProject, + Provider: services.LocalProviderType, + EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, + OfflineStore: offlineRemote, + OnlineStore: services.OnlineStoreConfig{ + Path: onlineStorePath, + Type: services.OnlineSqliteConfigType, + }, + Registry: regRemote, + } + Expect(repoConfigOnline).To(Equal(onlineConfig)) + Expect(deploy.Spec.Template.Spec.Containers[0].Env).To(HaveLen(3)) + + // check client config + cm := &corev1.ConfigMap{} + name := feast.GetFeastServiceName(services.ClientFeastType) + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: name, + Namespace: resource.Namespace, + }, + cm) + Expect(err).NotTo(HaveOccurred()) + repoConfigClient := &services.RepoConfig{} + err = yaml.Unmarshal([]byte(cm.Data[services.FeatureStoreYamlCmKey]), repoConfigClient) + Expect(err).NotTo(HaveOccurred()) + clientConfig := &services.RepoConfig{ + Project: feastProject, + Provider: services.LocalProviderType, + EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, + OfflineStore: offlineRemote, + OnlineStore: services.OnlineStoreConfig{ + Path: fmt.Sprintf("http://feast-%s-online.default.svc.cluster.local:80", resourceName), + Type: services.OnlineRemoteConfigType, + }, + Registry: regRemote, + } + Expect(repoConfigClient).To(Equal(clientConfig)) + + // change paths and reconcile + resourceNew := resource.DeepCopy() + newOnlineStorePath := "/data/new_online.db" + newRegistryPath := "/data/new_registry.db" + resourceNew.Spec.Services.OnlineStore.Persistence.FilePersistence.Path = newOnlineStorePath + resourceNew.Spec.Services.Registry.Local.Persistence.FilePersistence.Path = newRegistryPath + err = k8sClient.Update(ctx, resourceNew) + Expect(err).NotTo(HaveOccurred()) + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + resource = &feastdevv1alpha1.FeatureStore{} + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + feast.FeatureStore = resource + + // check registry config + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.RegistryFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + env = getFeatureStoreYamlEnvVar(deploy.Spec.Template.Spec.Containers[0].Env) + Expect(env).NotTo(BeNil()) + fsYamlStr, err = feast.GetServiceFeatureStoreYamlBase64(services.RegistryFeastType) + Expect(err).NotTo(HaveOccurred()) + Expect(fsYamlStr).To(Equal(env.Value)) + + envByte, err = base64.StdEncoding.DecodeString(env.Value) + Expect(err).NotTo(HaveOccurred()) + repoConfig = &services.RepoConfig{} + err = yaml.Unmarshal(envByte, repoConfig) + Expect(err).NotTo(HaveOccurred()) + testConfig.Registry.Path = newRegistryPath + Expect(repoConfig).To(Equal(testConfig)) + + // check offline config + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OfflineFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + env = getFeatureStoreYamlEnvVar(deploy.Spec.Template.Spec.Containers[0].Env) + Expect(env).NotTo(BeNil()) + + fsYamlStr, err = feast.GetServiceFeatureStoreYamlBase64(services.OfflineFeastType) + Expect(err).NotTo(HaveOccurred()) + Expect(fsYamlStr).To(Equal(env.Value)) + + envByte, err = base64.StdEncoding.DecodeString(env.Value) + Expect(err).NotTo(HaveOccurred()) + repoConfigOffline = &services.RepoConfig{} + err = yaml.Unmarshal(envByte, repoConfigOffline) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfigOffline).To(Equal(offlineConfig)) + + // check online config + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OnlineFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + env = getFeatureStoreYamlEnvVar(deploy.Spec.Template.Spec.Containers[0].Env) + Expect(env).NotTo(BeNil()) + + fsYamlStr, err = feast.GetServiceFeatureStoreYamlBase64(services.OnlineFeastType) + Expect(err).NotTo(HaveOccurred()) + Expect(fsYamlStr).To(Equal(env.Value)) + + envByte, err = base64.StdEncoding.DecodeString(env.Value) + Expect(err).NotTo(HaveOccurred()) + + repoConfigOnline = &services.RepoConfig{} + err = yaml.Unmarshal(envByte, repoConfigOnline) + Expect(err).NotTo(HaveOccurred()) + onlineConfig.OnlineStore.Path = newOnlineStorePath + Expect(repoConfigOnline).To(Equal(onlineConfig)) + }) + }) }) func createFeatureStoreResource(resourceName string, image string, pullPolicy corev1.PullPolicy, envVars *[]corev1.EnvVar) *feastdevv1alpha1.FeatureStore { diff --git a/infra/feast-operator/internal/controller/services/repo_config.go b/infra/feast-operator/internal/controller/services/repo_config.go index 3137417f3ac..e087038ba52 100644 --- a/infra/feast-operator/internal/controller/services/repo_config.go +++ b/infra/feast-operator/internal/controller/services/repo_config.go @@ -35,48 +35,74 @@ func (feast *FeastServices) GetServiceFeatureStoreYamlBase64(feastType FeastServ } func (feast *FeastServices) getServiceFeatureStoreYaml(feastType FeastServiceType) ([]byte, error) { - return yaml.Marshal(feast.getServiceRepoConfig(feastType)) + repoConfig, err := feast.getServiceRepoConfig(feastType) + if err != nil { + return nil, err + } + return yaml.Marshal(repoConfig) } -func (feast *FeastServices) getServiceRepoConfig(feastType FeastServiceType) RepoConfig { - appliedSpec := feast.FeatureStore.Status.Applied +func (feast *FeastServices) getServiceRepoConfig(feastType FeastServiceType) (RepoConfig, error) { + return getServiceRepoConfig(feastType, feast.FeatureStore) +} - repoConfig := feast.getClientRepoConfig() +func getServiceRepoConfig(feastType FeastServiceType, featureStore *feastdevv1alpha1.FeatureStore) (RepoConfig, error) { + appliedSpec := featureStore.Status.Applied + + repoConfig := getClientRepoConfig(featureStore) + isLocalRegistry := isLocalRegistry(featureStore) if appliedSpec.Services != nil { // Offline server has an `offline_store` section and a remote `registry` if feastType == OfflineFeastType && appliedSpec.Services.OfflineStore != nil { + fileType := string(OfflineDaskConfigType) + if appliedSpec.Services.OfflineStore.Persistence != nil && + appliedSpec.Services.OfflineStore.Persistence.FilePersistence != nil && + len(appliedSpec.Services.OfflineStore.Persistence.FilePersistence.Type) > 0 { + fileType = appliedSpec.Services.OfflineStore.Persistence.FilePersistence.Type + } + repoConfig.OfflineStore = OfflineStoreConfig{ - Type: OfflineDaskConfigType, + Type: OfflineConfigType(fileType), } repoConfig.OnlineStore = OnlineStoreConfig{} } // Online server has an `online_store` section, a remote `registry` and a remote `offline_store` if feastType == OnlineFeastType && appliedSpec.Services.OnlineStore != nil { + path := DefaultOnlinePath + if appliedSpec.Services.OnlineStore.Persistence != nil && appliedSpec.Services.OnlineStore.Persistence.FilePersistence != nil { + path = appliedSpec.Services.OnlineStore.Persistence.FilePersistence.Path + } + repoConfig.OnlineStore = OnlineStoreConfig{ Type: OnlineSqliteConfigType, - Path: LocalOnlinePath, + Path: path, } } // Registry server only has a `registry` section - if feastType == RegistryFeastType && feast.isLocalRegistry() { + if feastType == RegistryFeastType && isLocalRegistry { + path := DefaultRegistryPath + if appliedSpec.Services != nil && appliedSpec.Services.Registry != nil && appliedSpec.Services.Registry.Local != nil && + appliedSpec.Services.Registry.Local.Persistence != nil && appliedSpec.Services.Registry.Local.Persistence.FilePersistence != nil { + path = appliedSpec.Services.Registry.Local.Persistence.FilePersistence.Path + } repoConfig.Registry = RegistryConfig{ RegistryType: RegistryFileConfigType, - Path: LocalRegistryPath, + Path: path, } repoConfig.OfflineStore = OfflineStoreConfig{} repoConfig.OnlineStore = OnlineStoreConfig{} } } - return repoConfig + return repoConfig, nil } func (feast *FeastServices) getClientFeatureStoreYaml() ([]byte, error) { - return yaml.Marshal(feast.getClientRepoConfig()) + return yaml.Marshal(getClientRepoConfig(feast.FeatureStore)) } -func (feast *FeastServices) getClientRepoConfig() RepoConfig { - status := feast.FeatureStore.Status +func getClientRepoConfig(featureStore *feastdevv1alpha1.FeatureStore) RepoConfig { + status := featureStore.Status clientRepoConfig := RepoConfig{ Project: status.Applied.FeastProject, Provider: LocalProviderType, diff --git a/infra/feast-operator/internal/controller/services/repo_config_test.go b/infra/feast-operator/internal/controller/services/repo_config_test.go new file mode 100644 index 00000000000..285fc05c566 --- /dev/null +++ b/infra/feast-operator/internal/controller/services/repo_config_test.go @@ -0,0 +1,203 @@ +/* +Copyright 2024 Feast Community. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package services + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" +) + +var projectName = "test-project" + +var _ = Describe("Repo Config", func() { + Context("When creating the RepoConfig of a FeatureStore", func() { + + It("should successfully create the repo configs", func() { + By("Having the minimal created resource") + featureStore := minimalFeatureStore() + ApplyDefaultsToStatus(featureStore) + var repoConfig RepoConfig + repoConfig, err := getServiceRepoConfig(OfflineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + expectedRegistryConfig := RegistryConfig{ + RegistryType: "file", + Path: DefaultRegistryPath, + } + Expect(repoConfig.Registry).To(Equal(expectedRegistryConfig)) + + By("Having the local registry resource") + featureStore = minimalFeatureStore() + featureStore.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Persistence: &feastdevv1alpha1.RegistryPersistence{ + FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ + Path: "file.db", + }, + }, + }, + }, + } + ApplyDefaultsToStatus(featureStore) + repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + expectedRegistryConfig = RegistryConfig{ + RegistryType: "file", + Path: "file.db", + } + Expect(repoConfig.Registry).To(Equal(expectedRegistryConfig)) + + By("Having the remote registry resource") + featureStore = minimalFeatureStore() + featureStore.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Remote: &feastdevv1alpha1.RemoteRegistryConfig{ + FeastRef: &feastdevv1alpha1.FeatureStoreRef{ + Name: "registry", + Namespace: "remoteNS", + }, + }, + }, + } + ApplyDefaultsToStatus(featureStore) + repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + By("Having the all the services") + featureStore = minimalFeatureStore() + featureStore.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + OfflineStore: &feastdevv1alpha1.OfflineStore{ + Persistence: &feastdevv1alpha1.OfflineStorePersistence{ + FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ + Type: "duckdb", + }, + }, + }, + OnlineStore: &feastdevv1alpha1.OnlineStore{ + Persistence: &feastdevv1alpha1.OnlineStorePersistence{ + FilePersistence: &feastdevv1alpha1.OnlineStoreFilePersistence{ + Path: "/data/online.db", + }, + }, + }, + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Persistence: &feastdevv1alpha1.RegistryPersistence{ + FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ + Path: "/data/registry.db", + }, + }, + }, + }, + } + ApplyDefaultsToStatus(featureStore) + repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + expectedOfflineConfig := OfflineStoreConfig{ + Type: "duckdb", + } + Expect(repoConfig.OfflineStore).To(Equal(expectedOfflineConfig)) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + expectedOnlineConfig := OnlineStoreConfig{ + Type: "sqlite", + Path: "/data/online.db", + } + Expect(repoConfig.OnlineStore).To(Equal(expectedOnlineConfig)) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + expectedRegistryConfig = RegistryConfig{ + RegistryType: "file", + Path: "/data/registry.db", + } + Expect(repoConfig.Registry).To(Equal(expectedRegistryConfig)) + }) + }) +}) + +func emptyOnlineStoreConfig() OnlineStoreConfig { + return OnlineStoreConfig{} +} + +func emptyOfflineStoreConfig() OfflineStoreConfig { + return OfflineStoreConfig{} +} + +func emptyRegistryConfig() RegistryConfig { + return RegistryConfig{} +} + +func minimalFeatureStore() *feastdevv1alpha1.FeatureStore { + return &feastdevv1alpha1.FeatureStore{ + Spec: feastdevv1alpha1.FeatureStoreSpec{ + FeastProject: projectName, + }, + } +} diff --git a/infra/feast-operator/internal/controller/services/services.go b/infra/feast-operator/internal/controller/services/services.go index 5e1778322f3..a205c838ea1 100644 --- a/infra/feast-operator/internal/controller/services/services.go +++ b/infra/feast-operator/internal/controller/services/services.go @@ -43,6 +43,13 @@ func (feast *FeastServices) Deploy() error { services := feast.FeatureStore.Status.Applied.Services if services != nil { if services.OfflineStore != nil { + if services.OfflineStore.Persistence != nil && + services.OfflineStore.Persistence.FilePersistence != nil && + len(services.OfflineStore.Persistence.FilePersistence.Type) > 0 { + if err := checkOfflineStoreFilePersistenceType(services.OfflineStore.Persistence.FilePersistence.Type); err != nil { + return err + } + } if err := feast.deployFeastServiceByType(OfflineFeastType); err != nil { return err } @@ -329,8 +336,7 @@ func (feast *FeastServices) setRemoteRegistryURL() error { } func (feast *FeastServices) isLocalRegistry() bool { - appliedServices := feast.FeatureStore.Status.Applied.Services - return appliedServices != nil && appliedServices.Registry != nil && appliedServices.Registry.Local != nil + return isLocalRegistry(feast.FeatureStore) } func (feast *FeastServices) isRemoteRegistry() bool { diff --git a/infra/feast-operator/internal/controller/services/services_types.go b/infra/feast-operator/internal/controller/services/services_types.go index c2348666179..222bf62dc0e 100644 --- a/infra/feast-operator/internal/controller/services/services_types.go +++ b/infra/feast-operator/internal/controller/services/services_types.go @@ -30,8 +30,8 @@ const ( FeastPrefix = "feast-" FeatureStoreYamlEnvVar = "FEATURE_STORE_YAML_BASE64" FeatureStoreYamlCmKey = "feature_store.yaml" - LocalRegistryPath = "/tmp/registry.db" - LocalOnlinePath = "/tmp/online_store.db" + DefaultRegistryPath = "/tmp/registry.db" + DefaultOnlinePath = "/tmp/online_store.db" svcDomain = ".svc.cluster.local" HttpPort = 80 @@ -42,6 +42,7 @@ const ( OfflineRemoteConfigType OfflineConfigType = "remote" OfflineDaskConfigType OfflineConfigType = "dask" + OfflineDuckDbConfigType OfflineConfigType = "duckdb" OnlineRemoteConfigType OnlineConfigType = "remote" OnlineSqliteConfigType OnlineConfigType = "sqlite" diff --git a/infra/feast-operator/internal/controller/services/suite_test.go b/infra/feast-operator/internal/controller/services/suite_test.go new file mode 100644 index 00000000000..5f76f5e6ff7 --- /dev/null +++ b/infra/feast-operator/internal/controller/services/suite_test.go @@ -0,0 +1,86 @@ +/* +Copyright 2024 Feast Community. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package services + +import ( + "fmt" + "path/filepath" + "runtime" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" + //+kubebuilder:scaffold:imports +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. + +var k8sClient client.Client +var testEnv *envtest.Environment + +func TestServices(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecs(t, "Controller Services Suite") +} + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + By("bootstrapping test environment") + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: true, + + // The BinaryAssetsDirectory is only required if you want to run the tests directly + // without call the makefile target test. If not informed it will look for the + // default path defined in controller-runtime which is /usr/local/kubebuilder/. + // Note that you must have the required binaries setup under the bin directory to perform + // the tests directly. When we run make test it will be setup and used automatically. + BinaryAssetsDirectory: filepath.Join("..", "..", "..", "bin", "k8s", + fmt.Sprintf("1.29.0-%s-%s", runtime.GOOS, runtime.GOARCH)), + } + + cfg, err := testEnv.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(cfg).NotTo(BeNil()) + + err = feastdevv1alpha1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + //+kubebuilder:scaffold:scheme + + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient).NotTo(BeNil()) + +}) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + err := testEnv.Stop() + Expect(err).NotTo(HaveOccurred()) +}) diff --git a/infra/feast-operator/internal/controller/services/util.go b/infra/feast-operator/internal/controller/services/util.go new file mode 100644 index 00000000000..c2b29361a84 --- /dev/null +++ b/infra/feast-operator/internal/controller/services/util.go @@ -0,0 +1,84 @@ +package services + +import ( + "fmt" + "slices" + + "github.com/feast-dev/feast/infra/feast-operator/api/feastversion" + feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" +) + +func isLocalRegistry(featureStore *feastdevv1alpha1.FeatureStore) bool { + appliedServices := featureStore.Status.Applied.Services + return appliedServices != nil && appliedServices.Registry != nil && appliedServices.Registry.Local != nil +} + +func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { + cr.Status.FeastVersion = feastversion.FeastVersion + applied := cr.Spec.DeepCopy() + if applied.Services == nil { + applied.Services = &feastdevv1alpha1.FeatureStoreServices{} + } + services := applied.Services + + // default to registry service deployment + if services.Registry == nil { + services.Registry = &feastdevv1alpha1.Registry{} + } + // if remote registry not set, proceed w/ local registry defaults + if services.Registry.Remote == nil { + // if local registry not set, apply an empty pointer struct + if services.Registry.Local == nil { + services.Registry.Local = &feastdevv1alpha1.LocalRegistryConfig{} + } + if services.Registry.Local.Persistence == nil { + services.Registry.Local.Persistence = &feastdevv1alpha1.RegistryPersistence{} + } + if services.Registry.Local.Persistence.FilePersistence == nil { + services.Registry.Local.Persistence.FilePersistence = &feastdevv1alpha1.RegistryFilePersistence{} + } + if len(services.Registry.Local.Persistence.FilePersistence.Path) == 0 { + services.Registry.Local.Persistence.FilePersistence.Path = DefaultRegistryPath + } + setServiceDefaultConfigs(&services.Registry.Local.ServiceConfigs.DefaultConfigs) + } + if services.OfflineStore != nil { + setServiceDefaultConfigs(&services.OfflineStore.ServiceConfigs.DefaultConfigs) + if services.OfflineStore.Persistence == nil { + services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{} + } + if services.OfflineStore.Persistence.FilePersistence == nil { + services.OfflineStore.Persistence.FilePersistence = &feastdevv1alpha1.OfflineStoreFilePersistence{} + } + if len(services.OfflineStore.Persistence.FilePersistence.Type) == 0 { + services.OfflineStore.Persistence.FilePersistence.Type = string(OfflineDaskConfigType) + } + } + if services.OnlineStore != nil { + setServiceDefaultConfigs(&services.OnlineStore.ServiceConfigs.DefaultConfigs) + if services.OnlineStore.Persistence == nil { + services.OnlineStore.Persistence = &feastdevv1alpha1.OnlineStorePersistence{} + } + if services.OnlineStore.Persistence.FilePersistence == nil { + services.OnlineStore.Persistence.FilePersistence = &feastdevv1alpha1.OnlineStoreFilePersistence{} + } + if len(services.OnlineStore.Persistence.FilePersistence.Path) == 0 { + services.OnlineStore.Persistence.FilePersistence.Path = DefaultOnlinePath + } + } + // overwrite status.applied with every reconcile + applied.DeepCopyInto(&cr.Status.Applied) +} + +func setServiceDefaultConfigs(defaultConfigs *feastdevv1alpha1.DefaultConfigs) { + if defaultConfigs.Image == nil { + defaultConfigs.Image = &DefaultImage + } +} + +func checkOfflineStoreFilePersistenceType(value string) error { + if slices.Contains(feastdevv1alpha1.ValidOfflineStoreFilePersistenceTypes, value) { + return nil + } + return fmt.Errorf("invalid file type %s for offline store", value) +} diff --git a/infra/feast-operator/internal/controller/suite_test.go b/infra/feast-operator/internal/controller/suite_test.go index 57091df5c00..38da27cc9c5 100644 --- a/infra/feast-operator/internal/controller/suite_test.go +++ b/infra/feast-operator/internal/controller/suite_test.go @@ -26,7 +26,6 @@ import ( . "github.com/onsi/gomega" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -39,7 +38,6 @@ import ( // These tests use Ginkgo (BDD-style Go testing framework). Refer to // http://onsi.github.io/ginkgo/ to learn more about Ginkgo. -var cfg *rest.Config var k8sClient client.Client var testEnv *envtest.Environment @@ -66,9 +64,7 @@ var _ = BeforeSuite(func() { fmt.Sprintf("1.29.0-%s-%s", runtime.GOOS, runtime.GOARCH)), } - var err error - // cfg is defined in this file globally. - cfg, err = testEnv.Start() + cfg, err := testEnv.Start() Expect(err).NotTo(HaveOccurred()) Expect(cfg).NotTo(BeNil())