From cc894a24e1e1008d3439e67e307bdf96511cd661 Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Mon, 3 Nov 2025 16:59:12 -0600 Subject: [PATCH] [manager] Switch from awc to reqwest. This patch replaces `awc` with `reqwest` as the HTTP client for talking to the pipeline. The main reason for this are various hard-to-explain HTTP errors we've observed in CI. On top of this, awc doesn't support a clean way to disable connection caching. It can only be achieved by setting the `connection: Close` header, which we suspect confuses the server. In contrast, reqwest supports setting connection pool size to 0. This commit was mostly generated by cursor. There are a couple of subtle points here (which cursor missed) - In awc, ClientRequest::timeout() controls timeout from request start till HTTP response header is returned. reqwest doesn't have a similar method, so we use tokio::timeout instead. - reqwest seems to handle disconnects differently from awc: in awc when the pipeline drops its end of a streaming response, the byte stream simply ends (without an error); in reqwest, the stream contains an error, which gets propagated to the API client. This may be the right thing to do, but currently the Python SDK doesn't expect that. We simulate the old behavior to avoid a breaking change. Signed-off-by: Leonid Ryzhyk --- .../src/api/endpoints/config.rs | 2 +- .../src/api/endpoints/metrics.rs | 2 +- .../src/api/endpoints/pipeline_interaction.rs | 55 +-- .../pipeline_interaction/support_bundle.rs | 2 +- .../src/api/endpoints/pipeline_management.rs | 4 +- crates/pipeline-manager/src/api/main.rs | 52 +-- .../src/api/support_data_collector.rs | 40 +-- crates/pipeline-manager/src/config.rs | 12 +- .../src/runner/interaction.rs | 316 ++++++++++++------ 9 files changed, 315 insertions(+), 170 deletions(-) diff --git a/crates/pipeline-manager/src/api/endpoints/config.rs b/crates/pipeline-manager/src/api/endpoints/config.rs index 7f91bf3be6..a1b8f8d69f 100644 --- a/crates/pipeline-manager/src/api/endpoints/config.rs +++ b/crates/pipeline-manager/src/api/endpoints/config.rs @@ -146,7 +146,7 @@ impl Configuration { #[get("/config")] pub(crate) async fn get_config( state: WebData, - _client: WebData, + _client: WebData, _tenant_id: ReqData, ) -> Result { let config = Configuration::gather(&state).await; diff --git a/crates/pipeline-manager/src/api/endpoints/metrics.rs b/crates/pipeline-manager/src/api/endpoints/metrics.rs index 8e5d0d5eea..6709355572 100644 --- a/crates/pipeline-manager/src/api/endpoints/metrics.rs +++ b/crates/pipeline-manager/src/api/endpoints/metrics.rs @@ -29,7 +29,7 @@ use feldera_types::runtime_status::RuntimeStatus; #[get("/metrics")] pub(crate) async fn get_metrics( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, ) -> Result { let pipelines = state diff --git a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs index be5556f57c..a1a9f8972c 100644 --- a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs +++ b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs @@ -89,7 +89,7 @@ pub mod support_bundle; #[post("/pipelines/{pipeline_name}/ingress/{table_name}")] pub(crate) async fn http_input( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path<(String, String)>, req: HttpRequest, @@ -170,7 +170,7 @@ pub(crate) async fn http_input( #[post("/pipelines/{pipeline_name}/egress/{table_name}")] pub(crate) async fn http_output( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path<(String, String)>, req: HttpRequest, @@ -256,7 +256,7 @@ pub(crate) async fn http_output( #[post("/pipelines/{pipeline_name}/tables/{table_name}/connectors/{connector_name}/{action}")] pub(crate) async fn post_pipeline_input_connector_action( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path<(String, String, String, String)>, ) -> Result { @@ -343,7 +343,7 @@ pub(crate) async fn post_pipeline_input_connector_action( #[get("/pipelines/{pipeline_name}/tables/{table_name}/connectors/{connector_name}/stats")] pub(crate) async fn get_pipeline_input_connector_status( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path<(String, String, String)>, ) -> Result { @@ -414,7 +414,7 @@ pub(crate) async fn get_pipeline_input_connector_status( #[get("/pipelines/{pipeline_name}/views/{view_name}/connectors/{connector_name}/stats")] pub(crate) async fn get_pipeline_output_connector_status( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path<(String, String, String)>, ) -> Result { @@ -479,7 +479,7 @@ pub(crate) async fn get_pipeline_output_connector_status( #[get("/pipelines/{pipeline_name}/stats")] pub(crate) async fn get_pipeline_stats( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -531,7 +531,7 @@ pub(crate) async fn get_pipeline_stats( #[get("/pipelines/{pipeline_name}/metrics")] pub(crate) async fn get_pipeline_metrics( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, _query: web::Query, @@ -583,7 +583,7 @@ pub(crate) async fn get_pipeline_metrics( #[get("/pipelines/{pipeline_name}/time_series")] pub(crate) async fn get_pipeline_time_series( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -640,7 +640,7 @@ pub(crate) async fn get_pipeline_time_series( #[get("/pipelines/{pipeline_name}/time_series_stream")] pub(crate) async fn get_pipeline_time_series_stream( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -693,7 +693,7 @@ pub(crate) async fn get_pipeline_time_series_stream( #[get("/pipelines/{pipeline_name}/circuit_profile")] pub(crate) async fn get_pipeline_circuit_profile( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -745,7 +745,7 @@ pub(crate) async fn get_pipeline_circuit_profile( #[get("/pipelines/{pipeline_name}/circuit_json_profile")] pub(crate) async fn get_pipeline_circuit_json_profile( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -796,7 +796,7 @@ pub(crate) async fn get_pipeline_circuit_json_profile( #[post("/pipelines/{pipeline_name}/checkpoint/sync")] pub(crate) async fn sync_checkpoint( state: WebData, - _client: WebData, + _client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -862,7 +862,7 @@ pub(crate) async fn sync_checkpoint( #[post("/pipelines/{pipeline_name}/checkpoint")] pub(crate) async fn checkpoint_pipeline( state: WebData, - _client: WebData, + _client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -926,7 +926,7 @@ pub(crate) async fn checkpoint_pipeline( #[get("/pipelines/{pipeline_name}/checkpoint_status")] pub(crate) async fn get_checkpoint_status( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -978,7 +978,7 @@ pub(crate) async fn get_checkpoint_status( #[get("/pipelines/{pipeline_name}/checkpoint/sync_status")] pub(crate) async fn get_checkpoint_sync_status( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -1033,7 +1033,7 @@ pub(crate) async fn get_checkpoint_sync_status( #[get("/pipelines/{pipeline_name}/heap_profile")] pub(crate) async fn get_pipeline_heap_profile( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -1080,7 +1080,7 @@ pub(crate) async fn get_pipeline_heap_profile( #[post("/pipelines/{pipeline_name}/pause")] pub(crate) async fn post_pipeline_pause( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, ) -> Result { @@ -1140,7 +1140,7 @@ pub(crate) async fn post_pipeline_pause( #[post("/pipelines/{pipeline_name}/resume")] pub(crate) async fn post_pipeline_resume( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, ) -> Result { @@ -1211,7 +1211,7 @@ pub(crate) async fn post_pipeline_resume( #[post("/pipelines/{pipeline_name}/activate")] pub(crate) async fn post_pipeline_activate( state: WebData, - _client: WebData, + _client: WebData, tenant_id: ReqData, path: web::Path, _query: web::Query, @@ -1295,7 +1295,7 @@ pub(crate) async fn post_pipeline_activate( #[post("/pipelines/{pipeline_name}/approve")] pub(crate) async fn post_pipeline_approve( state: WebData, - _client: WebData, + _client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -1384,7 +1384,8 @@ fn request_is_websocket(request: &HttpRequest) -> bool { #[get("/pipelines/{pipeline_name}/query")] pub(crate) async fn pipeline_adhoc_sql( state: WebData, - client: WebData, + reqwest_client: WebData, + awc_client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -1395,7 +1396,7 @@ pub(crate) async fn pipeline_adhoc_sql( state .runner .forward_websocket_request_to_pipeline_by_name( - client.as_ref(), + awc_client.as_ref(), *tenant_id, &pipeline_name, "query", @@ -1407,7 +1408,7 @@ pub(crate) async fn pipeline_adhoc_sql( state .runner .forward_streaming_http_request_to_pipeline_by_name( - client.as_ref(), + reqwest_client.as_ref(), *tenant_id, &pipeline_name, "query", @@ -1460,7 +1461,7 @@ pub(crate) async fn pipeline_adhoc_sql( )] pub(crate) async fn completion_token( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path<(String, String, String)>, ) -> Result { @@ -1534,7 +1535,7 @@ pub(crate) async fn completion_token( #[get("/pipelines/{pipeline_name}/completion_status")] pub(crate) async fn completion_status( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -1589,7 +1590,7 @@ pub(crate) async fn completion_status( #[post("/pipelines/{pipeline_name}/start_transaction")] pub(crate) async fn start_transaction( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, @@ -1643,7 +1644,7 @@ pub(crate) async fn start_transaction( #[post("/pipelines/{pipeline_name}/commit_transaction")] pub(crate) async fn commit_transaction( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, request: HttpRequest, diff --git a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction/support_bundle.rs b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction/support_bundle.rs index 76d48d87cc..d9513f97cd 100644 --- a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction/support_bundle.rs +++ b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction/support_bundle.rs @@ -152,7 +152,7 @@ impl SupportBundleZip { #[get("/pipelines/{pipeline_name}/support_bundle")] pub(crate) async fn get_pipeline_support_bundle( state: WebData, - client: WebData, + client: WebData, tenant_id: ReqData, path: web::Path, query: web::Query, diff --git a/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs b/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs index 946a949df1..2b224909d5 100644 --- a/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs +++ b/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs @@ -1226,7 +1226,7 @@ pub(crate) async fn post_pipeline_start( #[post("/pipelines/{pipeline_name}/stop")] pub(crate) async fn post_pipeline_stop( state: WebData, - _client: WebData, + _client: WebData, tenant_id: ReqData, path: web::Path, query: web::Query, @@ -1395,7 +1395,7 @@ pub(crate) async fn post_pipeline_clear( )] #[get("/pipelines/{pipeline_name}/logs")] pub(crate) async fn get_pipeline_logs( - client: WebData, + client: WebData, state: WebData, tenant_id: ReqData, path: web::Path, diff --git a/crates/pipeline-manager/src/api/main.rs b/crates/pipeline-manager/src/api/main.rs index 6521ae6ed9..1db1a4d8a0 100644 --- a/crates/pipeline-manager/src/api/main.rs +++ b/crates/pipeline-manager/src/api/main.rs @@ -692,21 +692,28 @@ pub async fn run( } } }; + // We instantiate reqwest::Client that can be used if the api-server needs to + // make outgoing HTTP calls. For websocket connections, awc::Client is created + // per-worker (since it's not Send) using common_config.awc_client(). + // reqwest::Client is cloneable (internally uses Arc), so it can be shared across workers. + let reqwest_client = common_config.reqwest_client().await; + let server = match auth_configuration { - // We instantiate an awc::Client that can be used if the api-server needs to - // make outgoing calls. This object is not meant to have more than one instance - // per thread (otherwise, it causes high resource pressure on both CPU and fds). Some(auth_configuration) => { - let common_config_cloned = common_config.clone(); let api_config = api_config.clone(); let state = state.clone(); + let reqwest_client = reqwest_client.clone(); + // awc::Client is not Send, so we need to create it in each worker + let common_config_for_awc = common_config.clone(); let server = HttpServer::new(move || { let auth_middleware = HttpAuthentication::with_fn(crate::auth::auth_validator); - let client = WebData::new(common_config_cloned.awc_client()); + let reqwest_client = WebData::new(reqwest_client.clone()); + let awc_client = WebData::new(common_config_for_awc.awc_client()); App::new() .app_data(state.clone()) .app_data(auth_configuration.clone()) - .app_data(client) + .app_data(reqwest_client) + .app_data(awc_client) .wrap_fn(|req, srv| { let log_level = if req.method() == Method::GET && req.path() == "/healthz" { Level::Trace @@ -736,14 +743,18 @@ pub async fn run( } } None => { - let common_config_cloned = common_config.clone(); let api_config = api_config.clone(); let state = state.clone(); + let reqwest_client = reqwest_client.clone(); + // awc::Client is not Send, so we need to create it in each worker + let common_config_for_awc = common_config.clone(); let server = HttpServer::new(move || { - let client = WebData::new(common_config_cloned.awc_client()); + let reqwest_client = WebData::new(reqwest_client.clone()); + let awc_client = WebData::new(common_config_for_awc.awc_client()); App::new() .app_data(state.clone()) - .app_data(client) + .app_data(reqwest_client) + .app_data(awc_client) .wrap_fn(|req, srv| { trace!("Request: {} {}", req.method(), req.path()); srv.call(req).map(log_response) @@ -853,20 +864,15 @@ Version: {} v{}{} .unwrap(); rt.block_on(async { - let local = tokio::task::LocalSet::new(); - local - .run_until(async move { - let client = common_config.awc_client(); - let support_collector = SupportDataCollector::new( - state.clone().into_inner(), - client, - api_config.support_data_collection_frequency, - api_config.support_data_retention, - shutdown_rx, - ); - support_collector.run().await; - }) - .await; + let client = common_config.reqwest_client().await; + let support_collector = SupportDataCollector::new( + state.clone().into_inner(), + client, + api_config.support_data_collection_frequency, + api_config.support_data_retention, + shutdown_rx, + ); + support_collector.run().await; }); }) .unwrap(), diff --git a/crates/pipeline-manager/src/api/support_data_collector.rs b/crates/pipeline-manager/src/api/support_data_collector.rs index b628541c2b..4a05cdddf1 100644 --- a/crates/pipeline-manager/src/api/support_data_collector.rs +++ b/crates/pipeline-manager/src/api/support_data_collector.rs @@ -13,10 +13,8 @@ use crate::error::ManagerError; use actix_web::http::Method; use actix_web::rt::time::timeout; use actix_web::HttpResponse; -use awc::Client; use chrono::{DateTime, Utc}; use feldera_types::error::ErrorResponse; -use futures_util::StreamExt; use log::{debug, error, info}; use serde::Deserialize; use std::cmp::min; @@ -84,7 +82,7 @@ impl Default for SupportBundleParameters { /// Fetch data from a pipeline endpoint async fn fetch_pipeline_data( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, endpoint: &str, @@ -107,10 +105,12 @@ async fn fetch_pipeline_data( /// Stream logs from the pipeline with timeout-based termination async fn collect_pipeline_logs( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> Result { + use futures_util::StreamExt; + let mut first_line = true; let next_line_timeout = Duration::from_millis(500); let mut logs = String::with_capacity(4096); @@ -120,18 +120,18 @@ async fn collect_pipeline_logs( .get_logs_from_pipeline(client, tenant_id, pipeline_name) .await?; - let mut response = response; - while let Ok(Some(chunk)) = timeout( + let mut stream = response.bytes_stream(); + while let Ok(Some(chunk_result)) = timeout( if first_line { COLLECTION_TIMEOUT } else { next_line_timeout }, - response.next(), + stream.next(), ) .await { - match chunk { + match chunk_result { Ok(chunk) => { let text = String::from_utf8_lossy(&chunk); logs.push_str(&text); @@ -290,7 +290,7 @@ impl<'de> serde::Deserialize<'de> for SupportBundleData { impl SupportBundleData { pub(crate) async fn collect( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> Result { @@ -330,7 +330,7 @@ impl SupportBundleData { async fn collect_circuit_profile( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> BundleResult> { @@ -349,7 +349,7 @@ impl SupportBundleData { async fn collect_json_circuit_profile( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> BundleResult> { @@ -368,7 +368,7 @@ impl SupportBundleData { async fn collect_heap_profile( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> BundleResult> { @@ -387,7 +387,7 @@ impl SupportBundleData { async fn collect_metrics( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> BundleResult> { @@ -406,7 +406,7 @@ impl SupportBundleData { async fn collect_logs( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> BundleResult { @@ -417,7 +417,7 @@ impl SupportBundleData { async fn collect_stats( state: &ServerState, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> BundleResult> { @@ -640,7 +640,7 @@ pub struct SupportDataCollector { /// Collection schedule ordered by next collection time schedule: BTreeMap, /// HTTP client for making requests to pipelines - http_client: Client, + http_client: reqwest::Client, /// Shutdown signal receiver shutdown_rx: watch::Receiver, } @@ -654,7 +654,7 @@ impl SupportDataCollector { /// Create a new support data collector pub(crate) fn new( state: Arc, - client: Client, + client: reqwest::Client, collection_frequency: u64, retention_count: u64, shutdown_rx: watch::Receiver, @@ -1234,7 +1234,7 @@ mod tests { // Create support data collector let (shutdown_tx, shutdown_rx) = watch::channel(false); - let http_client = awc::Client::default(); + let http_client = reqwest::Client::new(); let mut collector = SupportDataCollector::new(state.clone(), http_client, 1, 2, shutdown_rx); @@ -1431,7 +1431,7 @@ mod tests { // Create support data collector let (shutdown_tx, shutdown_rx) = watch::channel(false); - let http_client = awc::Client::default(); + let http_client = reqwest::Client::new(); let retention_count = 3; let collector = SupportDataCollector::new(state.clone(), http_client, 1, retention_count, shutdown_rx); @@ -1544,7 +1544,7 @@ mod tests { // Create support data collector let (shutdown_tx, shutdown_rx) = watch::channel(false); - let http_client = awc::Client::default(); + let http_client = reqwest::Client::new(); let mut collector = SupportDataCollector::new(state.clone(), http_client, 1, 2, shutdown_rx); diff --git a/crates/pipeline-manager/src/config.rs b/crates/pipeline-manager/src/config.rs index ab16fb3621..27ddfc6650 100644 --- a/crates/pipeline-manager/src/config.rs +++ b/crates/pipeline-manager/src/config.rs @@ -396,11 +396,19 @@ impl CommonConfig { reqwest::ClientBuilder::new() .https_only(true) // Only connect to HTTPS .add_root_certificate(root_cert) // Add our own TLS certificate which is used - .tls_built_in_root_certs(false) // Other TLS certificates are not used + .tls_built_in_root_certs(false) // Other + // Disable connection caching. Normally, the HTTP client would cache HTTP connections + // to pipelines, but when the pipeline pod gets evicted and restarts on a different + // host or just stops and restarts, this connection is no longer valid, causing + // HTTP errors. This setting disables connection caching. + .pool_max_idle_per_host(0) .build() .expect("HTTPS client should be built") } else { - reqwest::Client::new() + reqwest::ClientBuilder::new() + .pool_max_idle_per_host(0) + .build() + .expect("HTTP client should be built") } } diff --git a/crates/pipeline-manager/src/runner/interaction.rs b/crates/pipeline-manager/src/runner/interaction.rs index 16cff4f60a..6378fdeb14 100644 --- a/crates/pipeline-manager/src/runner/interaction.rs +++ b/crates/pipeline-manager/src/runner/interaction.rs @@ -8,8 +8,6 @@ use crate::error::ManagerError; use crate::runner::error::RunnerError; use actix_web::{http::Method, web::Payload, HttpRequest, HttpResponse, HttpResponseBuilder}; use actix_ws::{CloseCode, CloseReason}; -use awc::error::{ConnectError, SendRequestError}; -use awc::{ClientRequest, ClientResponse}; use crossbeam::sync::ShardedLock; use feldera_types::query::MAX_WS_FRAME_SIZE; use log::{error, info}; @@ -20,12 +18,10 @@ use tokio::time::Instant; use crate::db::listen_table::PIPELINE_NOTIFY_CHANNEL_CAPACITY; use crate::db::types::resources_status::ResourcesStatus; -use actix_http::encoding::Decoder; use feldera_types::runtime_status::RuntimeStatus; /// Max non-streaming HTTP response body returned by the pipeline. -/// The awc default is 2MiB, which is not enough to, for example, retrieve -/// a large circuit profile. +/// 20 MiB limit is used, which is enough to retrieve large circuit profiles. const RESPONSE_SIZE_LIMIT: usize = 20 * 1024 * 1024; pub(crate) struct CachedPipelineDescr { @@ -203,7 +199,7 @@ impl RunnerInteraction { #[allow(clippy::too_many_arguments)] pub async fn forward_http_request_to_pipeline( common_config: &CommonConfig, - client: &awc::Client, + client: &reqwest::Client, pipeline_name: &str, location: &str, method: Method, @@ -223,36 +219,56 @@ impl RunnerInteraction { query_string, ); let timeout = timeout.unwrap_or(Self::PIPELINE_HTTP_REQUEST_TIMEOUT); - let request = client.request(method, &url).timeout(timeout).force_close(); - let request_str = Self::format_request(&request); - let mut original_response = request.send().await.map_err(|e| match e { - SendRequestError::Timeout => RunnerError::PipelineInteractionUnreachable { - pipeline_name: pipeline_name.to_string(), - request: request_str.clone(), - error: format_timeout_error_message(timeout, e), - }, - SendRequestError::Connect(ConnectError::Disconnected) => { + // Convert actix Method to reqwest Method + let reqwest_method = match method { + Method::GET => reqwest::Method::GET, + Method::POST => reqwest::Method::POST, + Method::PUT => reqwest::Method::PUT, + Method::DELETE => reqwest::Method::DELETE, + Method::PATCH => reqwest::Method::PATCH, + Method::OPTIONS => reqwest::Method::OPTIONS, + Method::HEAD => reqwest::Method::HEAD, + _ => reqwest::Method::GET, + }; + + let request = client.request(reqwest_method, &url).timeout(timeout); + let request_str = format!("{} {}", method, url); + + let original_response = request.send().await.map_err(|e| { + if e.is_timeout() { + RunnerError::PipelineInteractionUnreachable { + pipeline_name: pipeline_name.to_string(), + request: request_str.clone(), + error: format_timeout_error_message(timeout, e), + } + } else if e.is_connect() { RunnerError::PipelineInteractionUnreachable { pipeline_name: pipeline_name.to_string(), request: request_str.clone(), error: format_disconnected_error_message(e), } + } else { + RunnerError::PipelineInteractionUnreachable { + pipeline_name: pipeline_name.to_string(), + request: request_str.clone(), + error: format!("unable to send request due to: {e}"), + } } - _ => RunnerError::PipelineInteractionUnreachable { - pipeline_name: pipeline_name.to_string(), - request: request_str.clone(), - error: format!("unable to send request due to: {e}"), - }, })?; + let status = original_response.status(); if !status.is_success() { info!("HTTP request to pipeline '{pipeline_name}' returned status code {status}. Failed request: {request_str}"); } + // Convert reqwest::StatusCode to actix_http::StatusCode + let actix_status = actix_web::http::StatusCode::from_u16(status.as_u16()) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR); + // Build the HTTP response with the original status - let mut response_builder = HttpResponse::build(status); + let mut response_builder = HttpResponse::build(actix_status); // Add all the same headers as the original response, // excluding `Connection` as this is proxy, as per: @@ -260,21 +276,29 @@ impl RunnerInteraction { for (header_name, header_value) in original_response .headers() .iter() - .filter(|(h, _)| *h != "connection") + .filter(|(h, _)| h.as_str() != "connection") { - response_builder.insert_header((header_name.clone(), header_value.clone())); + response_builder.insert_header((header_name.as_str(), header_value.as_bytes())); } // Copy over the original response body - let response_body = original_response - .body() - .limit(RESPONSE_SIZE_LIMIT) - .await - .map_err(|e| RunnerError::PipelineInteractionInvalidResponse { + let response_bytes = original_response.bytes().await.map_err(|e| { + RunnerError::PipelineInteractionInvalidResponse { pipeline_name: pipeline_name.to_string(), - error: format!("unable to reconstruct response body due to: {e}"), - })?; - Ok(response_builder.body(response_body)) + error: format!("unable to read response body due to: {e}"), + } + })?; + + // Check size limit + if response_bytes.len() > RESPONSE_SIZE_LIMIT { + return Err(RunnerError::PipelineInteractionInvalidResponse { + pipeline_name: pipeline_name.to_string(), + error: format!("response body too large: {} bytes", response_bytes.len()), + } + .into()); + } + + Ok(response_builder.body(response_bytes.to_vec())) } /// Makes a new HTTP request without body to the pipeline. @@ -285,7 +309,7 @@ impl RunnerInteraction { #[allow(clippy::too_many_arguments)] pub(crate) async fn forward_http_request_to_pipeline_by_name( &self, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, method: Method, @@ -464,7 +488,7 @@ impl RunnerInteraction { #[allow(clippy::too_many_arguments)] pub(crate) async fn forward_streaming_http_request_to_pipeline_by_name( &self, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, endpoint: &str, @@ -472,6 +496,8 @@ impl RunnerInteraction { body: Payload, timeout: Option, // If no timeout is specified, a default timeout is used ) -> Result { + use futures_util::StreamExt; + let (location, _cache_hit) = self.check_pipeline(tenant_id, pipeline_name).await?; // Build new request to pipeline @@ -486,42 +512,85 @@ impl RunnerInteraction { request.query_string(), ); let timeout = timeout.unwrap_or(Self::PIPELINE_HTTP_REQUEST_TIMEOUT); - let mut new_request = client - .request(request.method().clone(), &url) - .timeout(timeout) - .force_close(); + + // Convert actix Method to reqwest Method + let reqwest_method = match *request.method() { + Method::GET => reqwest::Method::GET, + Method::POST => reqwest::Method::POST, + Method::PUT => reqwest::Method::PUT, + Method::DELETE => reqwest::Method::DELETE, + Method::PATCH => reqwest::Method::PATCH, + Method::OPTIONS => reqwest::Method::OPTIONS, + Method::HEAD => reqwest::Method::HEAD, + _ => reqwest::Method::GET, + }; + + let mut new_request = client.request(reqwest_method, &url); // Add headers of the original request - for header in request + for (header_name, header_value) in request .headers() .into_iter() .filter(|(h, _)| *h != "connection") { - new_request = new_request.append_header(header); + new_request = new_request.header(header_name.as_str(), header_value.as_bytes()); } - let request_str = Self::format_request(&new_request); + // Convert the actix Payload stream to a Send stream for reqwest + // We need to use a channel because Payload contains non-Send types (Rc) + let (tx, rx) = + tokio::sync::mpsc::channel::>(16); - // Perform request to the pipeline - let response = new_request.send_stream(body).await.map_err(|e| match e { - SendRequestError::Timeout => RunnerError::PipelineInteractionUnreachable { - pipeline_name: pipeline_name.to_string(), - request: request_str.to_string(), - error: format_timeout_error_message(timeout, e), - }, - SendRequestError::Connect(ConnectError::Disconnected) => { - RunnerError::PipelineInteractionUnreachable { - pipeline_name: pipeline_name.to_string(), - request: request_str.to_string(), - error: format_disconnected_error_message(e), + // Spawn a task to read from the Payload and send to the channel + actix_web::rt::spawn(async move { + let mut payload = body; + while let Some(chunk) = payload.next().await { + match chunk { + Ok(bytes) => { + if tx.send(Ok(bytes)).await.is_err() { + break; // Receiver dropped + } + } + Err(e) => { + let _ = tx.send(Err(std::io::Error::other(e))).await; + break; + } } } - _ => RunnerError::PipelineInteractionUnreachable { + }); + + // Create a stream from the receiver + let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx); + let new_request = new_request.body(reqwest::Body::wrap_stream(body_stream)); + + let request_str = format!("{} {}", request.method(), url); + + // Perform request to the pipeline with timeout only for receiving response status/headers + let response = tokio::time::timeout(timeout, new_request.send()) + .await + .map_err(|_| RunnerError::PipelineInteractionUnreachable { pipeline_name: pipeline_name.to_string(), - request: request_str.to_string(), - error: format!("unable to send request due to: {e}"), - }, - })?; + request: request_str.clone(), + error: format_timeout_error_message( + timeout, + "timed out waiting for response status", + ), + })? + .map_err(|e| { + if e.is_connect() { + RunnerError::PipelineInteractionUnreachable { + pipeline_name: pipeline_name.to_string(), + request: request_str.clone(), + error: format_disconnected_error_message(e), + } + } else { + RunnerError::PipelineInteractionUnreachable { + pipeline_name: pipeline_name.to_string(), + request: request_str.clone(), + error: format!("unable to send request due to: {e}"), + } + } + })?; let status = response.status(); @@ -529,20 +598,58 @@ impl RunnerInteraction { info!("HTTP request to pipeline '{pipeline_name}' returned status code {status}. Failed request: {request_str}"); } + // Convert reqwest::StatusCode to actix_http::StatusCode + let actix_status = actix_web::http::StatusCode::from_u16(status.as_u16()) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR); + // Build the new HTTP response with the same status, headers and streaming body - let mut builder = HttpResponseBuilder::new(status); - for header in response.headers().into_iter() { - builder.append_header(header); + let mut builder = HttpResponseBuilder::new(actix_status); + for (header_name, header_value) in response.headers().into_iter() { + builder.insert_header((header_name.as_str(), header_value.as_bytes())); } - Ok(builder.streaming(response)) + + // let request_str_clone = request_str.clone(); + + // Convert reqwest streaming response to actix streaming response + // Both reqwest and actix use the same Bytes type from the bytes crate, so no conversion needed + // Handle errors gracefully to avoid "response ended prematurely" on client disconnects + let stream = response + .bytes_stream() + // .inspect(move |result| { + // match result { + // Ok(bytes) => println!("Stream {request_str_clone} chunk: {} bytes - {:?}", bytes.len(), bytes), + // Err(e) => println!("Stream {request_str_clone} error: {:?}", e), + // } + // }) + + // When connection to the pipeline is lost, e.g., because the pipeline was killed, + // this will cleanly terminate the HTTP response. + // I am not sure this is the correct behavior, but it appears that this is how this worked + // when we used awc, and this is what Python SDK expects; otherwise it throws + // "response ended prematurely" and similar exceptions. + .take_while(move |result| { + let should_continue = result.is_ok(); + if let Err(e) = result { + // Log the error but don't propagate it to avoid client-side errors + info!( + "Stream ({}) ended due to error (likely client disconnect or network issue): {:?}", + request_str, + e + ); + } + futures_util::future::ready(should_continue) + }) + .map(|result| result.map_err(std::io::Error::other)); + + Ok(builder.streaming(stream)) } pub(crate) async fn get_logs_from_pipeline( &self, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, - ) -> Result>, ManagerError> { + ) -> Result { // Retrieve pipeline let pipeline = self .db @@ -564,23 +671,29 @@ impl RunnerInteraction { pipeline.id ); - // Perform request to the runner - let response = client - .request(Method::GET, &url) - .timeout(Self::RUNNER_HTTP_REQUEST_TIMEOUT) - .send() - .await - .map_err(|e| match e { - SendRequestError::Timeout => RunnerError::RunnerInteractionUnreachable { - error: format_runner_timeout_error_message( - Self::RUNNER_HTTP_REQUEST_TIMEOUT, - e, - ), - }, - _ => RunnerError::RunnerInteractionUnreachable { + // Perform request to the runner with timeout only for receiving response status/headers + let response = tokio::time::timeout( + Self::RUNNER_HTTP_REQUEST_TIMEOUT, + client.request(reqwest::Method::GET, &url).send(), + ) + .await + .map_err(|_| RunnerError::RunnerInteractionUnreachable { + error: format_runner_timeout_error_message( + Self::RUNNER_HTTP_REQUEST_TIMEOUT, + "timed out waiting for response status", + ), + })? + .map_err(|e| { + if e.is_connect() { + RunnerError::RunnerInteractionUnreachable { + error: format_disconnected_error_message(e), + } + } else { + RunnerError::RunnerInteractionUnreachable { error: format!("unable to send request due to: {e}"), - }, - })?; + } + } + })?; Ok(response) } @@ -591,30 +704,47 @@ impl RunnerInteraction { /// provided tenant identifier and pipeline name. pub(crate) async fn http_streaming_logs_from_pipeline_by_name( &self, - client: &awc::Client, + client: &reqwest::Client, tenant_id: TenantId, pipeline_name: &str, ) -> Result { + use futures_util::StreamExt; + // Perform request to the runner let response = self .get_logs_from_pipeline(client, tenant_id, pipeline_name) .await?; + let status = response.status(); + + // Convert reqwest::StatusCode to actix_http::StatusCode + let actix_status = actix_web::http::StatusCode::from_u16(status.as_u16()) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR); + // Build the HTTP response with the same status, headers and streaming body - let mut builder = HttpResponseBuilder::new(response.status()); - for header in response.headers().into_iter() { - builder.append_header(header); + let mut builder = HttpResponseBuilder::new(actix_status); + for (header_name, header_value) in response.headers().into_iter() { + builder.insert_header((header_name.as_str(), header_value.as_bytes())); } - Ok(builder.streaming(response)) - } - /// Format HTTP request for logging. - fn format_request(request: &ClientRequest) -> String { - format!( - "{:?} {} {}", - request.get_version(), - request.get_method(), - request.get_uri() - ) + // Convert reqwest streaming response to actix streaming response + // Both reqwest and actix use the same Bytes type from the bytes crate, so no conversion needed + // Handle errors gracefully to avoid "response ended prematurely" on client disconnects + let stream = response + .bytes_stream() + .take_while(|result| { + let should_continue = result.is_ok(); + if let Err(e) = result { + // Log the error but don't propagate it to avoid client-side errors + info!( + "Log stream ended due to error (likely client disconnect or network issue): {}", + e + ); + } + futures_util::future::ready(should_continue) + }) + .map(|result| result.map_err(std::io::Error::other)); + + Ok(builder.streaming(stream)) } }