core_contention_sweep.rs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. use {
  2. agave_thread_manager::*,
  3. log::info,
  4. std::{collections::HashMap, time::Duration},
  5. tokio::sync::oneshot,
  6. };
  7. mod common;
  8. use common::*;
  9. fn make_config_shared(cc: usize) -> ThreadManagerConfig {
  10. let tokio_cfg_1 = TokioConfig {
  11. core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: cc },
  12. worker_threads: cc,
  13. ..Default::default()
  14. };
  15. let tokio_cfg_2 = tokio_cfg_1.clone();
  16. ThreadManagerConfig {
  17. tokio_configs: HashMap::from([
  18. ("axum1".into(), tokio_cfg_1),
  19. ("axum2".into(), tokio_cfg_2),
  20. ]),
  21. ..Default::default()
  22. }
  23. }
  24. fn make_config_dedicated(core_count: usize) -> ThreadManagerConfig {
  25. let tokio_cfg_1 = TokioConfig {
  26. core_allocation: CoreAllocation::DedicatedCoreSet {
  27. min: 0,
  28. max: core_count / 2,
  29. },
  30. worker_threads: core_count / 2,
  31. ..Default::default()
  32. };
  33. let tokio_cfg_2 = TokioConfig {
  34. core_allocation: CoreAllocation::DedicatedCoreSet {
  35. min: core_count / 2,
  36. max: core_count,
  37. },
  38. worker_threads: core_count / 2,
  39. ..Default::default()
  40. };
  41. ThreadManagerConfig {
  42. tokio_configs: HashMap::from([
  43. ("axum1".into(), tokio_cfg_1),
  44. ("axum2".into(), tokio_cfg_2),
  45. ]),
  46. ..Default::default()
  47. }
  48. }
  49. #[derive(Debug, PartialEq, Eq, Clone, Copy)]
  50. enum Regime {
  51. Shared,
  52. Dedicated,
  53. Single,
  54. }
  55. impl Regime {
  56. const VALUES: [Self; 3] = [Self::Dedicated, Self::Shared, Self::Single];
  57. }
  58. #[derive(Debug, Default, serde::Serialize)]
  59. struct Results {
  60. latencies_s: Vec<f32>,
  61. requests_per_second: Vec<f32>,
  62. }
  63. fn main() -> anyhow::Result<()> {
  64. env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
  65. let mut all_results: HashMap<String, Results> = HashMap::new();
  66. for regime in Regime::VALUES {
  67. let mut results = Results::default();
  68. for core_count in [2, 4, 8, 16] {
  69. let manager;
  70. info!("===================");
  71. info!("Running {core_count} cores under {regime:?}");
  72. let (tokio1, tokio2) = match regime {
  73. Regime::Shared => {
  74. manager = ThreadManager::new(make_config_shared(core_count)).unwrap();
  75. (manager.get_tokio("axum1"), manager.get_tokio("axum2"))
  76. }
  77. Regime::Dedicated => {
  78. manager = ThreadManager::new(make_config_dedicated(core_count)).unwrap();
  79. (manager.get_tokio("axum1"), manager.get_tokio("axum2"))
  80. }
  81. Regime::Single => {
  82. manager = ThreadManager::new(make_config_shared(core_count)).unwrap();
  83. (manager.get_tokio("axum1"), manager.get_tokio("axum2"))
  84. }
  85. };
  86. let workload_runtime = TokioRuntime::new(
  87. "LoadGenerator".to_owned(),
  88. TokioConfig {
  89. core_allocation: CoreAllocation::DedicatedCoreSet { min: 32, max: 64 },
  90. ..Default::default()
  91. },
  92. )?;
  93. let measurement = std::thread::scope(|s| {
  94. let (tx1, rx1) = oneshot::channel();
  95. let (tx2, rx2) = oneshot::channel();
  96. s.spawn(|| {
  97. tokio1.start_metrics_sampling(Duration::from_secs(1));
  98. tokio1.tokio.block_on(axum_main(8888, tx1));
  99. });
  100. let jh = match regime {
  101. Regime::Single => s.spawn(|| {
  102. rx1.blocking_recv().unwrap();
  103. workload_runtime.block_on(workload_main(&[8888, 8888], 3000))
  104. }),
  105. _ => {
  106. s.spawn(|| {
  107. tokio2.start_metrics_sampling(Duration::from_secs(1));
  108. tokio2.tokio.block_on(axum_main(8889, tx2));
  109. });
  110. s.spawn(|| {
  111. rx1.blocking_recv().unwrap();
  112. rx2.blocking_recv().unwrap();
  113. workload_runtime.block_on(workload_main(&[8888, 8889], 3000))
  114. })
  115. }
  116. };
  117. jh.join().expect("Some of the threads crashed!")
  118. })?;
  119. info!("Results are: {measurement:?}");
  120. results.latencies_s.push(measurement.latency_s);
  121. results
  122. .requests_per_second
  123. .push(measurement.requests_per_second);
  124. }
  125. all_results.insert(format!("{regime:?}"), results);
  126. std::thread::sleep(Duration::from_secs(3));
  127. }
  128. //print the resulting measurements so they can be e.g. plotted with matplotlib
  129. println!("{}", serde_json::to_string_pretty(&all_results)?);
  130. Ok(())
  131. }