diff --git a/.gitignore b/.gitignore index 0f3165e8414..a1d4195fb04 100644 --- a/.gitignore +++ b/.gitignore @@ -105,7 +105,7 @@ coverage.xml .hypothesis/ .pytest_cache/ infra/scripts/*.conf -go/cmd/server/logging/feature_repo/data/ +go/internal/test/feature_repo/data/ # Translations *.mo diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index 636ccd403b3..c67f9ef7f4e 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -3,20 +3,23 @@ package embedded import ( "context" "fmt" - "github.com/feast-dev/feast/go/internal/feast/server" - "github.com/feast-dev/feast/go/internal/feast/server/logging" - "github.com/feast-dev/feast/go/protos/feast/serving" - "google.golang.org/grpc" "log" "net" "os" "os/signal" "syscall" + "google.golang.org/grpc" + + "github.com/feast-dev/feast/go/internal/feast/server" + "github.com/feast-dev/feast/go/internal/feast/server/logging" + "github.com/feast-dev/feast/go/protos/feast/serving" + "github.com/apache/arrow/go/v8/arrow" "github.com/apache/arrow/go/v8/arrow/array" "github.com/apache/arrow/go/v8/arrow/cdata" "github.com/apache/arrow/go/v8/arrow/memory" + "github.com/feast-dev/feast/go/internal/feast" "github.com/feast-dev/feast/go/internal/feast/model" "github.com/feast-dev/feast/go/internal/feast/onlineserving" diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index 5e10f4978e0..4ecd781b746 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/apache/arrow/go/v8/arrow/memory" + "github.com/feast-dev/feast/go/internal/feast/model" "github.com/feast-dev/feast/go/internal/feast/onlineserving" "github.com/feast-dev/feast/go/internal/feast/onlinestore" @@ -287,3 +288,32 @@ func (fs *FeatureStore) readFromOnlineStore(ctx context.Context, entityRows []*p } return fs.onlineStore.OnlineRead(ctx, entityRowsValue, requestedFeatureViewNames, requestedFeatureNames) } + +func (fs *FeatureStore) GetFcosMap() (map[string]*model.Entity, map[string]*model.FeatureView, map[string]*model.OnDemandFeatureView, error) { + odfvs, err := fs.ListOnDemandFeatureViews() + if err != nil { + return nil, nil, nil, err + } + fvs, err := fs.ListFeatureViews() + if err != nil { + return nil, nil, nil, err + } + entities, err := fs.ListEntities(true) + if err != nil { + return nil, nil, nil, err + } + + entityMap := make(map[string]*model.Entity) + for _, entity := range entities { + entityMap[entity.Name] = entity + } + fvMap := make(map[string]*model.FeatureView) + for _, fv := range fvs { + fvMap[fv.Base.Name] = fv + } + odfvMap := make(map[string]*model.OnDemandFeatureView) + for _, odfv := range odfvs { + odfvMap[odfv.Base.Name] = odfv + } + return entityMap, fvMap, odfvMap, nil +} diff --git a/go/internal/feast/featurestore_test.go b/go/internal/feast/featurestore_test.go index c8f9049c4a5..dd08bc287e9 100644 --- a/go/internal/feast/featurestore_test.go +++ b/go/internal/feast/featurestore_test.go @@ -6,10 +6,11 @@ import ( "runtime" "testing" + "github.com/stretchr/testify/assert" + "github.com/feast-dev/feast/go/internal/feast/onlinestore" "github.com/feast-dev/feast/go/internal/feast/registry" "github.com/feast-dev/feast/go/protos/feast/types" - "github.com/stretchr/testify/assert" ) // Return absolute path to the test_repo registry regardless of the working directory diff --git a/go/internal/feast/model/featureservice.go b/go/internal/feast/model/featureservice.go index 5619dd90426..ce2781efc28 100644 --- a/go/internal/feast/model/featureservice.go +++ b/go/internal/feast/model/featureservice.go @@ -1,8 +1,9 @@ package model import ( - "github.com/feast-dev/feast/go/protos/feast/core" timestamppb "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/feast-dev/feast/go/protos/feast/core" ) type FeatureService struct { @@ -11,6 +12,11 @@ type FeatureService struct { CreatedTimestamp *timestamppb.Timestamp LastUpdatedTimestamp *timestamppb.Timestamp Projections []*FeatureViewProjection + LoggingConfig *FeatureServiceLoggingConfig +} + +type FeatureServiceLoggingConfig struct { + SampleRate float32 } func NewFeatureServiceFromProto(proto *core.FeatureService) *FeatureService { @@ -18,10 +24,17 @@ func NewFeatureServiceFromProto(proto *core.FeatureService) *FeatureService { for index, projectionProto := range proto.Spec.Features { projections[index] = NewFeatureViewProjectionFromProto(projectionProto) } + var loggingConfig *FeatureServiceLoggingConfig + if proto.GetSpec().GetLoggingConfig() != nil { + loggingConfig = &FeatureServiceLoggingConfig{ + SampleRate: proto.GetSpec().GetLoggingConfig().SampleRate, + } + } return &FeatureService{Name: proto.Spec.Name, Project: proto.Spec.Project, CreatedTimestamp: proto.Meta.CreatedTimestamp, LastUpdatedTimestamp: proto.Meta.LastUpdatedTimestamp, Projections: projections, + LoggingConfig: loggingConfig, } } diff --git a/go/internal/feast/model/featureview.go b/go/internal/feast/model/featureview.go index 85fc7a60eeb..6c198f99947 100644 --- a/go/internal/feast/model/featureview.go +++ b/go/internal/feast/model/featureview.go @@ -1,9 +1,10 @@ package model import ( + durationpb "google.golang.org/protobuf/types/known/durationpb" + "github.com/feast-dev/feast/go/protos/feast/core" "github.com/feast-dev/feast/go/protos/feast/types" - durationpb "google.golang.org/protobuf/types/known/durationpb" ) const ( diff --git a/go/internal/feast/model/ondemandfeatureview.go b/go/internal/feast/model/ondemandfeatureview.go index b7a352cc205..b637cd75ed0 100644 --- a/go/internal/feast/model/ondemandfeatureview.go +++ b/go/internal/feast/model/ondemandfeatureview.go @@ -49,8 +49,10 @@ func (fs *OnDemandFeatureView) NewWithProjection(projection *FeatureViewProjecti } func NewOnDemandFeatureViewFromBase(base *BaseFeatureView) *OnDemandFeatureView { - - featureView := &OnDemandFeatureView{Base: base} + featureView := &OnDemandFeatureView{ + Base: base, + SourceFeatureViewProjections: map[string]*FeatureViewProjection{}, + SourceRequestDataSources: map[string]*core.DataSource_RequestDataOptions{}} return featureView } diff --git a/go/internal/feast/onlineserving/serving.go b/go/internal/feast/onlineserving/serving.go index 381ba5f0f2f..1d0567c3547 100644 --- a/go/internal/feast/onlineserving/serving.go +++ b/go/internal/feast/onlineserving/serving.go @@ -9,14 +9,15 @@ import ( "github.com/apache/arrow/go/v8/arrow" "github.com/apache/arrow/go/v8/arrow/memory" + "github.com/golang/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" + "github.com/feast-dev/feast/go/internal/feast/model" "github.com/feast-dev/feast/go/internal/feast/onlinestore" "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" - "github.com/golang/protobuf/proto" - "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" ) /* diff --git a/go/internal/feast/onlineserving/serving_test.go b/go/internal/feast/onlineserving/serving_test.go index 2f4cf8eabaa..0a00f546f9e 100644 --- a/go/internal/feast/onlineserving/serving_test.go +++ b/go/internal/feast/onlineserving/serving_test.go @@ -3,12 +3,13 @@ package onlineserving import ( "testing" - "github.com/feast-dev/feast/go/internal/feast/model" - "github.com/feast-dev/feast/go/protos/feast/core" - "github.com/feast-dev/feast/go/protos/feast/types" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/feast-dev/feast/go/internal/feast/model" + "github.com/feast-dev/feast/go/protos/feast/core" + "github.com/feast-dev/feast/go/protos/feast/types" ) func TestGroupingFeatureRefs(t *testing.T) { diff --git a/go/internal/feast/onlinestore/onlinestore.go b/go/internal/feast/onlinestore/onlinestore.go index 433b96c08e4..64a05f144ce 100644 --- a/go/internal/feast/onlinestore/onlinestore.go +++ b/go/internal/feast/onlinestore/onlinestore.go @@ -3,11 +3,13 @@ package onlinestore import ( "context" "fmt" + "github.com/feast-dev/feast/go/internal/feast/registry" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/feast-dev/feast/go/protos/feast/serving" "github.com/feast-dev/feast/go/protos/feast/types" - "github.com/golang/protobuf/ptypes/timestamp" ) type FeatureData struct { diff --git a/go/internal/feast/onlinestore/redisonlinestore.go b/go/internal/feast/onlinestore/redisonlinestore.go index 9049eae1033..df04856cbfa 100644 --- a/go/internal/feast/onlinestore/redisonlinestore.go +++ b/go/internal/feast/onlinestore/redisonlinestore.go @@ -9,12 +9,13 @@ import ( "strconv" "strings" - "github.com/feast-dev/feast/go/protos/feast/serving" - "github.com/feast-dev/feast/go/protos/feast/types" "github.com/go-redis/redis/v8" "github.com/golang/protobuf/proto" "github.com/spaolacci/murmur3" timestamppb "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/feast-dev/feast/go/protos/feast/serving" + "github.com/feast-dev/feast/go/protos/feast/types" ) type redisType int diff --git a/go/internal/feast/onlinestore/sqliteonlinestore.go b/go/internal/feast/onlinestore/sqliteonlinestore.go index f8c53255455..94ba0c0d568 100644 --- a/go/internal/feast/onlinestore/sqliteonlinestore.go +++ b/go/internal/feast/onlinestore/sqliteonlinestore.go @@ -5,19 +5,21 @@ import ( "database/sql" "encoding/hex" "errors" - "github.com/feast-dev/feast/go/internal/feast/registry" "strings" "sync" "time" + "github.com/feast-dev/feast/go/internal/feast/registry" + "context" "fmt" - "github.com/feast-dev/feast/go/protos/feast/serving" - "github.com/feast-dev/feast/go/protos/feast/types" _ "github.com/mattn/go-sqlite3" "google.golang.org/protobuf/proto" timestamppb "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/feast-dev/feast/go/protos/feast/serving" + "github.com/feast-dev/feast/go/protos/feast/types" ) type SqliteOnlineStore struct { diff --git a/go/internal/feast/onlinestore/sqliteonlinestore_test.go b/go/internal/feast/onlinestore/sqliteonlinestore_test.go index cbee9cd91c2..5af1c1f4ce4 100644 --- a/go/internal/feast/onlinestore/sqliteonlinestore_test.go +++ b/go/internal/feast/onlinestore/sqliteonlinestore_test.go @@ -8,17 +8,18 @@ import ( "github.com/feast-dev/feast/go/internal/feast/registry" + "github.com/stretchr/testify/assert" + "github.com/feast-dev/feast/go/internal/test" "github.com/feast-dev/feast/go/protos/feast/types" - "github.com/stretchr/testify/assert" ) func TestSqliteAndFeatureRepoSetup(t *testing.T) { - dir := "../../test" + dir := t.TempDir() feature_repo_path := filepath.Join(dir, "feature_repo") err := test.SetupCleanFeatureRepo(dir) assert.Nil(t, err) - defer test.CleanUpRepo(dir) + config, err := registry.NewRepoConfigFromFile(feature_repo_path) assert.Nil(t, err) assert.Equal(t, "feature_repo", config.Project) @@ -33,10 +34,10 @@ func TestSqliteAndFeatureRepoSetup(t *testing.T) { } func TestSqliteOnlineRead(t *testing.T) { - dir := "../../test" + dir := t.TempDir() feature_repo_path := filepath.Join(dir, "feature_repo") test.SetupCleanFeatureRepo(dir) - defer test.CleanUpRepo(dir) + config, err := registry.NewRepoConfigFromFile(feature_repo_path) assert.Nil(t, err) store, err := NewSqliteOnlineStore("feature_repo", config, config.OnlineStore) diff --git a/go/internal/feast/registry/local.go b/go/internal/feast/registry/local.go index 22db73a3206..8b35e5756b6 100644 --- a/go/internal/feast/registry/local.go +++ b/go/internal/feast/registry/local.go @@ -1,13 +1,15 @@ package registry import ( - "github.com/feast-dev/feast/go/protos/feast/core" - "github.com/golang/protobuf/proto" - "github.com/google/uuid" - "google.golang.org/protobuf/types/known/timestamppb" "io/ioutil" "os" "path/filepath" + + "github.com/golang/protobuf/proto" + "github.com/google/uuid" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/feast-dev/feast/go/protos/feast/core" ) // A LocalRegistryStore is a file-based implementation of the RegistryStore interface. diff --git a/go/internal/feast/registry/repoconfig.go b/go/internal/feast/registry/repoconfig.go index e5efd899de9..59d125b1bfc 100644 --- a/go/internal/feast/registry/repoconfig.go +++ b/go/internal/feast/registry/repoconfig.go @@ -2,9 +2,10 @@ package registry import ( "encoding/json" - "github.com/ghodss/yaml" "io/ioutil" "path/filepath" + + "github.com/ghodss/yaml" ) const ( diff --git a/go/internal/feast/registry/repoconfig_test.go b/go/internal/feast/registry/repoconfig_test.go index c3336fd618f..848977886c9 100644 --- a/go/internal/feast/registry/repoconfig_test.go +++ b/go/internal/feast/registry/repoconfig_test.go @@ -1,10 +1,11 @@ package registry import ( - "github.com/stretchr/testify/assert" "os" "path/filepath" "testing" + + "github.com/stretchr/testify/assert" ) func TestNewRepoConfig(t *testing.T) { diff --git a/go/internal/feast/server/grpc_server.go b/go/internal/feast/server/grpc_server.go index 08f624b6bda..c5aabd52d97 100644 --- a/go/internal/feast/server/grpc_server.go +++ b/go/internal/feast/server/grpc_server.go @@ -2,13 +2,15 @@ package server import ( "context" + "fmt" + + "github.com/google/uuid" "github.com/feast-dev/feast/go/internal/feast" "github.com/feast-dev/feast/go/internal/feast/server/logging" "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" - "github.com/google/uuid" ) const feastServerVersion = "0.0.1" @@ -55,7 +57,7 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques FeatureNames: &serving.FeatureList{Val: make([]string, 0)}, }, } - // Entities are currently part of the features as a value and the order that we add it to the resp MetaData + // JoinKeys are currently part of the features as a value and the order that we add it to the resp MetaData // Need to figure out a way to map the correct entities to the correct ordering entityValuesMap := make(map[string][]*prototypes.Value, 0) featureNames := make([]string, len(featureVectors)) @@ -76,8 +78,17 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques EventTimestamps: vector.Timestamps, }) } + if featuresOrService.FeatureService != nil && s.loggingService != nil { - go s.loggingService.GenerateLogs(featuresOrService.FeatureService, entityValuesMap, resp.Results[len(request.Entities):], request.RequestContext, requestId) + logger, err := s.loggingService.GetOrCreateLogger(featuresOrService.FeatureService) + if err != nil { + fmt.Printf("Couldn't instantiate logger for feature service %s: %+v", featuresOrService.FeatureService.Name, err) + } + + err = logger.Log(entityValuesMap, resp.Results[len(request.Entities):], resp.Metadata.FeatureNames.Val[len(request.Entities):], request.RequestContext, requestId) + if err != nil { + fmt.Printf("LoggerImpl error[%s]: %+v", featuresOrService.FeatureService.Name, err) + } } return resp, nil } diff --git a/go/internal/feast/server/grpc_server_test.go b/go/internal/feast/server/grpc_server_test.go index 090a8738111..52960321319 100644 --- a/go/internal/feast/server/grpc_server_test.go +++ b/go/internal/feast/server/grpc_server_test.go @@ -2,6 +2,7 @@ package server import ( "context" + "io/ioutil" "net" "os" "path/filepath" @@ -10,20 +11,23 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/feast-dev/feast/go/internal/feast/registry" "github.com/apache/arrow/go/v8/arrow/array" "github.com/apache/arrow/go/v8/arrow/memory" "github.com/apache/arrow/go/v8/parquet/file" "github.com/apache/arrow/go/v8/parquet/pqarrow" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/test/bufconn" + "github.com/feast-dev/feast/go/internal/feast" "github.com/feast-dev/feast/go/internal/feast/server/logging" "github.com/feast-dev/feast/go/internal/test" "github.com/feast-dev/feast/go/protos/feast/serving" "github.com/feast-dev/feast/go/protos/feast/types" - "github.com/stretchr/testify/assert" - "google.golang.org/grpc" - "google.golang.org/grpc/test/bufconn" ) // Return absolute path to the test_repo directory regardless of the working directory @@ -41,27 +45,12 @@ func getRepoPath(basePath string) string { } // Starts a new grpc server, registers the serving service and returns a client. -func getClient(ctx context.Context, offlineStoreType string, basePath string, enableLogging bool) (serving.ServingServiceClient, func()) { +func getClient(ctx context.Context, offlineStoreType string, basePath string, logPath string) (serving.ServingServiceClient, func()) { buffer := 1024 * 1024 listener := bufconn.Listen(buffer) server := grpc.NewServer() config, err := registry.NewRepoConfigFromFile(getRepoPath(basePath)) - - // TODO(kevjumba): either add this officially or talk in design review about what the correct solution for what do with path. - // Currently in python we use the path in FileSource but it is not specified in configuration unless it is using file_url? - if enableLogging { - if config.OfflineStore == nil { - config.OfflineStore = map[string]interface{}{} - } - absPath, err := filepath.Abs(filepath.Join(getRepoPath(basePath), "log.parquet")) - if err != nil { - panic(err) - } - config.OfflineStore["path"] = absPath - config.OfflineStore["storeType"] = offlineStoreType - } - if err != nil { panic(err) } @@ -69,7 +58,20 @@ func getClient(ctx context.Context, offlineStoreType string, basePath string, en if err != nil { panic(err) } - loggingService, err := logging.NewLoggingService(fs, 1000, "test_service", enableLogging) + + var logSink logging.LogSink + if logPath != "" { + logSink, err = logging.NewFileLogSink(logPath) + if err != nil { + panic(err) + } + } + loggingService, err := logging.NewLoggingService(fs, logSink, logging.LoggingOptions{ + WriteInterval: 10 * time.Millisecond, + FlushInterval: logging.DefaultOptions.FlushInterval, + EmitTimeout: logging.DefaultOptions.EmitTimeout, + ChannelCapacity: logging.DefaultOptions.ChannelCapacity, + }) if err != nil { panic(err) } @@ -99,12 +101,13 @@ func getClient(ctx context.Context, offlineStoreType string, basePath string, en func TestGetFeastServingInfo(t *testing.T) { ctx := context.Background() // Pregenerated using `feast init`. - dir := "logging/" + dir := "../../test/" err := test.SetupInitializedRepo(dir) - assert.Nil(t, err) defer test.CleanUpInitializedRepo(dir) - client, closer := getClient(ctx, "", dir, false) + require.Nil(t, err) + + client, closer := getClient(ctx, "", dir, "") defer closer() response, err := client.GetFeastServingInfo(ctx, &serving.GetFeastServingInfoRequest{}) assert.Nil(t, err) @@ -114,12 +117,13 @@ func TestGetFeastServingInfo(t *testing.T) { func TestGetOnlineFeaturesSqlite(t *testing.T) { ctx := context.Background() // Pregenerated using `feast init`. - dir := "logging/" + dir := "../../test/" err := test.SetupInitializedRepo(dir) - assert.Nil(t, err) defer test.CleanUpInitializedRepo(dir) - client, closer := getClient(ctx, "", dir, false) + require.Nil(t, err) + + client, closer := getClient(ctx, "", dir, "") defer closer() entities := make(map[string]*types.RepeatedValue) entities["driver_id"] = &types.RepeatedValue{ @@ -173,12 +177,14 @@ func TestGetOnlineFeaturesSqlite(t *testing.T) { func TestGetOnlineFeaturesSqliteWithLogging(t *testing.T) { ctx := context.Background() // Pregenerated using `feast init`. - dir := "logging/" + dir := "../../test/" err := test.SetupInitializedRepo(dir) - assert.Nil(t, err) defer test.CleanUpInitializedRepo(dir) - client, closer := getClient(ctx, "file", dir, true) + require.Nil(t, err) + + logPath := t.TempDir() + client, closer := getClient(ctx, "file", dir, logPath) defer closer() entities := make(map[string]*types.RepeatedValue) entities["driver_id"] = &types.RepeatedValue{ @@ -207,18 +213,20 @@ func TestGetOnlineFeaturesSqliteWithLogging(t *testing.T) { // TODO(kevjumba): implement for timestamp and status expectedLogValues, _, _ := GetExpectedLogRows(featureNames, response.Results[len(request.Entities):]) expectedLogValues["driver_id"] = entities["driver_id"] - logPath, err := filepath.Abs(filepath.Join(dir, "feature_repo", "log.parquet")) + // Wait for logger to flush. - assert.Eventually(t, func() bool { - var _, err = os.Stat(logPath) - if os.IsNotExist(err) { + require.Eventually(t, func() bool { + files, err := ioutil.ReadDir(logPath) + if err != nil || len(files) == 0 { return false - } else { - return true } - }, 1*time.Second, logging.DEFAULT_LOG_FLUSH_INTERVAL) - assert.Nil(t, err) - pf, err := file.OpenParquetFile(logPath, false) + stat, err := os.Stat(filepath.Join(logPath, files[0].Name())) + return err == nil && stat.Size() > 0 + }, 1*time.Second, 100*time.Millisecond) + + files, err := ioutil.ReadDir(logPath) + logFile := filepath.Join(logPath, files[0].Name()) + pf, err := file.OpenParquetFile(logFile, false) assert.Nil(t, err) reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) @@ -232,26 +240,23 @@ func TestGetOnlineFeaturesSqliteWithLogging(t *testing.T) { for tr.Next() { rec := tr.Record() assert.NotNil(t, rec) - values, err := test.GetProtoFromRecord(rec) + actualValues, err := test.GetProtoFromRecord(rec) assert.Nil(t, err) - assert.Equal(t, len(values)-1 /*request id column not counted*/, len(expectedLogValues)) - // Need to iterate through and compare because certain values in types.RepeatedValues aren't accurately being compared. - for name, val := range values { + // Need to iterate through and compare because certain actualValues in types.RepeatedValues aren't accurately being compared. + for name, val := range expectedLogValues { if name == "RequestId" { // Ensure there are request ids for each entity. - assert.Equal(t, len(val.Val), len(response.Results[0].Values)) + assert.Equal(t, len(val.Val), len(actualValues[name].Val)) } else { - assert.Equal(t, len(val.Val), len(expectedLogValues[name].Val)) + assert.Equal(t, len(val.Val), len(actualValues[name].Val)) for idx, featureVal := range val.Val { - assert.Equal(t, featureVal.Val, expectedLogValues[name].Val[idx].Val) + assert.Equal(t, featureVal.Val, actualValues[name].Val[idx].Val) } } } } - err = test.CleanUpFile(logPath) - assert.Nil(t, err) } // Generate the expected log rows based on the resulting feature vector returned from GetOnlineFeatures. diff --git a/go/internal/feast/server/logging/featureserviceschema.go b/go/internal/feast/server/logging/featureserviceschema.go new file mode 100644 index 00000000000..5047346c2ca --- /dev/null +++ b/go/internal/feast/server/logging/featureserviceschema.go @@ -0,0 +1,97 @@ +package logging + +import ( + "fmt" + + "github.com/feast-dev/feast/go/internal/feast/model" + "github.com/feast-dev/feast/go/protos/feast/types" +) + +type FeatureServiceSchema struct { + JoinKeys []string + Features []string + RequestData []string + + JoinKeysTypes map[string]types.ValueType_Enum + FeaturesTypes map[string]types.ValueType_Enum + RequestDataTypes map[string]types.ValueType_Enum +} + +func GenerateSchemaFromFeatureService(fs FeatureStore, featureServiceName string) (*FeatureServiceSchema, error) { + entityMap, fvMap, odFvMap, err := fs.GetFcosMap() + if err != nil { + return nil, err + } + + featureService, err := fs.GetFeatureService(featureServiceName) + if err != nil { + return nil, err + } + + return generateSchema(featureService, entityMap, fvMap, odFvMap) +} + +func generateSchema(featureService *model.FeatureService, entityMap map[string]*model.Entity, fvMap map[string]*model.FeatureView, odFvMap map[string]*model.OnDemandFeatureView) (*FeatureServiceSchema, error) { + joinKeys := make([]string, 0) + features := make([]string, 0) + requestData := make([]string, 0) + + joinKeysSet := make(map[string]interface{}) + + entityJoinKeyToType := make(map[string]types.ValueType_Enum) + allFeatureTypes := make(map[string]types.ValueType_Enum) + requestDataTypes := make(map[string]types.ValueType_Enum) + + for _, featureProjection := range featureService.Projections { + // Create copies of FeatureView that may contains the same *FeatureView but + // each differentiated by a *FeatureViewProjection + featureViewName := featureProjection.Name + if fv, ok := fvMap[featureViewName]; ok { + for _, f := range featureProjection.Features { + fullFeatureName := getFullFeatureName(featureProjection.NameToUse(), f.Name) + features = append(features, fullFeatureName) + allFeatureTypes[fullFeatureName] = f.Dtype + } + for _, entityName := range fv.Entities { + entity := entityMap[entityName] + var joinKey string + if joinKeyAlias, ok := featureProjection.JoinKeyMap[entity.JoinKey]; ok { + joinKey = joinKeyAlias + } else { + joinKey = entity.JoinKey + } + + if _, ok := joinKeysSet[joinKey]; !ok { + joinKeys = append(joinKeys, joinKey) + } + + joinKeysSet[joinKey] = nil + entityJoinKeyToType[joinKey] = entity.ValueType + } + } else if odFv, ok := odFvMap[featureViewName]; ok { + for _, f := range featureProjection.Features { + fullFeatureName := getFullFeatureName(featureProjection.NameToUse(), f.Name) + features = append(features, fullFeatureName) + allFeatureTypes[fullFeatureName] = f.Dtype + } + for paramName, paramType := range odFv.GetRequestDataSchema() { + requestData = append(requestData, paramName) + requestDataTypes[paramName] = paramType + } + } else { + return nil, fmt.Errorf("no such feature view %s found (referenced from feature service %s)", + featureViewName, featureService.Name) + } + } + + schema := &FeatureServiceSchema{ + JoinKeys: joinKeys, + Features: features, + RequestData: requestData, + + JoinKeysTypes: entityJoinKeyToType, + FeaturesTypes: allFeatureTypes, + RequestDataTypes: requestDataTypes, + } + return schema, nil +} diff --git a/go/internal/feast/server/logging/featureserviceschema_test.go b/go/internal/feast/server/logging/featureserviceschema_test.go new file mode 100644 index 00000000000..efcd5ec7fcc --- /dev/null +++ b/go/internal/feast/server/logging/featureserviceschema_test.go @@ -0,0 +1,204 @@ +package logging + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/feast-dev/feast/go/internal/feast/model" + "github.com/feast-dev/feast/go/internal/test" + "github.com/feast-dev/feast/go/protos/feast/core" + "github.com/feast-dev/feast/go/protos/feast/types" +) + +func buildFCOMaps(entities []*model.Entity, fvs []*model.FeatureView, odFvs []*model.OnDemandFeatureView) (map[string]*model.Entity, map[string]*model.FeatureView, map[string]*model.OnDemandFeatureView) { + entityMap := make(map[string]*model.Entity) + fvMap := make(map[string]*model.FeatureView) + odFvMap := make(map[string]*model.OnDemandFeatureView) + + for _, entity := range entities { + entityMap[entity.Name] = entity + } + + for _, fv := range fvs { + fvMap[fv.Base.Name] = fv + } + + for _, fv := range odFvs { + odFvMap[fv.Base.Name] = fv + } + + return entityMap, fvMap, odFvMap +} + +func TestSchemaTypeRetrieval(t *testing.T) { + featureService, entities, fvs, odfvs := InitializeFeatureRepoVariablesForTest() + entityMap, fvMap, odFvMap := buildFCOMaps(entities, fvs, odfvs) + + expectedFeatureNames := make([]string, 0) + expectedRequestData := make([]string, 0) + + for _, featureView := range fvs { + for _, f := range featureView.Base.Features { + expectedFeatureNames = append(expectedFeatureNames, getFullFeatureName(featureView.Base.Name, f.Name)) + } + } + for _, odFv := range odfvs { + for _, f := range odFv.Base.Features { + expectedFeatureNames = append(expectedFeatureNames, getFullFeatureName(odFv.Base.Name, f.Name)) + } + for _, dataSource := range odFv.SourceRequestDataSources { + for _, field := range dataSource.Schema { + expectedRequestData = append(expectedRequestData, field.Name) + } + } + } + + schema, err := generateSchema(featureService, entityMap, fvMap, odFvMap) + assert.Nil(t, err) + + assert.Equal(t, expectedFeatureNames, schema.Features) + assert.Equal(t, []string{"driver_id"}, schema.JoinKeys) + assert.Equal(t, schema.JoinKeysTypes["driver_id"], types.ValueType_INT64) + + types := []types.ValueType_Enum{*types.ValueType_INT64.Enum(), *types.ValueType_FLOAT.Enum(), *types.ValueType_INT32.Enum(), *types.ValueType_DOUBLE.Enum(), *types.ValueType_INT32.Enum(), *types.ValueType_DOUBLE.Enum()} + for idx, featureName := range expectedFeatureNames { + assert.Contains(t, schema.FeaturesTypes, featureName) + assert.Equal(t, schema.FeaturesTypes[featureName], types[idx]) + } +} + +func TestSchemaRetrievalIgnoresEntitiesNotInFeatureService(t *testing.T) { + featureService, entities, fvs, odfvs := InitializeFeatureRepoVariablesForTest() + entityMap, fvMap, odFvMap := buildFCOMaps(entities, fvs, odfvs) + + //Remove entities in featureservice + for _, featureView := range fvs { + featureView.Entities = []string{} + } + + schema, err := generateSchema(featureService, entityMap, fvMap, odFvMap) + assert.Nil(t, err) + assert.Empty(t, schema.JoinKeysTypes) +} + +func TestSchemaUsesOrderInFeatureService(t *testing.T) { + featureService, entities, fvs, odfvs := InitializeFeatureRepoVariablesForTest() + entityMap, fvMap, odFvMap := buildFCOMaps(entities, fvs, odfvs) + + expectedFeatureNames := make([]string, 0) + + // Source of truth for order of featureNames + for _, featureView := range fvs { + for _, f := range featureView.Base.Features { + expectedFeatureNames = append(expectedFeatureNames, getFullFeatureName(featureView.Base.Name, f.Name)) + } + } + for _, featureView := range odfvs { + for _, f := range featureView.Base.Features { + expectedFeatureNames = append(expectedFeatureNames, getFullFeatureName(featureView.Base.Name, f.Name)) + } + } + + rand.Seed(time.Now().UnixNano()) + // Shuffle the featureNames in incorrect order + for _, featureView := range fvs { + rand.Shuffle(len(featureView.Base.Features), func(i, j int) { + featureView.Base.Features[i], featureView.Base.Features[j] = featureView.Base.Features[j], featureView.Base.Features[i] + }) + } + for _, featureView := range odfvs { + rand.Shuffle(len(featureView.Base.Features), func(i, j int) { + featureView.Base.Features[i], featureView.Base.Features[j] = featureView.Base.Features[j], featureView.Base.Features[i] + }) + } + + schema, err := generateSchema(featureService, entityMap, fvMap, odFvMap) + assert.Nil(t, err) + + // Ensure the same results + assert.Equal(t, expectedFeatureNames, schema.Features) + assert.Equal(t, []string{"driver_id"}, schema.JoinKeys) + +} + +// Initialize all dummy featureservice, entities and featureviews/on demand featureviews for testing. +func InitializeFeatureRepoVariablesForTest() (*model.FeatureService, []*model.Entity, []*model.FeatureView, []*model.OnDemandFeatureView) { + f1 := test.CreateNewFeature( + "int64", + types.ValueType_INT64, + ) + f2 := test.CreateNewFeature( + "float32", + types.ValueType_FLOAT, + ) + projection1 := test.CreateNewFeatureViewProjection( + "featureView1", + "", + []*model.Feature{f1, f2}, + map[string]string{}, + ) + baseFeatureView1 := test.CreateBaseFeatureView( + "featureView1", + []*model.Feature{f1, f2}, + projection1, + ) + featureView1 := test.CreateFeatureView(baseFeatureView1, nil, []string{"driver_id"}) + entity1 := test.CreateNewEntity("driver_id", types.ValueType_INT64, "driver_id") + f3 := test.CreateNewFeature( + "int32", + types.ValueType_INT32, + ) + f4 := test.CreateNewFeature( + "double", + types.ValueType_DOUBLE, + ) + projection2 := test.CreateNewFeatureViewProjection( + "featureView2", + "", + []*model.Feature{f3, f4}, + map[string]string{}, + ) + baseFeatureView2 := test.CreateBaseFeatureView( + "featureView2", + []*model.Feature{f3, f4}, + projection2, + ) + featureView2 := test.CreateFeatureView(baseFeatureView2, nil, []string{"driver_id"}) + + f5 := test.CreateNewFeature( + "odfv_f1", + types.ValueType_INT32, + ) + f6 := test.CreateNewFeature( + "odfv_f2", + types.ValueType_DOUBLE, + ) + projection3 := test.CreateNewFeatureViewProjection( + "od_bf1", + "", + []*model.Feature{f5, f6}, + map[string]string{}, + ) + od_bf1 := test.CreateBaseFeatureView( + "od_bf1", + []*model.Feature{f5, f6}, + projection3, + ) + odfv := model.NewOnDemandFeatureViewFromBase(od_bf1) + odfv.SourceRequestDataSources["input"] = &core.DataSource_RequestDataOptions{ + Schema: []*core.FeatureSpecV2{ + {Name: "param1", ValueType: types.ValueType_FLOAT}, + }, + } + featureService := test.CreateNewFeatureService( + "test_service", + "test_project", + nil, + nil, + []*model.FeatureViewProjection{projection1, projection2, projection3}, + ) + return featureService, []*model.Entity{entity1}, []*model.FeatureView{featureView1, featureView2}, []*model.OnDemandFeatureView{odfv} +} diff --git a/go/internal/feast/server/logging/filelogsink.go b/go/internal/feast/server/logging/filelogsink.go new file mode 100644 index 00000000000..1d9afcd5234 --- /dev/null +++ b/go/internal/feast/server/logging/filelogsink.go @@ -0,0 +1,55 @@ +package logging + +import ( + "fmt" + "io" + "os" + "path/filepath" + + "github.com/pkg/errors" + + "github.com/apache/arrow/go/v8/arrow" + "github.com/google/uuid" + + "github.com/apache/arrow/go/v8/arrow/array" + "github.com/apache/arrow/go/v8/parquet" + "github.com/apache/arrow/go/v8/parquet/pqarrow" +) + +type FileLogSink struct { + path string +} + +// FileLogSink is currently only used for testing. It will be instantiated during go unit tests to log to file +// and the parquet files will be cleaned up after the test is run. +func NewFileLogSink(path string) (*FileLogSink, error) { + if path == "" { + return nil, errors.New("need path for file log sink") + } + + absPath, err := filepath.Abs(path) + if err != nil { + return nil, err + } + return &FileLogSink{path: absPath}, nil +} + +func (s *FileLogSink) Write(record arrow.Record) error { + fileName, _ := uuid.NewUUID() + + var writer io.Writer + writer, err := os.Create(filepath.Join(s.path, fmt.Sprintf("%s.parquet", fileName.String()))) + if err != nil { + return err + } + table := array.NewTableFromRecords(record.Schema(), []arrow.Record{record}) + + props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(false)) + arrProps := pqarrow.DefaultWriterProps() + return pqarrow.WriteTable(table, writer, 100, props, arrProps) +} + +func (s *FileLogSink) Flush() error { + // files are already flushed during Write + return nil +} diff --git a/go/internal/feast/server/logging/filelogstorage.go b/go/internal/feast/server/logging/filelogstorage.go deleted file mode 100644 index 19e9569e69d..00000000000 --- a/go/internal/feast/server/logging/filelogstorage.go +++ /dev/null @@ -1,86 +0,0 @@ -package logging - -import ( - "errors" - "fmt" - "io" - "os" - "path/filepath" - - "github.com/apache/arrow/go/v8/arrow/array" - "github.com/apache/arrow/go/v8/parquet" - "github.com/apache/arrow/go/v8/parquet/pqarrow" - "github.com/feast-dev/feast/go/internal/feast/registry" -) - -type FileLogStorage struct { - // Feast project name - project string - path string -} - -func GetFileConfig(config *registry.RepoConfig) (*OfflineLogStoreConfig, error) { - fileConfig := OfflineLogStoreConfig{ - storeType: "file", - } - if onlineStorePath, ok := config.OfflineStore["path"]; ok { - path, success := onlineStorePath.(string) - if !success { - return &fileConfig, fmt.Errorf("path, %s, cannot be converted to string", path) - } - fileConfig.path = path - } else { - return nil, errors.New("need path for file log storage") - } - return &fileConfig, nil -} - -// This offline store is currently only used for testing. It will be instantiated during go unit tests to log to file -// and the parquet files will be cleaned up after the test is run. -func NewFileOfflineStore(project string, offlineStoreConfig *OfflineLogStoreConfig) (*FileLogStorage, error) { - store := FileLogStorage{project: project} - var absPath string - var err error - // TODO(kevjumba) remove this default catch. - if offlineStoreConfig.path != "" { - absPath, err = filepath.Abs(offlineStoreConfig.path) - } else { - return nil, errors.New("need path for file log storage") - } - if err != nil { - return nil, err - } - store.path = absPath - return &store, nil -} - -func openLogFile(absPath string) (*os.File, error) { - var _, err = os.Stat(absPath) - - // create file if not exists - if os.IsNotExist(err) { - var file, err = os.Create(absPath) - if err != nil { - return nil, err - } - return file, nil - } else { - return nil, fmt.Errorf("path %s already exists", absPath) - } -} - -func (f *FileLogStorage) FlushToStorage(tbl array.Table) error { - w, err := openLogFile(f.path) - var writer io.Writer = w - if err != nil { - return err - } - props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(false)) - arrProps := pqarrow.DefaultWriterProps() - err = pqarrow.WriteTable(tbl, writer, 100, props, arrProps) - if err != nil { - return err - } - return nil - -} diff --git a/go/internal/feast/server/logging/filelogstorage_test.go b/go/internal/feast/server/logging/filelogstorage_test.go deleted file mode 100644 index 1da7dd38ad2..00000000000 --- a/go/internal/feast/server/logging/filelogstorage_test.go +++ /dev/null @@ -1,70 +0,0 @@ -package logging - -import ( - "context" - "path/filepath" - - "testing" - - "github.com/apache/arrow/go/v8/arrow/array" - "github.com/apache/arrow/go/v8/arrow/memory" - "github.com/apache/arrow/go/v8/parquet/file" - "github.com/apache/arrow/go/v8/parquet/pqarrow" - "github.com/feast-dev/feast/go/internal/test" - "github.com/stretchr/testify/assert" -) - -func TestFlushToStorage(t *testing.T) { - ctx := context.Background() - table, expectedSchema, expectedColumns, err := GetTestArrowTableAndExpectedResults() - defer table.Release() - assert.Nil(t, err) - offlineStoreConfig := OfflineLogStoreConfig{ - storeType: "file", - path: "./log.parquet", - } - fileStore, err := NewFileOfflineStore("test", &offlineStoreConfig) - assert.Nil(t, err) - err = fileStore.FlushToStorage(array.Table(table)) - assert.Nil(t, err) - logPath, err := filepath.Abs(offlineStoreConfig.path) - assert.Nil(t, err) - pf, err := file.OpenParquetFile(logPath, false) - assert.Nil(t, err) - - reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) - assert.Nil(t, err) - - tbl, err := reader.ReadTable(ctx) - assert.Nil(t, err) - tr := array.NewTableReader(tbl, -1) - defer tbl.Release() - - defer tr.Release() - for tr.Next() { - rec := tr.Record() - assert.NotNil(t, rec) - for _, field := range rec.Schema().Fields() { - assert.Contains(t, expectedSchema, field.Name) - assert.Equal(t, field.Type, expectedSchema[field.Name]) - } - values, err := test.GetProtoFromRecord(rec) - - assert.Nil(t, err) - for name, val := range values { - if name == "RequestId" { - // Ensure there are request ids in record. - assert.Greater(t, len(val.Val), 0) - } else { - assert.Equal(t, len(val.Val), len(expectedColumns[name].Val)) - for idx, featureVal := range val.Val { - assert.Equal(t, featureVal.Val, expectedColumns[name].Val[idx].Val) - } - } - } - } - - err = test.CleanUpFile(logPath) - assert.Nil(t, err) - -} diff --git a/go/internal/feast/server/logging/logger.go b/go/internal/feast/server/logging/logger.go new file mode 100644 index 00000000000..346bfdbf619 --- /dev/null +++ b/go/internal/feast/server/logging/logger.go @@ -0,0 +1,276 @@ +package logging + +import ( + "fmt" + "log" + "math/rand" + "sync" + "time" + + "github.com/apache/arrow/go/v8/arrow" + "github.com/pkg/errors" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/feast-dev/feast/go/protos/feast/serving" + "github.com/feast-dev/feast/go/protos/feast/types" +) + +type Log struct { + // Example: val{int64_val: 5017}, val{int64_val: 1003} + EntityValue []*types.Value + RequestData []*types.Value + + FeatureValues []*types.Value + FeatureStatuses []serving.FieldStatus + EventTimestamps []*timestamppb.Timestamp + + RequestId string + LogTimestamp time.Time +} + +type LogSink interface { + // Write is used to unload logs from memory buffer. + // Logs are not guaranteed to be flushed to sink on this point. + // The data can just be written to local disk (depending on implementation). + Write(data arrow.Record) error + + // Flush actually send data to a sink. + // We want to control amount to interaction with sink, since it could be a costly operation. + // Also, some sinks like BigQuery might have quotes and physically limit amount of write requests per day. + Flush() error +} + +type Logger interface { + Log(joinKeyToEntityValues map[string][]*types.Value, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error +} + +type LoggerImpl struct { + featureServiceName string + + buffer *MemoryBuffer + schema *FeatureServiceSchema + + logCh chan *Log + signalCh chan interface{} + + sink LogSink + config LoggerConfig + + isStopped bool + cond *sync.Cond +} + +type LoggerConfig struct { + LoggingOptions + + SampleRate float32 +} + +func NewLoggerConfig(sampleRate float32, opts LoggingOptions) LoggerConfig { + return LoggerConfig{ + LoggingOptions: opts, + SampleRate: sampleRate, + } +} + +func NewLogger(schema *FeatureServiceSchema, featureServiceName string, sink LogSink, config LoggerConfig) (*LoggerImpl, error) { + logger := &LoggerImpl{ + featureServiceName: featureServiceName, + + logCh: make(chan *Log, config.ChannelCapacity), + signalCh: make(chan interface{}, 2), + sink: sink, + + buffer: &MemoryBuffer{ + logs: make([]*Log, 0), + schema: schema, + }, + schema: schema, + config: config, + + isStopped: false, + cond: sync.NewCond(&sync.Mutex{}), + } + + logger.startLoggerLoop() + return logger, nil +} + +func (l *LoggerImpl) EmitLog(log *Log) error { + select { + case l.logCh <- log: + return nil + case <-time.After(l.config.EmitTimeout): + return fmt.Errorf("could not add to log channel with capacity %d. Operation timed out. Current log channel length is %d", cap(l.logCh), len(l.logCh)) + } +} + +func (l *LoggerImpl) startLoggerLoop() { + go func() { + for { + if err := l.loggerLoop(); err != nil { + log.Printf("LoggerImpl[%s] recovered from panic: %+v", l.featureServiceName, err) + + // Sleep for a couple of milliseconds to avoid CPU load from a potential infinite panic-recovery loop + time.Sleep(5 * time.Millisecond) + continue // try again + } + + // graceful stop + return + } + }() +} + +// Select that either ingests new logs that are added to the logging channel, one at a time to add +// to the in-memory buffer or flushes all of them synchronously to the OfflineStorage on a time interval. +func (l *LoggerImpl) loggerLoop() (lErr error) { + defer func() { + // Recover from panic in the logger loop, so that it doesn't bring down the entire feature server + if r := recover(); r != nil { + rErr, ok := r.(error) + if !ok { + rErr = fmt.Errorf("%v", r) + } + lErr = errors.WithStack(rErr) + } + }() + for { + shouldStop := false + + select { + case <-l.signalCh: + err := l.buffer.writeBatch(l.sink) + if err != nil { + log.Printf("Log write failed: %+v", err) + } + err = l.sink.Flush() + if err != nil { + log.Printf("Log flush failed: %+v", err) + } + shouldStop = true + case <-time.After(l.config.WriteInterval): + err := l.buffer.writeBatch(l.sink) + if err != nil { + log.Printf("Log write failed: %+v", err) + } + case <-time.After(l.config.FlushInterval): + err := l.sink.Flush() + if err != nil { + log.Printf("Log flush failed: %+v", err) + } + case logItem := <-l.logCh: + err := l.buffer.Append(logItem) + if err != nil { + log.Printf("Append log failed: %+v", err) + } + } + + if shouldStop { + break + } + } + + // Notify all waiters for graceful stop + l.cond.L.Lock() + l.isStopped = true + l.cond.Broadcast() + l.cond.L.Unlock() + return nil +} + +// Stop the loop goroutine gracefully +func (l *LoggerImpl) Stop() { + select { + case l.signalCh <- nil: + default: + } +} + +func (l *LoggerImpl) WaitUntilStopped() { + l.cond.L.Lock() + defer l.cond.L.Unlock() + for !l.isStopped { + l.cond.Wait() + } +} + +func getFullFeatureName(featureViewName string, featureName string) string { + return fmt.Sprintf("%s__%s", featureViewName, featureName) +} + +func (l *LoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error { + if len(featureVectors) == 0 { + return nil + } + + if rand.Float32() > l.config.SampleRate { + return nil + } + + numFeatures := len(l.schema.Features) + // Should be equivalent to how many entities there are(each feature row has (entity) number of features) + numRows := len(featureVectors[0].Values) + + featureNameToVectorIdx := make(map[string]int) + for idx, name := range featureNames { + featureNameToVectorIdx[name] = idx + } + + for rowIdx := 0; rowIdx < numRows; rowIdx++ { + featureValues := make([]*types.Value, numFeatures) + featureStatuses := make([]serving.FieldStatus, numFeatures) + eventTimestamps := make([]*timestamppb.Timestamp, numFeatures) + + for idx, featureName := range l.schema.Features { + featureIdx, ok := featureNameToVectorIdx[featureName] + if !ok { + return errors.Errorf("Missing feature %s in log data", featureName) + } + featureValues[idx] = featureVectors[featureIdx].Values[rowIdx] + featureStatuses[idx] = featureVectors[featureIdx].Statuses[rowIdx] + eventTimestamps[idx] = featureVectors[featureIdx].EventTimestamps[rowIdx] + } + + entityValues := make([]*types.Value, len(l.schema.JoinKeys)) + for idx, joinKey := range l.schema.JoinKeys { + rows, ok := joinKeyToEntityValues[joinKey] + if !ok { + return errors.Errorf("Missing join key %s in log data", joinKey) + } + entityValues[idx] = rows[rowIdx] + } + + requestDataValues := make([]*types.Value, len(l.schema.RequestData)) + for idx, requestParam := range l.schema.RequestData { + rows, ok := requestData[requestParam] + if !ok { + return errors.Errorf("Missing request parameter %s in log data", requestParam) + } + requestDataValues[idx] = rows.Val[rowIdx] + } + + newLog := Log{ + EntityValue: entityValues, + RequestData: requestDataValues, + + FeatureValues: featureValues, + FeatureStatuses: featureStatuses, + EventTimestamps: eventTimestamps, + + RequestId: requestId, + LogTimestamp: time.Now(), + } + err := l.EmitLog(&newLog) + if err != nil { + return err + } + } + return nil +} + +type DummyLoggerImpl struct{} + +func (l *DummyLoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error { + return nil +} diff --git a/go/internal/feast/server/logging/logger_test.go b/go/internal/feast/server/logging/logger_test.go new file mode 100644 index 00000000000..0c8e33ef6fe --- /dev/null +++ b/go/internal/feast/server/logging/logger_test.go @@ -0,0 +1,137 @@ +package logging + +import ( + "context" + "io/ioutil" + "path/filepath" + "testing" + "time" + + "github.com/apache/arrow/go/v8/arrow" + "github.com/apache/arrow/go/v8/arrow/array" + "github.com/apache/arrow/go/v8/arrow/memory" + "github.com/apache/arrow/go/v8/parquet/file" + "github.com/apache/arrow/go/v8/parquet/pqarrow" + "github.com/stretchr/testify/require" + + "github.com/feast-dev/feast/go/protos/feast/types" + + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/feast-dev/feast/go/protos/feast/serving" +) + +type DummySink struct{} + +func (s *DummySink) Write(rec arrow.Record) error { + return nil +} + +func (s *DummySink) Flush() error { + return nil +} + +func TestLoggingChannelTimeout(t *testing.T) { + config := LoggerConfig{ + SampleRate: 1.0, + LoggingOptions: LoggingOptions{ + ChannelCapacity: 1, + EmitTimeout: DefaultOptions.EmitTimeout, + FlushInterval: DefaultOptions.FlushInterval, + WriteInterval: DefaultOptions.WriteInterval, + }, + } + logger, err := NewLogger(nil, "testFS", &DummySink{}, config) + + // stop log processing to check buffering channel + logger.Stop() + logger.WaitUntilStopped() + + assert.Nil(t, err) + assert.Empty(t, logger.buffer.logs) + ts := timestamppb.New(time.Now()) + newLog := Log{ + FeatureStatuses: []serving.FieldStatus{serving.FieldStatus_PRESENT}, + EventTimestamps: []*timestamppb.Timestamp{ts, ts}, + } + err = logger.EmitLog(&newLog) + assert.Nil(t, err) + + newLog2 := Log{ + FeatureStatuses: []serving.FieldStatus{serving.FieldStatus_PRESENT}, + EventTimestamps: []*timestamppb.Timestamp{ts, ts}, + } + err = logger.EmitLog(&newLog2) + // The channel times out and doesn't hang. + assert.NotNil(t, err) +} + +func TestLogAndFlushToFile(t *testing.T) { + sink, err := NewFileLogSink(t.TempDir()) + assert.Nil(t, err) + + schema := &FeatureServiceSchema{ + JoinKeys: []string{"driver_id"}, + Features: []string{"view__feature"}, + JoinKeysTypes: map[string]types.ValueType_Enum{"driver_id": types.ValueType_INT32}, + FeaturesTypes: map[string]types.ValueType_Enum{"view__feature": types.ValueType_DOUBLE}, + } + config := LoggerConfig{ + SampleRate: 1.0, + LoggingOptions: LoggingOptions{ + ChannelCapacity: DefaultOptions.ChannelCapacity, + EmitTimeout: DefaultOptions.EmitTimeout, + FlushInterval: DefaultOptions.FlushInterval, + WriteInterval: 10 * time.Millisecond, + }, + } + logger, err := NewLogger(schema, "testFS", sink, config) + assert.Nil(t, err) + + assert.Nil(t, logger.Log( + map[string][]*types.Value{"driver_id": {{Val: &types.Value_Int32Val{Int32Val: 111}}}}, + []*serving.GetOnlineFeaturesResponse_FeatureVector{ + { + Values: []*types.Value{{Val: &types.Value_DoubleVal{DoubleVal: 2.0}}}, + Statuses: []serving.FieldStatus{serving.FieldStatus_PRESENT}, + EventTimestamps: []*timestamppb.Timestamp{timestamppb.Now()}, + }, + }, + []string{"view__feature"}, + map[string]*types.RepeatedValue{}, + "req-id", + )) + + require.Eventually(t, func() bool { + files, _ := ioutil.ReadDir(sink.path) + return len(files) > 0 + }, 60*time.Second, 100*time.Millisecond) + + files, _ := ioutil.ReadDir(sink.path) + + pf, err := file.OpenParquetFile(filepath.Join(sink.path, files[0].Name()), false) + assert.Nil(t, err) + + reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) + assert.Nil(t, err) + + tbl, err := reader.ReadTable(context.Background()) + assert.Nil(t, err) + tr := array.NewTableReader(tbl, -1) + defer tbl.Release() + + fieldNameToIdx := make(map[string]int) + for idx, field := range tbl.Schema().Fields() { + fieldNameToIdx[field.Name] = idx + } + + tr.Next() + rec := tr.Record() + + assert.Equal(t, "req-id", rec.Column(fieldNameToIdx[LOG_REQUEST_ID_FIELD]).(*array.String).Value(0)) + assert.EqualValues(t, 111, rec.Column(fieldNameToIdx["driver_id"]).(*array.Int32).Value(0)) + assert.EqualValues(t, 2.0, rec.Column(fieldNameToIdx["view__feature"]).(*array.Float64).Value(0)) + assert.EqualValues(t, serving.FieldStatus_PRESENT, rec.Column(fieldNameToIdx["view__feature__status"]).(*array.Int32).Value(0)) + +} diff --git a/go/internal/feast/server/logging/logging.go b/go/internal/feast/server/logging/logging.go deleted file mode 100644 index 010644709ae..00000000000 --- a/go/internal/feast/server/logging/logging.go +++ /dev/null @@ -1,392 +0,0 @@ -package logging - -import ( - "errors" - "fmt" - "log" - "time" - - "github.com/apache/arrow/go/v8/arrow" - "github.com/apache/arrow/go/v8/arrow/array" - "github.com/apache/arrow/go/v8/arrow/memory" - "github.com/feast-dev/feast/go/internal/feast" - "github.com/feast-dev/feast/go/internal/feast/model" - "github.com/feast-dev/feast/go/protos/feast/serving" - "github.com/feast-dev/feast/go/protos/feast/types" - gotypes "github.com/feast-dev/feast/go/types" - "google.golang.org/protobuf/types/known/timestamppb" -) - -const DEFAULT_LOG_FLUSH_INTERVAL = 100 * time.Millisecond -const DEFAULT_LOG_INSERT_TIMEOUT = 20 * time.Millisecond - -type Log struct { - // Example: val{int64_val: 5017}, val{int64_val: 1003} - EntityValue []*types.Value - - FeatureValues []*types.Value - FeatureStatuses []serving.FieldStatus - EventTimestamps []*timestamppb.Timestamp - RequestContext map[string]*types.Value - RequestId string -} - -type MemoryBuffer struct { - featureService *model.FeatureService - logs []*Log -} - -type LoggingService struct { - memoryBuffer *MemoryBuffer - logChannel chan *Log - fs *feast.FeatureStore - offlineLogStorage OfflineLogStorage - logInsertTTl time.Duration - logFlushInterval time.Duration -} - -func NewLoggingService(fs *feast.FeatureStore, logChannelCapacity int, featureServiceName string, enableLogProcessing bool) (*LoggingService, error) { - var featureService *model.FeatureService = nil - var err error - if fs != nil { - featureService, err = fs.GetFeatureService(featureServiceName) - if err != nil { - return nil, err - } - - } - - loggingService := &LoggingService{ - logChannel: make(chan *Log, logChannelCapacity), - memoryBuffer: &MemoryBuffer{ - logs: make([]*Log, 0), - featureService: featureService, - }, - fs: fs, - logInsertTTl: DEFAULT_LOG_INSERT_TIMEOUT, - logFlushInterval: DEFAULT_LOG_FLUSH_INTERVAL, - } - - if fs != nil { - offlineLogStorage, err := NewOfflineStore(fs.GetRepoConfig()) - loggingService.offlineLogStorage = offlineLogStorage - if err != nil { - return nil, err - } - } - - // Start goroutine to process logs - if enableLogProcessing { - go loggingService.processLogs() - - } - return loggingService, nil -} - -func (s *LoggingService) EmitLog(l *Log) error { - select { - case s.logChannel <- l: - return nil - case <-time.After(s.logInsertTTl): - return fmt.Errorf("could not add to log channel with capacity %d. Operation timed out. Current log channel length is %d", cap(s.logChannel), len(s.logChannel)) - } -} - -func (s *LoggingService) processLogs() { - // start a periodic flush - // TODO(kevjumba): set param so users can configure flushing duration - ticker := time.NewTicker(s.logFlushInterval) - defer ticker.Stop() - - for { - s.PerformPeriodicAppendToMemoryBufferAndLogFlush(ticker) - } -} - -// Select that either ingests new logs that are added to the logging channel, one at a time to add -// to the in-memory buffer or flushes all of them synchronously to the OfflineStorage on a time interval. -func (s *LoggingService) PerformPeriodicAppendToMemoryBufferAndLogFlush(t *time.Ticker) { - select { - case t := <-t.C: - s.flushLogsToOfflineStorage(t) - case new_log := <-s.logChannel: - log.Printf("Adding %s to memory.\n", new_log.FeatureValues) - s.memoryBuffer.logs = append(s.memoryBuffer.logs, new_log) - } -} - -// Acquires the logging schema from the feature service, converts the memory buffer array of rows of logs and flushes -// them to the offline storage. -func (s *LoggingService) flushLogsToOfflineStorage(t time.Time) error { - offlineStoreType, ok := getOfflineStoreType(s.fs.GetRepoConfig().OfflineStore) - if !ok { - return fmt.Errorf("could not get offline storage type for config: %s", s.fs.GetRepoConfig().OfflineStore) - } - if offlineStoreType == "file" { - entityMap, featureViews, odfvs, err := s.GetFcos() - if err != nil { - return err - } - schema, err := GetSchemaFromFeatureService(s.memoryBuffer.featureService, entityMap, featureViews, odfvs) - if err != nil { - return err - } - table, err := ConvertMemoryBufferToArrowTable(s.memoryBuffer, schema) - if err != nil { - return err - } - s.offlineLogStorage.FlushToStorage(table) - if err != nil { - return err - } - s.memoryBuffer.logs = s.memoryBuffer.logs[:0] - } else { - // Currently don't support any other offline flushing. - return errors.New("currently only file type is supported for offline log storage") - } - return nil -} - -// Takes memory buffer of logs in array row and converts them to columnar with generated fcoschema generated by GetFcoSchema -// and writes them to arrow table. -// Returns arrow table that contains all of the logs in columnar format. -func ConvertMemoryBufferToArrowTable(memoryBuffer *MemoryBuffer, fcoSchema *Schema) (array.Table, error) { - arrowMemory := memory.NewGoAllocator() - - columnNameToProtoValueArray := make(map[string][]*types.Value) - columnNameToStatus := make(map[string][]int32) - columnNameToTimestamp := make(map[string][]int64) - entityNameToEntityValues := make(map[string][]*types.Value) - - strBuilder := array.NewStringBuilder(arrowMemory) - - for _, l := range memoryBuffer.logs { - // EntityTypes maps an entity name to the specific type and also which index in the entityValues array it is - // e.g if an Entity Key is {driver_id, customer_id}, then the driver_id entitytype would be dtype=int64, index=0. - // It's in the order of the entities as given by the schema. - for idx, joinKey := range fcoSchema.Entities { - if _, ok := entityNameToEntityValues[joinKey]; !ok { - entityNameToEntityValues[joinKey] = make([]*types.Value, 0) - } - entityNameToEntityValues[joinKey] = append(entityNameToEntityValues[joinKey], l.EntityValue[idx]) - } - - // Contains both fv and odfv feature value types => they are processed in order of how the appear in the featureService - for idx, featureName := range fcoSchema.Features { - // populate the proto value arrays with values from memory buffer in separate columns one for each feature name - if _, ok := columnNameToProtoValueArray[featureName]; !ok { - columnNameToProtoValueArray[featureName] = make([]*types.Value, 0) - columnNameToStatus[featureName] = make([]int32, 0) - columnNameToTimestamp[featureName] = make([]int64, 0) - } - columnNameToProtoValueArray[featureName] = append(columnNameToProtoValueArray[featureName], l.FeatureValues[idx]) - columnNameToStatus[featureName] = append(columnNameToStatus[featureName], int32(l.FeatureStatuses[idx])) - columnNameToTimestamp[featureName] = append(columnNameToTimestamp[featureName], l.EventTimestamps[idx].AsTime().UnixNano()/int64(time.Millisecond)) - } - strBuilder.Append(l.RequestId) - } - - fields := make([]arrow.Field, 0) - columns := make([]array.Interface, 0) - for _, entityName := range fcoSchema.Entities { - protoArr := entityNameToEntityValues[entityName] - if len(protoArr) == 0 { - break - } - valArrowArray, err := gotypes.ProtoValuesToArrowArray(protoArr, arrowMemory, len(columnNameToProtoValueArray)) - if err != nil { - return nil, err - } - arrowType, err := gotypes.ValueTypeEnumToArrowType(fcoSchema.EntityTypes[entityName]) - if err != nil { - return nil, err - } - fields = append(fields, arrow.Field{ - Name: entityName, - Type: arrowType, - }) - columns = append(columns, valArrowArray) - } - - for _, featureName := range fcoSchema.Features { - - protoArr := columnNameToProtoValueArray[featureName] - if len(protoArr) == 0 { - break - } - arrowArray, err := gotypes.ProtoValuesToArrowArray(protoArr, arrowMemory, len(columnNameToProtoValueArray)) - if err != nil { - return nil, err - } - - arrowType, err := gotypes.ValueTypeEnumToArrowType(fcoSchema.FeaturesTypes[featureName]) - - if err != nil { - return nil, err - } - fields = append(fields, arrow.Field{ - Name: featureName, - Type: arrowType, - }) - columns = append(columns, arrowArray) - } - fields = append(fields, arrow.Field{ - Name: "RequestId", - Type: &arrow.StringType{}, - }) - - columns = append(columns, strBuilder.NewArray()) - schema := arrow.NewSchema( - fields, - nil, - ) - - result := array.Record(array.NewRecord(schema, columns, int64(len(memoryBuffer.logs)))) - - tbl := array.NewTableFromRecords(schema, []array.Record{result}) - return array.Table(tbl), nil -} - -type Schema struct { - Entities []string - Features []string - EntityTypes map[string]types.ValueType_Enum - FeaturesTypes map[string]types.ValueType_Enum -} - -func GetSchemaFromFeatureService(featureService *model.FeatureService, entityMap map[string]*model.Entity, featureViews []*model.FeatureView, onDemandFeatureViews []*model.OnDemandFeatureView) (*Schema, error) { - fvs := make(map[string]*model.FeatureView) - odFvs := make(map[string]*model.OnDemandFeatureView) - - joinKeys := make([]string, 0) - // All joinkeys in the featureService are put in here - joinKeysSet := make(map[string]interface{}) - entityJoinKeyToType := make(map[string]types.ValueType_Enum) - var entities []string - for _, featureView := range featureViews { - fvs[featureView.Base.Name] = featureView - entities = featureView.Entities - } - - for _, onDemandFeatureView := range onDemandFeatureViews { - odFvs[onDemandFeatureView.Base.Name] = onDemandFeatureView - } - - allFeatureTypes := make(map[string]types.ValueType_Enum) - features := make([]string, 0) - for _, featureProjection := range featureService.Projections { - // Create copies of FeatureView that may contains the same *FeatureView but - // each differentiated by a *FeatureViewProjection - featureViewName := featureProjection.Name - if fv, ok := fvs[featureViewName]; ok { - for _, f := range featureProjection.Features { - features = append(features, GetFullFeatureName(featureViewName, f.Name)) - allFeatureTypes[GetFullFeatureName(featureViewName, f.Name)] = f.Dtype - } - for _, entityName := range fv.Entities { - entity := entityMap[entityName] - if joinKeyAlias, ok := featureProjection.JoinKeyMap[entity.JoinKey]; ok { - joinKeysSet[joinKeyAlias] = nil - } else { - joinKeysSet[entity.JoinKey] = nil - } - } - } else if _, ok := odFvs[featureViewName]; ok { - for _, f := range featureProjection.Features { - // TODO(kevjumba) check in test here. - features = append(features, GetFullFeatureName(featureViewName, f.Name)) - allFeatureTypes[GetFullFeatureName(featureViewName, f.Name)] = f.Dtype - } - } else { - return nil, fmt.Errorf("no such feature view found in feature service %s", featureViewName) - } - } - - // Only get entities in the current feature service. - for _, entity := range entities { - if _, ok := joinKeysSet[entity]; ok { - joinKeys = append(joinKeys, entityMap[entity].JoinKey) - entityJoinKeyToType[entityMap[entity].JoinKey] = entityMap[entity].ValueType - } - } - - schema := &Schema{ - Entities: joinKeys, - Features: features, - EntityTypes: entityJoinKeyToType, - FeaturesTypes: allFeatureTypes, - } - return schema, nil -} - -func GetFullFeatureName(featureViewName string, featureName string) string { - return fmt.Sprintf("%s__%s", featureViewName, featureName) -} - -func (s *LoggingService) GetFcos() (map[string]*model.Entity, []*model.FeatureView, []*model.OnDemandFeatureView, error) { - odfvs, err := s.fs.ListOnDemandFeatureViews() - if err != nil { - return nil, nil, nil, err - } - fvs, err := s.fs.ListFeatureViews() - if err != nil { - return nil, nil, nil, err - } - entities, err := s.fs.ListEntities(true) - if err != nil { - return nil, nil, nil, err - } - entityMap := make(map[string]*model.Entity) - for _, entity := range entities { - entityMap[entity.Name] = entity - } - return entityMap, fvs, odfvs, nil -} - -func (l *LoggingService) GenerateLogs(featureService *model.FeatureService, joinKeyToEntityValues map[string][]*types.Value, features []*serving.GetOnlineFeaturesResponse_FeatureVector, requestData map[string]*types.RepeatedValue, requestId string) error { - if len(features) <= 0 { - return nil - } - - entitySet, featureViews, odfvs, err := l.GetFcos() - if err != nil { - return err - } - schema, err := GetSchemaFromFeatureService(featureService, entitySet, featureViews, odfvs) - - if err != nil { - return err - } - - numFeatures := len(schema.Features) - // Should be equivalent to how many entities there are(each feature row has (entity) number of features) - numRows := len(features[0].Values) - - for row_idx := 0; row_idx < numRows; row_idx++ { - featureValueLogRow := make([]*types.Value, numFeatures) - featureStatusLogRow := make([]serving.FieldStatus, numFeatures) - eventTimestampLogRow := make([]*timestamppb.Timestamp, numFeatures) - for idx := 0; idx < len(features); idx++ { - featureValueLogRow[idx] = features[idx].Values[row_idx] - featureStatusLogRow[idx] = features[idx].Statuses[row_idx] - eventTimestampLogRow[idx] = features[idx].EventTimestamps[row_idx] - } - valuesPerEntityRow := make([]*types.Value, 0) - // ensure that the entity values are in the order that the schema defines which is the order that ListEntities returns the entities - for _, joinKey := range schema.Entities { - valuesPerEntityRow = append(valuesPerEntityRow, joinKeyToEntityValues[joinKey][row_idx]) - } - newLog := Log{ - EntityValue: valuesPerEntityRow, - FeatureValues: featureValueLogRow, - FeatureStatuses: featureStatusLogRow, - EventTimestamps: eventTimestampLogRow, - RequestId: requestId, - } - err := l.EmitLog(&newLog) - if err != nil { - return err - } - } - return nil -} diff --git a/go/internal/feast/server/logging/logging_test.go b/go/internal/feast/server/logging/logging_test.go deleted file mode 100644 index 68da0bf498f..00000000000 --- a/go/internal/feast/server/logging/logging_test.go +++ /dev/null @@ -1,402 +0,0 @@ -package logging - -import ( - "math/rand" - "reflect" - "testing" - "time" - - "github.com/apache/arrow/go/v8/arrow" - "github.com/apache/arrow/go/v8/arrow/array" - "github.com/feast-dev/feast/go/internal/feast/model" - "github.com/feast-dev/feast/go/internal/test" - "github.com/feast-dev/feast/go/protos/feast/serving" - "github.com/feast-dev/feast/go/protos/feast/types" - gotypes "github.com/feast-dev/feast/go/types" - "github.com/stretchr/testify/assert" - "google.golang.org/protobuf/types/known/timestamppb" -) - -func TestLoggingChannelTimeout(t *testing.T) { - // Pregenerated using `feast init`. - loggingService, err := NewLoggingService(nil, 1, "", false) - assert.Nil(t, err) - assert.Empty(t, loggingService.memoryBuffer.logs) - ts := timestamppb.New(time.Now()) - newLog := Log{ - FeatureStatuses: []serving.FieldStatus{serving.FieldStatus_PRESENT}, - EventTimestamps: []*timestamppb.Timestamp{ts, ts}, - } - loggingService.EmitLog(&newLog) - newTs := timestamppb.New(time.Now()) - - newLog2 := Log{ - FeatureStatuses: []serving.FieldStatus{serving.FieldStatus_PRESENT}, - EventTimestamps: []*timestamppb.Timestamp{newTs, newTs}, - } - err = loggingService.EmitLog(&newLog2) - // The channel times out and doesn't hang. - assert.NotNil(t, err) -} - -func TestSchemaTypeRetrieval(t *testing.T) { - featureService, entities, featureViews, odfvs := InitializeFeatureRepoVariablesForTest() - entityMap := make(map[string]*model.Entity) - expectedEntityNames := make([]string, 0) - expectedFeatureNames := make([]string, 0) - for _, entity := range entities { - entityMap[entity.Name] = entity - expectedEntityNames = append(expectedEntityNames, entity.Name) - } - for _, featureView := range featureViews { - for _, f := range featureView.Base.Features { - expectedFeatureNames = append(expectedFeatureNames, GetFullFeatureName(featureView.Base.Name, f.Name)) - } - } - for _, featureView := range odfvs { - for _, f := range featureView.Base.Features { - expectedFeatureNames = append(expectedFeatureNames, GetFullFeatureName(featureView.Base.Name, f.Name)) - } - } - - schema, err := GetSchemaFromFeatureService(featureService, entityMap, featureViews, odfvs) - assert.Nil(t, err) - - assert.Equal(t, expectedFeatureNames, schema.Features) - assert.Equal(t, expectedEntityNames, schema.Entities) - for _, entityName := range expectedEntityNames { - assert.Contains(t, schema.EntityTypes, entityName) - } - assert.True(t, reflect.DeepEqual(schema.EntityTypes["driver_id"], types.ValueType_INT64)) - - types := []types.ValueType_Enum{*types.ValueType_INT64.Enum(), *types.ValueType_FLOAT.Enum(), *types.ValueType_INT32.Enum(), *types.ValueType_DOUBLE.Enum(), *types.ValueType_INT32.Enum(), *types.ValueType_DOUBLE.Enum()} - for idx, featureName := range expectedFeatureNames { - assert.Contains(t, schema.FeaturesTypes, featureName) - assert.Equal(t, schema.FeaturesTypes[featureName], types[idx]) - } -} - -func TestSchemaRetrievalIgnoresEntitiesNotInFeatureService(t *testing.T) { - featureService, entities, featureViews, odfvs := InitializeFeatureRepoVariablesForTest() - //Remove entities in featureservice - for _, featureView := range featureViews { - featureView.Entities = []string{} - } - entityMap := make(map[string]*model.Entity) - for _, entity := range entities { - entityMap[entity.Name] = entity - } - schema, err := GetSchemaFromFeatureService(featureService, entityMap, featureViews, odfvs) - assert.Nil(t, err) - assert.Empty(t, schema.EntityTypes) -} - -func TestSchemaUsesOrderInFeatureService(t *testing.T) { - featureService, entities, featureViews, odfvs := InitializeFeatureRepoVariablesForTest() - expectedEntityNames := make([]string, 0) - expectedFeatureNames := make([]string, 0) - entityMap := make(map[string]*model.Entity) - for _, entity := range entities { - entityMap[entity.Name] = entity - } - for _, entity := range entities { - entityMap[entity.Name] = entity - expectedEntityNames = append(expectedEntityNames, entity.Name) - } - // Source of truth for order of featureNames - for _, featureView := range featureViews { - for _, f := range featureView.Base.Features { - expectedFeatureNames = append(expectedFeatureNames, GetFullFeatureName(featureView.Base.Name, f.Name)) - } - } - for _, featureView := range odfvs { - for _, f := range featureView.Base.Features { - expectedFeatureNames = append(expectedFeatureNames, GetFullFeatureName(featureView.Base.Name, f.Name)) - } - } - - rand.Seed(time.Now().UnixNano()) - // Shuffle the featureNames in incorrect order - for _, featureView := range featureViews { - rand.Shuffle(len(featureView.Base.Features), func(i, j int) { - featureView.Base.Features[i], featureView.Base.Features[j] = featureView.Base.Features[j], featureView.Base.Features[i] - }) - } - for _, featureView := range odfvs { - rand.Shuffle(len(featureView.Base.Features), func(i, j int) { - featureView.Base.Features[i], featureView.Base.Features[j] = featureView.Base.Features[j], featureView.Base.Features[i] - }) - } - - schema, err := GetSchemaFromFeatureService(featureService, entityMap, featureViews, odfvs) - assert.Nil(t, err) - - // Ensure the same results - assert.Equal(t, expectedFeatureNames, schema.Features) - assert.Equal(t, expectedEntityNames, schema.Entities) - for _, entityName := range expectedEntityNames { - assert.Contains(t, schema.EntityTypes, entityName) - } - assert.True(t, reflect.DeepEqual(schema.EntityTypes["driver_id"], types.ValueType_INT64)) - - types := []types.ValueType_Enum{*types.ValueType_INT64.Enum(), *types.ValueType_FLOAT.Enum(), *types.ValueType_INT32.Enum(), *types.ValueType_DOUBLE.Enum(), *types.ValueType_INT32.Enum(), *types.ValueType_DOUBLE.Enum()} - for idx, featureName := range expectedFeatureNames { - assert.Contains(t, schema.FeaturesTypes, featureName) - assert.Equal(t, schema.FeaturesTypes[featureName], types[idx]) - } -} - -func TestSerializeToArrowTable(t *testing.T) { - table, expectedSchema, expectedColumns, err := GetTestArrowTableAndExpectedResults() - assert.Nil(t, err) - defer table.Release() - tr := array.NewTableReader(table, -1) - - defer tr.Release() - for tr.Next() { - rec := tr.Record() - assert.NotNil(t, rec) - for _, field := range rec.Schema().Fields() { - assert.Contains(t, expectedSchema, field.Name) - assert.Equal(t, field.Type, expectedSchema[field.Name]) - } - values, err := test.GetProtoFromRecord(rec) - - assert.Nil(t, err) - for name, val := range values { - if name == "RequestId" { - continue - } - assert.Equal(t, len(val.Val), len(expectedColumns[name].Val)) - for idx, featureVal := range val.Val { - assert.Equal(t, featureVal.Val, expectedColumns[name].Val[idx].Val) - } - } - } -} - -// Initialize all dummy featureservice, entities and featureviews/on demand featureviews for testing. -func InitializeFeatureRepoVariablesForTest() (*model.FeatureService, []*model.Entity, []*model.FeatureView, []*model.OnDemandFeatureView) { - f1 := test.CreateNewFeature( - "int64", - types.ValueType_INT64, - ) - f2 := test.CreateNewFeature( - "float32", - types.ValueType_FLOAT, - ) - projection1 := test.CreateNewFeatureViewProjection( - "featureView1", - "", - []*model.Feature{f1, f2}, - map[string]string{}, - ) - baseFeatureView1 := test.CreateBaseFeatureView( - "featureView1", - []*model.Feature{f1, f2}, - projection1, - ) - featureView1 := test.CreateFeatureView(baseFeatureView1, nil, []string{"driver_id"}) - entity1 := test.CreateNewEntity("driver_id", types.ValueType_INT64, "driver_id") - f3 := test.CreateNewFeature( - "int32", - types.ValueType_INT32, - ) - f4 := test.CreateNewFeature( - "double", - types.ValueType_DOUBLE, - ) - projection2 := test.CreateNewFeatureViewProjection( - "featureView2", - "", - []*model.Feature{f3, f4}, - map[string]string{}, - ) - baseFeatureView2 := test.CreateBaseFeatureView( - "featureView2", - []*model.Feature{f3, f4}, - projection2, - ) - featureView2 := test.CreateFeatureView(baseFeatureView2, nil, []string{"driver_id"}) - - f5 := test.CreateNewFeature( - "odfv_f1", - types.ValueType_INT32, - ) - f6 := test.CreateNewFeature( - "odfv_f2", - types.ValueType_DOUBLE, - ) - projection3 := test.CreateNewFeatureViewProjection( - "od_bf1", - "", - []*model.Feature{f5, f6}, - map[string]string{}, - ) - od_bf1 := test.CreateBaseFeatureView( - "od_bf1", - []*model.Feature{f5, f6}, - projection3, - ) - odfv := model.NewOnDemandFeatureViewFromBase(od_bf1) - featureService := test.CreateNewFeatureService( - "test_service", - "test_project", - nil, - nil, - []*model.FeatureViewProjection{projection1, projection2, projection3}, - ) - return featureService, []*model.Entity{entity1}, []*model.FeatureView{featureView1, featureView2}, []*model.OnDemandFeatureView{odfv} -} - -// Create dummy FeatureService, Entities, and FeatureViews add them to the logger and convert the logs to Arrow table. -// Returns arrow table, expected test schema, and expected columns. -func GetTestArrowTableAndExpectedResults() (array.Table, map[string]arrow.DataType, map[string]*types.RepeatedValue, error) { - featureService, entities, featureViews, odfvs := InitializeFeatureRepoVariablesForTest() - entityMap := make(map[string]*model.Entity) - for _, entity := range entities { - entityMap[entity.Name] = entity - } - schema, err := GetSchemaFromFeatureService(featureService, entityMap, featureViews, odfvs) - if err != nil { - return nil, nil, nil, err - } - - ts := timestamppb.New(time.Now()) - log1 := Log{ - EntityValue: []*types.Value{ - {Val: &types.Value_Int64Val{Int64Val: 1001}}, - }, - FeatureValues: []*types.Value{ - /* normal feature values */ - {Val: &types.Value_Int64Val{Int64Val: rand.Int63()}}, - {Val: &types.Value_FloatVal{FloatVal: rand.Float32()}}, - {Val: &types.Value_Int32Val{Int32Val: rand.Int31()}}, - {Val: &types.Value_DoubleVal{DoubleVal: rand.Float64()}}, - /* odfv values */ - {Val: &types.Value_Int32Val{Int32Val: rand.Int31()}}, - {Val: &types.Value_DoubleVal{DoubleVal: rand.Float64()}}, - }, - FeatureStatuses: []serving.FieldStatus{ - serving.FieldStatus_PRESENT, - serving.FieldStatus_PRESENT, - serving.FieldStatus_PRESENT, - serving.FieldStatus_PRESENT, - serving.FieldStatus_PRESENT, - serving.FieldStatus_PRESENT, - }, - EventTimestamps: []*timestamppb.Timestamp{ - ts, ts, ts, ts, ts, ts, - }, - } - log2 := Log{ - EntityValue: []*types.Value{ - {Val: &types.Value_Int64Val{Int64Val: 1003}}, - }, - FeatureValues: []*types.Value{ - /* normal feature values */ - {Val: &types.Value_Int64Val{Int64Val: rand.Int63()}}, - {Val: &types.Value_FloatVal{FloatVal: rand.Float32()}}, - {Val: &types.Value_Int32Val{Int32Val: rand.Int31()}}, - {Val: &types.Value_DoubleVal{DoubleVal: rand.Float64()}}, - /* odfv values */ - {Val: &types.Value_Int32Val{Int32Val: rand.Int31()}}, - {Val: &types.Value_DoubleVal{DoubleVal: rand.Float64()}}, - }, - FeatureStatuses: []serving.FieldStatus{ - serving.FieldStatus_PRESENT, - serving.FieldStatus_PRESENT, - serving.FieldStatus_PRESENT, - serving.FieldStatus_PRESENT, - serving.FieldStatus_PRESENT, - serving.FieldStatus_PRESENT, - }, - EventTimestamps: []*timestamppb.Timestamp{ - ts, ts, ts, ts, ts, ts, - }, - } - - expectedSchema := make(map[string]arrow.DataType) - for joinKey, entityType := range schema.EntityTypes { - arrowType, err := gotypes.ValueTypeEnumToArrowType(entityType) - if err != nil { - return nil, nil, nil, err - } - expectedSchema[joinKey] = arrowType - } - expectedSchema["RequestId"] = arrow.BinaryTypes.String - for featureName, featureType := range schema.FeaturesTypes { - arrowType, err := gotypes.ValueTypeEnumToArrowType(featureType) - if err != nil { - return nil, nil, nil, err - } - expectedSchema[featureName] = arrowType - } - - expectedColumns := map[string]*types.RepeatedValue{ - "driver_id": { - Val: []*types.Value{ - log1.EntityValue[0], - log2.EntityValue[0]}, - }, - "featureView1__int64": { - Val: []*types.Value{ - log1.FeatureValues[0], - log2.FeatureValues[0]}, - }, - "featureView1__float32": { - Val: []*types.Value{ - log1.FeatureValues[1], - log2.FeatureValues[1]}, - }, - "featureView2__int32": { - Val: []*types.Value{ - log1.FeatureValues[2], - log2.FeatureValues[2]}, - }, - "featureView2__double": { - Val: []*types.Value{ - log1.FeatureValues[3], - log2.FeatureValues[3]}, - }, - "od_bf1__odfv_f1": { - Val: []*types.Value{ - log1.FeatureValues[4], - log2.FeatureValues[4]}, - }, - "od_bf1__odfv_f2": { - Val: []*types.Value{ - log1.FeatureValues[5], - log2.FeatureValues[5]}, - }, - } - loggingService, err := SetupLoggingServiceWithLogs([]*Log{&log1, &log2}) - if err != nil { - return nil, nil, nil, err - } - - table, err := ConvertMemoryBufferToArrowTable(loggingService.memoryBuffer, schema) - - if err != nil { - return nil, nil, nil, err - } - return table, expectedSchema, expectedColumns, nil -} - -func SetupLoggingServiceWithLogs(logs []*Log) (*LoggingService, error) { - loggingService, err := NewLoggingService(nil, len(logs), "", false) - if err != nil { - return nil, err - } - dummyTicker := time.NewTicker(10 * time.Second) - // stop the ticker so that the logs are not flushed to offline storage - dummyTicker.Stop() - for _, log := range logs { - loggingService.EmitLog(log) - } - // manually handle flushing logs - for i := 0; i < len(logs); i++ { - loggingService.PerformPeriodicAppendToMemoryBufferAndLogFlush(dummyTicker) - } - return loggingService, nil -} diff --git a/go/internal/feast/server/logging/memorybuffer.go b/go/internal/feast/server/logging/memorybuffer.go new file mode 100644 index 00000000000..80a5e03228a --- /dev/null +++ b/go/internal/feast/server/logging/memorybuffer.go @@ -0,0 +1,162 @@ +package logging + +import ( + "fmt" + "time" + + "github.com/apache/arrow/go/v8/arrow" + "github.com/apache/arrow/go/v8/arrow/array" + "github.com/apache/arrow/go/v8/arrow/memory" + + "github.com/feast-dev/feast/go/protos/feast/types" + gotypes "github.com/feast-dev/feast/go/types" +) + +type MemoryBuffer struct { + logs []*Log + schema *FeatureServiceSchema +} + +const ( + LOG_TIMESTAMP_FIELD = "__log_timestamp" + LOG_DATE_FIELD = "__log_date" + LOG_REQUEST_ID_FIELD = "__request_id" +) + +// Acquires the logging schema from the feature service, converts the memory buffer array of rows of logs and flushes +// them to the offline storage. +func (b *MemoryBuffer) writeBatch(sink LogSink) error { + if len(b.logs) == 0 { + return nil + } + + record, err := b.convertToArrowRecord() + + if err != nil { + return err + } + err = sink.Write(record) + if err != nil { + return err + } + + b.logs = b.logs[:0] + return nil +} + +func (b *MemoryBuffer) Append(log *Log) error { + b.logs = append(b.logs, log) + return nil +} + +func (b *MemoryBuffer) getArrowSchema() (*arrow.Schema, error) { + fields := make([]arrow.Field, 0) + + for _, joinKey := range b.schema.JoinKeys { + arrowType, err := gotypes.ValueTypeEnumToArrowType(b.schema.JoinKeysTypes[joinKey]) + if err != nil { + return nil, err + } + + fields = append(fields, arrow.Field{Name: joinKey, Type: arrowType}) + } + + for _, requestParam := range b.schema.RequestData { + arrowType, err := gotypes.ValueTypeEnumToArrowType(b.schema.RequestDataTypes[requestParam]) + if err != nil { + return nil, err + } + + fields = append(fields, arrow.Field{Name: requestParam, Type: arrowType}) + } + + for _, featureName := range b.schema.Features { + arrowType, err := gotypes.ValueTypeEnumToArrowType(b.schema.FeaturesTypes[featureName]) + if err != nil { + return nil, err + } + + fields = append(fields, arrow.Field{Name: featureName, Type: arrowType}) + fields = append(fields, arrow.Field{ + Name: fmt.Sprintf("%s__timestamp", featureName), + Type: arrow.FixedWidthTypes.Timestamp_s}) + fields = append(fields, arrow.Field{ + Name: fmt.Sprintf("%s__status", featureName), + Type: arrow.PrimitiveTypes.Int32}) + } + + fields = append(fields, arrow.Field{Name: LOG_TIMESTAMP_FIELD, Type: arrow.FixedWidthTypes.Timestamp_us}) + fields = append(fields, arrow.Field{Name: LOG_DATE_FIELD, Type: arrow.FixedWidthTypes.Date32}) + fields = append(fields, arrow.Field{Name: LOG_REQUEST_ID_FIELD, Type: arrow.BinaryTypes.String}) + + return arrow.NewSchema(fields, nil), nil +} + +// convertToArrowRecord Takes memory buffer of logs in array row and converts them to columnar with generated fcoschema generated by GetFcoSchema +// and writes them to arrow table. +// Returns arrow table that contains all of the logs in columnar format. +func (b *MemoryBuffer) convertToArrowRecord() (arrow.Record, error) { + arrowMemory := memory.NewGoAllocator() + numRows := len(b.logs) + + arrowSchema, err := b.getArrowSchema() + if err != nil { + return nil, err + } + + columns := make(map[string][]*types.Value) + fieldNameToIdx := make(map[string]int) + for idx, field := range arrowSchema.Fields() { + fieldNameToIdx[field.Name] = idx + } + + builder := array.NewRecordBuilder(arrowMemory, arrowSchema) + defer builder.Release() + + builder.Reserve(numRows) + + for rowIdx, logRow := range b.logs { + for colIdx, joinKey := range b.schema.JoinKeys { + if _, ok := columns[joinKey]; !ok { + columns[joinKey] = make([]*types.Value, numRows) + } + columns[joinKey][rowIdx] = logRow.EntityValue[colIdx] + } + for colIdx, requestParam := range b.schema.RequestData { + if _, ok := columns[requestParam]; !ok { + columns[requestParam] = make([]*types.Value, numRows) + } + columns[requestParam][rowIdx] = logRow.RequestData[colIdx] + } + for colIdx, featureName := range b.schema.Features { + if _, ok := columns[featureName]; !ok { + columns[featureName] = make([]*types.Value, numRows) + } + columns[featureName][rowIdx] = logRow.FeatureValues[colIdx] + + timestamp := arrow.Timestamp(logRow.EventTimestamps[colIdx].GetSeconds()) + timestampFieldIdx := fieldNameToIdx[fmt.Sprintf("%s__timestamp", featureName)] + statusFieldIdx := fieldNameToIdx[fmt.Sprintf("%s__status", featureName)] + + builder.Field(timestampFieldIdx).(*array.TimestampBuilder).UnsafeAppend(timestamp) + builder.Field(statusFieldIdx).(*array.Int32Builder).UnsafeAppend(int32(logRow.FeatureStatuses[colIdx])) + } + + logTimestamp := arrow.Timestamp(logRow.LogTimestamp.UnixMicro()) + logDate := arrow.Date32(logRow.LogTimestamp.Truncate(24 * time.Hour).Unix()) + + builder.Field(fieldNameToIdx[LOG_TIMESTAMP_FIELD]).(*array.TimestampBuilder).UnsafeAppend(logTimestamp) + builder.Field(fieldNameToIdx[LOG_DATE_FIELD]).(*array.Date32Builder).UnsafeAppend(logDate) + builder.Field(fieldNameToIdx[LOG_REQUEST_ID_FIELD]).(*array.StringBuilder).Append(logRow.RequestId) + } + + for columnName, protoArray := range columns { + fieldIdx := fieldNameToIdx[columnName] + err := gotypes.CopyProtoValuesToArrowArray(builder.Field(fieldIdx), protoArray) + if err != nil { + return nil, err + } + } + + return builder.NewRecord(), nil +} diff --git a/go/internal/feast/server/logging/memorybuffer_test.go b/go/internal/feast/server/logging/memorybuffer_test.go new file mode 100644 index 00000000000..f652f2c99a6 --- /dev/null +++ b/go/internal/feast/server/logging/memorybuffer_test.go @@ -0,0 +1,174 @@ +package logging + +import ( + "math/rand" + "testing" + "time" + + "github.com/apache/arrow/go/v8/arrow" + "github.com/apache/arrow/go/v8/arrow/array" + "github.com/apache/arrow/go/v8/arrow/memory" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/feast-dev/feast/go/protos/feast/serving" + "github.com/feast-dev/feast/go/protos/feast/types" +) + +func TestArrowSchemaGeneration(t *testing.T) { + schema := &FeatureServiceSchema{ + JoinKeys: []string{ + "driver_id", + }, + Features: []string{ + "featureView1__int64", + "featureView1__float32", + "featureView2__int32", + "featureView2__double", + }, + JoinKeysTypes: map[string]types.ValueType_Enum{ + "driver_id": types.ValueType_INT32, + }, + FeaturesTypes: map[string]types.ValueType_Enum{ + "featureView1__int64": types.ValueType_INT64, + "featureView1__float32": types.ValueType_FLOAT, + "featureView2__int32": types.ValueType_INT32, + "featureView2__double": types.ValueType_DOUBLE, + }, + } + + b := &MemoryBuffer{ + schema: schema, + } + + expectedArrowSchema := []arrow.Field{ + {Name: "driver_id", Type: arrow.PrimitiveTypes.Int32}, + {Name: "featureView1__int64", Type: arrow.PrimitiveTypes.Int64}, + {Name: "featureView1__int64__timestamp", Type: arrow.FixedWidthTypes.Timestamp_s}, + {Name: "featureView1__int64__status", Type: arrow.PrimitiveTypes.Int32}, + {Name: "featureView1__float32", Type: arrow.PrimitiveTypes.Float32}, + {Name: "featureView1__float32__timestamp", Type: arrow.FixedWidthTypes.Timestamp_s}, + {Name: "featureView1__float32__status", Type: arrow.PrimitiveTypes.Int32}, + {Name: "featureView2__int32", Type: arrow.PrimitiveTypes.Int32}, + {Name: "featureView2__int32__timestamp", Type: arrow.FixedWidthTypes.Timestamp_s}, + {Name: "featureView2__int32__status", Type: arrow.PrimitiveTypes.Int32}, + {Name: "featureView2__double", Type: arrow.PrimitiveTypes.Float64}, + {Name: "featureView2__double__timestamp", Type: arrow.FixedWidthTypes.Timestamp_s}, + {Name: "featureView2__double__status", Type: arrow.PrimitiveTypes.Int32}, + {Name: "__log_timestamp", Type: arrow.FixedWidthTypes.Timestamp_us}, + {Name: "__log_date", Type: arrow.FixedWidthTypes.Date32}, + {Name: "__request_id", Type: arrow.BinaryTypes.String}, + } + + actualSchema, err := b.getArrowSchema() + assert.Nil(t, err) + assert.Equal(t, expectedArrowSchema, actualSchema.Fields()) +} + +func TestSerializeToArrowTable(t *testing.T) { + schema := &FeatureServiceSchema{ + JoinKeys: []string{ + "driver_id", + }, + Features: []string{ + "featureView1__int64", + "featureView1__float32", + }, + JoinKeysTypes: map[string]types.ValueType_Enum{ + "driver_id": types.ValueType_INT32, + }, + FeaturesTypes: map[string]types.ValueType_Enum{ + "featureView1__int64": types.ValueType_INT64, + "featureView1__float32": types.ValueType_FLOAT, + }, + } + + ts := timestamppb.New(time.Now()) + b := &MemoryBuffer{ + schema: schema, + logs: []*Log{ + { + EntityValue: []*types.Value{ + {Val: &types.Value_Int64Val{Int64Val: 1001}}, + }, + FeatureValues: []*types.Value{ + {Val: &types.Value_Int64Val{Int64Val: rand.Int63()}}, + {Val: &types.Value_FloatVal{FloatVal: rand.Float32()}}, + }, + FeatureStatuses: []serving.FieldStatus{ + serving.FieldStatus_PRESENT, + serving.FieldStatus_OUTSIDE_MAX_AGE, + }, + EventTimestamps: []*timestamppb.Timestamp{ + ts, ts, + }, + RequestId: "aaa", + LogTimestamp: time.Now(), + }, + { + EntityValue: []*types.Value{ + {Val: &types.Value_Int64Val{Int64Val: 1003}}, + }, + FeatureValues: []*types.Value{ + {Val: &types.Value_Int64Val{Int64Val: rand.Int63()}}, + {Val: &types.Value_FloatVal{FloatVal: rand.Float32()}}, + }, + FeatureStatuses: []serving.FieldStatus{ + serving.FieldStatus_PRESENT, + serving.FieldStatus_PRESENT, + }, + EventTimestamps: []*timestamppb.Timestamp{ + ts, ts, + }, + RequestId: "bbb", + LogTimestamp: time.Now(), + }, + }, + } + + pool := memory.NewGoAllocator() + arrowSchema, _ := b.getArrowSchema() + builder := array.NewRecordBuilder(pool, arrowSchema) + defer builder.Release() + + // join key: driver_id + builder.Field(0).(*array.Int32Builder).AppendValues( + []int32{b.logs[0].EntityValue[0].GetInt32Val(), b.logs[1].EntityValue[0].GetInt32Val()}, []bool{true, true}) + + // feature int64 + builder.Field(1).(*array.Int64Builder).AppendValues( + []int64{b.logs[0].FeatureValues[0].GetInt64Val(), b.logs[1].FeatureValues[0].GetInt64Val()}, []bool{true, true}) + builder.Field(2).(*array.TimestampBuilder).AppendValues( + []arrow.Timestamp{arrow.Timestamp(ts.GetSeconds()), arrow.Timestamp(ts.GetSeconds())}, []bool{true, true}) + builder.Field(3).(*array.Int32Builder).AppendValues( + []int32{int32(serving.FieldStatus_PRESENT), int32(serving.FieldStatus_PRESENT)}, []bool{true, true}) + + // feature float + builder.Field(4).(*array.Float32Builder).AppendValues( + []float32{b.logs[0].FeatureValues[1].GetFloatVal(), b.logs[1].FeatureValues[1].GetFloatVal()}, []bool{true, true}) + builder.Field(5).(*array.TimestampBuilder).AppendValues( + []arrow.Timestamp{arrow.Timestamp(ts.GetSeconds()), arrow.Timestamp(ts.GetSeconds())}, []bool{true, true}) + builder.Field(6).(*array.Int32Builder).AppendValues( + []int32{int32(serving.FieldStatus_OUTSIDE_MAX_AGE), int32(serving.FieldStatus_PRESENT)}, []bool{true, true}) + + // log timestamp + builder.Field(7).(*array.TimestampBuilder).AppendValues( + []arrow.Timestamp{arrow.Timestamp(b.logs[0].LogTimestamp.UnixMicro()), arrow.Timestamp(b.logs[1].LogTimestamp.UnixMicro())}, []bool{true, true}) + + // log date + today := time.Now().Truncate(24 * time.Hour) + builder.Field(8).(*array.Date32Builder).AppendValues( + []arrow.Date32{arrow.Date32(today.Unix()), arrow.Date32(today.Unix())}, []bool{true, true}) + + // request id + builder.Field(9).(*array.StringBuilder).AppendValues( + []string{b.logs[0].RequestId, b.logs[1].RequestId}, []bool{true, true}) + + record, err := b.convertToArrowRecord() + expectedRecord := builder.NewRecord() + assert.Nil(t, err) + for colIdx := 0; colIdx < int(record.NumCols()); colIdx++ { + assert.Equal(t, expectedRecord.Column(colIdx), record.Column(colIdx), "Columns with idx %d are not equal", colIdx) + } + +} diff --git a/go/internal/feast/server/logging/offlinelogstorage.go b/go/internal/feast/server/logging/offlinelogstorage.go deleted file mode 100644 index 1a0f4142554..00000000000 --- a/go/internal/feast/server/logging/offlinelogstorage.go +++ /dev/null @@ -1,46 +0,0 @@ -package logging - -import ( - "errors" - - "github.com/apache/arrow/go/v8/arrow/array" - "github.com/feast-dev/feast/go/internal/feast/registry" -) - -type OfflineLogStoreConfig struct { - storeType string - project string - path string -} - -type OfflineLogStorage interface { - // Todo: Maybe we can add a must implement function that retrieves the correct config based on type - FlushToStorage(array.Table) error -} - -func getOfflineStoreType(offlineStoreConfig map[string]interface{}) (string, bool) { - if onlineStoreType, ok := offlineStoreConfig["storeType"]; !ok { - // Assume file for case of no specified. - return "", true - } else { - result, ok := onlineStoreType.(string) - return result, ok - } -} - -func NewOfflineStore(config *registry.RepoConfig) (OfflineLogStorage, error) { - offlineStoreType, _ := getOfflineStoreType(config.OfflineStore) - if offlineStoreType == "" { - // No offline store specified. - return nil, nil - } else if offlineStoreType == "file" { - fileConfig, err := GetFileConfig(config) - if err != nil { - return nil, err - } - offlineStore, err := NewFileOfflineStore(config.Project, fileConfig) - return offlineStore, err - } else { - return nil, errors.New("no offline storage besides file is currently supported") - } -} diff --git a/go/internal/feast/server/logging/service.go b/go/internal/feast/server/logging/service.go new file mode 100644 index 00000000000..a06698638ae --- /dev/null +++ b/go/internal/feast/server/logging/service.go @@ -0,0 +1,100 @@ +package logging + +import ( + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/feast-dev/feast/go/internal/feast/model" +) + +type FeatureStore interface { + GetFcosMap() (map[string]*model.Entity, map[string]*model.FeatureView, map[string]*model.OnDemandFeatureView, error) + GetFeatureService(name string) (*model.FeatureService, error) +} + +type LoggingOptions struct { + // How many log items can be buffered in channel + ChannelCapacity int + + // Waiting time when inserting new log into the channel + EmitTimeout time.Duration + + // Interval on which logs buffered in memory will be written to sink + WriteInterval time.Duration + + // Interval on which sink will be flushed + // (see LogSink interface for better explanation on differences with Write) + FlushInterval time.Duration +} + +type LoggingService struct { + // feature service name -> LoggerImpl + loggers map[string]*LoggerImpl + + fs FeatureStore + sink LogSink + opts LoggingOptions + + creationLock *sync.Mutex +} + +var ( + DefaultOptions = LoggingOptions{ + ChannelCapacity: 1000, + FlushInterval: 10 * time.Minute, + WriteInterval: 10 * time.Second, + EmitTimeout: 10 * time.Millisecond, + } +) + +func NewLoggingService(fs FeatureStore, sink LogSink, opts ...LoggingOptions) (*LoggingService, error) { + if len(opts) == 0 { + opts = append(opts, DefaultOptions) + } + + return &LoggingService{ + fs: fs, + loggers: make(map[string]*LoggerImpl), + sink: sink, + opts: opts[0], + creationLock: &sync.Mutex{}, + }, nil +} + +func (s *LoggingService) GetOrCreateLogger(featureService *model.FeatureService) (Logger, error) { + if logger, ok := s.loggers[featureService.Name]; ok { + return logger, nil + } + + if featureService.LoggingConfig == nil { + return nil, errors.New("Only feature services with configured logging can be used") + } + + s.creationLock.Lock() + defer s.creationLock.Unlock() + + // could be created by another go-routine on this point + if logger, ok := s.loggers[featureService.Name]; ok { + return logger, nil + } + + if s.sink == nil { + return &DummyLoggerImpl{}, nil + } + + config := NewLoggerConfig(featureService.LoggingConfig.SampleRate, s.opts) + schema, err := GenerateSchemaFromFeatureService(s.fs, featureService.Name) + if err != nil { + return nil, err + } + + logger, err := NewLogger(schema, featureService.Name, s.sink, config) + if err != nil { + return nil, err + } + s.loggers[featureService.Name] = logger + + return logger, nil +} diff --git a/go/internal/feast/transformation/transformation.go b/go/internal/feast/transformation/transformation.go index 319bed3b2c2..3dfbdc7f1b7 100644 --- a/go/internal/feast/transformation/transformation.go +++ b/go/internal/feast/transformation/transformation.go @@ -3,18 +3,20 @@ package transformation import ( "errors" "fmt" + "strings" + "unsafe" + "github.com/apache/arrow/go/v8/arrow" "github.com/apache/arrow/go/v8/arrow/array" "github.com/apache/arrow/go/v8/arrow/cdata" "github.com/apache/arrow/go/v8/arrow/memory" + "google.golang.org/protobuf/types/known/timestamppb" + "github.com/feast-dev/feast/go/internal/feast/model" "github.com/feast-dev/feast/go/internal/feast/onlineserving" "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" - "google.golang.org/protobuf/types/known/timestamppb" - "strings" - "unsafe" ) /* diff --git a/go/internal/feast/server/logging/feature_repo/__init__.py b/go/internal/test/feature_repo/__init__.py similarity index 100% rename from go/internal/feast/server/logging/feature_repo/__init__.py rename to go/internal/test/feature_repo/__init__.py diff --git a/go/internal/feast/server/logging/feature_repo/driver_stats.parquet b/go/internal/test/feature_repo/driver_stats.parquet similarity index 100% rename from go/internal/feast/server/logging/feature_repo/driver_stats.parquet rename to go/internal/test/feature_repo/driver_stats.parquet diff --git a/go/internal/feast/server/logging/feature_repo/example.py b/go/internal/test/feature_repo/example.py similarity index 85% rename from go/internal/feast/server/logging/feature_repo/example.py rename to go/internal/test/feature_repo/example.py index f78470efd55..2b1d74ad32e 100644 --- a/go/internal/feast/server/logging/feature_repo/example.py +++ b/go/internal/test/feature_repo/example.py @@ -3,6 +3,8 @@ from google.protobuf.duration_pb2 import Duration from feast import Entity, Feature, FeatureView, FileSource, ValueType, FeatureService +from feast.feature_logging import LoggingConfig +from feast.infra.offline_stores.file_source import FileLoggingDestination # Read data from parquet files. Parquet is convenient for local development mode. For # production, you can use your favorite DWH, such as BigQuery. See Feast documentation @@ -36,5 +38,6 @@ driver_stats_fs = FeatureService( name="test_service", - features=[driver_hourly_stats_view] + features=[driver_hourly_stats_view], + logging_config=LoggingConfig(destination=FileLoggingDestination(path="")) ) \ No newline at end of file diff --git a/go/internal/feast/server/logging/feature_repo/feature_store.yaml b/go/internal/test/feature_repo/feature_store.yaml similarity index 100% rename from go/internal/feast/server/logging/feature_repo/feature_store.yaml rename to go/internal/test/feature_repo/feature_store.yaml diff --git a/go/internal/test/go_integration_test_utils.go b/go/internal/test/go_integration_test_utils.go index d66a5461930..6d236a43190 100644 --- a/go/internal/test/go_integration_test_utils.go +++ b/go/internal/test/go_integration_test_utils.go @@ -19,6 +19,7 @@ import ( "time" "github.com/apache/arrow/go/v8/arrow/array" + "github.com/feast-dev/feast/go/internal/feast/model" "github.com/feast-dev/feast/go/protos/feast/types" gotypes "github.com/feast-dev/feast/go/types" @@ -137,10 +138,10 @@ func SetupInitializedRepo(basePath string) error { // var stderr bytes.Buffer // var stdout bytes.Buffer applyCommand.Dir = featureRepoPath - err = applyCommand.Run() + out, err := applyCommand.Output() if err != nil { + log.Println(string(out)) return err - } t := time.Now() @@ -151,7 +152,7 @@ func SetupInitializedRepo(basePath string) error { materializeCommand := exec.Command("feast", "materialize-incremental", formattedTime) materializeCommand.Env = os.Environ() materializeCommand.Dir = featureRepoPath - out, err := materializeCommand.Output() + out, err = materializeCommand.Output() if err != nil { log.Println(string(out)) return err @@ -175,22 +176,14 @@ func CleanUpInitializedRepo(basePath string) { } } -func CleanUpRepo(basePath string) { - featureRepoPath, err := filepath.Abs(filepath.Join(basePath, "feature_repo")) - if err != nil { - log.Fatal(err) - } - err = os.RemoveAll(featureRepoPath) - if err != nil { - log.Fatal(err) - } -} - -func GetProtoFromRecord(rec array.Record) (map[string]*types.RepeatedValue, error) { +func GetProtoFromRecord(rec arrow.Record) (map[string]*types.RepeatedValue, error) { r := make(map[string]*types.RepeatedValue) schema := rec.Schema() for idx, column := range rec.Columns() { field := schema.Field(idx) + if field.Type.ID() == arrow.FixedWidthTypes.Timestamp_ms.ID() || field.Type.ID() == arrow.FixedWidthTypes.Date32.ID() { + continue + } values, err := gotypes.ArrowValuesToProtoValues(column) if err != nil { return nil, err @@ -200,10 +193,6 @@ func GetProtoFromRecord(rec array.Record) (map[string]*types.RepeatedValue, erro return r, nil } -func CleanUpFile(absPath string) error { - return os.Remove(absPath) -} - func CreateBaseFeatureView(name string, features []*model.Feature, projection *model.FeatureViewProjection) *model.BaseFeatureView { return &model.BaseFeatureView{ Name: name, diff --git a/go/types/typeconversion.go b/go/types/typeconversion.go index 416eb2ac273..30fcc1e393c 100644 --- a/go/types/typeconversion.go +++ b/go/types/typeconversion.go @@ -6,6 +6,7 @@ import ( "github.com/apache/arrow/go/v8/arrow" "github.com/apache/arrow/go/v8/arrow/array" "github.com/apache/arrow/go/v8/arrow/memory" + "github.com/feast-dev/feast/go/protos/feast/types" ) @@ -89,7 +90,7 @@ func ValueTypeEnumToArrowType(t types.ValueType_Enum) (arrow.DataType, error) { } } -func copyProtoValuesToArrowArray(builder array.Builder, values []*types.Value) error { +func CopyProtoValuesToArrowArray(builder array.Builder, values []*types.Value) error { switch fieldBuilder := builder.(type) { case *array.BooleanBuilder: for _, v := range values { @@ -307,7 +308,7 @@ func ProtoValuesToArrowArray(protoValues []*types.Value, arrowAllocator memory.A if fieldType != nil { builder := array.NewBuilder(arrowAllocator, fieldType) - err = copyProtoValuesToArrowArray(builder, protoValues) + err = CopyProtoValuesToArrowArray(builder, protoValues) if err != nil { return nil, err } diff --git a/go/types/typeconversion_test.go b/go/types/typeconversion_test.go index 05fc32f63ac..1f89593ea01 100644 --- a/go/types/typeconversion_test.go +++ b/go/types/typeconversion_test.go @@ -1,12 +1,14 @@ package types import ( + "testing" + "time" + "github.com/apache/arrow/go/v8/arrow/memory" - "github.com/feast-dev/feast/go/protos/feast/types" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" - "testing" - "time" + + "github.com/feast-dev/feast/go/protos/feast/types" ) var ( diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index e35dbdd9c7a..acc965ac444 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -143,9 +143,11 @@ def to_data_source(self) -> DataSource: class LoggingConfig: destination: LoggingDestination + sample_rate: float - def __init__(self, destination: LoggingDestination): + def __init__(self, destination: LoggingDestination, sample_rate: float = 1.0): self.destination = destination + self.sample_rate = sample_rate @classmethod def from_proto(cls, config_proto: LoggingConfigProto) -> Optional["LoggingConfig"]: @@ -157,8 +159,12 @@ def from_proto(cls, config_proto: LoggingConfigProto) -> Optional["LoggingConfig proto_kind = config_proto.custom_destination.kind destination_class = _DestinationRegistry.classes_by_proto_attr_name[proto_kind] - return LoggingConfig(destination=destination_class.from_proto(config_proto)) + return LoggingConfig( + destination=destination_class.from_proto(config_proto), + sample_rate=config_proto.sample_rate, + ) def to_proto(self) -> LoggingConfigProto: proto = self.destination.to_proto() + proto.sample_rate = self.sample_rate return proto