Skip to content

Commit 4f5f651

Browse files
committed
add coonection config for Elastcisearch sink
1 parent 2763639 commit 4f5f651

File tree

3 files changed

+18
-3
lines changed

3 files changed

+18
-3
lines changed

src/sinks/elasticsearch/common.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::{
2323
tls::TlsSettings,
2424
transforms::metric_to_log::MetricToLog,
2525
};
26+
use crate::sinks::util::http::ConnectionConfig;
2627

2728
#[derive(Debug, Clone)]
2829
pub struct ElasticsearchCommon {
@@ -190,6 +191,7 @@ impl ElasticsearchCommon {
190191
&request,
191192
&tls_settings,
192193
proxy_config,
194+
config.connection
193195
)
194196
.await
195197
{
@@ -341,6 +343,7 @@ async fn get_version(
341343
request: &RequestConfig,
342344
tls_settings: &TlsSettings,
343345
proxy_config: &ProxyConfig,
346+
connection_config: Option<ConnectionConfig>,
344347
) -> crate::Result<usize> {
345348
#[derive(Deserialize)]
346349
struct Version {
@@ -351,7 +354,7 @@ async fn get_version(
351354
version: Option<Version>,
352355
}
353356

354-
let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
357+
let client = HttpClient::new_with_connection_config(tls_settings.clone(), proxy_config, connection_config)?;
355358
let response = get(
356359
base_url,
357360
auth,

src/sinks/elasticsearch/config.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use vector_lib::lookup::event_path;
3535
use vector_lib::lookup::lookup_v2::ConfigValuePath;
3636
use vector_lib::schema::Requirement;
3737
use vrl::value::Kind;
38+
use crate::sinks::util::http::ConnectionConfig;
3839

3940
/// The field name for the timestamp required by data stream mode
4041
pub const DATA_STREAM_TIMESTAMP_KEY: &str = "@timestamp";
@@ -218,6 +219,16 @@ pub struct ElasticsearchConfig {
218219
)]
219220
#[configurable(derived)]
220221
pub acknowledgements: AcknowledgementsConfig,
222+
223+
224+
/// Connection-level settings for the underlying HTTP client.
225+
///
226+
/// This allows configuring parameters like connection idle timeout and
227+
/// maximum idle connections per host. Useful when running behind load
228+
/// balancers with strict idle policies.
229+
#[configurable(derived)]
230+
#[serde(default)]
231+
pub connection: Option<ConnectionConfig>,
221232
}
222233

223234
fn default_doc_type() -> String {
@@ -255,6 +266,7 @@ impl Default for ElasticsearchConfig {
255266
data_stream: None,
256267
metrics: None,
257268
acknowledgements: Default::default(),
269+
connection: None,
258270
}
259271
}
260272
}
@@ -541,7 +553,7 @@ impl SinkConfig for ElasticsearchConfig {
541553
let commons = ElasticsearchCommon::parse_many(self, cx.proxy()).await?;
542554
let common = commons[0].clone();
543555

544-
let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?;
556+
let client = HttpClient::new_with_connection_config(common.tls_settings.clone(), cx.proxy(), self.connection)?;
545557

546558
let request_limits = self.request.tower.into_settings();
547559

src/sinks/http/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ impl From<HttpMethod> for Method {
168168
impl HttpSinkConfig {
169169
fn build_http_client(&self, cx: &SinkContext) -> crate::Result<HttpClient> {
170170
let tls = TlsSettings::from_options(self.tls.as_ref())?;
171-
Ok(HttpClient::new(tls, cx.proxy())?)
171+
Ok(HttpClient::new_with_connection_config(tls, cx.proxy(), self.connection.clone())?)
172172
}
173173

174174
pub(super) fn build_encoder(&self) -> crate::Result<Encoder<Framer>> {

0 commit comments

Comments
 (0)