diff --git a/go.mod b/go.mod index 18c459373b6..7a65f5b744b 100644 --- a/go.mod +++ b/go.mod @@ -6,15 +6,21 @@ toolchain go1.22.5 require ( github.com/apache/arrow/go/v17 v17.0.0 + github.com/aws/aws-sdk-go-v2 v1.36.4 + github.com/aws/aws-sdk-go-v2/config v1.29.14 + github.com/aws/aws-sdk-go-v2/service/dynamodb v1.43.3 + github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3 github.com/ghodss/yaml v1.0.0 github.com/golang/protobuf v1.5.4 github.com/google/uuid v1.6.0 github.com/mattn/go-sqlite3 v1.14.23 github.com/pkg/errors v0.9.1 github.com/redis/go-redis/v9 v9.6.1 + github.com/roberson-io/mmh3 v0.0.0-20190729202758-fdfce3ba6225 github.com/rs/zerolog v1.33.0 github.com/spaolacci/murmur3 v1.1.0 github.com/stretchr/testify v1.9.0 + google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 google.golang.org/grpc v1.67.0 google.golang.org/protobuf v1.34.2 ) @@ -23,20 +29,18 @@ require ( github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/andybalholm/brotli v1.1.0 // indirect github.com/apache/thrift v0.21.0 // indirect - github.com/aws/aws-sdk-go-v2 v1.36.3 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect - github.com/aws/aws-sdk-go-v2/config v1.29.14 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.35 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.35 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.16 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect - github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect @@ -66,7 +70,6 @@ require ( golang.org/x/text v0.18.0 // indirect golang.org/x/tools v0.25.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 9aefbf0aa35..7778a906ecd 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM= github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= +github.com/aws/aws-sdk-go-v2 v1.36.4 h1:GySzjhVvx0ERP6eyfAbAuAXLtAda5TEy19E5q5W8I9E= +github.com/aws/aws-sdk-go-v2 v1.36.4/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10/go.mod h1:qqvMj6gHLR/EXWZw4ZbqlPbQUyenf4h82UQUlKc+l14= github.com/aws/aws-sdk-go-v2/config v1.29.14 h1:f+eEi/2cKCg9pqKBoAIwRGzVb70MRKqWX4dg1BDcSJM= @@ -18,16 +20,24 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 h1:x793wxmUWVDhshP8WW2mln github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30/go.mod h1:Jpne2tDnYiFascUEs2AWHJL9Yp7A5ZVy3TNyxaAjD6M= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34/go.mod h1:p4VfIceZokChbA9FzMbRGz5OV+lekcVtHlPKEO0gSZY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.35 h1:o1v1VFfPcDVlK3ll1L5xHsaQAFdNtZ5GXnNR7SwueC4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.35/go.mod h1:rZUQNYMNG+8uZxz9FOerQJ+FceCiodXvixpeRtdESrU= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 h1:SZwFm17ZUNNg5Np0ioo/gq8Mn6u9w19Mri8DnJ15Jf0= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34/go.mod h1:dFZsC0BLo346mvKQLWmoJxT+Sjp+qcVR1tRVHQGOH9Q= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.35 h1:R5b82ubO2NntENm3SAm0ADME+H630HomNJdgv+yZ3xw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.35/go.mod h1:FuA+nmgMRfkzVKYDNEqQadvEMxtxl9+RLT9ribCwEMs= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 h1:ZNTqv4nIdE/DiBfUUfXcLZ/Spcuz+RjeziUtNJackkM= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34/go.mod h1:zf7Vcd1ViW7cPqYWEHLHJkS50X0JS2IKz9Cgaj6ugrs= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.43.3 h1:2FCJAT5wyPs5JjAFoLgaEB0MIiWvXiJ0T6PZiKDkJoo= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.43.3/go.mod h1:rUOhTo9+gtTYTMnGD+xiiks/2Z8vssPP+uSMNhJBbmI= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1 h1:4nm2G6A4pV9rdlWzGMPv4BNtQp22v1hg3yrtkYpeLl8= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1/go.mod h1:iu6FSzgt+M2/x3Dk8zhycdIcHjEFb36IS8HVUVFoMg0= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.16 h1:TLsOzHW9zlJoMgjcKQI/7bolyv/DL0796y4NigWgaw8= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.16/go.mod h1:mNoiR5qsO9TxXZ6psjjQ3M+Zz7hURFTumXHF+UKjyAU= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 h1:dM9/92u2F1JbDaGooxTq18wmmFzbJRfXfVfy96/1CXM= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15/go.mod h1:SwFBy2vjtA0vZbjjaFtfN045boopadnoVPhu4Fv66vY= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 h1:moLQUoVq91LiqT1nbvzDukyqAlCv89ZmwaHw/ZFlFZg= @@ -94,6 +104,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= +github.com/roberson-io/mmh3 v0.0.0-20190729202758-fdfce3ba6225 h1:ZMsPCp7oYgjoIFt1c+sM2qojxZXotSYcMF8Ur9/LJlM= +github.com/roberson-io/mmh3 v0.0.0-20190729202758-fdfce3ba6225/go.mod h1:XEESr+X1SY8ZSuc3jqsTlb3clCkqQJ4DcF3Qxv1N3PM= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= diff --git a/go/internal/feast/onlinestore/dynamodbonlinestore.go b/go/internal/feast/onlinestore/dynamodbonlinestore.go new file mode 100644 index 00000000000..36aed52cddf --- /dev/null +++ b/go/internal/feast/onlinestore/dynamodbonlinestore.go @@ -0,0 +1,240 @@ +package onlinestore + +import ( + "context" + "encoding/hex" + "fmt" + awsConfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + dtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/feast-dev/feast/go/internal/feast/registry" + "github.com/feast-dev/feast/go/protos/feast/serving" + "github.com/feast-dev/feast/go/protos/feast/types" + "github.com/roberson-io/mmh3" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + "runtime" + "sync" + "time" +) + +type batchResult struct { + index int + response *dynamodb.BatchGetItemOutput + err error +} + +type DynamodbOnlineStore struct { + // Feast project name + // TODO: Should we remove project as state that is tracked at the store level? + project string + + client *dynamodb.Client + + config *registry.RepoConfig + + // dynamodb configuration + consistentRead *bool + batchSize *int +} + +func NewDynamodbOnlineStore(project string, config *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*DynamodbOnlineStore, error) { + store := DynamodbOnlineStore{ + project: project, + config: config, + } + + // aws configuration + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + cfg, err := awsConfig.LoadDefaultConfig(ctx) + if err != nil { + panic(err) + } + store.client = dynamodb.NewFromConfig(cfg) + + // dynamodb configuration + consistentRead, ok := onlineStoreConfig["consistent_reads"].(bool) + if !ok { + consistentRead = false + } + store.consistentRead = &consistentRead + + var batchSize int + if batchSizeFloat, ok := onlineStoreConfig["batch_size"].(float64); ok { + batchSize = int(batchSizeFloat) + } else { + batchSize = 40 + } + store.batchSize = &batchSize + + return &store, nil +} + +func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) { + // prevent resource waste in case context is canceled earlier + if ctx.Err() != nil { + return nil, ctx.Err() + } + + results := make([][]FeatureData, len(entityKeys)) + + // serialize entity key into entity hash id + entityIndexMap := make(map[string]int) + entityIds := make([]string, 0, len(entityKeys)) + unprocessedEntityIds := make(map[string]bool) + for i, entityKey := range entityKeys { + serKey, err := serializeEntityKey(entityKey, d.config.EntityKeySerializationVersion) + if err != nil { + return nil, err + } + entityId := hex.EncodeToString(mmh3.Hashx64_128(*serKey, 0)) + entityIds = append(entityIds, entityId) + entityIndexMap[entityId] = i + unprocessedEntityIds[entityId] = false + } + + // metadata from feature views, feature names + featureMap, featureNamesIndex, err := makeFeatureMeta(featureViewNames, featureNames) + if err != nil { + return nil, err + } + + // initialize `FeatureData` slice + featureCount := len(featureNamesIndex) + for i := 0; i < len(results); i++ { + results[i] = make([]FeatureData, featureCount) + } + + // controls the maximum number of concurrent goroutines sending requests to DynamoDB using a semaphore + cpuCount := runtime.NumCPU() + sem := semaphore.NewWeighted(int64(cpuCount * 2)) + + var mu sync.Mutex + for featureViewName, featureNames := range featureMap { + tableName := fmt.Sprintf("%s.%s", d.project, featureViewName) + + var batchGetItemInputs []*dynamodb.BatchGetItemInput + batchSize := *d.batchSize + for i := 0; i < len(entityIds); i += batchSize { + end := i + batchSize + if end > len(entityIds) { + end = len(entityIds) + } + batchEntityIds := entityIds[i:end] + entityIdBatch := make([]map[string]dtypes.AttributeValue, len(batchEntityIds)) + for i, entityId := range batchEntityIds { + entityIdBatch[i] = map[string]dtypes.AttributeValue{ + "entity_id": &dtypes.AttributeValueMemberS{Value: entityId}, + } + } + batchGetItemInput := &dynamodb.BatchGetItemInput{ + RequestItems: map[string]dtypes.KeysAndAttributes{ + tableName: { + Keys: entityIdBatch, + ConsistentRead: d.consistentRead, + }, + }, + } + batchGetItemInputs = append(batchGetItemInputs, batchGetItemInput) + } + + // goroutines sending requests to DynamoDB + errGroup, ctx := errgroup.WithContext(ctx) + for i, batchGetItemInput := range batchGetItemInputs { + _, batchGetItemInput := i, batchGetItemInput + errGroup.Go(func() error { + if err := sem.Acquire(ctx, 1); err != nil { + return err + } + defer sem.Release(1) + + resp, err := d.client.BatchGetItem(ctx, batchGetItemInput) + if err != nil { + return err + } + + // in case there is no entity id of a feature view in dynamodb + batchSize := len(resp.Responses[tableName]) + if batchSize == 0 { + return nil + } + + // process response from dynamodb + for j := 0; j < batchSize; j++ { + entityId := resp.Responses[tableName][j]["entity_id"].(*dtypes.AttributeValueMemberS).Value + timestampString := resp.Responses[tableName][j]["event_ts"].(*dtypes.AttributeValueMemberS).Value + t, err := time.Parse("2006-01-02 15:04:05-07:00", timestampString) + if err != nil { + return err + } + timeStamp := timestamppb.New(t) + + featureValues := resp.Responses[tableName][j]["values"].(*dtypes.AttributeValueMemberM).Value + entityIndex := entityIndexMap[entityId] + + for _, featureName := range featureNames { + featureValue := featureValues[featureName].(*dtypes.AttributeValueMemberB).Value + var value types.Value + if err := proto.Unmarshal(featureValue, &value); err != nil { + return err + } + featureIndex := featureNamesIndex[featureName] + + mu.Lock() + results[entityIndex][featureIndex] = FeatureData{Reference: serving.FeatureReferenceV2{FeatureViewName: featureViewName, FeatureName: featureName}, + Timestamp: timestamppb.Timestamp{Seconds: timeStamp.Seconds, Nanos: timeStamp.Nanos}, + Value: types.Value{Val: value.Val}, + } + mu.Unlock() + } + + mu.Lock() + delete(unprocessedEntityIds, entityId) + mu.Unlock() + } + return nil + }) + } + if err := errGroup.Wait(); err != nil { + return nil, err + } + + // process null imputation for entity ids that don't exist in dynamodb + currentTime := timestamppb.Now() // TODO: should use a different timestamp? + for entityId, _ := range unprocessedEntityIds { + entityIndex := entityIndexMap[entityId] + for _, featureName := range featureNames { + featureIndex := featureNamesIndex[featureName] + results[entityIndex][featureIndex] = FeatureData{Reference: serving.FeatureReferenceV2{FeatureViewName: featureViewName, FeatureName: featureName}, + Timestamp: timestamppb.Timestamp{Seconds: currentTime.Seconds, Nanos: currentTime.Nanos}, + Value: types.Value{Val: &types.Value_NullVal{NullVal: types.Null_NULL}}, + } + } + } + } + + return results, nil +} + +func (d *DynamodbOnlineStore) Destruct() { + +} + +func makeFeatureMeta(featureViewNames []string, featureNames []string) (map[string][]string, map[string]int, error) { + if len(featureViewNames) != len(featureNames) { + return nil, nil, fmt.Errorf("the lengths of featureViewNames and featureNames must be the same. got=%d, %d", len(featureViewNames), len(featureNames)) + } + featureMap := make(map[string][]string) + featureNamesIndex := make(map[string]int) + for i := 0; i < len(featureViewNames); i++ { + featureViewName := featureViewNames[i] + featureName := featureNames[i] + + featureMap[featureViewName] = append(featureMap[featureViewName], featureName) + featureNamesIndex[featureName] = i + } + return featureMap, featureNamesIndex, nil +} diff --git a/go/internal/feast/onlinestore/dynamodbonlinestore_test.go b/go/internal/feast/onlinestore/dynamodbonlinestore_test.go new file mode 100644 index 00000000000..1687d765dd9 --- /dev/null +++ b/go/internal/feast/onlinestore/dynamodbonlinestore_test.go @@ -0,0 +1,23 @@ +package onlinestore + +import ( + "testing" + + "github.com/feast-dev/feast/go/internal/feast/registry" + "github.com/stretchr/testify/assert" +) + +func TestNewDynamodbOnlineStore(t *testing.T) { + var config = map[string]interface{}{ + "batch_size": 40, + "region": "us-east-1", + "max_pool_connections": 4, + "consistent_reads": "true", + } + rc := ®istry.RepoConfig{ + OnlineStore: config, + EntityKeySerializationVersion: 2, + } + _, err := NewDynamodbOnlineStore("test", rc, config) + assert.Nil(t, err) +} diff --git a/go/internal/feast/onlinestore/onlinestore.go b/go/internal/feast/onlinestore/onlinestore.go index 2f30e16d674..5fd23c52ee9 100644 --- a/go/internal/feast/onlinestore/onlinestore.go +++ b/go/internal/feast/onlinestore/onlinestore.go @@ -2,7 +2,9 @@ package onlinestore import ( "context" + "encoding/binary" "fmt" + "sort" "github.com/feast-dev/feast/go/internal/feast/registry" "github.com/feast-dev/feast/go/protos/feast/serving" @@ -61,7 +63,72 @@ func NewOnlineStore(config *registry.RepoConfig) (OnlineStore, error) { } else if onlineStoreType == "redis" { onlineStore, err := NewRedisOnlineStore(config.Project, config, config.OnlineStore) return onlineStore, err + } else if onlineStoreType == "dynamodb" { + onlineStore, err := NewDynamodbOnlineStore(config.Project, config, config.OnlineStore) + return onlineStore, err } else { return nil, fmt.Errorf("%s online store type is currently not supported; only redis and sqlite are supported", onlineStoreType) } } + +func serializeEntityKey(entityKey *types.EntityKey, entityKeySerializationVersion int64) (*[]byte, error) { + // Serialize entity key to a bytestring so that it can be used as a lookup key in a hash table. + + // Ensure that we have the right amount of join keys and entity values + if len(entityKey.JoinKeys) != len(entityKey.EntityValues) { + return nil, fmt.Errorf("the amount of join key names and entity values don't match: %s vs %s", entityKey.JoinKeys, entityKey.EntityValues) + } + + // Make sure that join keys are sorted so that we have consistent key building + m := make(map[string]*types.Value) + + for i := 0; i < len(entityKey.JoinKeys); i++ { + m[entityKey.JoinKeys[i]] = entityKey.EntityValues[i] + } + + keys := make([]string, 0, len(m)) + for k := range entityKey.JoinKeys { + keys = append(keys, entityKey.JoinKeys[k]) + } + sort.Strings(keys) + + // Build the key + length := 5 * len(keys) + bufferList := make([][]byte, length) + + for i := 0; i < len(keys); i++ { + offset := i * 2 + byteBuffer := make([]byte, 4) + binary.LittleEndian.PutUint32(byteBuffer, uint32(types.ValueType_Enum_value["STRING"])) + bufferList[offset] = byteBuffer + bufferList[offset+1] = []byte(keys[i]) + } + + for i := 0; i < len(keys); i++ { + offset := (2 * len(keys)) + (i * 3) + value := m[keys[i]].GetVal() + + valueBytes, valueTypeBytes, err := serializeValue(value, entityKeySerializationVersion) + if err != nil { + return valueBytes, err + } + + typeBuffer := make([]byte, 4) + binary.LittleEndian.PutUint32(typeBuffer, uint32(valueTypeBytes)) + + lenBuffer := make([]byte, 4) + binary.LittleEndian.PutUint32(lenBuffer, uint32(len(*valueBytes))) + + bufferList[offset+0] = typeBuffer + bufferList[offset+1] = lenBuffer + bufferList[offset+2] = *valueBytes + } + + // Convert from an array of byte arrays to a single byte array + var entityKeyBuffer []byte + for i := 0; i < len(bufferList); i++ { + entityKeyBuffer = append(entityKeyBuffer, bufferList[i]...) + } + + return &entityKeyBuffer, nil +} diff --git a/go/internal/feast/onlinestore/redisonlinestore.go b/go/internal/feast/onlinestore/redisonlinestore.go index 95377d7548a..d5713d5e4d0 100644 --- a/go/internal/feast/onlinestore/redisonlinestore.go +++ b/go/internal/feast/onlinestore/redisonlinestore.go @@ -7,8 +7,6 @@ import ( "errors" "fmt" - //"os" - "sort" "strconv" "strings" @@ -341,80 +339,6 @@ func buildRedisKey(project string, entityKey *types.EntityKey, entityKeySerializ return &fullKey, nil } -func serializeEntityKey(entityKey *types.EntityKey, entityKeySerializationVersion int64) (*[]byte, error) { - // Serialize entity key to a bytestring so that it can be used as a lookup key in a hash table. - - if entityKeySerializationVersion < 3 { - return nil, fmt.Errorf("Serialization of entity key with version < 3 is removed. Please use version 3 by setting entity_key_serialization_version=3. To reserializa your online store featrues refer - https://github.com/feast-dev/feast/blob/master/docs/how-to-guides/entity-reserialization-of-from-v2-to-v3.md") - } - - // Ensure that we have the right amount of join keys and entity values - if len(entityKey.JoinKeys) != len(entityKey.EntityValues) { - return nil, fmt.Errorf("the amount of join key names and entity values don't match: %s vs %s", entityKey.JoinKeys, entityKey.EntityValues) - } - - // Make sure that join keys are sorted so that we have consistent key building - m := make(map[string]*types.Value) - - for i := 0; i < len(entityKey.JoinKeys); i++ { - m[entityKey.JoinKeys[i]] = entityKey.EntityValues[i] - } - - keys := make([]string, 0, len(m)) - for k := range entityKey.JoinKeys { - keys = append(keys, entityKey.JoinKeys[k]) - } - sort.Strings(keys) - - // Build the key - length := 5 * len(keys) - bufferList := make([][]byte, length) - - // For entityKeySerializationVersion 3 and above, we add the number of join keys - // as the first 4 bytes of the serialized key. - if entityKeySerializationVersion >= 3 { - byteBuffer := make([]byte, 4) - binary.LittleEndian.PutUint32(byteBuffer, uint32(len(keys))) - bufferList = append([][]byte{byteBuffer}, bufferList...) - } - - for i := 0; i < len(keys); i++ { - offset := i * 2 - byteBuffer := make([]byte, 4) - binary.LittleEndian.PutUint32(byteBuffer, uint32(types.ValueType_Enum_value["STRING"])) - bufferList[offset] = byteBuffer - bufferList[offset+1] = []byte(keys[i]) - } - - for i := 0; i < len(keys); i++ { - offset := (2 * len(keys)) + (i * 3) - value := m[keys[i]].GetVal() - - valueBytes, valueTypeBytes, err := serializeValue(value, entityKeySerializationVersion) - if err != nil { - return valueBytes, err - } - - typeBuffer := make([]byte, 4) - binary.LittleEndian.PutUint32(typeBuffer, uint32(valueTypeBytes)) - - lenBuffer := make([]byte, 4) - binary.LittleEndian.PutUint32(lenBuffer, uint32(len(*valueBytes))) - - bufferList[offset+0] = typeBuffer - bufferList[offset+1] = lenBuffer - bufferList[offset+2] = *valueBytes - } - - // Convert from an array of byte arrays to a single byte array - var entityKeyBuffer []byte - for i := 0; i < len(bufferList); i++ { - entityKeyBuffer = append(entityKeyBuffer, bufferList[i]...) - } - - return &entityKeyBuffer, nil -} - func serializeValue(value interface{}, entityKeySerializationVersion int64) (*[]byte, types.ValueType_Enum, error) { // TODO: Implement support for other types (at least the major types like ints, strings, bytes) switch x := (value).(type) {