Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/pipeline-manager/src/api/endpoints/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl Configuration {
#[get("/config")]
pub(crate) async fn get_config(
state: WebData<ServerState>,
_client: WebData<awc::Client>,
_client: WebData<reqwest::Client>,
_tenant_id: ReqData<TenantId>,
) -> Result<HttpResponse, ManagerError> {
let config = Configuration::gather(&state).await;
Expand Down
2 changes: 1 addition & 1 deletion crates/pipeline-manager/src/api/endpoints/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use feldera_types::runtime_status::RuntimeStatus;
#[get("/metrics")]
pub(crate) async fn get_metrics(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
) -> Result<HttpResponse, ManagerError> {
let pipelines = state
Expand Down
55 changes: 28 additions & 27 deletions crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub mod support_bundle;
#[post("/pipelines/{pipeline_name}/ingress/{table_name}")]
pub(crate) async fn http_input(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<(String, String)>,
req: HttpRequest,
Expand Down Expand Up @@ -170,7 +170,7 @@ pub(crate) async fn http_input(
#[post("/pipelines/{pipeline_name}/egress/{table_name}")]
pub(crate) async fn http_output(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<(String, String)>,
req: HttpRequest,
Expand Down Expand Up @@ -256,7 +256,7 @@ pub(crate) async fn http_output(
#[post("/pipelines/{pipeline_name}/tables/{table_name}/connectors/{connector_name}/{action}")]
pub(crate) async fn post_pipeline_input_connector_action(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<(String, String, String, String)>,
) -> Result<HttpResponse, ManagerError> {
Expand Down Expand Up @@ -343,7 +343,7 @@ pub(crate) async fn post_pipeline_input_connector_action(
#[get("/pipelines/{pipeline_name}/tables/{table_name}/connectors/{connector_name}/stats")]
pub(crate) async fn get_pipeline_input_connector_status(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<(String, String, String)>,
) -> Result<HttpResponse, ManagerError> {
Expand Down Expand Up @@ -414,7 +414,7 @@ pub(crate) async fn get_pipeline_input_connector_status(
#[get("/pipelines/{pipeline_name}/views/{view_name}/connectors/{connector_name}/stats")]
pub(crate) async fn get_pipeline_output_connector_status(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<(String, String, String)>,
) -> Result<HttpResponse, ManagerError> {
Expand Down Expand Up @@ -479,7 +479,7 @@ pub(crate) async fn get_pipeline_output_connector_status(
#[get("/pipelines/{pipeline_name}/stats")]
pub(crate) async fn get_pipeline_stats(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
request: HttpRequest,
Expand Down Expand Up @@ -531,7 +531,7 @@ pub(crate) async fn get_pipeline_stats(
#[get("/pipelines/{pipeline_name}/metrics")]
pub(crate) async fn get_pipeline_metrics(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
_query: web::Query<MetricsParameters>,
Expand Down Expand Up @@ -583,7 +583,7 @@ pub(crate) async fn get_pipeline_metrics(
#[get("/pipelines/{pipeline_name}/time_series")]
pub(crate) async fn get_pipeline_time_series(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
request: HttpRequest,
Expand Down Expand Up @@ -640,7 +640,7 @@ pub(crate) async fn get_pipeline_time_series(
#[get("/pipelines/{pipeline_name}/time_series_stream")]
pub(crate) async fn get_pipeline_time_series_stream(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
request: HttpRequest,
Expand Down Expand Up @@ -693,7 +693,7 @@ pub(crate) async fn get_pipeline_time_series_stream(
#[get("/pipelines/{pipeline_name}/circuit_profile")]
pub(crate) async fn get_pipeline_circuit_profile(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
request: HttpRequest,
Expand Down Expand Up @@ -745,7 +745,7 @@ pub(crate) async fn get_pipeline_circuit_profile(
#[get("/pipelines/{pipeline_name}/circuit_json_profile")]
pub(crate) async fn get_pipeline_circuit_json_profile(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
request: HttpRequest,
Expand Down Expand Up @@ -796,7 +796,7 @@ pub(crate) async fn get_pipeline_circuit_json_profile(
#[post("/pipelines/{pipeline_name}/checkpoint/sync")]
pub(crate) async fn sync_checkpoint(
state: WebData<ServerState>,
_client: WebData<awc::Client>,
_client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
request: HttpRequest,
Expand Down Expand Up @@ -862,7 +862,7 @@ pub(crate) async fn sync_checkpoint(
#[post("/pipelines/{pipeline_name}/checkpoint")]
pub(crate) async fn checkpoint_pipeline(
state: WebData<ServerState>,
_client: WebData<awc::Client>,
_client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
request: HttpRequest,
Expand Down Expand Up @@ -926,7 +926,7 @@ pub(crate) async fn checkpoint_pipeline(
#[get("/pipelines/{pipeline_name}/checkpoint_status")]
pub(crate) async fn get_checkpoint_status(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
request: HttpRequest,
Expand Down Expand Up @@ -978,7 +978,7 @@ pub(crate) async fn get_checkpoint_status(
#[get("/pipelines/{pipeline_name}/checkpoint/sync_status")]
pub(crate) async fn get_checkpoint_sync_status(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
request: HttpRequest,
Expand Down Expand Up @@ -1033,7 +1033,7 @@ pub(crate) async fn get_checkpoint_sync_status(
#[get("/pipelines/{pipeline_name}/heap_profile")]
pub(crate) async fn get_pipeline_heap_profile(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
request: HttpRequest,
Expand Down Expand Up @@ -1080,7 +1080,7 @@ pub(crate) async fn get_pipeline_heap_profile(
#[post("/pipelines/{pipeline_name}/pause")]
pub(crate) async fn post_pipeline_pause(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
) -> Result<HttpResponse, ManagerError> {
Expand Down Expand Up @@ -1140,7 +1140,7 @@ pub(crate) async fn post_pipeline_pause(
#[post("/pipelines/{pipeline_name}/resume")]
pub(crate) async fn post_pipeline_resume(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
) -> Result<HttpResponse, ManagerError> {
Expand Down Expand Up @@ -1211,7 +1211,7 @@ pub(crate) async fn post_pipeline_resume(
#[post("/pipelines/{pipeline_name}/activate")]
pub(crate) async fn post_pipeline_activate(
state: WebData<ServerState>,
_client: WebData<awc::Client>,
_client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
_query: web::Query<ActivateParams>,
Expand Down Expand Up @@ -1295,7 +1295,7 @@ pub(crate) async fn post_pipeline_activate(
#[post("/pipelines/{pipeline_name}/approve")]
pub(crate) async fn post_pipeline_approve(
state: WebData<ServerState>,
_client: WebData<awc::Client>,
_client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
request: HttpRequest,
Expand Down Expand Up @@ -1384,7 +1384,8 @@ fn request_is_websocket(request: &HttpRequest) -> bool {
#[get("/pipelines/{pipeline_name}/query")]
pub(crate) async fn pipeline_adhoc_sql(
state: WebData<ServerState>,
client: WebData<awc::Client>,
reqwest_client: WebData<reqwest::Client>,
awc_client: WebData<awc::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
request: HttpRequest,
Expand All @@ -1395,7 +1396,7 @@ pub(crate) async fn pipeline_adhoc_sql(
state
.runner
.forward_websocket_request_to_pipeline_by_name(
client.as_ref(),
awc_client.as_ref(),
*tenant_id,
&pipeline_name,
"query",
Expand All @@ -1407,7 +1408,7 @@ pub(crate) async fn pipeline_adhoc_sql(
state
.runner
.forward_streaming_http_request_to_pipeline_by_name(
client.as_ref(),
reqwest_client.as_ref(),
*tenant_id,
&pipeline_name,
"query",
Expand Down Expand Up @@ -1460,7 +1461,7 @@ pub(crate) async fn pipeline_adhoc_sql(
)]
pub(crate) async fn completion_token(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<(String, String, String)>,
) -> Result<HttpResponse, ManagerError> {
Expand Down Expand Up @@ -1534,7 +1535,7 @@ pub(crate) async fn completion_token(
#[get("/pipelines/{pipeline_name}/completion_status")]
pub(crate) async fn completion_status(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
request: HttpRequest,
Expand Down Expand Up @@ -1589,7 +1590,7 @@ pub(crate) async fn completion_status(
#[post("/pipelines/{pipeline_name}/start_transaction")]
pub(crate) async fn start_transaction(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
request: HttpRequest,
Expand Down Expand Up @@ -1643,7 +1644,7 @@ pub(crate) async fn start_transaction(
#[post("/pipelines/{pipeline_name}/commit_transaction")]
pub(crate) async fn commit_transaction(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
request: HttpRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl SupportBundleZip {
#[get("/pipelines/{pipeline_name}/support_bundle")]
pub(crate) async fn get_pipeline_support_bundle(
state: WebData<ServerState>,
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
query: web::Query<SupportBundleParameters>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1226,7 +1226,7 @@ pub(crate) async fn post_pipeline_start(
#[post("/pipelines/{pipeline_name}/stop")]
pub(crate) async fn post_pipeline_stop(
state: WebData<ServerState>,
_client: WebData<awc::Client>,
_client: WebData<reqwest::Client>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
query: web::Query<PostStopPipelineParameters>,
Expand Down Expand Up @@ -1395,7 +1395,7 @@ pub(crate) async fn post_pipeline_clear(
)]
#[get("/pipelines/{pipeline_name}/logs")]
pub(crate) async fn get_pipeline_logs(
client: WebData<awc::Client>,
client: WebData<reqwest::Client>,
state: WebData<ServerState>,
tenant_id: ReqData<TenantId>,
path: web::Path<String>,
Expand Down
52 changes: 29 additions & 23 deletions crates/pipeline-manager/src/api/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,21 +692,28 @@ pub async fn run(
}
}
};
// We instantiate reqwest::Client that can be used if the api-server needs to
// make outgoing HTTP calls. For websocket connections, awc::Client is created
// per-worker (since it's not Send) using common_config.awc_client().
// reqwest::Client is cloneable (internally uses Arc), so it can be shared across workers.
let reqwest_client = common_config.reqwest_client().await;

let server = match auth_configuration {
// We instantiate an awc::Client that can be used if the api-server needs to
// make outgoing calls. This object is not meant to have more than one instance
// per thread (otherwise, it causes high resource pressure on both CPU and fds).
Some(auth_configuration) => {
let common_config_cloned = common_config.clone();
let api_config = api_config.clone();
let state = state.clone();
let reqwest_client = reqwest_client.clone();
// awc::Client is not Send, so we need to create it in each worker
let common_config_for_awc = common_config.clone();
let server = HttpServer::new(move || {
let auth_middleware = HttpAuthentication::with_fn(crate::auth::auth_validator);
let client = WebData::new(common_config_cloned.awc_client());
let reqwest_client = WebData::new(reqwest_client.clone());
let awc_client = WebData::new(common_config_for_awc.awc_client());
App::new()
.app_data(state.clone())
.app_data(auth_configuration.clone())
.app_data(client)
.app_data(reqwest_client)
.app_data(awc_client)
.wrap_fn(|req, srv| {
let log_level = if req.method() == Method::GET && req.path() == "/healthz" {
Level::Trace
Expand Down Expand Up @@ -736,14 +743,18 @@ pub async fn run(
}
}
None => {
let common_config_cloned = common_config.clone();
let api_config = api_config.clone();
let state = state.clone();
let reqwest_client = reqwest_client.clone();
// awc::Client is not Send, so we need to create it in each worker
let common_config_for_awc = common_config.clone();
let server = HttpServer::new(move || {
let client = WebData::new(common_config_cloned.awc_client());
let reqwest_client = WebData::new(reqwest_client.clone());
let awc_client = WebData::new(common_config_for_awc.awc_client());
App::new()
.app_data(state.clone())
.app_data(client)
.app_data(reqwest_client)
.app_data(awc_client)
.wrap_fn(|req, srv| {
trace!("Request: {} {}", req.method(), req.path());
srv.call(req).map(log_response)
Expand Down Expand Up @@ -853,20 +864,15 @@ Version: {} v{}{}
.unwrap();

rt.block_on(async {
let local = tokio::task::LocalSet::new();
local
.run_until(async move {
let client = common_config.awc_client();
let support_collector = SupportDataCollector::new(
state.clone().into_inner(),
client,
api_config.support_data_collection_frequency,
api_config.support_data_retention,
shutdown_rx,
);
support_collector.run().await;
})
.await;
let client = common_config.reqwest_client().await;
let support_collector = SupportDataCollector::new(
state.clone().into_inner(),
client,
api_config.support_data_collection_frequency,
api_config.support_data_retention,
shutdown_rx,
);
support_collector.run().await;
});
})
.unwrap(),
Expand Down
Loading
Loading