diff --git a/src/logging_client/mod.rs b/src/logging_client/mod.rs index 705de3e3..f4b7036b 100644 --- a/src/logging_client/mod.rs +++ b/src/logging_client/mod.rs @@ -215,7 +215,7 @@ impl BackgroundWorker { } loop { self.flush().await; - if self.request_data.lock().unwrap().is_none() { + if self.request_data.lock().unwrap().is_none() && self.events.lock().unwrap().tables.is_empty() { break; } } diff --git a/tests/ingestion_test.rs b/tests/ingestion_test.rs index e807652e..e5e3c016 100644 --- a/tests/ingestion_test.rs +++ b/tests/ingestion_test.rs @@ -311,7 +311,7 @@ async fn test_persist_meta_tables() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_many_concurrent_requests() { - let timeout_duration = Duration::from_secs(30); + let timeout_duration = Duration::from_secs(120); let _ = env_logger::builder().is_test(true).try_init(); let db_path: PathBuf = tempdir().unwrap().path().into(); @@ -324,6 +324,7 @@ async fn test_many_concurrent_requests() { let port = 8891; let (_, _handle) = create_locustdb(&opts, port); + // Some prior issued reproduced more consistently with tread_count=50 let thread_count = 20; let value_count = 20000; let sum = (0..value_count).map(|i| i as f64).sum::(); @@ -376,7 +377,7 @@ async fn test_many_concurrent_requests() { log::info!("[query {}] Query result is correct", tid); break; } else if last_log_time.elapsed() > Duration::from_secs(5) { - log::info!("[query {}] Query result is incorrect: {:?}", tid, vec); + log::info!("[query {}] Query result is incorrect: {:?}, expected [{}]", tid, vec, sum); last_log_time = Instant::now(); if last_sum != vec[0] { last_sum = vec[0];