diff --git a/crates/pipeline-manager/src/api/endpoints/config.rs b/crates/pipeline-manager/src/api/endpoints/config.rs index 7f91bf3be6..a1b8f8d69f 100644 --- a/crates/pipeline-manager/src/api/endpoints/config.rs +++ b/crates/pipeline-manager/src/api/endpoints/config.rs @@ -146,7 +146,7 @@ impl Configuration { #[get("/config")] pub(crate) async fn get_config( state: WebData, - _client: WebData, + _client: WebData, _tenant_id: ReqData, ) -> Result { let config = Configuration::gather(&state).await; diff --git a/crates/pipeline-manager/src/api/endpoints/metrics.rs b/crates/pipeline-manager/src/api/endpoints/metrics.rs index 8e5d0d5eea..6709355572 100644 --- a/crates/pipeline-manager/src/api/endpoints/metrics.rs +++ b/crates/pipeline-manager/src/api/endpoints/metrics.rs @@ -29,7 +29,7 @@ use feldera_types::runtime_status::RuntimeStatus; #[get("/metrics")] pub(crate) async fn get_metrics( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, ) -> Result { let pipelines = state diff --git a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs index be5556f57c..a1a9f8972c 100644 --- a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs +++ b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs @@ -89,7 +89,7 @@ pub mod support_bundle; #[post("/pipelines/{pipeline_name}/ingress/{table_name}")] pub(crate) async fn http_input( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path<(String, String)>, req: HttpRequest, @@ -170,7 +170,7 @@ pub(crate) async fn http_input( #[post("/pipelines/{pipeline_name}/egress/{table_name}")] pub(crate) async fn http_output( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path<(String, String)>, req: HttpRequest, @@ -256,7 +256,7 @@ pub(crate) async fn http_output( #[post("/pipelines/{pipeline_name}/tables/{table_name}/connectors/{connector_name}/{action}")] pub(crate) async fn post_pipeline_input_connector_action( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path<(String, String, String, String)>, ) -> Result { @@ -343,7 +343,7 @@ pub(crate) async fn post_pipeline_input_connector_action( #[get("/pipelines/{pipeline_name}/tables/{table_name}/connectors/{connector_name}/stats")] pub(crate) async fn get_pipeline_input_connector_status( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path<(String, String, String)>, ) -> Result { @@ -414,7 +414,7 @@ pub(crate) async fn get_pipeline_input_connector_status( #[get("/pipelines/{pipeline_name}/views/{view_name}/connectors/{connector_name}/stats")] pub(crate) async fn get_pipeline_output_connector_status( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path<(String, String, String)>, ) -> Result { @@ -479,7 +479,7 @@ pub(crate) async fn get_pipeline_output_connector_status( #[get("/pipelines/{pipeline_name}/stats")] pub(crate) async fn get_pipeline_stats( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -531,7 +531,7 @@ pub(crate) async fn get_pipeline_stats( #[get("/pipelines/{pipeline_name}/metrics")] pub(crate) async fn get_pipeline_metrics( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, _query: web::Query, @@ -583,7 +583,7 @@ pub(crate) async fn get_pipeline_metrics( #[get("/pipelines/{pipeline_name}/time_series")] pub(crate) async fn get_pipeline_time_series( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -640,7 +640,7 @@ pub(crate) async fn get_pipeline_time_series( #[get("/pipelines/{pipeline_name}/time_series_stream")] pub(crate) async fn get_pipeline_time_series_stream( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -693,7 +693,7 @@ pub(crate) async fn get_pipeline_time_series_stream( #[get("/pipelines/{pipeline_name}/circuit_profile")] pub(crate) async fn get_pipeline_circuit_profile( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -745,7 +745,7 @@ pub(crate) async fn get_pipeline_circuit_profile( #[get("/pipelines/{pipeline_name}/circuit_json_profile")] pub(crate) async fn get_pipeline_circuit_json_profile( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -796,7 +796,7 @@ pub(crate) async fn get_pipeline_circuit_json_profile( #[post("/pipelines/{pipeline_name}/checkpoint/sync")] pub(crate) async fn sync_checkpoint( state: WebData, - _client: WebData, + _client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -862,7 +862,7 @@ pub(crate) async fn sync_checkpoint( #[post("/pipelines/{pipeline_name}/checkpoint")] pub(crate) async fn checkpoint_pipeline( state: WebData, - _client: WebData, + _client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -926,7 +926,7 @@ pub(crate) async fn checkpoint_pipeline( #[get("/pipelines/{pipeline_name}/checkpoint_status")] pub(crate) async fn get_checkpoint_status( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -978,7 +978,7 @@ pub(crate) async fn get_checkpoint_status( #[get("/pipelines/{pipeline_name}/checkpoint/sync_status")] pub(crate) async fn get_checkpoint_sync_status( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -1033,7 +1033,7 @@ pub(crate) async fn get_checkpoint_sync_status( #[get("/pipelines/{pipeline_name}/heap_profile")] pub(crate) async fn get_pipeline_heap_profile( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -1080,7 +1080,7 @@ pub(crate) async fn get_pipeline_heap_profile( #[post("/pipelines/{pipeline_name}/pause")] pub(crate) async fn post_pipeline_pause( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, ) -> Result { @@ -1140,7 +1140,7 @@ pub(crate) async fn post_pipeline_pause( #[post("/pipelines/{pipeline_name}/resume")] pub(crate) async fn post_pipeline_resume( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, ) -> Result { @@ -1211,7 +1211,7 @@ pub(crate) async fn post_pipeline_resume( #[post("/pipelines/{pipeline_name}/activate")] pub(crate) async fn post_pipeline_activate( state: WebData, - _client: WebData, + _client: WebData, tenant_id: ReqData, path: web::Path, _query: web::Query, @@ -1295,7 +1295,7 @@ pub(crate) async fn post_pipeline_activate( #[post("/pipelines/{pipeline_name}/approve")] pub(crate) async fn post_pipeline_approve( state: WebData, - _client: WebData, + _client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -1384,7 +1384,8 @@ fn request_is_websocket(request: &HttpRequest) -> bool { #[get("/pipelines/{pipeline_name}/query")] pub(crate) async fn pipeline_adhoc_sql( state: WebData, - client: WebData, + reqwest_client: WebData, + awc_client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -1395,7 +1396,7 @@ pub(crate) async fn pipeline_adhoc_sql( state .runner .forward_websocket_request_to_pipeline_by_name( - client.as_ref(), + awc_client.as_ref(), *tenant_id, &pipeline_name, "query", @@ -1407,7 +1408,7 @@ pub(crate) async fn pipeline_adhoc_sql( state .runner .forward_streaming_http_request_to_pipeline_by_name( - client.as_ref(), + reqwest_client.as_ref(), *tenant_id, &pipeline_name, "query", @@ -1460,7 +1461,7 @@ pub(crate) async fn pipeline_adhoc_sql( )] pub(crate) async fn completion_token( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path<(String, String, String)>, ) -> Result { @@ -1534,7 +1535,7 @@ pub(crate) async fn completion_token( #[get("/pipelines/{pipeline_name}/completion_status")] pub(crate) async fn completion_status( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -1589,7 +1590,7 @@ pub(crate) async fn completion_status( #[post("/pipelines/{pipeline_name}/start_transaction")] pub(crate) async fn start_transaction( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -1643,7 +1644,7 @@ pub(crate) async fn start_transaction( #[post("/pipelines/{pipeline_name}/commit_transaction")] pub(crate) async fn commit_transaction( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, diff --git a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction/support_bundle.rs b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction/support_bundle.rs index 76d48d87cc..d9513f97cd 100644 --- a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction/support_bundle.rs +++ b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction/support_bundle.rs @@ -152,7 +152,7 @@ impl SupportBundleZip { #[get("/pipelines/{pipeline_name}/support_bundle")] pub(crate) async fn get_pipeline_support_bundle( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, query: web::Query, diff --git a/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs b/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs index 946a949df1..2b224909d5 100644 --- a/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs +++ b/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs @@ -1226,7 +1226,7 @@ pub(crate) async fn post_pipeline_start( #[post("/pipelines/{pipeline_name}/stop")] pub(crate) async fn post_pipeline_stop( state: WebData, - _client: WebData, + _client: WebData, tenant_id: ReqData, path: web::Path, query: web::Query, @@ -1395,7 +1395,7 @@ pub(crate) async fn post_pipeline_clear( )] #[get("/pipelines/{pipeline_name}/logs")] pub(crate) async fn get_pipeline_logs( - client: WebData, + client: WebData, state: WebData, tenant_id: ReqData, path: web::Path, diff --git a/crates/pipeline-manager/src/api/main.rs b/crates/pipeline-manager/src/api/main.rs index 6521ae6ed9..1db1a4d8a0 100644 --- a/crates/pipeline-manager/src/api/main.rs +++ b/crates/pipeline-manager/src/api/main.rs @@ -692,21 +692,28 @@ pub async fn run( } } }; + // We instantiate reqwest::Client that can be used if the api-server needs to + // make outgoing HTTP calls. For websocket connections, awc::Client is created + // per-worker (since it's not Send) using common_config.awc_client(). + // reqwest::Client is cloneable (internally uses Arc), so it can be shared across workers. + let reqwest_client = common_config.reqwest_client().await; + let server = match auth_configuration { - // We instantiate an awc::Client that can be used if the api-server needs to - // make outgoing calls. This object is not meant to have more than one instance - // per thread (otherwise, it causes high resource pressure on both CPU and fds). Some(auth_configuration) => { - let common_config_cloned = common_config.clone(); let api_config = api_config.clone(); let state = state.clone(); + let reqwest_client = reqwest_client.clone(); + // awc::Client is not Send, so we need to create it in each worker + let common_config_for_awc = common_config.clone(); let server = HttpServer::new(move || { let auth_middleware = HttpAuthentication::with_fn(crate::auth::auth_validator); - let client = WebData::new(common_config_cloned.awc_client()); + let reqwest_client = WebData::new(reqwest_client.clone()); + let awc_client = WebData::new(common_config_for_awc.awc_client()); App::new() .app_data(state.clone()) .app_data(auth_configuration.clone()) - .app_data(client) + .app_data(reqwest_client) + .app_data(awc_client) .wrap_fn(|req, srv| { let log_level = if req.method() == Method::GET && req.path() == "/healthz" { Level::Trace @@ -736,14 +743,18 @@ pub async fn run( } } None => { - let common_config_cloned = common_config.clone(); let api_config = api_config.clone(); let state = state.clone(); + let reqwest_client = reqwest_client.clone(); + // awc::Client is not Send, so we need to create it in each worker + let common_config_for_awc = common_config.clone(); let server = HttpServer::new(move || { - let client = WebData::new(common_config_cloned.awc_client()); + let reqwest_client = WebData::new(reqwest_client.clone()); + let awc_client = WebData::new(common_config_for_awc.awc_client()); App::new() .app_data(state.clone()) - .app_data(client) + .app_data(reqwest_client) + .app_data(awc_client) .wrap_fn(|req, srv| { trace!("Request: {} {}", req.method(), req.path()); srv.call(req).map(log_response) @@ -853,20 +864,15 @@ Version: {} v{}{} .unwrap(); rt.block_on(async { - let local = tokio::task::LocalSet::new(); - local - .run_until(async move { - let client = common_config.awc_client(); - let support_collector = SupportDataCollector::new( - state.clone().into_inner(), - client, - api_config.support_data_collection_frequency, - api_config.support_data_retention, - shutdown_rx, - ); - support_collector.run().await; - }) - .await; + let client = common_config.reqwest_client().await; + let support_collector = SupportDataCollector::new( + state.clone().into_inner(), + client, + api_config.support_data_collection_frequency, + api_config.support_data_retention, + shutdown_rx, + ); + support_collector.run().await; }); }) .unwrap(), diff --git a/crates/pipeline-manager/src/api/support_data_collector.rs b/crates/pipeline-manager/src/api/support_data_collector.rs index b628541c2b..4a05cdddf1 100644 --- a/crates/pipeline-manager/src/api/support_data_collector.rs +++ b/crates/pipeline-manager/src/api/support_data_collector.rs @@ -13,10 +13,8 @@ use crate::error::ManagerError; use actix_web::http::Method; use actix_web::rt::time::timeout; use actix_web::HttpResponse; -use awc::Client; use chrono::{DateTime, Utc}; use feldera_types::error::ErrorResponse; -use futures_util::StreamExt; use log::{debug, error, info}; use serde::Deserialize; use std::cmp::min; @@ -84,7 +82,7 @@ impl Default for SupportBundleParameters { /// Fetch data from a pipeline endpoint async fn fetch_pipeline_data( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, endpoint: &str, @@ -107,10 +105,12 @@ async fn fetch_pipeline_data( /// Stream logs from the pipeline with timeout-based termination async fn collect_pipeline_logs( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> Result { + use futures_util::StreamExt; + let mut first_line = true; let next_line_timeout = Duration::from_millis(500); let mut logs = String::with_capacity(4096); @@ -120,18 +120,18 @@ async fn collect_pipeline_logs( .get_logs_from_pipeline(client, tenant_id, pipeline_name) .await?; - let mut response = response; - while let Ok(Some(chunk)) = timeout( + let mut stream = response.bytes_stream(); + while let Ok(Some(chunk_result)) = timeout( if first_line { COLLECTION_TIMEOUT } else { next_line_timeout }, - response.next(), + stream.next(), ) .await { - match chunk { + match chunk_result { Ok(chunk) => { let text = String::from_utf8_lossy(&chunk); logs.push_str(&text); @@ -290,7 +290,7 @@ impl<'de> serde::Deserialize<'de> for SupportBundleData { impl SupportBundleData { pub(crate) async fn collect( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> Result { @@ -330,7 +330,7 @@ impl SupportBundleData { async fn collect_circuit_profile( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> BundleResult> { @@ -349,7 +349,7 @@ impl SupportBundleData { async fn collect_json_circuit_profile( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> BundleResult> { @@ -368,7 +368,7 @@ impl SupportBundleData { async fn collect_heap_profile( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> BundleResult> { @@ -387,7 +387,7 @@ impl SupportBundleData { async fn collect_metrics( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> BundleResult> { @@ -406,7 +406,7 @@ impl SupportBundleData { async fn collect_logs( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> BundleResult { @@ -417,7 +417,7 @@ impl SupportBundleData { async fn collect_stats( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> BundleResult> { @@ -640,7 +640,7 @@ pub struct SupportDataCollector { /// Collection schedule ordered by next collection time schedule: BTreeMap, /// HTTP client for making requests to pipelines - http_client: Client, + http_client: reqwest::Client, /// Shutdown signal receiver shutdown_rx: watch::Receiver, } @@ -654,7 +654,7 @@ impl SupportDataCollector { /// Create a new support data collector pub(crate) fn new( state: Arc, - client: Client, + client: reqwest::Client, collection_frequency: u64, retention_count: u64, shutdown_rx: watch::Receiver, @@ -1234,7 +1234,7 @@ mod tests { // Create support data collector let (shutdown_tx, shutdown_rx) = watch::channel(false); - let http_client = awc::Client::default(); + let http_client = reqwest::Client::new(); let mut collector = SupportDataCollector::new(state.clone(), http_client, 1, 2, shutdown_rx); @@ -1431,7 +1431,7 @@ mod tests { // Create support data collector let (shutdown_tx, shutdown_rx) = watch::channel(false); - let http_client = awc::Client::default(); + let http_client = reqwest::Client::new(); let retention_count = 3; let collector = SupportDataCollector::new(state.clone(), http_client, 1, retention_count, shutdown_rx); @@ -1544,7 +1544,7 @@ mod tests { // Create support data collector let (shutdown_tx, shutdown_rx) = watch::channel(false); - let http_client = awc::Client::default(); + let http_client = reqwest::Client::new(); let mut collector = SupportDataCollector::new(state.clone(), http_client, 1, 2, shutdown_rx); diff --git a/crates/pipeline-manager/src/config.rs b/crates/pipeline-manager/src/config.rs index ab16fb3621..27ddfc6650 100644 --- a/crates/pipeline-manager/src/config.rs +++ b/crates/pipeline-manager/src/config.rs @@ -396,11 +396,19 @@ impl CommonConfig { reqwest::ClientBuilder::new() .https_only(true) // Only connect to HTTPS .add_root_certificate(root_cert) // Add our own TLS certificate which is used - .tls_built_in_root_certs(false) // Other TLS certificates are not used + .tls_built_in_root_certs(false) // Other + // Disable connection caching. Normally, the HTTP client would cache HTTP connections + // to pipelines, but when the pipeline pod gets evicted and restarts on a different + // host or just stops and restarts, this connection is no longer valid, causing + // HTTP errors. This setting disables connection caching. + .pool_max_idle_per_host(0) .build() .expect("HTTPS client should be built") } else { - reqwest::Client::new() + reqwest::ClientBuilder::new() + .pool_max_idle_per_host(0) + .build() + .expect("HTTP client should be built") } } diff --git a/crates/pipeline-manager/src/runner/interaction.rs b/crates/pipeline-manager/src/runner/interaction.rs index 16cff4f60a..6378fdeb14 100644 --- a/crates/pipeline-manager/src/runner/interaction.rs +++ b/crates/pipeline-manager/src/runner/interaction.rs @@ -8,8 +8,6 @@ use crate::error::ManagerError; use crate::runner::error::RunnerError; use actix_web::{http::Method, web::Payload, HttpRequest, HttpResponse, HttpResponseBuilder}; use actix_ws::{CloseCode, CloseReason}; -use awc::error::{ConnectError, SendRequestError}; -use awc::{ClientRequest, ClientResponse}; use crossbeam::sync::ShardedLock; use feldera_types::query::MAX_WS_FRAME_SIZE; use log::{error, info}; @@ -20,12 +18,10 @@ use tokio::time::Instant; use crate::db::listen_table::PIPELINE_NOTIFY_CHANNEL_CAPACITY; use crate::db::types::resources_status::ResourcesStatus; -use actix_http::encoding::Decoder; use feldera_types::runtime_status::RuntimeStatus; /// Max non-streaming HTTP response body returned by the pipeline. -/// The awc default is 2MiB, which is not enough to, for example, retrieve -/// a large circuit profile. +/// 20 MiB limit is used, which is enough to retrieve large circuit profiles. const RESPONSE_SIZE_LIMIT: usize = 20 * 1024 * 1024; pub(crate) struct CachedPipelineDescr { @@ -203,7 +199,7 @@ impl RunnerInteraction { #[allow(clippy::too_many_arguments)] pub async fn forward_http_request_to_pipeline( common_config: &CommonConfig, - client: &awc::Client, + client: &reqwest::Client, pipeline_name: &str, location: &str, method: Method, @@ -223,36 +219,56 @@ impl RunnerInteraction { query_string, ); let timeout = timeout.unwrap_or(Self::PIPELINE_HTTP_REQUEST_TIMEOUT); - let request = client.request(method, &url).timeout(timeout).force_close(); - let request_str = Self::format_request(&request); - let mut original_response = request.send().await.map_err(|e| match e { - SendRequestError::Timeout => RunnerError::PipelineInteractionUnreachable { - pipeline_name: pipeline_name.to_string(), - request: request_str.clone(), - error: format_timeout_error_message(timeout, e), - }, - SendRequestError::Connect(ConnectError::Disconnected) => { + // Convert actix Method to reqwest Method + let reqwest_method = match method { + Method::GET => reqwest::Method::GET, + Method::POST => reqwest::Method::POST, + Method::PUT => reqwest::Method::PUT, + Method::DELETE => reqwest::Method::DELETE, + Method::PATCH => reqwest::Method::PATCH, + Method::OPTIONS => reqwest::Method::OPTIONS, + Method::HEAD => reqwest::Method::HEAD, + _ => reqwest::Method::GET, + }; + + let request = client.request(reqwest_method, &url).timeout(timeout); + let request_str = format!("{} {}", method, url); + + let original_response = request.send().await.map_err(|e| { + if e.is_timeout() { + RunnerError::PipelineInteractionUnreachable { + pipeline_name: pipeline_name.to_string(), + request: request_str.clone(), + error: format_timeout_error_message(timeout, e), + } + } else if e.is_connect() { RunnerError::PipelineInteractionUnreachable { pipeline_name: pipeline_name.to_string(), request: request_str.clone(), error: format_disconnected_error_message(e), } + } else { + RunnerError::PipelineInteractionUnreachable { + pipeline_name: pipeline_name.to_string(), + request: request_str.clone(), + error: format!("unable to send request due to: {e}"), + } } - _ => RunnerError::PipelineInteractionUnreachable { - pipeline_name: pipeline_name.to_string(), - request: request_str.clone(), - error: format!("unable to send request due to: {e}"), - }, })?; + let status = original_response.status(); if !status.is_success() { info!("HTTP request to pipeline '{pipeline_name}' returned status code {status}. Failed request: {request_str}"); } + // Convert reqwest::StatusCode to actix_http::StatusCode + let actix_status = actix_web::http::StatusCode::from_u16(status.as_u16()) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR); + // Build the HTTP response with the original status - let mut response_builder = HttpResponse::build(status); + let mut response_builder = HttpResponse::build(actix_status); // Add all the same headers as the original response, // excluding `Connection` as this is proxy, as per: @@ -260,21 +276,29 @@ impl RunnerInteraction { for (header_name, header_value) in original_response .headers() .iter() - .filter(|(h, _)| *h != "connection") + .filter(|(h, _)| h.as_str() != "connection") { - response_builder.insert_header((header_name.clone(), header_value.clone())); + response_builder.insert_header((header_name.as_str(), header_value.as_bytes())); } // Copy over the original response body - let response_body = original_response - .body() - .limit(RESPONSE_SIZE_LIMIT) - .await - .map_err(|e| RunnerError::PipelineInteractionInvalidResponse { + let response_bytes = original_response.bytes().await.map_err(|e| { + RunnerError::PipelineInteractionInvalidResponse { pipeline_name: pipeline_name.to_string(), - error: format!("unable to reconstruct response body due to: {e}"), - })?; - Ok(response_builder.body(response_body)) + error: format!("unable to read response body due to: {e}"), + } + })?; + + // Check size limit + if response_bytes.len() > RESPONSE_SIZE_LIMIT { + return Err(RunnerError::PipelineInteractionInvalidResponse { + pipeline_name: pipeline_name.to_string(), + error: format!("response body too large: {} bytes", response_bytes.len()), + } + .into()); + } + + Ok(response_builder.body(response_bytes.to_vec())) } /// Makes a new HTTP request without body to the pipeline. @@ -285,7 +309,7 @@ impl RunnerInteraction { #[allow(clippy::too_many_arguments)] pub(crate) async fn forward_http_request_to_pipeline_by_name( &self, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, method: Method, @@ -464,7 +488,7 @@ impl RunnerInteraction { #[allow(clippy::too_many_arguments)] pub(crate) async fn forward_streaming_http_request_to_pipeline_by_name( &self, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, endpoint: &str, @@ -472,6 +496,8 @@ impl RunnerInteraction { body: Payload, timeout: Option, // If no timeout is specified, a default timeout is used ) -> Result { + use futures_util::StreamExt; + let (location, _cache_hit) = self.check_pipeline(tenant_id, pipeline_name).await?; // Build new request to pipeline @@ -486,42 +512,85 @@ impl RunnerInteraction { request.query_string(), ); let timeout = timeout.unwrap_or(Self::PIPELINE_HTTP_REQUEST_TIMEOUT); - let mut new_request = client - .request(request.method().clone(), &url) - .timeout(timeout) - .force_close(); + + // Convert actix Method to reqwest Method + let reqwest_method = match *request.method() { + Method::GET => reqwest::Method::GET, + Method::POST => reqwest::Method::POST, + Method::PUT => reqwest::Method::PUT, + Method::DELETE => reqwest::Method::DELETE, + Method::PATCH => reqwest::Method::PATCH, + Method::OPTIONS => reqwest::Method::OPTIONS, + Method::HEAD => reqwest::Method::HEAD, + _ => reqwest::Method::GET, + }; + + let mut new_request = client.request(reqwest_method, &url); // Add headers of the original request - for header in request + for (header_name, header_value) in request .headers() .into_iter() .filter(|(h, _)| *h != "connection") { - new_request = new_request.append_header(header); + new_request = new_request.header(header_name.as_str(), header_value.as_bytes()); } - let request_str = Self::format_request(&new_request); + // Convert the actix Payload stream to a Send stream for reqwest + // We need to use a channel because Payload contains non-Send types (Rc) + let (tx, rx) = + tokio::sync::mpsc::channel::>(16); - // Perform request to the pipeline - let response = new_request.send_stream(body).await.map_err(|e| match e { - SendRequestError::Timeout => RunnerError::PipelineInteractionUnreachable { - pipeline_name: pipeline_name.to_string(), - request: request_str.to_string(), - error: format_timeout_error_message(timeout, e), - }, - SendRequestError::Connect(ConnectError::Disconnected) => { - RunnerError::PipelineInteractionUnreachable { - pipeline_name: pipeline_name.to_string(), - request: request_str.to_string(), - error: format_disconnected_error_message(e), + // Spawn a task to read from the Payload and send to the channel + actix_web::rt::spawn(async move { + let mut payload = body; + while let Some(chunk) = payload.next().await { + match chunk { + Ok(bytes) => { + if tx.send(Ok(bytes)).await.is_err() { + break; // Receiver dropped + } + } + Err(e) => { + let _ = tx.send(Err(std::io::Error::other(e))).await; + break; + } } } - _ => RunnerError::PipelineInteractionUnreachable { + }); + + // Create a stream from the receiver + let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx); + let new_request = new_request.body(reqwest::Body::wrap_stream(body_stream)); + + let request_str = format!("{} {}", request.method(), url); + + // Perform request to the pipeline with timeout only for receiving response status/headers + let response = tokio::time::timeout(timeout, new_request.send()) + .await + .map_err(|_| RunnerError::PipelineInteractionUnreachable { pipeline_name: pipeline_name.to_string(), - request: request_str.to_string(), - error: format!("unable to send request due to: {e}"), - }, - })?; + request: request_str.clone(), + error: format_timeout_error_message( + timeout, + "timed out waiting for response status", + ), + })? + .map_err(|e| { + if e.is_connect() { + RunnerError::PipelineInteractionUnreachable { + pipeline_name: pipeline_name.to_string(), + request: request_str.clone(), + error: format_disconnected_error_message(e), + } + } else { + RunnerError::PipelineInteractionUnreachable { + pipeline_name: pipeline_name.to_string(), + request: request_str.clone(), + error: format!("unable to send request due to: {e}"), + } + } + })?; let status = response.status(); @@ -529,20 +598,58 @@ impl RunnerInteraction { info!("HTTP request to pipeline '{pipeline_name}' returned status code {status}. Failed request: {request_str}"); } + // Convert reqwest::StatusCode to actix_http::StatusCode + let actix_status = actix_web::http::StatusCode::from_u16(status.as_u16()) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR); + // Build the new HTTP response with the same status, headers and streaming body - let mut builder = HttpResponseBuilder::new(status); - for header in response.headers().into_iter() { - builder.append_header(header); + let mut builder = HttpResponseBuilder::new(actix_status); + for (header_name, header_value) in response.headers().into_iter() { + builder.insert_header((header_name.as_str(), header_value.as_bytes())); } - Ok(builder.streaming(response)) + + // let request_str_clone = request_str.clone(); + + // Convert reqwest streaming response to actix streaming response + // Both reqwest and actix use the same Bytes type from the bytes crate, so no conversion needed + // Handle errors gracefully to avoid "response ended prematurely" on client disconnects + let stream = response + .bytes_stream() + // .inspect(move |result| { + // match result { + // Ok(bytes) => println!("Stream {request_str_clone} chunk: {} bytes - {:?}", bytes.len(), bytes), + // Err(e) => println!("Stream {request_str_clone} error: {:?}", e), + // } + // }) + + // When connection to the pipeline is lost, e.g., because the pipeline was killed, + // this will cleanly terminate the HTTP response. + // I am not sure this is the correct behavior, but it appears that this is how this worked + // when we used awc, and this is what Python SDK expects; otherwise it throws + // "response ended prematurely" and similar exceptions. + .take_while(move |result| { + let should_continue = result.is_ok(); + if let Err(e) = result { + // Log the error but don't propagate it to avoid client-side errors + info!( + "Stream ({}) ended due to error (likely client disconnect or network issue): {:?}", + request_str, + e + ); + } + futures_util::future::ready(should_continue) + }) + .map(|result| result.map_err(std::io::Error::other)); + + Ok(builder.streaming(stream)) } pub(crate) async fn get_logs_from_pipeline( &self, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, - ) -> Result>, ManagerError> { + ) -> Result { // Retrieve pipeline let pipeline = self .db @@ -564,23 +671,29 @@ impl RunnerInteraction { pipeline.id ); - // Perform request to the runner - let response = client - .request(Method::GET, &url) - .timeout(Self::RUNNER_HTTP_REQUEST_TIMEOUT) - .send() - .await - .map_err(|e| match e { - SendRequestError::Timeout => RunnerError::RunnerInteractionUnreachable { - error: format_runner_timeout_error_message( - Self::RUNNER_HTTP_REQUEST_TIMEOUT, - e, - ), - }, - _ => RunnerError::RunnerInteractionUnreachable { + // Perform request to the runner with timeout only for receiving response status/headers + let response = tokio::time::timeout( + Self::RUNNER_HTTP_REQUEST_TIMEOUT, + client.request(reqwest::Method::GET, &url).send(), + ) + .await + .map_err(|_| RunnerError::RunnerInteractionUnreachable { + error: format_runner_timeout_error_message( + Self::RUNNER_HTTP_REQUEST_TIMEOUT, + "timed out waiting for response status", + ), + })? + .map_err(|e| { + if e.is_connect() { + RunnerError::RunnerInteractionUnreachable { + error: format_disconnected_error_message(e), + } + } else { + RunnerError::RunnerInteractionUnreachable { error: format!("unable to send request due to: {e}"), - }, - })?; + } + } + })?; Ok(response) } @@ -591,30 +704,47 @@ impl RunnerInteraction { /// provided tenant identifier and pipeline name. pub(crate) async fn http_streaming_logs_from_pipeline_by_name( &self, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> Result { + use futures_util::StreamExt; + // Perform request to the runner let response = self .get_logs_from_pipeline(client, tenant_id, pipeline_name) .await?; + let status = response.status(); + + // Convert reqwest::StatusCode to actix_http::StatusCode + let actix_status = actix_web::http::StatusCode::from_u16(status.as_u16()) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR); + // Build the HTTP response with the same status, headers and streaming body - let mut builder = HttpResponseBuilder::new(response.status()); - for header in response.headers().into_iter() { - builder.append_header(header); + let mut builder = HttpResponseBuilder::new(actix_status); + for (header_name, header_value) in response.headers().into_iter() { + builder.insert_header((header_name.as_str(), header_value.as_bytes())); } - Ok(builder.streaming(response)) - } - /// Format HTTP request for logging. - fn format_request(request: &ClientRequest) -> String { - format!( - "{:?} {} {}", - request.get_version(), - request.get_method(), - request.get_uri() - ) + // Convert reqwest streaming response to actix streaming response + // Both reqwest and actix use the same Bytes type from the bytes crate, so no conversion needed + // Handle errors gracefully to avoid "response ended prematurely" on client disconnects + let stream = response + .bytes_stream() + .take_while(|result| { + let should_continue = result.is_ok(); + if let Err(e) = result { + // Log the error but don't propagate it to avoid client-side errors + info!( + "Log stream ended due to error (likely client disconnect or network issue): {}", + e + ); + } + futures_util::future::ready(should_continue) + }) + .map(|result| result.map_err(std::io::Error::other)); + + Ok(builder.streaming(stream)) } }