From 740ddc101aa204b75191e11a63b941c7e9113492 Mon Sep 17 00:00:00 2001 From: Shuchu Han Date: Sat, 14 Mar 2026 23:06:25 -0400 Subject: [PATCH 1/4] feat: Add HTTPs server support. Signed-off-by: Shuchu Han --- go/main.go | 101 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 100 insertions(+), 1 deletion(-) diff --git a/go/main.go b/go/main.go index f49a27efa46..eb801c5b943 100644 --- a/go/main.go +++ b/go/main.go @@ -47,6 +47,10 @@ func (s *RealServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, return StartHttpServer(fs, host, port, metricsPort, writeLoggedFeaturesCallback, loggingOpts) } +func (s *RealServerStarter) StartHttpsServer(fs *feast.FeatureStore, host string, port int, metricsPort int, certFile string, keyFile string, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { + return StartHttpsServer(fs, host, port, metricsPort, certFile, keyFile, writeLoggedFeaturesCallback, loggingOpts) +} + func (s *RealServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, metricsPort int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { return StartGrpcServer(fs, host, port, metricsPort, writeLoggedFeaturesCallback, loggingOpts) } @@ -58,6 +62,8 @@ func main() { port := 8080 metricsPort := 9090 server := RealServerStarter{} + certFile := "" + keyFile := "" // Current Directory repoPath, err := os.Getwd() if err != nil { @@ -70,6 +76,8 @@ func main() { flag.StringVar(&host, "host", host, "Specify a host for the server") flag.IntVar(&port, "port", port, "Specify a port for the server") flag.IntVar(&metricsPort, "metrics-port", metricsPort, "Specify a port for the metrics server") + flag.StringVar(&certFile, "tls-cert-file", "", "Path to the TLS certificate file") + flag.StringVar(&keyFile, "tls-key-file", "", "Path to the TLS key file" ) flag.Parse() // Initialize tracer @@ -119,8 +127,10 @@ func main() { err = server.StartHttpServer(fs, host, port, metricsPort, nil, loggingOptions) } else if serverType == "grpc" { err = server.StartGrpcServer(fs, host, port, metricsPort, nil, loggingOptions) + } else if serverType == "https" { + err = server.StartHttpsServer(fs, host, port, metricsPort, certFile, keyFile, nil, loggingOptions) } else { - fmt.Println("Unknown server type. Please specify 'http' or 'grpc'.") + fmt.Println("Unknown server type. Please specify 'http' or 'grpc' or 'https'.") } if err != nil { @@ -285,6 +295,95 @@ func StartHttpServer(fs *feast.FeatureStore, host string, port int, metricsPort return err } +// StartHttpsServer starts HTTP server with TLS. Requires TLS_CERT_FILE and TLS_KEY_FILE env vars. +func StartHttpsServer(fs *feast.FeatureStore, host string, port int, metricsPort int, certFile string, keyFile string, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { + if certFile == "" || keyFile == "" { + return fmt.Errorf("TLS_CERT_FILE and TLS_KEY_FILE must be set") + } + + loggingService, err := constructLoggingService(fs, writeLoggedFeaturesCallback, loggingOpts) + if err != nil { + return err + } + + // Try to obtain an http.Handler from the concrete server if possible. + ser := server.NewHttpServer(fs, loggingService) + + // Start metrics server (same as HTTP) + metricsServer := &http.Server{Addr: fmt.Sprintf(":%d", metricsPort)} + go func() { + log.Info().Msgf("Starting metrics server on port %d", metricsPort) + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + metricsServer.Handler = mux + if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Error().Err(err).Msg("Failed to start metrics server") + } + }() + + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + + var wg sync.WaitGroup + wg.Add(1) + serverExited := make(chan struct{}) + go func() { + defer wg.Done() + select { + case <-stop: + log.Info().Msg("Stopping the HTTPS server...") + // Try to stop underlying server if it exposes Stop() + if stopper, ok := interface{}(ser).(interface{ Stop() error }); ok { + if err := stopper.Stop(); err != nil { + log.Error().Err(err).Msg("Error when stopping the HTTPS server") + } + } + if err := metricsServer.Shutdown(context.Background()); err != nil { + log.Error().Err(err).Msg("Error stopping metrics server") + } + if loggingService != nil { + loggingService.Stop() + } + log.Info().Msg("HTTPS server terminated") + case <-serverExited: + metricsServer.Shutdown(context.Background()) + if loggingService != nil { + loggingService.Stop() + } + } + }() + + // If the concrete server exposes a Handler, use it with a tls-enabled http.Server. + if hProvider, ok := interface{}(ser).(interface{ Handler() http.Handler }); ok { + handler := hProvider.Handler() + srv := &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: handler} + go func() { + log.Info().Msgf("Starting HTTPS server on host %s, port %d", host, port) + if err := srv.ListenAndServeTLS(certFile, keyFile); err != nil && err != http.ErrServerClosed { + log.Error().Err(err).Msg("HTTPS server failed") + } + }() + close(serverExited) + wg.Wait() + return nil + } + + // If concrete server supports ServeTLS(host,port,cert,key), call it. + if tlsServ, ok := interface{}(ser).(interface { + ServeTLS(string, int, string, string) error + }); ok { + err := tlsServ.ServeTLS(host, port, certFile, keyFile) + close(serverExited) + wg.Wait() + return err + } + + // Fallback: cannot enable TLS for this server implementation. + close(serverExited) + wg.Wait() + return fmt.Errorf("HTTPS not supported by underlying HTTP server implementation") +} + func OTELTracingEnabled() bool { return strings.ToLower(os.Getenv("ENABLE_OTEL_TRACING")) == "true" } From 46038cf6bfb5f92b601bf6eb85512ee50f615526 Mon Sep 17 00:00:00 2001 From: Shuchu Han Date: Sun, 5 Apr 2026 19:50:30 -0400 Subject: [PATCH 2/4] feat: Add serverTSL() method. Signed-off-by: Shuchu Han --- go/internal/feast/server/http_server.go | 14 +++ go/main.go | 161 +++++++++++------------- 2 files changed, 89 insertions(+), 86 deletions(-) diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index adfd40110e7..a96a23a4c62 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -396,6 +396,20 @@ func (s *httpServer) Serve(host string, port int) error { return err } +func (s *httpServer) ServeTLS(host string, port int, certFile string, keyFile string) error { + mux := http.NewServeMux() + mux.Handle("/get-online-features", metricsMiddleware(recoverMiddleware(http.HandlerFunc(s.getOnlineFeatures)))) + mux.Handle("/health", metricsMiddleware(http.HandlerFunc(healthCheckHandler))) + s.server = &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: mux, ReadTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second, IdleTimeout: 15 * time.Second} + err := s.server.ListenAndServeTLS(certFile, keyFile) + // Don't return the error if it's caused by graceful shutdown using Stop() + if err == http.ErrServerClosed { + return nil + } + log.Fatal().Stack().Err(err).Msg("Failed to start HTTPS server") + return err +} + func healthCheckHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "Healthy") diff --git a/go/main.go b/go/main.go index eb801c5b943..b86eccc3e21 100644 --- a/go/main.go +++ b/go/main.go @@ -11,6 +11,7 @@ import ( "strings" "sync" "syscall" + "time" "github.com/feast-dev/feast/go/internal/feast" "github.com/feast-dev/feast/go/internal/feast/registry" @@ -70,14 +71,14 @@ func main() { log.Error().Stack().Err(err).Msg("Failed to get current directory") } - flag.StringVar(&serverType, "type", serverType, "Specify the server type (http or grpc)") + flag.StringVar(&serverType, "type", serverType, "Specify the server type (http, https or grpc)") flag.StringVar(&repoPath, "chdir", repoPath, "Repository path where feature store yaml file is stored") flag.StringVar(&host, "host", host, "Specify a host for the server") flag.IntVar(&port, "port", port, "Specify a port for the server") flag.IntVar(&metricsPort, "metrics-port", metricsPort, "Specify a port for the metrics server") flag.StringVar(&certFile, "tls-cert-file", "", "Path to the TLS certificate file") - flag.StringVar(&keyFile, "tls-key-file", "", "Path to the TLS key file" ) + flag.StringVar(&keyFile, "tls-key-file", "", "Path to the TLS key file") flag.Parse() // Initialize tracer @@ -244,13 +245,16 @@ func StartHttpServer(fs *feast.FeatureStore, host string, port int, metricsPort } ser := server.NewHttpServer(fs, loggingService) log.Info().Msgf("Starting a HTTP server on host %s, port %d", host, port) + // Start metrics server - metricsServer := &http.Server{Addr: fmt.Sprintf(":%d", metricsPort)} + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + metricsServer := &http.Server{ + Addr: fmt.Sprintf(":%d", metricsPort), + Handler: mux, + } go func() { log.Info().Msgf("Starting metrics server on port %d", metricsPort) - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.Handler()) - metricsServer.Handler = mux if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Error().Err(err).Msg("Failed to start metrics server") } @@ -258,6 +262,7 @@ func StartHttpServer(fs *feast.FeatureStore, host string, port int, metricsPort stop := make(chan os.Signal, 1) signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + defer signal.Stop(stop) var wg sync.WaitGroup wg.Add(1) @@ -273,7 +278,9 @@ func StartHttpServer(fs *feast.FeatureStore, host string, port int, metricsPort log.Error().Err(err).Msg("Error when stopping the HTTP server") } log.Info().Msg("Stopping metrics server...") - if err := metricsServer.Shutdown(context.Background()); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := metricsServer.Shutdown(ctx); err != nil { log.Error().Err(err).Msg("Error stopping metrics server") } if loggingService != nil { @@ -295,6 +302,44 @@ func StartHttpServer(fs *feast.FeatureStore, host string, port int, metricsPort return err } +func OTELTracingEnabled() bool { + return strings.ToLower(os.Getenv("ENABLE_OTEL_TRACING")) == "true" +} + +func newExporter(ctx context.Context) (*otlptrace.Exporter, error) { + exp, err := otlptracehttp.New(ctx, + otlptracehttp.WithInsecure()) + if err != nil { + return nil, err + } + return exp, nil +} + +func newTracerProvider(exp sdktrace.SpanExporter) (*sdktrace.TracerProvider, error) { + serviceName := os.Getenv("OTEL_SERVICE_NAME") + if serviceName == "" { + serviceName = "FeastGoFeatureServer" + } + r, err := resource.Merge( + resource.Default(), + resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName(serviceName), + ), + ) + + if err != nil { + return nil, err + } + + return sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + sdktrace.WithResource(r), + ), nil +} + + + // StartHttpsServer starts HTTP server with TLS. Requires TLS_CERT_FILE and TLS_KEY_FILE env vars. func StartHttpsServer(fs *feast.FeatureStore, host string, port int, metricsPort int, certFile string, keyFile string, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { if certFile == "" || keyFile == "" { @@ -305,17 +350,18 @@ func StartHttpsServer(fs *feast.FeatureStore, host string, port int, metricsPort if err != nil { return err } - - // Try to obtain an http.Handler from the concrete server if possible. ser := server.NewHttpServer(fs, loggingService) + log.Info().Msgf("Starting a HTTPS server on host %s, port %d", host, port) - // Start metrics server (same as HTTP) - metricsServer := &http.Server{Addr: fmt.Sprintf(":%d", metricsPort)} + // Start metrics server + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + metricsServer := &http.Server{ + Addr: fmt.Sprintf(":%d", metricsPort), + Handler: mux, + } go func() { log.Info().Msgf("Starting metrics server on port %d", metricsPort) - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.Handler()) - metricsServer.Handler = mux if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Error().Err(err).Msg("Failed to start metrics server") } @@ -323,6 +369,7 @@ func StartHttpsServer(fs *feast.FeatureStore, host string, port int, metricsPort stop := make(chan os.Signal, 1) signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + defer signal.Stop(stop) var wg sync.WaitGroup wg.Add(1) @@ -331,21 +378,24 @@ func StartHttpsServer(fs *feast.FeatureStore, host string, port int, metricsPort defer wg.Done() select { case <-stop: - log.Info().Msg("Stopping the HTTPS server...") - // Try to stop underlying server if it exposes Stop() - if stopper, ok := interface{}(ser).(interface{ Stop() error }); ok { - if err := stopper.Stop(); err != nil { - log.Error().Err(err).Msg("Error when stopping the HTTPS server") - } + // Received SIGINT/SIGTERM. Perform graceful shutdown. + log.Info().Msg("Stopping the HTTP server...") + err := ser.Stop() + if err != nil { + log.Error().Err(err).Msg("Error when stopping the HTTP server") } - if err := metricsServer.Shutdown(context.Background()); err != nil { + log.Info().Msg("Stopping metrics server...") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := metricsServer.Shutdown(ctx); err != nil { log.Error().Err(err).Msg("Error stopping metrics server") } if loggingService != nil { loggingService.Stop() } - log.Info().Msg("HTTPS server terminated") + log.Info().Msg("HTTP server terminated") case <-serverExited: + // Server exited (e.g. startup error), ensure metrics server is stopped metricsServer.Shutdown(context.Background()) if loggingService != nil { loggingService.Stop() @@ -353,69 +403,8 @@ func StartHttpsServer(fs *feast.FeatureStore, host string, port int, metricsPort } }() - // If the concrete server exposes a Handler, use it with a tls-enabled http.Server. - if hProvider, ok := interface{}(ser).(interface{ Handler() http.Handler }); ok { - handler := hProvider.Handler() - srv := &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: handler} - go func() { - log.Info().Msgf("Starting HTTPS server on host %s, port %d", host, port) - if err := srv.ListenAndServeTLS(certFile, keyFile); err != nil && err != http.ErrServerClosed { - log.Error().Err(err).Msg("HTTPS server failed") - } - }() - close(serverExited) - wg.Wait() - return nil - } - - // If concrete server supports ServeTLS(host,port,cert,key), call it. - if tlsServ, ok := interface{}(ser).(interface { - ServeTLS(string, int, string, string) error - }); ok { - err := tlsServ.ServeTLS(host, port, certFile, keyFile) - close(serverExited) - wg.Wait() - return err - } - - // Fallback: cannot enable TLS for this server implementation. + err = ser.ServeTLS(host, port, certFile, keyFile) close(serverExited) wg.Wait() - return fmt.Errorf("HTTPS not supported by underlying HTTP server implementation") -} - -func OTELTracingEnabled() bool { - return strings.ToLower(os.Getenv("ENABLE_OTEL_TRACING")) == "true" -} - -func newExporter(ctx context.Context) (*otlptrace.Exporter, error) { - exp, err := otlptracehttp.New(ctx, - otlptracehttp.WithInsecure()) - if err != nil { - return nil, err - } - return exp, nil -} - -func newTracerProvider(exp sdktrace.SpanExporter) (*sdktrace.TracerProvider, error) { - serviceName := os.Getenv("OTEL_SERVICE_NAME") - if serviceName == "" { - serviceName = "FeastGoFeatureServer" - } - r, err := resource.Merge( - resource.Default(), - resource.NewWithAttributes( - semconv.SchemaURL, - semconv.ServiceName(serviceName), - ), - ) - - if err != nil { - return nil, err - } - - return sdktrace.NewTracerProvider( - sdktrace.WithBatcher(exp), - sdktrace.WithResource(r), - ), nil -} + return err +} \ No newline at end of file From bd7907ce612414f81c1186ef54d14bfe8e1b8153 Mon Sep 17 00:00:00 2001 From: Shuchu Han Date: Sun, 5 Apr 2026 20:02:01 -0400 Subject: [PATCH 3/4] fix: Lint the Go code. Signed-off-by: Shuchu Han --- go/internal/feast/metrics/metrics.go | 1 - go/internal/feast/onlinestore/postgresonlinestore.go | 2 +- go/main.go | 8 +++----- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/go/internal/feast/metrics/metrics.go b/go/internal/feast/metrics/metrics.go index 804eef6fa1b..d4783f257b7 100644 --- a/go/internal/feast/metrics/metrics.go +++ b/go/internal/feast/metrics/metrics.go @@ -30,7 +30,6 @@ var ( TimeHistogramType = reflect.TypeOf((*TimeHistogram)(nil)).Elem() ) - func RegisterTimeHistogram(name, help, namespace string, labelNames []string, tag reflect.StructTag) (func(prometheus.Labels) interface{}, prometheus.Collector, error) { f, collector, err := prometheusvanilla.BuildHistogram(name, help, namespace, labelNames, tag) if err != nil { diff --git a/go/internal/feast/onlinestore/postgresonlinestore.go b/go/internal/feast/onlinestore/postgresonlinestore.go index 4813f341db7..4077a9e06fa 100644 --- a/go/internal/feast/onlinestore/postgresonlinestore.go +++ b/go/internal/feast/onlinestore/postgresonlinestore.go @@ -194,4 +194,4 @@ func buildPostgresConnString(config map[string]interface{}) string { } return connURL.String() -} \ No newline at end of file +} diff --git a/go/main.go b/go/main.go index b86eccc3e21..a0e72720332 100644 --- a/go/main.go +++ b/go/main.go @@ -251,7 +251,7 @@ func StartHttpServer(fs *feast.FeatureStore, host string, port int, metricsPort mux.Handle("/metrics", promhttp.Handler()) metricsServer := &http.Server{ Addr: fmt.Sprintf(":%d", metricsPort), - Handler: mux, + Handler: mux, } go func() { log.Info().Msgf("Starting metrics server on port %d", metricsPort) @@ -338,8 +338,6 @@ func newTracerProvider(exp sdktrace.SpanExporter) (*sdktrace.TracerProvider, err ), nil } - - // StartHttpsServer starts HTTP server with TLS. Requires TLS_CERT_FILE and TLS_KEY_FILE env vars. func StartHttpsServer(fs *feast.FeatureStore, host string, port int, metricsPort int, certFile string, keyFile string, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { if certFile == "" || keyFile == "" { @@ -358,7 +356,7 @@ func StartHttpsServer(fs *feast.FeatureStore, host string, port int, metricsPort mux.Handle("/metrics", promhttp.Handler()) metricsServer := &http.Server{ Addr: fmt.Sprintf(":%d", metricsPort), - Handler: mux, + Handler: mux, } go func() { log.Info().Msgf("Starting metrics server on port %d", metricsPort) @@ -407,4 +405,4 @@ func StartHttpsServer(fs *feast.FeatureStore, host string, port int, metricsPort close(serverExited) wg.Wait() return err -} \ No newline at end of file +} From 5e1b4ee0b45a2a216a78e474f5a8c5452a781975 Mon Sep 17 00:00:00 2001 From: Shuchu Han Date: Sun, 5 Apr 2026 20:36:40 -0400 Subject: [PATCH 4/4] feat: Add TLS configuration. Signed-off-by: Shuchu Han --- go/internal/feast/server/http_server.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index a96a23a4c62..2f60444c849 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -8,6 +8,7 @@ import ( "runtime" "strconv" "time" + "crypto/tls" "github.com/feast-dev/feast/go/internal/feast" "github.com/feast-dev/feast/go/internal/feast/model" @@ -400,7 +401,24 @@ func (s *httpServer) ServeTLS(host string, port int, certFile string, keyFile st mux := http.NewServeMux() mux.Handle("/get-online-features", metricsMiddleware(recoverMiddleware(http.HandlerFunc(s.getOnlineFeatures)))) mux.Handle("/health", metricsMiddleware(http.HandlerFunc(healthCheckHandler))) - s.server = &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: mux, ReadTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second, IdleTimeout: 15 * time.Second} + s.server = &http.Server{ + Addr: fmt.Sprintf("%s:%d", host, port), + Handler: mux, + ReadTimeout: 5 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 15 * time.Second, + TLSConfig: &tls.Config{ + MinVersion: tls.VersionTLS12, + // For production, use proper certificates + // Prefer server's cipher suites + PreferServerCipherSuites: true, + CurvePreferences: []tls.CurveID{ + tls.CurveP256, + tls.X25519MLKEM768, + tls.SecP256r1MLKEM768, + }, + }, + } err := s.server.ListenAndServeTLS(certFile, keyFile) // Don't return the error if it's caused by graceful shutdown using Stop() if err == http.ErrServerClosed {