Skip to content

Commit

Permalink
Fix theFix the payload size hint.
Browse files Browse the repository at this point in the history
The incoming buffer may be much larger than the actual contained data. The decoder knows where the payload starts and ends, but the sidecar send_trace_v04 method does not.

This ensures proper trace coalescing, leading to less individual http requests.

Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
  • Loading branch information
bwoebi committed Oct 3, 2024
1 parent d01cf11 commit fc18fe7
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 18 deletions.
14 changes: 7 additions & 7 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,17 +265,17 @@ impl SidecarServer {
}
};

let size = data.len();

match tracer_payload::TracerPayloadParams::new(
let mut size = 0;
let mut processor = tracer_payload::DefaultTraceChunkProcessor;
let mut payload_params = tracer_payload::TracerPayloadParams::new(
data,
&headers,
&mut tracer_payload::DefaultTraceChunkProcessor,
&mut processor,
target.api_key.is_some(),
TraceEncoding::V04,
)
.try_into()
{
);
payload_params.measure_size(&mut size);
match payload_params.try_into() {
Ok(payload) => {
let data = SendData::new(size, payload, headers, target, None);
self.trace_flusher.enqueue(data);
Expand Down
10 changes: 6 additions & 4 deletions trace-utils/src/msgpack_decoder/v04/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,22 @@ use tinybytes::{Bytes, BytesString};
/// };
/// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap();
/// let encoded_data_as_tinybytes = tinybytes::Bytes::from(encoded_data);
/// let decoded_traces = from_slice(encoded_data_as_tinybytes).expect("Decoding failed");
/// let (decoded_traces, _) = from_slice(encoded_data_as_tinybytes).expect("Decoding failed");
///
/// assert_eq!(1, decoded_traces.len());
/// assert_eq!(1, decoded_traces[0].len());
/// let decoded_span = &decoded_traces[0][0];
/// assert_eq!("test-span", decoded_span.name.as_str());
/// ```
pub fn from_slice(mut data: tinybytes::Bytes) -> Result<Vec<Vec<Span>>, DecodeError> {
pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec<Vec<Span>>, usize), DecodeError> {
let trace_count =
rmp::decode::read_array_len(unsafe { data.as_mut_slice() }).map_err(|_| {
DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned())
})?;

let start_len = data.len();

(0..trace_count).try_fold(
Ok(((0..trace_count).try_fold(
Vec::with_capacity(
trace_count
.try_into()
Expand Down Expand Up @@ -89,7 +91,7 @@ pub fn from_slice(mut data: tinybytes::Bytes) -> Result<Vec<Vec<Span>>, DecodeEr

Ok(traces)
},
)
)?, start_len - data.len()))
}

#[inline]
Expand Down
24 changes: 17 additions & 7 deletions trace-utils/src/tracer_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ pub struct TracerPayloadParams<'a, T: TraceChunkProcessor + 'a> {
data: tinybytes::Bytes,
/// Reference to `TracerHeaderTags` containing metadata for the trace.
tracer_header_tags: &'a TracerHeaderTags<'a>,
/// Amount of data consumed from buffer
size: Option<&'a mut usize>,
/// A mutable reference to an implementation of `TraceChunkProcessor` that processes each
/// `TraceChunk` after it is constructed but before it is added to the TracerPayloadCollection.
/// TraceChunks are only available for v07 traces.
Expand All @@ -194,11 +196,16 @@ impl<'a, T: TraceChunkProcessor + 'a> TracerPayloadParams<'a, T> {
TracerPayloadParams {
data,
tracer_header_tags,
size: None,
chunk_processor,
is_agentless,
encoding_type,
}
}

pub fn measure_size(&mut self, size: &'a mut usize) {
self.size = Some(size);
}
}
// TODO: APMSP-1282 - Implement TryInto for other encoding types. Supporting TraceChunkProcessor but
// not supporting v07 is a bit pointless for now.
Expand Down Expand Up @@ -253,13 +260,16 @@ impl<'a, T: TraceChunkProcessor + 'a> TryInto<TracerPayloadCollection>
fn try_into(self) -> Result<TracerPayloadCollection, Self::Error> {
match self.encoding_type {
TraceEncoding::V04 => {
let traces: Vec<Vec<Span>> =
match msgpack_decoder::v04::decoder::from_slice(self.data) {
Ok(res) => res,
Err(e) => {
anyhow::bail!("Error deserializing trace from request body: {e}")
}
};
let (traces, size) = match msgpack_decoder::v04::decoder::from_slice(self.data) {
Ok(res) => res,
Err(e) => {
anyhow::bail!("Error deserializing trace from request body: {e}")
}
};

if let Some(size_ref) = self.size {
*size_ref = size;
}

if traces.is_empty() {
anyhow::bail!("No traces deserialized from the request body.");
Expand Down

0 comments on commit fc18fe7

Please sign in to comment.