Skip to content

Commit

Permalink
[WIP] refactor proxy handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
hoolioh committed Oct 8, 2024
1 parent 6e8dbae commit 22bf687
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 75 deletions.
2 changes: 1 addition & 1 deletion data-pipeline/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ impl TraceExporter {
url: self.output_format.add_path(&self.endpoint.url),
..self.endpoint.clone()
};
let send_data = SendData::new(size, tracer_payload, header_tags, &endpoint, None);
let send_data = SendData::new(size, tracer_payload, header_tags, &endpoint);
self.runtime.block_on(async {
match send_data.send().await.last_result {
Ok(response) => match hyper::body::to_bytes(response.into_body()).await {
Expand Down
2 changes: 1 addition & 1 deletion sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ impl SidecarServer {
.try_into()
{
Ok(payload) => {
let data = SendData::new(size, payload, headers, target, None);
let data = SendData::new(size, payload, headers, target);
self.trace_flusher.enqueue(data);
}
Err(e) => {
Expand Down
12 changes: 8 additions & 4 deletions trace-mini-agent/src/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub trait TraceFlusher {
/// implementing flushing logic that calls flush_traces.
async fn start_trace_flusher(&self, config: Arc<Config>, mut rx: Receiver<SendData>);
/// Flushes traces to the Datadog trace intake.
async fn flush_traces(&self, traces: Vec<SendData>);
async fn flush_traces(&self, traces: Vec<SendData>, config: Arc<Config>);
}

#[derive(Clone)]
Expand All @@ -43,20 +43,24 @@ impl TraceFlusher for ServerlessTraceFlusher {

let mut buffer = buffer_consumer.lock().await;
if !buffer.is_empty() {
self.flush_traces(buffer.to_vec()).await;
self.flush_traces(buffer.to_vec(), config.clone()).await;
buffer.clear();
}
}
}

async fn flush_traces(&self, traces: Vec<SendData>) {
async fn flush_traces(&self, traces: Vec<SendData>, config: Arc<Config>) {
if traces.is_empty() {
return;
}
info!("Flushing {} traces", traces.len());

for traces in trace_utils::coalesce_send_data(traces) {
match traces.send().await.last_result {
match traces
.send_proxy(config.proxy_url.as_deref())
.await
.last_result
{
Ok(_) => info!("Successfully flushed traces"),
Err(e) => {
error!("Error sending trace: {e:?}")
Expand Down
8 changes: 1 addition & 7 deletions trace-mini-agent/src/trace_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,7 @@ impl TraceProcessor for ServerlessTraceProcessor {
true, // In mini agent, we always send agentless
);

let send_data = SendData::new(
body_size,
payload,
tracer_header_tags,
&config.trace_intake,
config.proxy_url.clone(),
);
let send_data = SendData::new(body_size, payload, tracer_header_tags, &config.trace_intake);

// send trace payload to our trace flusher
match tx.send(send_data).await {
Expand Down
1 change: 1 addition & 0 deletions trace-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,6 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
datadog-trace-utils = { path = ".", features = ["test-utils"] }

[features]
default = ["proxy"]
test-utils = ["httpmock", "testcontainers", "cargo_metadata", "cargo-platform"]
proxy = ["hyper-proxy"]
97 changes: 39 additions & 58 deletions trace-utils/src/send_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ pub mod retry_strategy;
pub mod send_data_result;

pub use crate::send_data::retry_strategy::{RetryBackoffType, RetryStrategy};
#[cfg(feature = "proxy")]
use ddcommon::connector::Connector;

use crate::trace_utils::{SendDataResult, TracerHeaderTags};
use crate::tracer_payload::TracerPayloadCollection;
Expand All @@ -18,7 +16,6 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hyper::header::HeaderValue;
use hyper::{Body, Client, HeaderMap, Method, Response};
#[cfg(feature = "proxy")]
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use std::collections::HashMap;
use std::time::Duration;
Expand Down Expand Up @@ -70,13 +67,6 @@ pub(crate) enum RequestResult {
BuildError((Attempts, ChunksDropped)),
}

#[derive(Debug, Clone)]
#[cfg(feature = "proxy")]
pub enum ClientWrapper {
Direct(Client<Connector>),
Proxy(Client<ProxyConnector<Connector>>),
}

#[derive(Debug, Clone)]
/// `SendData` is a structure that holds the data to be sent to a target endpoint.
/// It includes the payloads to be sent, the size of the data, the target endpoint,
Expand All @@ -102,7 +92,7 @@ pub enum ClientWrapper {
/// let tracer_header_tags = TracerHeaderTags::default(); // Replace with actual header tags
/// let target = Endpoint::default(); // Replace with actual endpoint
///
/// let mut send_data = SendData::new(size, tracer_payload, tracer_header_tags, &target, None);
/// let mut send_data = SendData::new(size, tracer_payload, tracer_header_tags, &target);
///
/// // Set a custom retry strategy
/// let retry_strategy = RetryStrategy::new(3, 10, RetryBackoffType::Exponential, Some(5));
Expand All @@ -119,22 +109,6 @@ pub struct SendData {
target: Endpoint,
headers: HashMap<&'static str, String>,
retry_strategy: RetryStrategy,
#[cfg(feature = "proxy")]
client: ClientWrapper,
}

#[cfg(feature = "proxy")]
pub fn build_client(http_proxy: Option<String>) -> ClientWrapper {
let builder = Client::builder();

if let Some(proxy) = http_proxy {
let proxy = Proxy::new(Intercept::Https, proxy.parse().unwrap());
let proxy_connector =
ProxyConnector::from_proxy(connector::Connector::default(), proxy).unwrap();
ClientWrapper::Proxy(builder.build(proxy_connector))
} else {
ClientWrapper::Direct(builder.build(connector::Connector::default()))
}
}

impl SendData {
Expand All @@ -156,7 +130,6 @@ impl SendData {
tracer_payload: TracerPayloadCollection,
tracer_header_tags: TracerHeaderTags,
target: &Endpoint,
http_proxy: Option<String>,
) -> SendData {
let mut headers = if let Some(api_key) = &target.api_key {
HashMap::from([(DD_API_KEY, api_key.as_ref().to_string())])
Expand All @@ -167,17 +140,12 @@ impl SendData {
headers.insert("x-datadog-test-session-token", token.to_string());
}

#[cfg(feature = "proxy")]
let client = build_client(http_proxy);

SendData {
tracer_payloads: tracer_payload,
size,
target: target.clone(),
headers,
retry_strategy: RetryStrategy::default(),
#[cfg(feature = "proxy")]
client,
}
}

Expand Down Expand Up @@ -232,17 +200,31 @@ impl SendData {
///
/// A `SendDataResult` instance containing the result of the operation.
pub async fn send(&self) -> SendDataResult {
self.send_internal(None).await
}

/// Sends the data to the target endpoint.
///
/// # Returns
///
/// A `SendDataResult` instance containing the result of the operation.
pub async fn send_proxy(&self, http_proxy: Option<&str>) -> SendDataResult {
self.send_internal(http_proxy).await
}

async fn send_internal(&self, http_proxy: Option<&str>) -> SendDataResult {
if self.use_protobuf() {
self.send_with_protobuf().await
self.send_with_protobuf(http_proxy).await
} else {
self.send_with_msgpack().await
self.send_with_msgpack(http_proxy).await
}
}

async fn send_request(
&self,
req: HttpRequestBuilder,
payload: Bytes,
http_proxy: Option<&str>,
) -> Result<Response<Body>, RequestError> {
let req = match req.body(Body::from(payload)) {
Ok(req) => req,
Expand All @@ -251,15 +233,16 @@ impl SendData {

match tokio::time::timeout(
Duration::from_millis(self.target.timeout_ms),
#[cfg(feature = "proxy")]
match &self.client {
ClientWrapper::Direct(client) => client.request(req),
ClientWrapper::Proxy(client) => client.request(req),
if let Some(proxy) = http_proxy {
let proxy = Proxy::new(Intercept::Https, proxy.parse().unwrap());
let proxy_connector =
ProxyConnector::from_proxy(connector::Connector::default(), proxy).unwrap();
Client::builder().build(proxy_connector).request(req)
} else {
Client::builder()
.build(connector::Connector::default())
.request(req)
},
#[cfg(not(feature = "proxy"))]
Client::builder()
.build(connector::Connector::default())
.request(req),
)
.await
{
Expand Down Expand Up @@ -292,6 +275,7 @@ impl SendData {
payload_chunks: u64,
// For payload specific headers that need to be added to the request like trace count.
additional_payload_headers: Option<HashMap<&'static str, String>>,
http_proxy: Option<&str>,
) -> RequestResult {
let mut request_attempt = 0;
let payload = Bytes::from(payload);
Expand All @@ -312,7 +296,7 @@ impl SendData {
.expect("HttpRequestBuilder unable to get headers for request")
.extend(headers.clone());

match self.send_request(req, payload.clone()).await {
match self.send_request(req, payload.clone(), http_proxy).await {
// An Ok response doesn't necessarily mean the request was successful, we need to
// check the status code and if it's not a 2xx or 3xx we treat it as an error
Ok(response) => {
Expand Down Expand Up @@ -396,7 +380,7 @@ impl SendData {
req
}

async fn send_with_protobuf(&self) -> SendDataResult {
async fn send_with_protobuf(&self, http_proxy: Option<&str>) -> SendDataResult {
let mut result = SendDataResult::default();
let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();

Expand All @@ -417,6 +401,7 @@ impl SendData {
serialized_trace_payload,
chunks,
None,
http_proxy,
)
.await,
)
Expand All @@ -428,7 +413,7 @@ impl SendData {
}
}

async fn send_with_msgpack(&self) -> SendDataResult {
async fn send_with_msgpack(&self, http_proxy: Option<&str>) -> SendDataResult {
let mut result = SendDataResult::default();
let mut futures = FuturesUnordered::new();

Expand All @@ -448,6 +433,7 @@ impl SendData {
payload,
chunks,
additional_payload_headers,
http_proxy,
));
}
}
Expand All @@ -460,7 +446,13 @@ impl SendData {
Err(e) => return result.error(anyhow!(e)),
};

futures.push(self.send_payload(HEADER_CTYPE_MSGPACK, payload, chunks, headers));
futures.push(self.send_payload(
HEADER_CTYPE_MSGPACK,
payload,
chunks,
headers,
http_proxy,
));
}
}

Expand Down Expand Up @@ -609,7 +601,6 @@ mod tests {
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
None,
);

assert_eq!(data.size, 100);
Expand All @@ -635,7 +626,6 @@ mod tests {
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
None,
);

assert_eq!(data.size, 100);
Expand Down Expand Up @@ -674,7 +664,6 @@ mod tests {
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
None,
);

let data_payload_len = compute_payload_len(&data.tracer_payloads);
Expand Down Expand Up @@ -719,7 +708,6 @@ mod tests {
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
None,
);

let data_payload_len = compute_payload_len(&data.tracer_payloads);
Expand Down Expand Up @@ -775,7 +763,6 @@ mod tests {
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
None,
);

let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
Expand Down Expand Up @@ -831,7 +818,6 @@ mod tests {
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
None,
);

let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
Expand Down Expand Up @@ -876,7 +862,6 @@ mod tests {
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
None,
);

let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
Expand Down Expand Up @@ -919,7 +904,6 @@ mod tests {
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
None,
);

let res = data.send().await;
Expand Down Expand Up @@ -951,7 +935,6 @@ mod tests {
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
None,
);

let res = data.send().await;
Expand Down Expand Up @@ -1016,7 +999,6 @@ mod tests {
timeout_ms: 200,
..Endpoint::default()
},
None,
);

let res = data.send().await;
Expand Down Expand Up @@ -1059,7 +1041,6 @@ mod tests {
timeout_ms: 200,
..Endpoint::default()
},
None,
);

let res = data.send().await;
Expand Down
1 change: 0 additions & 1 deletion trace-utils/src/test_utils/datadog_test_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ impl DatadogTestAgentContainer {
/// TracerPayloadCollection::V04(vec![trace.clone()]),
/// TracerHeaderTags::default(),
/// &endpoint,
/// None,
/// );
///
/// let _result = data.send().await;
Expand Down
1 change: 0 additions & 1 deletion trace-utils/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,5 @@ pub fn create_send_data(size: usize, target_endpoint: &Endpoint) -> SendData {
TracerPayloadCollection::V07(vec![tracer_payload]),
tracer_header_tags,
target_endpoint,
None,
)
}
Loading

0 comments on commit 22bf687

Please sign in to comment.