memory.rs 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. use {
  2. agave_io_uring::{Ring, RingOp},
  3. std::{
  4. io,
  5. ops::{Deref, DerefMut},
  6. ptr::{self, NonNull},
  7. slice,
  8. },
  9. };
  10. // We use fixed buffers to save the cost of mapping/unmapping them at each operation.
  11. //
  12. // Instead of doing many large allocations and registering those, we do a single large one
  13. // and chunk it in slices of up to 1G each.
  14. const FIXED_BUFFER_LEN: usize = 1024 * 1024 * 1024;
  15. pub enum LargeBuffer {
  16. Vec(Vec<u8>),
  17. HugeTable(PageAlignedMemory),
  18. }
  19. impl Deref for LargeBuffer {
  20. type Target = [u8];
  21. fn deref(&self) -> &Self::Target {
  22. match self {
  23. Self::Vec(buf) => buf.as_slice(),
  24. Self::HugeTable(mem) => mem.deref(),
  25. }
  26. }
  27. }
  28. impl DerefMut for LargeBuffer {
  29. fn deref_mut(&mut self) -> &mut Self::Target {
  30. match self {
  31. Self::Vec(buf) => buf.as_mut_slice(),
  32. Self::HugeTable(ref mut mem) => mem.deref_mut(),
  33. }
  34. }
  35. }
  36. impl AsMut<[u8]> for LargeBuffer {
  37. fn as_mut(&mut self) -> &mut [u8] {
  38. match self {
  39. Self::Vec(vec) => vec.as_mut_slice(),
  40. LargeBuffer::HugeTable(ref mut mem) => mem,
  41. }
  42. }
  43. }
  44. impl LargeBuffer {
  45. /// Allocate memory buffer optimized for io_uring operations, i.e.
  46. /// using HugeTable when it is available on the host.
  47. pub fn new(size: usize) -> Self {
  48. if size > PageAlignedMemory::page_size() {
  49. let size = size.next_power_of_two();
  50. if let Ok(alloc) = PageAlignedMemory::alloc_huge_table(size) {
  51. log::info!("obtained hugetable io_uring buffer (len={size})");
  52. return Self::HugeTable(alloc);
  53. }
  54. }
  55. Self::Vec(vec![0; size])
  56. }
  57. }
  58. #[derive(Debug)]
  59. struct AllocError;
  60. pub struct PageAlignedMemory {
  61. ptr: NonNull<u8>,
  62. len: usize,
  63. }
  64. impl PageAlignedMemory {
  65. fn alloc_huge_table(memory_size: usize) -> Result<Self, AllocError> {
  66. let page_size = Self::page_size();
  67. debug_assert!(memory_size.is_power_of_two());
  68. debug_assert!(page_size.is_power_of_two());
  69. let aligned_size = memory_size.next_multiple_of(page_size);
  70. // Safety:
  71. // doing an ANONYMOUS alloc. addr=NULL is ok, fd is not used.
  72. let ptr = unsafe {
  73. libc::mmap(
  74. ptr::null_mut(),
  75. aligned_size,
  76. libc::PROT_READ | libc::PROT_WRITE,
  77. libc::MAP_PRIVATE | libc::MAP_ANONYMOUS | libc::MAP_HUGETLB,
  78. -1,
  79. 0,
  80. )
  81. };
  82. if std::ptr::eq(ptr, libc::MAP_FAILED) {
  83. return Err(AllocError);
  84. }
  85. Ok(Self {
  86. ptr: NonNull::new(ptr as *mut u8).ok_or(AllocError)?,
  87. len: aligned_size,
  88. })
  89. }
  90. fn page_size() -> usize {
  91. // Safety: just a libc wrapper
  92. unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize }
  93. }
  94. }
  95. impl Drop for PageAlignedMemory {
  96. fn drop(&mut self) {
  97. // Safety:
  98. // ptr is a valid pointer returned by mmap
  99. unsafe {
  100. libc::munmap(self.ptr.as_ptr() as *mut libc::c_void, self.len);
  101. }
  102. }
  103. }
  104. impl Deref for PageAlignedMemory {
  105. type Target = [u8];
  106. fn deref(&self) -> &Self::Target {
  107. unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
  108. }
  109. }
  110. impl DerefMut for PageAlignedMemory {
  111. fn deref_mut(&mut self) -> &mut Self::Target {
  112. unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) }
  113. }
  114. }
  115. /// Fixed mutable view into externally allocated IO bytes buffer
  116. /// registered in `io_uring` for access in scheduled IO operations.
  117. ///
  118. /// It is used as an unsafe (no lifetime tracking) equivalent of `&mut [u8]`.
  119. #[derive(Debug)]
  120. pub(super) struct FixedIoBuffer {
  121. ptr: *mut u8,
  122. size: usize,
  123. io_buf_index: Option<u16>,
  124. }
  125. impl FixedIoBuffer {
  126. pub const fn empty() -> Self {
  127. Self {
  128. ptr: std::ptr::null_mut(),
  129. size: 0,
  130. io_buf_index: None,
  131. }
  132. }
  133. /// Split buffer into `chunk_size` sized `IoFixedBuffer` buffers for use as registered
  134. /// buffer in io_uring operations.
  135. pub unsafe fn split_buffer_chunks(
  136. buffer: &mut [u8],
  137. chunk_size: usize,
  138. ) -> impl Iterator<Item = Self> + use<'_> {
  139. assert!(
  140. buffer.len() / FIXED_BUFFER_LEN <= u16::MAX as usize,
  141. "buffer too large to register in io_uring"
  142. );
  143. let buf_start = buffer.as_ptr() as usize;
  144. buffer.chunks_exact_mut(chunk_size).map(move |buf| {
  145. let io_buf_index = (buf.as_ptr() as usize - buf_start) / FIXED_BUFFER_LEN;
  146. Self {
  147. ptr: buf.as_mut_ptr(),
  148. size: buf.len(),
  149. io_buf_index: Some(io_buf_index as u16),
  150. }
  151. })
  152. }
  153. pub fn len(&self) -> usize {
  154. self.size
  155. }
  156. /// Safety: while just returning without dereferencing a pointer is safe, this is marked unsafe
  157. /// so that the callers are encouraged to reason about the lifetime of the buffer.
  158. pub unsafe fn as_mut_ptr(&self) -> *mut u8 {
  159. self.ptr
  160. }
  161. /// The index of the fixed buffer in the ring. See register_buffers().
  162. pub fn io_buf_index(&self) -> Option<u16> {
  163. self.io_buf_index
  164. }
  165. /// Return a clone of `self` reduced to specified `size`
  166. pub fn into_shrinked(self, size: usize) -> Self {
  167. assert!(size <= self.size);
  168. Self {
  169. ptr: self.ptr,
  170. size,
  171. io_buf_index: self.io_buf_index,
  172. }
  173. }
  174. /// Register provided buffer as fixed buffer in `io_uring`.
  175. pub unsafe fn register<S, E: RingOp<S>>(
  176. buffer: &mut [u8],
  177. ring: &Ring<S, E>,
  178. ) -> io::Result<()> {
  179. let iovecs = buffer
  180. .chunks(FIXED_BUFFER_LEN)
  181. .map(|buf| libc::iovec {
  182. iov_base: buf.as_ptr() as _,
  183. iov_len: buf.len(),
  184. })
  185. .collect::<Vec<_>>();
  186. unsafe { ring.register_buffers(&iovecs) }
  187. }
  188. }
  189. impl AsRef<[u8]> for FixedIoBuffer {
  190. fn as_ref(&self) -> &[u8] {
  191. unsafe { slice::from_raw_parts(self.ptr, self.size) }
  192. }
  193. }
  194. /// Check kernel memory lock limit and increase it if necessary.
  195. ///
  196. /// Returns `Err` when current limit is below `min_required` and cannot be increased.
  197. pub fn adjust_ulimit_memlock(min_required: usize) -> io::Result<()> {
  198. fn get_memlock() -> libc::rlimit {
  199. let mut memlock = libc::rlimit {
  200. rlim_cur: 0,
  201. rlim_max: 0,
  202. };
  203. if unsafe { libc::getrlimit(libc::RLIMIT_MEMLOCK, &mut memlock) } != 0 {
  204. log::warn!("getrlimit(RLIMIT_MEMLOCK) failed");
  205. }
  206. memlock
  207. }
  208. let mut memlock = get_memlock();
  209. let current = memlock.rlim_cur as usize;
  210. if current < min_required {
  211. memlock.rlim_cur = min_required as u64;
  212. memlock.rlim_max = min_required as u64;
  213. if unsafe { libc::setrlimit(libc::RLIMIT_MEMLOCK, &memlock) } != 0 {
  214. log::error!(
  215. "Unable to increase the maximum memory lock limit to {min_required} from {current}"
  216. );
  217. if cfg!(target_os = "macos") {
  218. log::error!(
  219. "On mac OS you may need to run |sudo launchctl limit memlock {min_required} \
  220. {min_required}| first"
  221. );
  222. }
  223. return Err(io::Error::new(
  224. io::ErrorKind::OutOfMemory,
  225. "unable to set memory lock limit",
  226. ));
  227. }
  228. memlock = get_memlock();
  229. log::info!("Bumped maximum memory lock limit: {}", memlock.rlim_cur);
  230. }
  231. Ok(())
  232. }