-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Add TLS support for Go Feature Server #6229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -194,4 +194,4 @@ func buildPostgresConnString(config map[string]interface{}) string { | |
| } | ||
|
|
||
| return connURL.String() | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -11,6 +11,7 @@ import ( | |||||||||||||||
| "strings" | ||||||||||||||||
| "sync" | ||||||||||||||||
| "syscall" | ||||||||||||||||
| "time" | ||||||||||||||||
|
|
||||||||||||||||
| "github.com/feast-dev/feast/go/internal/feast" | ||||||||||||||||
| "github.com/feast-dev/feast/go/internal/feast/registry" | ||||||||||||||||
|
|
@@ -47,6 +48,10 @@ func (s *RealServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, | |||||||||||||||
| return StartHttpServer(fs, host, port, metricsPort, writeLoggedFeaturesCallback, loggingOpts) | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| func (s *RealServerStarter) StartHttpsServer(fs *feast.FeatureStore, host string, port int, metricsPort int, certFile string, keyFile string, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { | ||||||||||||||||
| return StartHttpsServer(fs, host, port, metricsPort, certFile, keyFile, writeLoggedFeaturesCallback, loggingOpts) | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| func (s *RealServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, metricsPort int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { | ||||||||||||||||
| return StartGrpcServer(fs, host, port, metricsPort, writeLoggedFeaturesCallback, loggingOpts) | ||||||||||||||||
| } | ||||||||||||||||
|
|
@@ -58,18 +63,22 @@ func main() { | |||||||||||||||
| port := 8080 | ||||||||||||||||
| metricsPort := 9090 | ||||||||||||||||
| server := RealServerStarter{} | ||||||||||||||||
| certFile := "" | ||||||||||||||||
| keyFile := "" | ||||||||||||||||
| // Current Directory | ||||||||||||||||
| repoPath, err := os.Getwd() | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| log.Error().Stack().Err(err).Msg("Failed to get current directory") | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| flag.StringVar(&serverType, "type", serverType, "Specify the server type (http or grpc)") | ||||||||||||||||
| flag.StringVar(&serverType, "type", serverType, "Specify the server type (http, https or grpc)") | ||||||||||||||||
| flag.StringVar(&repoPath, "chdir", repoPath, "Repository path where feature store yaml file is stored") | ||||||||||||||||
|
|
||||||||||||||||
| flag.StringVar(&host, "host", host, "Specify a host for the server") | ||||||||||||||||
| flag.IntVar(&port, "port", port, "Specify a port for the server") | ||||||||||||||||
| flag.IntVar(&metricsPort, "metrics-port", metricsPort, "Specify a port for the metrics server") | ||||||||||||||||
| flag.StringVar(&certFile, "tls-cert-file", "", "Path to the TLS certificate file") | ||||||||||||||||
| flag.StringVar(&keyFile, "tls-key-file", "", "Path to the TLS key file") | ||||||||||||||||
| flag.Parse() | ||||||||||||||||
|
|
||||||||||||||||
| // Initialize tracer | ||||||||||||||||
|
|
@@ -119,8 +128,10 @@ func main() { | |||||||||||||||
| err = server.StartHttpServer(fs, host, port, metricsPort, nil, loggingOptions) | ||||||||||||||||
| } else if serverType == "grpc" { | ||||||||||||||||
| err = server.StartGrpcServer(fs, host, port, metricsPort, nil, loggingOptions) | ||||||||||||||||
| } else if serverType == "https" { | ||||||||||||||||
| err = server.StartHttpsServer(fs, host, port, metricsPort, certFile, keyFile, nil, loggingOptions) | ||||||||||||||||
| } else { | ||||||||||||||||
| fmt.Println("Unknown server type. Please specify 'http' or 'grpc'.") | ||||||||||||||||
| fmt.Println("Unknown server type. Please specify 'http' or 'grpc' or 'https'.") | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| if err != nil { | ||||||||||||||||
|
|
@@ -234,20 +245,24 @@ func StartHttpServer(fs *feast.FeatureStore, host string, port int, metricsPort | |||||||||||||||
| } | ||||||||||||||||
| ser := server.NewHttpServer(fs, loggingService) | ||||||||||||||||
| log.Info().Msgf("Starting a HTTP server on host %s, port %d", host, port) | ||||||||||||||||
|
|
||||||||||||||||
| // Start metrics server | ||||||||||||||||
| metricsServer := &http.Server{Addr: fmt.Sprintf(":%d", metricsPort)} | ||||||||||||||||
| mux := http.NewServeMux() | ||||||||||||||||
| mux.Handle("/metrics", promhttp.Handler()) | ||||||||||||||||
| metricsServer := &http.Server{ | ||||||||||||||||
| Addr: fmt.Sprintf(":%d", metricsPort), | ||||||||||||||||
| Handler: mux, | ||||||||||||||||
| } | ||||||||||||||||
| go func() { | ||||||||||||||||
| log.Info().Msgf("Starting metrics server on port %d", metricsPort) | ||||||||||||||||
| mux := http.NewServeMux() | ||||||||||||||||
| mux.Handle("/metrics", promhttp.Handler()) | ||||||||||||||||
| metricsServer.Handler = mux | ||||||||||||||||
| if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { | ||||||||||||||||
| log.Error().Err(err).Msg("Failed to start metrics server") | ||||||||||||||||
| } | ||||||||||||||||
| }() | ||||||||||||||||
|
|
||||||||||||||||
| stop := make(chan os.Signal, 1) | ||||||||||||||||
| signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) | ||||||||||||||||
| defer signal.Stop(stop) | ||||||||||||||||
|
|
||||||||||||||||
| var wg sync.WaitGroup | ||||||||||||||||
| wg.Add(1) | ||||||||||||||||
|
|
@@ -263,7 +278,9 @@ func StartHttpServer(fs *feast.FeatureStore, host string, port int, metricsPort | |||||||||||||||
| log.Error().Err(err).Msg("Error when stopping the HTTP server") | ||||||||||||||||
| } | ||||||||||||||||
| log.Info().Msg("Stopping metrics server...") | ||||||||||||||||
| if err := metricsServer.Shutdown(context.Background()); err != nil { | ||||||||||||||||
| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||||||||||||||||
| defer cancel() | ||||||||||||||||
| if err := metricsServer.Shutdown(ctx); err != nil { | ||||||||||||||||
| log.Error().Err(err).Msg("Error stopping metrics server") | ||||||||||||||||
| } | ||||||||||||||||
| if loggingService != nil { | ||||||||||||||||
|
|
@@ -320,3 +337,72 @@ func newTracerProvider(exp sdktrace.SpanExporter) (*sdktrace.TracerProvider, err | |||||||||||||||
| sdktrace.WithResource(r), | ||||||||||||||||
| ), nil | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // StartHttpsServer starts HTTP server with TLS. Requires TLS_CERT_FILE and TLS_KEY_FILE env vars. | ||||||||||||||||
| func StartHttpsServer(fs *feast.FeatureStore, host string, port int, metricsPort int, certFile string, keyFile string, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { | ||||||||||||||||
| if certFile == "" || keyFile == "" { | ||||||||||||||||
| return fmt.Errorf("TLS_CERT_FILE and TLS_KEY_FILE must be set") | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
|
Comment on lines
+343
to
+346
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Error message and comment incorrectly reference environment variables instead of CLI flags The comment at
Suggested change
Was this helpful? React with 👍 or 👎 to provide feedback. |
||||||||||||||||
| loggingService, err := constructLoggingService(fs, writeLoggedFeaturesCallback, loggingOpts) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| return err | ||||||||||||||||
| } | ||||||||||||||||
| ser := server.NewHttpServer(fs, loggingService) | ||||||||||||||||
| log.Info().Msgf("Starting a HTTPS server on host %s, port %d", host, port) | ||||||||||||||||
|
|
||||||||||||||||
| // Start metrics server | ||||||||||||||||
| mux := http.NewServeMux() | ||||||||||||||||
| mux.Handle("/metrics", promhttp.Handler()) | ||||||||||||||||
| metricsServer := &http.Server{ | ||||||||||||||||
| Addr: fmt.Sprintf(":%d", metricsPort), | ||||||||||||||||
| Handler: mux, | ||||||||||||||||
| } | ||||||||||||||||
| go func() { | ||||||||||||||||
| log.Info().Msgf("Starting metrics server on port %d", metricsPort) | ||||||||||||||||
| if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { | ||||||||||||||||
| log.Error().Err(err).Msg("Failed to start metrics server") | ||||||||||||||||
| } | ||||||||||||||||
| }() | ||||||||||||||||
|
|
||||||||||||||||
| stop := make(chan os.Signal, 1) | ||||||||||||||||
| signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) | ||||||||||||||||
| defer signal.Stop(stop) | ||||||||||||||||
|
|
||||||||||||||||
| var wg sync.WaitGroup | ||||||||||||||||
| wg.Add(1) | ||||||||||||||||
| serverExited := make(chan struct{}) | ||||||||||||||||
| go func() { | ||||||||||||||||
| defer wg.Done() | ||||||||||||||||
| select { | ||||||||||||||||
| case <-stop: | ||||||||||||||||
| // Received SIGINT/SIGTERM. Perform graceful shutdown. | ||||||||||||||||
| log.Info().Msg("Stopping the HTTP server...") | ||||||||||||||||
| err := ser.Stop() | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| log.Error().Err(err).Msg("Error when stopping the HTTP server") | ||||||||||||||||
| } | ||||||||||||||||
| log.Info().Msg("Stopping metrics server...") | ||||||||||||||||
| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||||||||||||||||
| defer cancel() | ||||||||||||||||
| if err := metricsServer.Shutdown(ctx); err != nil { | ||||||||||||||||
| log.Error().Err(err).Msg("Error stopping metrics server") | ||||||||||||||||
| } | ||||||||||||||||
| if loggingService != nil { | ||||||||||||||||
| loggingService.Stop() | ||||||||||||||||
| } | ||||||||||||||||
| log.Info().Msg("HTTP server terminated") | ||||||||||||||||
| case <-serverExited: | ||||||||||||||||
| // Server exited (e.g. startup error), ensure metrics server is stopped | ||||||||||||||||
| metricsServer.Shutdown(context.Background()) | ||||||||||||||||
| if loggingService != nil { | ||||||||||||||||
| loggingService.Stop() | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
| }() | ||||||||||||||||
|
|
||||||||||||||||
| err = ser.ServeTLS(host, port, certFile, keyFile) | ||||||||||||||||
| close(serverExited) | ||||||||||||||||
| wg.Wait() | ||||||||||||||||
| return err | ||||||||||||||||
| } | ||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡
ServerStarterinterface not updated withStartHttpsServer, breaking the established abstraction patternThe
ServerStarterinterface atgo/main.go:40-43definesStartHttpServerandStartGrpcServerbut the newStartHttpsServermethod was not added to it. The method was added toRealServerStarteratgo/main.go:51-53but not to the interface. This breaks the established pattern where the interface enumerates all server start methods, and meansMockServerStarteringo/main_test.go:13cannot be used to test the HTTPS code path. While the concrete type is used inmain()so the code runs correctly, the interface contract is incomplete.(Refers to lines 40-43)
Was this helpful? React with 👍 or 👎 to provide feedback.