mod.rs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. use {
  2. axum::{routing::get, Router},
  3. hyper::{Body, Request},
  4. log::info,
  5. std::{
  6. future::IntoFuture,
  7. net::{IpAddr, Ipv4Addr, SocketAddr},
  8. sync::{
  9. atomic::{AtomicUsize, Ordering},
  10. Arc,
  11. },
  12. time::{Duration, Instant},
  13. },
  14. tokio::{net::TcpStream, sync::oneshot::Sender, task::JoinSet, time::timeout},
  15. tower::ServiceExt,
  16. };
  17. // 10 seconds is just enough to get a reasonably accurate measurement under our crude methodology
  18. const TEST_SECONDS: u64 = 10;
  19. // A simple web server that puts load on tokio to measure thread contention impacts
  20. pub async fn axum_main(port: u16, ready: Sender<()>) {
  21. // basic handler that responds with a static string
  22. async fn root() -> &'static str {
  23. tokio::time::sleep(Duration::from_millis(1)).await;
  24. "Hello, World!"
  25. }
  26. // build our application with a route
  27. let app = Router::new().route("/", get(root));
  28. // run our app with hyper, listening globally on port 3000
  29. let listener =
  30. tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port))
  31. .await
  32. .unwrap();
  33. info!("Server on port {port} ready");
  34. ready.send(()).unwrap();
  35. let timeout = tokio::time::timeout(
  36. Duration::from_secs(TEST_SECONDS + 1),
  37. axum::serve(listener, app).into_future(),
  38. )
  39. .await;
  40. match timeout {
  41. Ok(v) => {
  42. v.unwrap();
  43. }
  44. Err(_) => {
  45. info!("Terminating server on port {port}");
  46. }
  47. }
  48. }
  49. #[allow(dead_code)]
  50. #[derive(Debug)]
  51. pub struct Stats {
  52. pub latency_s: f32,
  53. pub requests_per_second: f32,
  54. }
  55. // Generates a bunch of HTTP load on the ports provided. Will spawn `tasks` worth
  56. // of connections for each of the ports given.
  57. pub async fn workload_main(ports: &[u16], tasks: usize) -> anyhow::Result<Stats> {
  58. struct ControlBlock {
  59. start_time: std::time::Instant,
  60. requests: AtomicUsize,
  61. cumulative_latency_us: AtomicUsize,
  62. }
  63. let control_block = Arc::new(ControlBlock {
  64. start_time: std::time::Instant::now(),
  65. requests: AtomicUsize::new(0),
  66. cumulative_latency_us: AtomicUsize::new(0),
  67. });
  68. async fn connection(port: u16, control_block: Arc<ControlBlock>) -> anyhow::Result<()> {
  69. let sa = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port);
  70. let stream = TcpStream::connect(sa).await?;
  71. let (mut request_sender, connection) = hyper::client::conn::handshake(stream).await?;
  72. // spawn a task to poll the connection and drive the HTTP state
  73. tokio::spawn(async move {
  74. // Technically, this can error but this only happens when server is killed
  75. let _ = connection.await;
  76. });
  77. let path = "/";
  78. while control_block.start_time.elapsed() < Duration::from_secs(TEST_SECONDS) {
  79. let req = Request::builder()
  80. .uri(path)
  81. .method("GET")
  82. .body(Body::from(""))?;
  83. let start = Instant::now();
  84. let res = timeout(Duration::from_millis(100), request_sender.send_request(req)).await;
  85. let res = match res {
  86. Ok(res) => res?,
  87. Err(_) => {
  88. anyhow::bail!("Timeout on request!")
  89. }
  90. };
  91. let _ = res.body();
  92. if res.status() != 200 {
  93. anyhow::bail!("Got error from server");
  94. }
  95. control_block
  96. .cumulative_latency_us
  97. .fetch_add(start.elapsed().as_micros() as usize, Ordering::Relaxed);
  98. control_block.requests.fetch_add(1, Ordering::Relaxed);
  99. // To send via the same connection again, it may not work as it may not be ready,
  100. // so we have to wait until the request_sender becomes ready.
  101. request_sender.ready().await?;
  102. }
  103. Ok(())
  104. }
  105. let mut join_set = JoinSet::new();
  106. for port in ports {
  107. info!("Starting load generation on port {port}");
  108. for _t in 0..tasks {
  109. join_set.spawn(connection(*port, control_block.clone()));
  110. }
  111. }
  112. while let Some(jr) = join_set.join_next().await {
  113. jr??;
  114. }
  115. let requests = control_block.requests.load(Ordering::Relaxed);
  116. let latency_accumulator_us = control_block.cumulative_latency_us.load(Ordering::Relaxed);
  117. Ok(Stats {
  118. requests_per_second: requests as f32 / TEST_SECONDS as f32,
  119. #[allow(clippy::arithmetic_side_effects)]
  120. latency_s: (latency_accumulator_us / requests) as f32 / 1e6,
  121. })
  122. }