Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dogstatsd aggregator #654

Merged
merged 6 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 26 additions & 18 deletions dogstatsd/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use ustr::Ustr;
impl MetricValue {
fn aggregate(&mut self, metric: Metric) {
// safe because we know there's at least one value when we parse
// TODO aggregating different types should return error
bantonsson marked this conversation as resolved.
Show resolved Hide resolved
match self {
MetricValue::Count(count) => *count += metric.value.get_value().unwrap_or_default(),
MetricValue::Gauge(gauge) => *gauge = metric.value.get_value().unwrap_or_default(),
Expand Down Expand Up @@ -127,7 +128,7 @@ impl Aggregator {
self.map
.iter()
.filter_map(|entry| match entry.value {
MetricValue::Distribution(_) => build_sketch(now, entry, &self.tags),
MetricValue::Distribution(_) => build_sketch(now, entry, self.tags.clone()),
_ => None,
})
.for_each(|sketch| sketch_payload.sketches.push(sketch));
Expand All @@ -153,7 +154,7 @@ impl Aggregator {
}
false
})
.filter_map(|entry| build_sketch(now, &entry, &self.tags))
.filter_map(|entry| build_sketch(now, &entry, self.tags.clone()))
{
let next_chunk_size = sketch.compute_size();

Expand Down Expand Up @@ -187,7 +188,7 @@ impl Aggregator {
.iter()
.filter_map(|entry| match entry.value {
MetricValue::Distribution(_) => None,
_ => build_metric(entry, &self.tags),
_ => build_metric(entry, self.tags.clone()),
})
.for_each(|metric| series_payload.series.push(metric));
series_payload
Expand All @@ -208,7 +209,7 @@ impl Aggregator {
}
true
})
.filter_map(|entry| build_metric(&entry, &self.tags))
.filter_map(|entry| build_metric(&entry, self.tags.clone()))
{
// TODO serialization is made twice for each point. If we return a Vec<u8> we can avoid
// that
Expand Down Expand Up @@ -246,13 +247,13 @@ impl Aggregator {
batched_payloads
}

pub fn get_entry_by_id(&self, name: Ustr, tags: &SortedTags) -> Option<&Metric> {
pub fn get_entry_by_id(&self, name: Ustr, tags: &Option<SortedTags>) -> Option<&Metric> {
let id = metric::id(name, tags);
self.map.find(id, |m| m.id == id)
}
}

fn build_sketch(now: i64, entry: &Metric, base_tag_vec: &SortedTags) -> Option<Sketch> {
fn build_sketch(now: i64, entry: &Metric, mut base_tag_vec: SortedTags) -> Option<Sketch> {
let sketch = entry.value.get_sketch()?;
let mut dogsketch = Dogsketch::default();
sketch.merge_to_dogsketch(&mut dogsketch);
Expand All @@ -262,14 +263,20 @@ fn build_sketch(now: i64, entry: &Metric, base_tag_vec: &SortedTags) -> Option<S
sketch.set_dogsketches(vec![dogsketch]);
let name = entry.name.to_string();
sketch.set_metric(name.clone().into());
let mut tags = entry.tags.clone();
tags.extend(base_tag_vec);
sketch.set_tags(tags.to_chars());
if let Some(tags) = entry.tags.clone() {
base_tag_vec.extend(&tags);
}
sketch.set_tags(base_tag_vec.to_chars());
Some(sketch)
}

fn build_metric(entry: &Metric, base_tag_vec: &SortedTags) -> Option<MetricToShip> {
let resources = entry.tags.to_resources();
fn build_metric(entry: &Metric, mut base_tag_vec: SortedTags) -> Option<MetricToShip> {
let resources;
if let Some(tags) = entry.tags.clone() {
resources = tags.to_resources();
} else {
resources = Vec::new();
}
let kind = match entry.value {
MetricValue::Count(_) => datadog::DdMetricKind::Count,
MetricValue::Gauge(_) => datadog::DdMetricKind::Gauge,
Expand All @@ -284,15 +291,16 @@ fn build_metric(entry: &Metric, base_tag_vec: &SortedTags) -> Option<MetricToShi
.as_secs(),
};

let mut tags = entry.tags.clone();
tags.extend(base_tag_vec);
if let Some(tags) = entry.tags.clone() {
base_tag_vec.extend(&tags);
}

Some(MetricToShip {
metric: entry.name.as_str(),
resources,
kind,
points: [point; 1],
tags: tags.to_strings(),
tags: base_tag_vec.to_strings(),
})
}

Expand Down Expand Up @@ -322,7 +330,7 @@ pub mod tests {
) {
let aggregator = aggregator_mutex.lock().unwrap();
if let Some(e) =
aggregator.get_entry_by_id(metric_id.into(), &SortedTags::parse(tags).unwrap())
aggregator.get_entry_by_id(metric_id.into(), &Some(SortedTags::parse(tags).unwrap()))
{
let metric = e.value.get_value().unwrap();
assert!((metric - value).abs() < PRECISION);
Expand All @@ -333,7 +341,7 @@ pub mod tests {

pub fn assert_sketch(aggregator_mutex: &Mutex<Aggregator>, metric_id: &str, value: f64) {
let aggregator = aggregator_mutex.lock().unwrap();
if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &EMPTY_TAGS) {
if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &None) {
let metric = e.value.get_sketch().unwrap();
assert!((metric.max().unwrap() - value).abs() < PRECISION);
assert!((metric.min().unwrap() - value).abs() < PRECISION);
Expand Down Expand Up @@ -413,15 +421,15 @@ pub mod tests {

assert_eq!(aggregator.map.len(), 2);
if let Some(v) =
aggregator.get_entry_by_id("foo".into(), &SortedTags::parse("k2:v2").unwrap())
aggregator.get_entry_by_id("foo".into(), &Some(SortedTags::parse("k2:v2").unwrap()))
{
assert_eq!(v.value.get_value().unwrap(), 5f64);
} else {
panic!("failed to get value by id");
}

if let Some(v) =
aggregator.get_entry_by_id("test".into(), &SortedTags::parse("k1:v1").unwrap())
aggregator.get_entry_by_id("test".into(), &Some(SortedTags::parse("k1:v1").unwrap()))
{
assert_eq!(v.value.get_value().unwrap(), 3f64);
} else {
Expand Down
54 changes: 42 additions & 12 deletions dogstatsd/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ pub enum MetricValue {
Distribution(DDSketch),
}

impl MetricValue {
pub fn count(v: f64) -> MetricValue {
MetricValue::Count(v)
}

pub fn gauge(v: f64) -> MetricValue {
MetricValue::Gauge(v)
}

pub fn distribution(v: f64) -> MetricValue {
let sketch = &mut DDSketch::default();
sketch.insert(v);
MetricValue::Distribution(sketch.to_owned())
}
}

#[derive(Clone, Debug)]
pub struct SortedTags {
// We sort tags. This is in feature parity with DogStatsD and also means
Expand Down Expand Up @@ -121,10 +137,22 @@ pub struct Metric {
/// the parser. We assume here that tags are not sent in random order by the
/// clien or that, if they are, the API will tidy that up. That is `a:1,b:2`
/// is a different tagset from `b:2,a:1`.
pub tags: SortedTags,
pub tags: Option<SortedTags>,

/// ID given a name and tagset.
pub(crate) id: u64,
pub id: u64,
}

impl Metric {
pub fn new(name: Ustr, value: MetricValue, tags: Option<SortedTags>) -> Metric {
let id = id(name, &tags);
Metric {
name,
value,
tags,
id,
}
}
}

/// Parse a metric from given input.
Expand All @@ -148,9 +176,9 @@ pub fn parse(input: &str) -> Result<Metric, ParseError> {

let tags;
if let Some(tags_section) = caps.name("tags") {
tags = SortedTags::parse(tags_section.as_str())?;
tags = Some(SortedTags::parse(tags_section.as_str())?);
} else {
tags = EMPTY_TAGS;
tags = None;
alexgallotta marked this conversation as resolved.
Show resolved Hide resolved
}
let val = first_value(caps.name("values").unwrap().as_str())?;
let metric_value = match caps.name("type").unwrap().as_str() {
Expand Down Expand Up @@ -202,13 +230,15 @@ fn first_value(values: &str) -> Result<f64, ParseError> {
/// from the point of view of this function.
#[inline]
#[must_use]
pub fn id(name: Ustr, tags: &SortedTags) -> u64 {
pub fn id(name: Ustr, tags: &Option<SortedTags>) -> u64 {
let mut hasher = FnvHasher::default();

name.hash(&mut hasher);
for kv in tags.values.iter() {
kv.0.as_bytes().hash(&mut hasher);
kv.1.as_bytes().hash(&mut hasher);
if let Some(tags_present) = tags {
for kv in tags_present.values.iter() {
kv.0.as_bytes().hash(&mut hasher);
kv.1.as_bytes().hash(&mut hasher);
}
}
hasher.finish()
}
Expand Down Expand Up @@ -273,7 +303,7 @@ mod tests {
assert_eq!(name, metric.name.as_str());

if let Some(tags) = tagset {
let parsed_metric_tags : SortedTags= metric.tags.clone();
let parsed_metric_tags : SortedTags = metric.tags.unwrap();
assert_eq!(tags.split(',').count(), parsed_metric_tags.values.len());
tags.split(',').for_each(|kv| {
let (original_key, original_value) = kv.split_once(':').unwrap();
Expand All @@ -287,7 +317,7 @@ mod tests {
assert!(found);
});
} else {
assert!(metric.tags.is_empty());
assert!(metric.tags.is_none());
}

match mtype.as_str() {
Expand Down Expand Up @@ -404,8 +434,8 @@ mod tests {
tagset2.pop();
}

let id1 = id(Ustr::from(&name), &SortedTags::parse(&tagset1).unwrap());
let id2 = id(Ustr::from(&name), &SortedTags::parse(&tagset2).unwrap());
let id1 = id(Ustr::from(&name), &Some(SortedTags::parse(&tagset1).unwrap()));
let id2 = id(Ustr::from(&name), &Some(SortedTags::parse(&tagset2).unwrap()));

assert_eq!(id1, id2);
}
Expand Down
Loading