Skip to content

Commit

Permalink
feat: add matched events metric to watchdog and telemetry (#55)
Browse files Browse the repository at this point in the history
* feat: add matched events metric to watchdog and telemetry

* docs: add description for matched_events and adjust as needed

* chore: bump release

* test: incl matched_events metric
  • Loading branch information
mmta authored Mar 21, 2024
1 parent 79701d7 commit 647382b
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 51 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ default-members = [
]

[workspace.package]
version = "1.3.0"
version = "1.3.1"
authors = ["Dsiem Authors"]
description = "OSSIM-style event correlation engine for ELK stack"
documentation = "https://github.com/defenxor/dsiem/blob/master/docs/README.md"
Expand Down
11 changes: 7 additions & 4 deletions docs/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,18 @@ Measurement is done for the following metrics:

- `eps` : The rate of events/second processed by Dsiem.

- `ttl_directives`: Total number of directives that are loaded and will receive events.
- `ttl_directives`: Total number of directives assigned to the node. This is only checked at startup and never changes afterwards.

- `active_directives`: Total number of directives that are actively having a backlog.

- `backlogs`: The number of backlogs active on the system.
- `backlogs`: The number of backlogs currently active on the system.

- `avg_proc_time_ms`: Average backlog's processing time for a single event, in milliseconds.
- `queue_length`: Total current events in queue, waiting for directive manager to pickup.

- `matched_events`: The number of events matching any directive _since the last measurement_.

- `avg_proc_time_ms`: Average backlog's processing time for a single event, in milliseconds. This resets to 0 together with `matched_events` if there is no matching events since the previous measurement.

- `queue_length`: Total events in queue, waiting for directive manager to pickup.

Here's an example of Thanos frontend displaying the graph of `avg_proc_time_ms`. Prefix `dsiem_` is added to all metrics for easier search experience. Prometheus supports advanced queries and can be set to send alerts when a certain baseline/threshold is breached.

Expand Down
10 changes: 9 additions & 1 deletion server/src/backlog/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct ManagerReport {
pub id: u64,
pub active_backlogs: usize,
pub timedout_backlogs: usize,
pub matched_events: usize,
}

#[derive(Clone)]
Expand Down Expand Up @@ -221,7 +222,9 @@ impl BacklogManager {
id: self.directive.id,
active_backlogs: 0,
timedout_backlogs: 0,
matched_events: 0,
};
let mut prev_matched_events = mgr_report.matched_events;

let clean_deleted = || async {
let mut backlogs = self.backlogs.write().await;
Expand Down Expand Up @@ -341,9 +344,13 @@ impl BacklogManager {
let prev = mgr_report.active_backlogs;
mgr_report.active_backlogs = length;

if mgr_report.active_backlogs != prev {
// send only when there's a change
if mgr_report.active_backlogs != prev || mgr_report.matched_events != prev_matched_events {
let _ = report_sender.try_send(mgr_report.clone());
}
// save prev value then reset
prev_matched_events = mgr_report.matched_events;
mgr_report.matched_events = 0;
},
Some(mut event) = upstream_rx.recv() => {

Expand All @@ -352,6 +359,7 @@ impl BacklogManager {

let _ = backlog_mgr_span.enter();
debug!(directive.id = self.directive.id, event.id, "received event");
mgr_report.matched_events += 1;

let mut match_found = false;
// keep this lock for the entire event recv() loop so the next event will get updated backlogs
Expand Down
6 changes: 4 additions & 2 deletions server/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Manager {
// backlogs saved on disk

if !preload_directives && self.option.reload_backlogs {
debug!("preload_directives is false, listing saved backlogs if any");
debug!("preload_directives is false, listing saved backlogs for reactivation");
// backlogs dir may not exist
let ids = crate::backlog::manager::storage::list(self.option.test_env).unwrap_or_default();
debug!("found {} saved backlogs", ids.len());
Expand Down Expand Up @@ -244,7 +244,7 @@ impl Manager {
// here we just need to find the directive(s) that match the event
let matched_dirs: Vec<&FilterTarget> =
c.iter().filter(|p| matched_with_event(p, &event)).collect();
info!(
debug!(
event.id,
"event matched rules in {} directive(s)",
matched_dirs.len()
Expand Down Expand Up @@ -552,11 +552,13 @@ mod test {
id: 1,
active_backlogs: 1,
timedout_backlogs: 0,
matched_events: 0,
};
let mut rpt2 = ManagerReport {
id: 1,
active_backlogs: 1,
timedout_backlogs: 0,
matched_events: 0,
};
assert!(rpt1 == rpt2);
rpt2.active_backlogs = 2;
Expand Down
104 changes: 63 additions & 41 deletions server/src/watchdog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl Watchdog {
let mut cancel_rx = opt.cancel_tx.subscribe();
let mut resp_histo = HdrHistogram::with_bound(60 * 60 * 1000 * UNIT_MULTIPLIER as u64); // max 1 hour
let max_proc_time_ms = 1000.0 / opt.max_eps as f64;
let mut report_map = HashMap::<u64, (usize, usize)>::new();
let mut report_map = HashMap::<u64, (usize, usize, usize)>::new();
let mut resptime_rx = opt.resptime_rx;
let mut report_rx = opt.report_rx;

Expand All @@ -64,6 +64,7 @@ impl Watchdog {
"dsiem_ttl_directives",
"dsiem_active_directives",
"dsiem_backlogs",
"dsiem_matched_events",
] {
meter.upsert_u64(s, None)?;
}
Expand All @@ -81,48 +82,58 @@ impl Watchdog {
resp_histo.record(v as u64);
}
Some(v) = report_rx.recv() => {
report_map.insert(v.id, (v.active_backlogs, v.timedout_backlogs));
report_map.insert(v.id, (v.active_backlogs, v.timedout_backlogs, v.matched_events));
}
_ = report.tick() => {

let eps = round(opt.eps.metrics.count.throughput.histogram().mean(), 2);
let queue_length = opt.event_tx.len();
let avg_proc_time_ms = resp_histo.mean()/UNIT_MULTIPLIER;

// irrelevant for non preload_directives mode
// let ttl_directives = report_map.len();

let active_directives = report_map.iter().filter(|&(_, (x, _))|*x > 0).count();
let (backlogs, timedout_backlogs) = report_map.values().fold((0, 0), |acc, x| (acc.0 + x.0, acc.1 + x.1));

if let Some(ref mut meter) = meter {
meter.upsert_f64("dsiem_eps", Some(eps))?;
meter.upsert_f64("dsiem_avg_proc_time_ms", Some(avg_proc_time_ms))?;
meter.upsert_u64("dsiem_queue_length", Some(queue_length as u64))?;
meter.upsert_u64("dsiem_ttl_directives", Some(opt.ttl_directives as u64))?;
meter.upsert_u64("dsiem_active_directives", Some(active_directives as u64))?;
meter.upsert_u64("dsiem_backlogs", Some(backlogs as u64))?;
}

let rounded_avg_proc_time_ms = (avg_proc_time_ms * 1000.0).round() / 1000.0;

info!(
eps,
queue_length,
avg_proc_time_ms = rounded_avg_proc_time_ms,
ttl_directives = opt.ttl_directives,
active_directives,
backlogs,
timedout_backlogs,
"watchdog report"
);

if queue_length != 0 && avg_proc_time_ms > max_proc_time_ms {
warn!(avg_proc_time_ms = rounded_avg_proc_time_ms, "avg. processing time maybe too long to sustain the target {} event/sec (or {:.3} ms/event)", opt.max_eps, max_proc_time_ms );
// reset so next it excludes any previous outliers
resp_histo.clear();
let eps = round(opt.eps.metrics.count.throughput.histogram().mean(), 2);
let queue_length = opt.event_tx.len();

// irrelevant for non preload_directives mode
// let ttl_directives = report_map.len();

let active_directives = report_map.iter().filter(|&(_, (x, _, _))|*x > 0).count();
let (backlogs, timedout_backlogs, matched_events) = report_map.values().fold((0, 0, 0), |acc, x| (acc.0 + x.0, acc.1 + x.1, acc.2 + x.2));

// reset this if there's no processed events since last report
let avg_proc_time_ms = match matched_events {
0 => {
resp_histo.clear();
0.0
},
_ => resp_histo.mean()/UNIT_MULTIPLIER
};

if let Some(ref mut meter) = meter {
meter.upsert_f64("dsiem_eps", Some(eps))?;
meter.upsert_f64("dsiem_avg_proc_time_ms", Some(avg_proc_time_ms))?;
meter.upsert_u64("dsiem_queue_length", Some(queue_length as u64))?;
meter.upsert_u64("dsiem_ttl_directives", Some(opt.ttl_directives as u64))?;
meter.upsert_u64("dsiem_active_directives", Some(active_directives as u64))?;
meter.upsert_u64("dsiem_backlogs", Some(backlogs as u64))?;
meter.upsert_u64("dsiem_matched_events", Some(matched_events as u64))?;
}

let rounded_avg_proc_time_ms = (avg_proc_time_ms * 1000.0).round() / 1000.0;

info!(
eps,
queue_length,
avg_proc_time_ms = rounded_avg_proc_time_ms,
ttl_directives = opt.ttl_directives,
active_directives,
matched_events,
backlogs,
timedout_backlogs,
"watchdog report"
);

if queue_length != 0 && avg_proc_time_ms > max_proc_time_ms {
warn!(avg_proc_time_ms = rounded_avg_proc_time_ms, "avg. processing time maybe too long to sustain the target {} event/sec (or {:.3} ms/event)", opt.max_eps, max_proc_time_ms );
// reset so next it excludes any previous outliers
resp_histo.clear();
}
}
}
}
}
Ok(())
Expand Down Expand Up @@ -180,6 +191,16 @@ mod test {
_ = w.start(opt).instrument(span).await;
});

let rpt = ManagerReport {
id: 1,
active_backlogs: 100,
timedout_backlogs: 0,
matched_events: 9001,
};
_ = report_tx.send(rpt).await;
sleep(Duration::from_millis(2000)).await;
assert!(logs_contain("backlogs=100"));

_ = resptime_tx.send(100.0 * UNIT_MULTIPLIER).await;
_ = resptime_tx.send(100.0 * UNIT_MULTIPLIER).await;
_ = resptime_tx.send(25.0 * UNIT_MULTIPLIER).await;
Expand Down Expand Up @@ -207,10 +228,11 @@ mod test {
id: 1,
active_backlogs: 100,
timedout_backlogs: 0,
matched_events: 0,
};
_ = report_tx.send(rpt).await;
sleep(Duration::from_millis(2000)).await;
assert!(logs_contain("backlogs=100"));
sleep(Duration::from_millis(1000)).await;
assert!(logs_contain("avg_proc_time_ms=0.0"));

cancel_tx.send(()).unwrap();
sleep(Duration::from_millis(2000)).await;
Expand Down

0 comments on commit 647382b

Please sign in to comment.