From 69c80fc5c2851f3cff77877e96c5d2ed99e88e87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=A1=B0=EC=98=81=ED=9B=88?= Date: Wed, 7 May 2025 17:06:12 +0900 Subject: [PATCH 1/7] fix: upgrade protobuf version, make `protos` directory beforehand Signed-off-by: iamcodingcat --- Makefile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 75f05a28fc8..e4a7ec64e82 100644 --- a/Makefile +++ b/Makefile @@ -650,7 +650,7 @@ format-ui: # Go SDK & embedded PB_REL = https://github.com/protocolbuffers/protobuf/releases -PB_VERSION = 3.11.2 +PB_VERSION = 30.2 PB_ARCH := $(shell uname -m) ifeq ($(PB_ARCH), arm64) PB_ARCH=aarch_64 @@ -669,6 +669,7 @@ install-go-proto-dependencies: $(TOOL_DIR)/protoc-$(PB_VERSION)-$(OS)-$(PB_ARCH) .PHONY: compile-protos-go compile-protos-go: install-go-proto-dependencies + mkdir -p $(ROOT_DIR)/go/protos $(foreach folder,$(PB_PROTO_FOLDERS), \ protoc --proto_path=$(ROOT_DIR)/protos \ --go_out=$(ROOT_DIR)/go/protos \ From 466cdd016f2dce84badb1f6ee3426e5fcd3955db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=A1=B0=EC=98=81=ED=9B=88?= Date: Wed, 7 May 2025 17:06:58 +0900 Subject: [PATCH 2/7] feat: add aws s3 storage based registry store Signed-off-by: iamcodingcat --- go/internal/feast/registry/local.go | 95 ++++++++++++++++++++++++++ go/internal/feast/registry/registry.go | 4 +- 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/go/internal/feast/registry/local.go b/go/internal/feast/registry/local.go index e5343cd75cd..677d681e99c 100644 --- a/go/internal/feast/registry/local.go +++ b/go/internal/feast/registry/local.go @@ -1,11 +1,19 @@ package registry import ( + "context" + "errors" + "github.com/aws/aws-sdk-go-v2/aws" + awsConfig "github.com/aws/aws-sdk-go-v2/config" "io/ioutil" "os" "path/filepath" + "strings" + "time" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/google/uuid" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" @@ -17,6 +25,12 @@ type FileRegistryStore struct { filePath string } +// A S3RegistryStore is a S3 object storage-based implementation of the RegistryStore interface +type S3RegistryStore struct { + filePath string + s3Client *s3.Client +} + // NewFileRegistryStore creates a FileRegistryStore with the given configuration and infers // the file path from the repo path and registry path. func NewFileRegistryStore(config *RegistryConfig, repoPath string) *FileRegistryStore { @@ -30,6 +44,26 @@ func NewFileRegistryStore(config *RegistryConfig, repoPath string) *FileRegistry return &lr } +// NewS3RegistryStore creates a S3RegistryStore with the given configuration +func NewS3RegistryStore(config *RegistryConfig, repoPath string) *S3RegistryStore { + var lr S3RegistryStore + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + cfg, err := awsConfig.LoadDefaultConfig(ctx) + if err != nil { + lr = S3RegistryStore{ + filePath: config.Path, + } + } else { + lr = S3RegistryStore{ + filePath: config.Path, + s3Client: s3.NewFromConfig(cfg), + } + } + return &lr +} + // GetRegistryProto reads and parses the registry proto from the file path. func (r *FileRegistryStore) GetRegistryProto() (*core.Registry, error) { registry := &core.Registry{} @@ -64,3 +98,64 @@ func (r *FileRegistryStore) writeRegistry(rp *core.Registry) error { } return nil } + +func (r *S3RegistryStore) GetRegistryProto() (*core.Registry, error) { + bucket, key, err := r.parseS3Path() + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + output, err := r.s3Client.GetObject(ctx, + &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + return nil, err + } + defer output.Body.Close() + + data, err := ioutil.ReadAll(output.Body) + if err != nil { + return nil, err + } + + registry := &core.Registry{} + if err := proto.Unmarshal(data, registry); err != nil { + return nil, err + } + return registry, nil +} + +func (r *S3RegistryStore) UpdateRegistryProto(rp *core.Registry) error { + return errors.New("not implemented in S3RegistryStore") +} + +func (r *S3RegistryStore) Teardown() error { + bucket, key, err := r.parseS3Path() + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = r.s3Client.DeleteObject(ctx, + &s3.DeleteObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + return err + } + return nil +} + +func (r *S3RegistryStore) parseS3Path() (string, string, error) { + path := strings.TrimPrefix(r.filePath, "s3://") + parts := strings.SplitN(path, "/", 2) + if len(parts) != 2 { + return "", "", errors.New("invalid S3 file path format") + } + return parts[0], parts[1], nil +} diff --git a/go/internal/feast/registry/registry.go b/go/internal/feast/registry/registry.go index a383dc42c07..b5010419712 100644 --- a/go/internal/feast/registry/registry.go +++ b/go/internal/feast/registry/registry.go @@ -68,7 +68,7 @@ func NewRegistry(registryConfig *RegistryConfig, repoPath string, project string func (r *Registry) InitializeRegistry() error { _, err := r.getRegistryProto() if err != nil { - if _, ok := r.registryStore.(*FileRegistryStore); ok { + if _, ok := r.registryStore.(*FileRegistryStore); ok { // S3에는 굳이 연동할 필요 없어 보임. 오히려 이로 인해 정상적이던 s3 내의 레지스트리 파일이 초기화된 파일로 덮어쓰기 될 수도 있지 않음? log.Error().Err(err).Msg("Registry Initialization Failed") return err } @@ -364,6 +364,8 @@ func getRegistryStoreFromType(registryStoreType string, registryConfig *Registry switch registryStoreType { case "FileRegistryStore": return NewFileRegistryStore(registryConfig, repoPath), nil + case "S3RegistryStore": + return NewS3RegistryStore(registryConfig, repoPath), nil } return nil, errors.New("only FileRegistryStore as a RegistryStore is supported at this moment") } From 57340129a514a2363a6bece05d30287ec3e6358a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=A1=B0=EC=98=81=ED=9B=88?= Date: Wed, 7 May 2025 17:08:26 +0900 Subject: [PATCH 3/7] chore: add aws s3 api related pkgs Signed-off-by: iamcodingcat --- go.mod | 18 ++++++++++++++++++ go.sum | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/go.mod b/go.mod index 05305c1e6c1..18c459373b6 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,24 @@ require ( github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/andybalholm/brotli v1.1.0 // indirect github.com/apache/thrift v0.21.0 // indirect + github.com/aws/aws-sdk-go-v2 v1.36.3 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect + github.com/aws/aws-sdk-go-v2/config v1.29.14 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect + github.com/aws/smithy-go v1.22.2 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect diff --git a/go.sum b/go.sum index 41abd905c44..9aefbf0aa35 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,42 @@ github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN github.com/apache/arrow/go/v17 v17.0.0/go.mod h1:jR7QHkODl15PfYyjM2nU+yTLScZ/qfj7OSUZmJ8putc= github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= +github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM= +github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10/go.mod h1:qqvMj6gHLR/EXWZw4ZbqlPbQUyenf4h82UQUlKc+l14= +github.com/aws/aws-sdk-go-v2/config v1.29.14 h1:f+eEi/2cKCg9pqKBoAIwRGzVb70MRKqWX4dg1BDcSJM= +github.com/aws/aws-sdk-go-v2/config v1.29.14/go.mod h1:wVPHWcIFv3WO89w0rE10gzf17ZYy+UVS1Geq8Iei34g= +github.com/aws/aws-sdk-go-v2/credentials v1.17.67 h1:9KxtdcIA/5xPNQyZRgUSpYOE6j9Bc4+D7nZua0KGYOM= +github.com/aws/aws-sdk-go-v2/credentials v1.17.67/go.mod h1:p3C44m+cfnbv763s52gCqrjaqyPikj9Sg47kUVaNZQQ= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 h1:x793wxmUWVDhshP8WW2mlnXuFrO4cOd3HLBroh1paFw= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30/go.mod h1:Jpne2tDnYiFascUEs2AWHJL9Yp7A5ZVy3TNyxaAjD6M= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34/go.mod h1:p4VfIceZokChbA9FzMbRGz5OV+lekcVtHlPKEO0gSZY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 h1:SZwFm17ZUNNg5Np0ioo/gq8Mn6u9w19Mri8DnJ15Jf0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34/go.mod h1:dFZsC0BLo346mvKQLWmoJxT+Sjp+qcVR1tRVHQGOH9Q= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 h1:ZNTqv4nIdE/DiBfUUfXcLZ/Spcuz+RjeziUtNJackkM= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34/go.mod h1:zf7Vcd1ViW7cPqYWEHLHJkS50X0JS2IKz9Cgaj6ugrs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1 h1:4nm2G6A4pV9rdlWzGMPv4BNtQp22v1hg3yrtkYpeLl8= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1/go.mod h1:iu6FSzgt+M2/x3Dk8zhycdIcHjEFb36IS8HVUVFoMg0= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 h1:dM9/92u2F1JbDaGooxTq18wmmFzbJRfXfVfy96/1CXM= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15/go.mod h1:SwFBy2vjtA0vZbjjaFtfN045boopadnoVPhu4Fv66vY= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 h1:moLQUoVq91LiqT1nbvzDukyqAlCv89ZmwaHw/ZFlFZg= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15/go.mod h1:ZH34PJUc8ApjBIfgQCFvkWcUDBtl/WTD+uiYHjd8igA= +github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3 h1:BRXS0U76Z8wfF+bnkilA2QwpIch6URlm++yPUt9QPmQ= +github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3/go.mod h1:bNXKFFyaiVvWuR6O16h/I1724+aXe/tAkA9/QS01t5k= +github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 h1:1Gw+9ajCV1jogloEv1RRnvfRFia2cL6c9cuKV2Ps+G8= +github.com/aws/aws-sdk-go-v2/service/sso v1.25.3/go.mod h1:qs4a9T5EMLl/Cajiw2TcbNt2UNo/Hqlyp+GiuG4CFDI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 h1:hXmVKytPfTy5axZ+fYbR5d0cFmC3JvwLm5kM83luako= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1/go.mod h1:MlYRNmYu/fGPoxBQVvBYr9nyr948aY/WLUvwBMBJubs= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 h1:1XuUZ8mYJw9B6lzAkXhqHlJd/XvaX32evhproijJEZY= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.19/go.mod h1:cQnB8CUnxbMU82JvlqjKR2HBOm3fe9pWorWBza6MBJ4= +github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= +github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= From 70bd018edcd8978ee3a6fda7a83c4a11e02518a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=A1=B0=EC=98=81=ED=9B=88?= Date: Wed, 7 May 2025 17:28:51 +0900 Subject: [PATCH 4/7] style: remove my custom comment Signed-off-by: iamcodingcat --- go/internal/feast/registry/registry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/internal/feast/registry/registry.go b/go/internal/feast/registry/registry.go index b5010419712..160dda94fd6 100644 --- a/go/internal/feast/registry/registry.go +++ b/go/internal/feast/registry/registry.go @@ -68,7 +68,7 @@ func NewRegistry(registryConfig *RegistryConfig, repoPath string, project string func (r *Registry) InitializeRegistry() error { _, err := r.getRegistryProto() if err != nil { - if _, ok := r.registryStore.(*FileRegistryStore); ok { // S3에는 굳이 연동할 필요 없어 보임. 오히려 이로 인해 정상적이던 s3 내의 레지스트리 파일이 초기화된 파일로 덮어쓰기 될 수도 있지 않음? + if _, ok := r.registryStore.(*FileRegistryStore); ok { log.Error().Err(err).Msg("Registry Initialization Failed") return err } From 6a8f6143374eb066c78a37a2cd8431d322b6262f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=A1=B0=EC=98=81=ED=9B=88?= Date: Thu, 8 May 2025 18:09:00 +0900 Subject: [PATCH 5/7] refact: separate s3 registry file from `local.go` Signed-off-by: iamcodingcat --- go/internal/feast/registry/local.go | 97 +------------------------- go/internal/feast/registry/s3.go | 103 ++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 96 deletions(-) create mode 100644 go/internal/feast/registry/s3.go diff --git a/go/internal/feast/registry/local.go b/go/internal/feast/registry/local.go index 677d681e99c..58c6426368a 100644 --- a/go/internal/feast/registry/local.go +++ b/go/internal/feast/registry/local.go @@ -1,18 +1,10 @@ package registry import ( - "context" - "errors" - "github.com/aws/aws-sdk-go-v2/aws" - awsConfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/google/uuid" "io/ioutil" "os" "path/filepath" - "strings" - "time" - - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/google/uuid" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" @@ -25,12 +17,6 @@ type FileRegistryStore struct { filePath string } -// A S3RegistryStore is a S3 object storage-based implementation of the RegistryStore interface -type S3RegistryStore struct { - filePath string - s3Client *s3.Client -} - // NewFileRegistryStore creates a FileRegistryStore with the given configuration and infers // the file path from the repo path and registry path. func NewFileRegistryStore(config *RegistryConfig, repoPath string) *FileRegistryStore { @@ -44,26 +30,6 @@ func NewFileRegistryStore(config *RegistryConfig, repoPath string) *FileRegistry return &lr } -// NewS3RegistryStore creates a S3RegistryStore with the given configuration -func NewS3RegistryStore(config *RegistryConfig, repoPath string) *S3RegistryStore { - var lr S3RegistryStore - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - cfg, err := awsConfig.LoadDefaultConfig(ctx) - if err != nil { - lr = S3RegistryStore{ - filePath: config.Path, - } - } else { - lr = S3RegistryStore{ - filePath: config.Path, - s3Client: s3.NewFromConfig(cfg), - } - } - return &lr -} - // GetRegistryProto reads and parses the registry proto from the file path. func (r *FileRegistryStore) GetRegistryProto() (*core.Registry, error) { registry := &core.Registry{} @@ -98,64 +64,3 @@ func (r *FileRegistryStore) writeRegistry(rp *core.Registry) error { } return nil } - -func (r *S3RegistryStore) GetRegistryProto() (*core.Registry, error) { - bucket, key, err := r.parseS3Path() - if err != nil { - return nil, err - } - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - output, err := r.s3Client.GetObject(ctx, - &s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), - }) - if err != nil { - return nil, err - } - defer output.Body.Close() - - data, err := ioutil.ReadAll(output.Body) - if err != nil { - return nil, err - } - - registry := &core.Registry{} - if err := proto.Unmarshal(data, registry); err != nil { - return nil, err - } - return registry, nil -} - -func (r *S3RegistryStore) UpdateRegistryProto(rp *core.Registry) error { - return errors.New("not implemented in S3RegistryStore") -} - -func (r *S3RegistryStore) Teardown() error { - bucket, key, err := r.parseS3Path() - if err != nil { - return err - } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _, err = r.s3Client.DeleteObject(ctx, - &s3.DeleteObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), - }) - if err != nil { - return err - } - return nil -} - -func (r *S3RegistryStore) parseS3Path() (string, string, error) { - path := strings.TrimPrefix(r.filePath, "s3://") - parts := strings.SplitN(path, "/", 2) - if len(parts) != 2 { - return "", "", errors.New("invalid S3 file path format") - } - return parts[0], parts[1], nil -} diff --git a/go/internal/feast/registry/s3.go b/go/internal/feast/registry/s3.go new file mode 100644 index 00000000000..27efbe8ad43 --- /dev/null +++ b/go/internal/feast/registry/s3.go @@ -0,0 +1,103 @@ +package registry + +import ( + "context" + "errors" + "io/ioutil" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + awsConfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/feast-dev/feast/go/protos/feast/core" + + "google.golang.org/protobuf/proto" +) + +// A S3RegistryStore is a S3 object storage-based implementation of the RegistryStore interface +type S3RegistryStore struct { + filePath string + s3Client *s3.Client +} + +// NewS3RegistryStore creates a S3RegistryStore with the given configuration +func NewS3RegistryStore(config *RegistryConfig, repoPath string) *S3RegistryStore { + var lr S3RegistryStore + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + cfg, err := awsConfig.LoadDefaultConfig(ctx) + if err != nil { + lr = S3RegistryStore{ + filePath: config.Path, + } + } else { + lr = S3RegistryStore{ + filePath: config.Path, + s3Client: s3.NewFromConfig(cfg), + } + } + return &lr +} + +func (r *S3RegistryStore) GetRegistryProto() (*core.Registry, error) { + bucket, key, err := r.parseS3Path() + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + output, err := r.s3Client.GetObject(ctx, + &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + return nil, err + } + defer output.Body.Close() + + data, err := ioutil.ReadAll(output.Body) + if err != nil { + return nil, err + } + + registry := &core.Registry{} + if err := proto.Unmarshal(data, registry); err != nil { + return nil, err + } + return registry, nil +} + +func (r *S3RegistryStore) UpdateRegistryProto(rp *core.Registry) error { + return errors.New("not implemented in S3RegistryStore") +} + +func (r *S3RegistryStore) Teardown() error { + bucket, key, err := r.parseS3Path() + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = r.s3Client.DeleteObject(ctx, + &s3.DeleteObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + return err + } + return nil +} + +func (r *S3RegistryStore) parseS3Path() (string, string, error) { + path := strings.TrimPrefix(r.filePath, "s3://") + parts := strings.SplitN(path, "/", 2) + if len(parts) != 2 { + return "", "", errors.New("invalid S3 file path format") + } + return parts[0], parts[1], nil +} From dc576aff5cce9299effbab7cd6ec474437ef189e Mon Sep 17 00:00:00 2001 From: iamcodingcat Date: Fri, 9 May 2025 21:54:09 +0900 Subject: [PATCH 6/7] feat: add if-statement in Makefile on linux arm64 os-platform Signed-off-by: iamcodingcat --- Makefile | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Makefile b/Makefile index e4a7ec64e82..b8a752f9a16 100644 --- a/Makefile +++ b/Makefile @@ -655,6 +655,9 @@ PB_ARCH := $(shell uname -m) ifeq ($(PB_ARCH), arm64) PB_ARCH=aarch_64 endif +ifeq ($(PB_ARCH), aarch64) + PB_ARCH=aarch_64 +endif PB_PROTO_FOLDERS=core registry serving types storage $(TOOL_DIR)/protoc-$(PB_VERSION)-$(OS)-$(PB_ARCH).zip: $(TOOL_DIR) From 486262263fc8e5e7be21fb3e908be3840badc622 Mon Sep 17 00:00:00 2001 From: iamcodingcat Date: Sat, 10 May 2025 13:49:28 +0900 Subject: [PATCH 7/7] feat: add test-code for s3 registry store Signed-off-by: iamcodingcat --- go/internal/feast/registry/registry_test.go | 94 +++++++++++++++++++++ go/internal/feast/registry/s3.go | 12 ++- 2 files changed, 103 insertions(+), 3 deletions(-) create mode 100644 go/internal/feast/registry/registry_test.go diff --git a/go/internal/feast/registry/registry_test.go b/go/internal/feast/registry/registry_test.go new file mode 100644 index 00000000000..0801632a70d --- /dev/null +++ b/go/internal/feast/registry/registry_test.go @@ -0,0 +1,94 @@ +package registry + +import ( + "context" + "errors" + "io/ioutil" + "net/url" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +func TestGetOnlineFeaturesS3Registry(t *testing.T) { + mockS3Client := &MockS3Client{ + GetObjectFn: func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + return &s3.GetObjectOutput{ + Body: ioutil.NopCloser(strings.NewReader("mock data")), + }, nil + }, + DeleteObjectFn: func(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + return &s3.DeleteObjectOutput{}, nil + }, + } + + tests := []struct { + name string + config *RepoConfig + }{ + { + name: "redis with simple features", + config: &RepoConfig{ + Project: "feature_repo", + Registry: map[string]interface{}{ + "path": "s3://test-bucket/path/to/registry.db", + }, + Provider: "aws", + }, + }, + } + for _, test := range tests { + registryConfig, err := test.config.GetRegistryConfig() + if err != nil { + t.Errorf("Error getting registry config. msg: %s", err.Error()) + } + r := &Registry{ + project: test.config.Project, + cachedRegistryProtoTtl: time.Duration(registryConfig.CacheTtlSeconds) * time.Second, + } + _ = registryConfig.RegistryStoreType + registryPath := registryConfig.Path + uri, err := url.Parse(registryPath) + if err != nil { + t.Errorf("Error parsing registry path. msg: %s", err.Error()) + } + if registryStoreType, ok := REGISTRY_STORE_CLASS_FOR_SCHEME[uri.Scheme]; ok { + switch registryStoreType { + case "S3RegistryStore": + registryStore := &S3RegistryStore{ + filePath: registryConfig.Path, + s3Client: mockS3Client, + } + r.registryStore = registryStore + err := r.InitializeRegistry() + if err != nil { + t.Errorf("Error initializing registry. msg: %s. registry path=%q", err.Error(), registryPath) + } + default: + t.Errorf("Only S3RegistryStore is supported on this testing. got=%s", registryStoreType) + } + } + } +} + +// MockS3Client is mock client for testing s3 registry store +type MockS3Client struct { + GetObjectFn func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) + DeleteObjectFn func(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) +} + +func (m *MockS3Client) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + if m.GetObjectFn != nil { + return m.GetObjectFn(ctx, params) + } + return nil, errors.New("not implemented") +} + +func (m *MockS3Client) DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + if m.DeleteObjectFn != nil { + return m.DeleteObjectFn(ctx, params) + } + return nil, errors.New("not implemented") +} diff --git a/go/internal/feast/registry/s3.go b/go/internal/feast/registry/s3.go index 27efbe8ad43..0979dac64d0 100644 --- a/go/internal/feast/registry/s3.go +++ b/go/internal/feast/registry/s3.go @@ -15,10 +15,16 @@ import ( "google.golang.org/protobuf/proto" ) +// S3ClientInterface define interface of s3.Client for making mocking s3 client and testing it +type S3ClientInterface interface { + GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) + DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) +} + // A S3RegistryStore is a S3 object storage-based implementation of the RegistryStore interface type S3RegistryStore struct { filePath string - s3Client *s3.Client + s3Client S3ClientInterface } // NewS3RegistryStore creates a S3RegistryStore with the given configuration @@ -55,7 +61,7 @@ func (r *S3RegistryStore) GetRegistryProto() (*core.Registry, error) { Key: aws.String(key), }) if err != nil { - return nil, err + panic(err) } defer output.Body.Close() @@ -88,7 +94,7 @@ func (r *S3RegistryStore) Teardown() error { Key: aws.String(key), }) if err != nil { - return err + panic(err) } return nil }