blockstore_db.rs 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450
  1. pub use rocksdb::Direction as IteratorDirection;
  2. use {
  3. crate::{
  4. blockstore::{
  5. column::{
  6. columns, Column, ColumnIndexDeprecation, ColumnName, ProtobufColumn, TypedColumn,
  7. DEPRECATED_PROGRAM_COSTS_COLUMN_NAME,
  8. },
  9. error::{BlockstoreError, Result},
  10. },
  11. blockstore_metrics::{
  12. maybe_enable_rocksdb_perf, report_rocksdb_read_perf, report_rocksdb_write_perf,
  13. BlockstoreRocksDbColumnFamilyMetrics, PerfSamplingStatus, PERF_METRIC_OP_NAME_GET,
  14. PERF_METRIC_OP_NAME_MULTI_GET, PERF_METRIC_OP_NAME_PUT,
  15. PERF_METRIC_OP_NAME_WRITE_BATCH,
  16. },
  17. blockstore_options::{AccessType, BlockstoreOptions, LedgerColumnOptions},
  18. },
  19. bincode::deserialize,
  20. log::*,
  21. prost::Message,
  22. rocksdb::{
  23. self,
  24. compaction_filter::CompactionFilter,
  25. compaction_filter_factory::{CompactionFilterContext, CompactionFilterFactory},
  26. properties as RocksProperties, ColumnFamily, ColumnFamilyDescriptor, CompactionDecision,
  27. DBCompressionType, DBIterator, DBPinnableSlice, DBRawIterator,
  28. IteratorMode as RocksIteratorMode, LiveFile, Options, WriteBatch as RWriteBatch, DB,
  29. },
  30. serde::de::DeserializeOwned,
  31. solana_clock::Slot,
  32. std::{
  33. collections::HashSet,
  34. ffi::{CStr, CString},
  35. fs,
  36. marker::PhantomData,
  37. num::NonZeroUsize,
  38. path::{Path, PathBuf},
  39. sync::{
  40. atomic::{AtomicBool, AtomicU64, Ordering},
  41. Arc,
  42. },
  43. },
  44. };
  45. const BLOCKSTORE_METRICS_ERROR: i64 = -1;
  46. const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB
  47. // SST files older than this value will be picked up for compaction. This value
  48. // was chosen to be one day to strike a balance between storage getting
  49. // reclaimed in a timely manner and the additional I/O that compaction incurs.
  50. // For more details on this property, see
  51. // https://github.com/facebook/rocksdb/blob/749b179c041347d150fa6721992ae8398b7d2b39/
  52. // include/rocksdb/advanced_options.h#L908C30-L908C30
  53. const PERIODIC_COMPACTION_SECONDS: u64 = 60 * 60 * 24;
  54. pub enum IteratorMode<Index> {
  55. Start,
  56. End,
  57. From(Index, IteratorDirection),
  58. }
  59. #[derive(Default, Clone, Debug)]
  60. struct OldestSlot {
  61. slot: Arc<AtomicU64>,
  62. clean_slot_0: Arc<AtomicBool>,
  63. }
  64. impl OldestSlot {
  65. pub fn set(&self, oldest_slot: Slot) {
  66. // this is independently used for compaction_filter without any data dependency.
  67. // also, compaction_filters are created via its factories, creating short-lived copies of
  68. // this atomic value for the single job of compaction. So, Relaxed store can be justified
  69. // in total
  70. self.slot.store(oldest_slot, Ordering::Relaxed);
  71. }
  72. pub fn get(&self) -> Slot {
  73. // copy from the AtomicU64 as a general precaution so that the oldest_slot can not mutate
  74. // across single run of compaction for simpler reasoning although this isn't strict
  75. // requirement at the moment
  76. // also eventual propagation (very Relaxed) load is Ok, because compaction by nature doesn't
  77. // require strictly synchronized semantics in this regard
  78. self.slot.load(Ordering::Relaxed)
  79. }
  80. pub(crate) fn set_clean_slot_0(&self, clean_slot_0: bool) {
  81. self.clean_slot_0.store(clean_slot_0, Ordering::Relaxed);
  82. }
  83. pub(crate) fn get_clean_slot_0(&self) -> bool {
  84. self.clean_slot_0.load(Ordering::Relaxed)
  85. }
  86. }
  87. #[derive(Debug)]
  88. pub(crate) struct Rocks {
  89. db: rocksdb::DB,
  90. path: PathBuf,
  91. access_type: AccessType,
  92. oldest_slot: OldestSlot,
  93. column_options: Arc<LedgerColumnOptions>,
  94. write_batch_perf_status: PerfSamplingStatus,
  95. }
  96. impl Rocks {
  97. pub(crate) fn open(path: PathBuf, options: BlockstoreOptions) -> Result<Rocks> {
  98. let recovery_mode = options.recovery_mode.clone();
  99. fs::create_dir_all(&path)?;
  100. // Use default database options
  101. let mut db_options = get_db_options(&options);
  102. if let Some(recovery_mode) = recovery_mode {
  103. db_options.set_wal_recovery_mode(recovery_mode.into());
  104. }
  105. let oldest_slot = OldestSlot::default();
  106. let cf_descriptors = Self::cf_descriptors(&path, &options, &oldest_slot);
  107. let column_options = Arc::from(options.column_options);
  108. // Open the database
  109. let mut db = match options.access_type {
  110. AccessType::Primary | AccessType::PrimaryForMaintenance => {
  111. DB::open_cf_descriptors(&db_options, &path, cf_descriptors)?
  112. }
  113. AccessType::Secondary => {
  114. let secondary_path = path.join("solana-secondary");
  115. info!(
  116. "Opening Rocks with secondary (read only) access at: {secondary_path:?}. This \
  117. secondary access could temporarily degrade other accesses, such as by \
  118. agave-validator"
  119. );
  120. DB::open_cf_descriptors_as_secondary(
  121. &db_options,
  122. &path,
  123. &secondary_path,
  124. cf_descriptors,
  125. )?
  126. }
  127. };
  128. // Delete the now unused program_costs column if it is present
  129. if db.cf_handle(DEPRECATED_PROGRAM_COSTS_COLUMN_NAME).is_some() {
  130. db.drop_cf(DEPRECATED_PROGRAM_COSTS_COLUMN_NAME)?;
  131. }
  132. let rocks = Rocks {
  133. db,
  134. path,
  135. access_type: options.access_type,
  136. oldest_slot,
  137. column_options,
  138. write_batch_perf_status: PerfSamplingStatus::default(),
  139. };
  140. rocks.configure_compaction();
  141. Ok(rocks)
  142. }
  143. /// Create the column family (CF) descriptors necessary to open the database.
  144. ///
  145. /// In order to open a RocksDB database with Primary access, all columns must be opened. So,
  146. /// in addition to creating descriptors for all of the expected columns, also create
  147. /// descriptors for columns that were discovered but are otherwise unknown to the software.
  148. ///
  149. /// One case where columns could be unknown is if a RocksDB database is modified with a newer
  150. /// software version that adds a new column, and then also opened with an older version that
  151. /// did not have knowledge of that new column.
  152. fn cf_descriptors(
  153. path: &Path,
  154. options: &BlockstoreOptions,
  155. oldest_slot: &OldestSlot,
  156. ) -> Vec<ColumnFamilyDescriptor> {
  157. let mut cf_descriptors = vec![
  158. new_cf_descriptor::<columns::SlotMeta>(options, oldest_slot),
  159. new_cf_descriptor::<columns::DeadSlots>(options, oldest_slot),
  160. new_cf_descriptor::<columns::DuplicateSlots>(options, oldest_slot),
  161. new_cf_descriptor::<columns::ErasureMeta>(options, oldest_slot),
  162. new_cf_descriptor::<columns::Orphans>(options, oldest_slot),
  163. new_cf_descriptor::<columns::BankHash>(options, oldest_slot),
  164. new_cf_descriptor::<columns::Root>(options, oldest_slot),
  165. new_cf_descriptor::<columns::Index>(options, oldest_slot),
  166. new_cf_descriptor::<columns::ShredData>(options, oldest_slot),
  167. new_cf_descriptor::<columns::ShredCode>(options, oldest_slot),
  168. new_cf_descriptor::<columns::TransactionStatus>(options, oldest_slot),
  169. new_cf_descriptor::<columns::AddressSignatures>(options, oldest_slot),
  170. new_cf_descriptor::<columns::TransactionMemos>(options, oldest_slot),
  171. new_cf_descriptor::<columns::TransactionStatusIndex>(options, oldest_slot),
  172. new_cf_descriptor::<columns::Rewards>(options, oldest_slot),
  173. new_cf_descriptor::<columns::Blocktime>(options, oldest_slot),
  174. new_cf_descriptor::<columns::PerfSamples>(options, oldest_slot),
  175. new_cf_descriptor::<columns::BlockHeight>(options, oldest_slot),
  176. new_cf_descriptor::<columns::OptimisticSlots>(options, oldest_slot),
  177. new_cf_descriptor::<columns::MerkleRootMeta>(options, oldest_slot),
  178. ];
  179. // If the access type is Secondary, we don't need to open all of the
  180. // columns so we can just return immediately.
  181. match options.access_type {
  182. AccessType::Secondary => {
  183. return cf_descriptors;
  184. }
  185. AccessType::Primary | AccessType::PrimaryForMaintenance => {}
  186. }
  187. // Attempt to detect the column families that are present. It is not a
  188. // fatal error if we cannot, for example, if the Blockstore is brand
  189. // new and will be created by the call to Rocks::open().
  190. let detected_cfs = match DB::list_cf(&Options::default(), path) {
  191. Ok(detected_cfs) => detected_cfs,
  192. Err(err) => {
  193. warn!("Unable to detect Rocks columns: {err:?}");
  194. vec![]
  195. }
  196. };
  197. // The default column is handled automatically, we don't need to create
  198. // a descriptor for it
  199. const DEFAULT_COLUMN_NAME: &str = "default";
  200. let known_cfs: HashSet<_> = cf_descriptors
  201. .iter()
  202. .map(|cf_descriptor| cf_descriptor.name().to_string())
  203. .chain(std::iter::once(DEFAULT_COLUMN_NAME.to_string()))
  204. .collect();
  205. detected_cfs.iter().for_each(|cf_name| {
  206. if !known_cfs.contains(cf_name.as_str()) {
  207. info!("Detected unknown column {cf_name}, opening column with basic options");
  208. // This version of the software was unaware of the column, so
  209. // it is fair to assume that we will not attempt to read or
  210. // write the column. So, set some bare bones settings to avoid
  211. // using extra resources on this unknown column.
  212. let mut options = Options::default();
  213. // Lower the default to avoid unnecessary allocations
  214. options.set_write_buffer_size(1024 * 1024);
  215. // Disable compactions to avoid any modifications to the column
  216. options.set_disable_auto_compactions(true);
  217. cf_descriptors.push(ColumnFamilyDescriptor::new(cf_name, options));
  218. }
  219. });
  220. cf_descriptors
  221. }
  222. const fn columns() -> [&'static str; 20] {
  223. [
  224. columns::ErasureMeta::NAME,
  225. columns::DeadSlots::NAME,
  226. columns::DuplicateSlots::NAME,
  227. columns::Index::NAME,
  228. columns::Orphans::NAME,
  229. columns::BankHash::NAME,
  230. columns::Root::NAME,
  231. columns::SlotMeta::NAME,
  232. columns::ShredData::NAME,
  233. columns::ShredCode::NAME,
  234. columns::TransactionStatus::NAME,
  235. columns::AddressSignatures::NAME,
  236. columns::TransactionMemos::NAME,
  237. columns::TransactionStatusIndex::NAME,
  238. columns::Rewards::NAME,
  239. columns::Blocktime::NAME,
  240. columns::PerfSamples::NAME,
  241. columns::BlockHeight::NAME,
  242. columns::OptimisticSlots::NAME,
  243. columns::MerkleRootMeta::NAME,
  244. ]
  245. }
  246. // Configure compaction on a per-column basis
  247. fn configure_compaction(&self) {
  248. // If compactions are disabled altogether, no need to tune values
  249. if should_disable_auto_compactions(&self.access_type) {
  250. info!(
  251. "Rocks's automatic compactions are disabled due to {:?} access",
  252. self.access_type
  253. );
  254. return;
  255. }
  256. // Some columns make use of rocksdb's compaction to help in cleaning
  257. // the database. See comments in should_enable_cf_compaction() for more
  258. // details on why some columns need compaction and why others do not.
  259. //
  260. // More specifically, periodic (automatic) compaction is used as
  261. // opposed to manual compaction requests on a range.
  262. // - Periodic compaction operates on individual files once the file
  263. // has reached a certain (configurable) age. See comments at
  264. // PERIODIC_COMPACTION_SECONDS for some more deatil.
  265. // - Manual compaction operates on a range and could end up propagating
  266. // through several files and/or levels of the db.
  267. //
  268. // Given that data is inserted into the db at a somewhat steady rate,
  269. // the age of the individual files will be fairly evently distributed
  270. // over time as well. Thus, the I/O to perform cleanup with periodic
  271. // compaction is also evenly distributed over time. On the other hand,
  272. // a manual compaction spanning a large numbers of files could cause
  273. // a sudden burst in I/O. Such a burst could potentially cause a write
  274. // stall in addition to negatively impacting other parts of the system.
  275. // Thus, the choice to use periodic compactions is fairly easy.
  276. for cf_name in Self::columns() {
  277. if should_enable_cf_compaction(cf_name) {
  278. let cf_handle = self.cf_handle(cf_name);
  279. self.db
  280. .set_options_cf(
  281. &cf_handle,
  282. &[(
  283. "periodic_compaction_seconds",
  284. &PERIODIC_COMPACTION_SECONDS.to_string(),
  285. )],
  286. )
  287. .unwrap();
  288. }
  289. }
  290. }
  291. pub(crate) fn column<C>(self: &Arc<Self>) -> LedgerColumn<C>
  292. where
  293. C: Column + ColumnName,
  294. {
  295. let column_options = Arc::clone(&self.column_options);
  296. LedgerColumn {
  297. backend: Arc::clone(self),
  298. column: PhantomData,
  299. column_options,
  300. read_perf_status: PerfSamplingStatus::default(),
  301. write_perf_status: PerfSamplingStatus::default(),
  302. }
  303. }
  304. pub(crate) fn destroy(path: &Path) -> Result<()> {
  305. DB::destroy(&Options::default(), path)?;
  306. Ok(())
  307. }
  308. pub(crate) fn cf_handle(&self, cf: &str) -> &ColumnFamily {
  309. self.db
  310. .cf_handle(cf)
  311. .expect("should never get an unknown column")
  312. }
  313. fn get_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<Option<Vec<u8>>> {
  314. let opt = self.db.get_cf(cf, key)?;
  315. Ok(opt)
  316. }
  317. fn get_pinned_cf(
  318. &self,
  319. cf: &ColumnFamily,
  320. key: impl AsRef<[u8]>,
  321. ) -> Result<Option<DBPinnableSlice<'_>>> {
  322. let opt = self.db.get_pinned_cf(cf, key)?;
  323. Ok(opt)
  324. }
  325. fn put_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K, value: &[u8]) -> Result<()> {
  326. self.db.put_cf(cf, key, value)?;
  327. Ok(())
  328. }
  329. fn multi_get_cf<'a, K, I>(
  330. &self,
  331. cf: &ColumnFamily,
  332. keys: I,
  333. ) -> impl Iterator<Item = Result<Option<DBPinnableSlice<'_>>>>
  334. where
  335. K: AsRef<[u8]> + 'a + ?Sized,
  336. I: IntoIterator<Item = &'a K>,
  337. {
  338. self.db
  339. .batched_multi_get_cf(cf, keys, /*sorted_input:*/ false)
  340. .into_iter()
  341. .map(|out| out.map_err(BlockstoreError::RocksDb))
  342. }
  343. fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<()> {
  344. self.db.delete_cf(cf, key)?;
  345. Ok(())
  346. }
  347. /// Delete files whose slot range is within \[`from`, `to`\].
  348. fn delete_file_in_range_cf<K: AsRef<[u8]>>(
  349. &self,
  350. cf: &ColumnFamily,
  351. from_key: K,
  352. to_key: K,
  353. ) -> Result<()> {
  354. self.db.delete_file_in_range_cf(cf, from_key, to_key)?;
  355. Ok(())
  356. }
  357. pub(crate) fn iterator_cf(
  358. &self,
  359. cf: &ColumnFamily,
  360. iterator_mode: RocksIteratorMode,
  361. ) -> DBIterator<'_> {
  362. self.db.iterator_cf(cf, iterator_mode)
  363. }
  364. pub(crate) fn raw_iterator_cf(&self, cf: &ColumnFamily) -> Result<DBRawIterator<'_>> {
  365. Ok(self.db.raw_iterator_cf(cf))
  366. }
  367. pub(crate) fn batch(&self) -> Result<WriteBatch> {
  368. Ok(WriteBatch {
  369. write_batch: RWriteBatch::default(),
  370. })
  371. }
  372. pub(crate) fn write(&self, batch: WriteBatch) -> Result<()> {
  373. let op_start_instant = maybe_enable_rocksdb_perf(
  374. self.column_options.rocks_perf_sample_interval,
  375. &self.write_batch_perf_status,
  376. );
  377. let result = self.db.write(batch.write_batch);
  378. if let Some(op_start_instant) = op_start_instant {
  379. report_rocksdb_write_perf(
  380. PERF_METRIC_OP_NAME_WRITE_BATCH, // We use write_batch as cf_name for write batch.
  381. PERF_METRIC_OP_NAME_WRITE_BATCH, // op_name
  382. &op_start_instant.elapsed(),
  383. &self.column_options,
  384. );
  385. }
  386. match result {
  387. Ok(_) => Ok(()),
  388. Err(e) => Err(BlockstoreError::RocksDb(e)),
  389. }
  390. }
  391. pub(crate) fn is_primary_access(&self) -> bool {
  392. self.access_type == AccessType::Primary
  393. || self.access_type == AccessType::PrimaryForMaintenance
  394. }
  395. /// Retrieves the specified RocksDB integer property of the current
  396. /// column family.
  397. ///
  398. /// Full list of properties that return int values could be found
  399. /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
  400. fn get_int_property_cf(&self, cf: &ColumnFamily, name: &'static std::ffi::CStr) -> Result<i64> {
  401. match self.db.property_int_value_cf(cf, name) {
  402. Ok(Some(value)) => Ok(value.try_into().unwrap()),
  403. Ok(None) => Ok(0),
  404. Err(e) => Err(BlockstoreError::RocksDb(e)),
  405. }
  406. }
  407. pub(crate) fn live_files_metadata(&self) -> Result<Vec<LiveFile>> {
  408. match self.db.live_files() {
  409. Ok(live_files) => Ok(live_files),
  410. Err(e) => Err(BlockstoreError::RocksDb(e)),
  411. }
  412. }
  413. pub(crate) fn storage_size(&self) -> Result<u64> {
  414. Ok(fs_extra::dir::get_size(&self.path)?)
  415. }
  416. pub(crate) fn set_oldest_slot(&self, oldest_slot: Slot) {
  417. self.oldest_slot.set(oldest_slot);
  418. }
  419. pub(crate) fn set_clean_slot_0(&self, clean_slot_0: bool) {
  420. self.oldest_slot.set_clean_slot_0(clean_slot_0);
  421. }
  422. }
  423. #[derive(Debug)]
  424. pub struct LedgerColumn<C: Column + ColumnName> {
  425. backend: Arc<Rocks>,
  426. column: PhantomData<C>,
  427. pub column_options: Arc<LedgerColumnOptions>,
  428. read_perf_status: PerfSamplingStatus,
  429. write_perf_status: PerfSamplingStatus,
  430. }
  431. impl<C: Column + ColumnName> LedgerColumn<C> {
  432. pub fn submit_rocksdb_cf_metrics(&self) {
  433. let cf_rocksdb_metrics = BlockstoreRocksDbColumnFamilyMetrics {
  434. total_sst_files_size: self
  435. .get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE)
  436. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  437. size_all_mem_tables: self
  438. .get_int_property(RocksProperties::SIZE_ALL_MEM_TABLES)
  439. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  440. num_snapshots: self
  441. .get_int_property(RocksProperties::NUM_SNAPSHOTS)
  442. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  443. oldest_snapshot_time: self
  444. .get_int_property(RocksProperties::OLDEST_SNAPSHOT_TIME)
  445. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  446. actual_delayed_write_rate: self
  447. .get_int_property(RocksProperties::ACTUAL_DELAYED_WRITE_RATE)
  448. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  449. is_write_stopped: self
  450. .get_int_property(RocksProperties::IS_WRITE_STOPPED)
  451. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  452. block_cache_capacity: self
  453. .get_int_property(RocksProperties::BLOCK_CACHE_CAPACITY)
  454. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  455. block_cache_usage: self
  456. .get_int_property(RocksProperties::BLOCK_CACHE_USAGE)
  457. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  458. block_cache_pinned_usage: self
  459. .get_int_property(RocksProperties::BLOCK_CACHE_PINNED_USAGE)
  460. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  461. estimate_table_readers_mem: self
  462. .get_int_property(RocksProperties::ESTIMATE_TABLE_READERS_MEM)
  463. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  464. mem_table_flush_pending: self
  465. .get_int_property(RocksProperties::MEM_TABLE_FLUSH_PENDING)
  466. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  467. compaction_pending: self
  468. .get_int_property(RocksProperties::COMPACTION_PENDING)
  469. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  470. num_running_compactions: self
  471. .get_int_property(RocksProperties::NUM_RUNNING_COMPACTIONS)
  472. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  473. num_running_flushes: self
  474. .get_int_property(RocksProperties::NUM_RUNNING_FLUSHES)
  475. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  476. estimate_oldest_key_time: self
  477. .get_int_property(RocksProperties::ESTIMATE_OLDEST_KEY_TIME)
  478. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  479. background_errors: self
  480. .get_int_property(RocksProperties::BACKGROUND_ERRORS)
  481. .unwrap_or(BLOCKSTORE_METRICS_ERROR),
  482. };
  483. cf_rocksdb_metrics.report_metrics(C::NAME, &self.column_options);
  484. }
  485. }
  486. pub struct WriteBatch {
  487. write_batch: RWriteBatch,
  488. }
  489. pub(crate) struct BlockstoreByteReference<'a> {
  490. slice: DBPinnableSlice<'a>,
  491. }
  492. impl<'a> From<DBPinnableSlice<'a>> for BlockstoreByteReference<'a> {
  493. #[inline]
  494. fn from(slice: DBPinnableSlice<'a>) -> Self {
  495. Self { slice }
  496. }
  497. }
  498. impl std::ops::Deref for BlockstoreByteReference<'_> {
  499. type Target = [u8];
  500. #[inline]
  501. fn deref(&self) -> &[u8] {
  502. &self.slice
  503. }
  504. }
  505. impl AsRef<[u8]> for BlockstoreByteReference<'_> {
  506. #[inline]
  507. fn as_ref(&self) -> &[u8] {
  508. self
  509. }
  510. }
  511. impl WriteBatch {
  512. fn put_cf<K: AsRef<[u8]>>(&mut self, cf: &ColumnFamily, key: K, value: &[u8]) -> Result<()> {
  513. self.write_batch.put_cf(cf, key, value);
  514. Ok(())
  515. }
  516. fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &ColumnFamily, key: K) -> Result<()> {
  517. self.write_batch.delete_cf(cf, key);
  518. Ok(())
  519. }
  520. fn delete_range_cf<K: AsRef<[u8]>>(&mut self, cf: &ColumnFamily, from: K, to: K) -> Result<()> {
  521. self.write_batch.delete_range_cf(cf, from, to);
  522. Ok(())
  523. }
  524. }
  525. impl<C> LedgerColumn<C>
  526. where
  527. C: Column + ColumnName,
  528. {
  529. pub fn get_bytes(&self, index: C::Index) -> Result<Option<Vec<u8>>> {
  530. let is_perf_enabled = maybe_enable_rocksdb_perf(
  531. self.column_options.rocks_perf_sample_interval,
  532. &self.read_perf_status,
  533. );
  534. let key = <C as Column>::key(&index);
  535. let result = self.backend.get_cf(self.handle(), key);
  536. if let Some(op_start_instant) = is_perf_enabled {
  537. report_rocksdb_read_perf(
  538. C::NAME,
  539. PERF_METRIC_OP_NAME_GET,
  540. &op_start_instant.elapsed(),
  541. &self.column_options,
  542. );
  543. }
  544. result
  545. }
  546. /// Create a key type suitable for use with multi_get_bytes() and
  547. /// multi_get(). Those functions return iterators, so the keys must be
  548. /// created with a separate function in order to live long enough
  549. pub(crate) fn multi_get_keys<I>(&self, keys: I) -> Vec<<C as Column>::Key>
  550. where
  551. I: IntoIterator<Item = C::Index>,
  552. {
  553. keys.into_iter().map(|index| C::key(&index)).collect()
  554. }
  555. pub(crate) fn multi_get_bytes<'a, K>(
  556. &'a self,
  557. keys: impl IntoIterator<Item = &'a K> + 'a,
  558. ) -> impl Iterator<Item = Result<Option<BlockstoreByteReference<'a>>>> + 'a
  559. where
  560. K: AsRef<[u8]> + 'a + ?Sized,
  561. {
  562. let is_perf_enabled = maybe_enable_rocksdb_perf(
  563. self.column_options.rocks_perf_sample_interval,
  564. &self.read_perf_status,
  565. );
  566. let result = self
  567. .backend
  568. .multi_get_cf(self.handle(), keys)
  569. .map(|out| Ok(out?.map(BlockstoreByteReference::from)));
  570. if let Some(op_start_instant) = is_perf_enabled {
  571. report_rocksdb_read_perf(
  572. C::NAME,
  573. PERF_METRIC_OP_NAME_MULTI_GET,
  574. &op_start_instant.elapsed(),
  575. &self.column_options,
  576. );
  577. }
  578. result
  579. }
  580. pub fn iter(
  581. &self,
  582. iterator_mode: IteratorMode<C::Index>,
  583. ) -> Result<impl Iterator<Item = (C::Index, Box<[u8]>)> + '_> {
  584. let start_key: <C as Column>::Key;
  585. let iterator_mode = match iterator_mode {
  586. IteratorMode::Start => RocksIteratorMode::Start,
  587. IteratorMode::End => RocksIteratorMode::End,
  588. IteratorMode::From(start, direction) => {
  589. start_key = <C as Column>::key(&start);
  590. RocksIteratorMode::From(start_key.as_ref(), direction)
  591. }
  592. };
  593. let iter = self.backend.iterator_cf(self.handle(), iterator_mode);
  594. Ok(iter.map(|pair| {
  595. let (key, value) = pair.unwrap();
  596. (C::index(&key), value)
  597. }))
  598. }
  599. #[cfg(test)]
  600. // The validator performs compactions asynchronously, this method is
  601. // provided to force a synchronous compaction to test our compaction filter
  602. pub fn compact(&self) {
  603. // compact_range_cf() optionally takes a start and end key to limit
  604. // compaction. Providing values will result in a different method
  605. // getting called in the rocksdb code, even if the specified keys span
  606. // the entire key range of the column
  607. //
  608. // Internally, rocksdb will do some checks to figure out if it should
  609. // run a compaction. Empirically, it has been found that passing the
  610. // keys leads to more variability in whether rocksdb runs a compaction
  611. // or not. For the sake of our unit tests, we want the compaction to
  612. // run everytime. So, set the keys as None which will result in rocksdb
  613. // using the heavier method to determine if a compaction should run
  614. let (start, end) = (None::<&[u8]>, None::<&[u8]>);
  615. self.backend.db.compact_range_cf(self.handle(), start, end);
  616. }
  617. #[inline]
  618. pub fn handle(&self) -> &ColumnFamily {
  619. self.backend.cf_handle(C::NAME)
  620. }
  621. #[cfg(test)]
  622. pub fn is_empty(&self) -> Result<bool> {
  623. let mut iter = self.backend.raw_iterator_cf(self.handle())?;
  624. iter.seek_to_first();
  625. Ok(!iter.valid())
  626. }
  627. pub fn put_bytes(&self, index: C::Index, value: &[u8]) -> Result<()> {
  628. let is_perf_enabled = maybe_enable_rocksdb_perf(
  629. self.column_options.rocks_perf_sample_interval,
  630. &self.write_perf_status,
  631. );
  632. let key = <C as Column>::key(&index);
  633. let result = self.backend.put_cf(self.handle(), key, value);
  634. if let Some(op_start_instant) = is_perf_enabled {
  635. report_rocksdb_write_perf(
  636. C::NAME,
  637. PERF_METRIC_OP_NAME_PUT,
  638. &op_start_instant.elapsed(),
  639. &self.column_options,
  640. );
  641. }
  642. result
  643. }
  644. pub fn put_bytes_in_batch(
  645. &self,
  646. batch: &mut WriteBatch,
  647. index: C::Index,
  648. value: &[u8],
  649. ) -> Result<()> {
  650. let key = <C as Column>::key(&index);
  651. batch.put_cf(self.handle(), key, value)
  652. }
  653. /// Retrieves the specified RocksDB integer property of the current
  654. /// column family.
  655. ///
  656. /// Full list of properties that return int values could be found
  657. /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
  658. pub fn get_int_property(&self, name: &'static std::ffi::CStr) -> Result<i64> {
  659. self.backend.get_int_property_cf(self.handle(), name)
  660. }
  661. pub fn delete(&self, index: C::Index) -> Result<()> {
  662. let is_perf_enabled = maybe_enable_rocksdb_perf(
  663. self.column_options.rocks_perf_sample_interval,
  664. &self.write_perf_status,
  665. );
  666. let key = <C as Column>::key(&index);
  667. let result = self.backend.delete_cf(self.handle(), key);
  668. if let Some(op_start_instant) = is_perf_enabled {
  669. report_rocksdb_write_perf(
  670. C::NAME,
  671. "delete",
  672. &op_start_instant.elapsed(),
  673. &self.column_options,
  674. );
  675. }
  676. result
  677. }
  678. pub fn delete_in_batch(&self, batch: &mut WriteBatch, index: C::Index) -> Result<()> {
  679. let key = <C as Column>::key(&index);
  680. batch.delete_cf(self.handle(), key)
  681. }
  682. /// Adds a \[`from`, `to`\] range that deletes all entries between the `from` slot
  683. /// and `to` slot inclusively. If `from` slot and `to` slot are the same, then all
  684. /// entries in that slot will be removed.
  685. pub fn delete_range_in_batch(&self, batch: &mut WriteBatch, from: Slot, to: Slot) -> Result<()>
  686. where
  687. C: Column + ColumnName,
  688. {
  689. // Note that the default behavior of rocksdb's delete_range_cf deletes
  690. // files within [from, to), while our purge logic applies to [from, to].
  691. //
  692. // For consistency, we make our delete_range_cf works for [from, to] by
  693. // adjusting the `to` slot range by 1.
  694. let from_key = <C as Column>::key(&C::as_index(from));
  695. let to_key = <C as Column>::key(&C::as_index(to.saturating_add(1)));
  696. batch.delete_range_cf(self.handle(), from_key, to_key)
  697. }
  698. /// Delete files whose slot range is within \[`from`, `to`\].
  699. pub fn delete_file_in_range(&self, from: Slot, to: Slot) -> Result<()>
  700. where
  701. C: Column + ColumnName,
  702. {
  703. let from_key = <C as Column>::key(&C::as_index(from));
  704. let to_key = <C as Column>::key(&C::as_index(to));
  705. self.backend
  706. .delete_file_in_range_cf(self.handle(), from_key, to_key)
  707. }
  708. }
  709. impl<C> LedgerColumn<C>
  710. where
  711. C: TypedColumn + ColumnName,
  712. {
  713. pub(crate) fn multi_get<'a, K>(
  714. &'a self,
  715. keys: impl IntoIterator<Item = &'a K> + 'a,
  716. ) -> impl Iterator<Item = Result<Option<C::Type>>> + 'a
  717. where
  718. K: AsRef<[u8]> + 'a + ?Sized,
  719. {
  720. let is_perf_enabled = maybe_enable_rocksdb_perf(
  721. self.column_options.rocks_perf_sample_interval,
  722. &self.read_perf_status,
  723. );
  724. let result = self
  725. .backend
  726. .multi_get_cf(self.handle(), keys)
  727. .map(|out| out?.as_deref().map(C::deserialize).transpose());
  728. if let Some(op_start_instant) = is_perf_enabled {
  729. // use multi-get instead
  730. report_rocksdb_read_perf(
  731. C::NAME,
  732. PERF_METRIC_OP_NAME_MULTI_GET,
  733. &op_start_instant.elapsed(),
  734. &self.column_options,
  735. );
  736. }
  737. result
  738. }
  739. pub fn get(&self, index: C::Index) -> Result<Option<C::Type>> {
  740. let key = <C as Column>::key(&index);
  741. self.get_raw(key)
  742. }
  743. pub fn get_raw<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<C::Type>> {
  744. let mut result = Ok(None);
  745. let is_perf_enabled = maybe_enable_rocksdb_perf(
  746. self.column_options.rocks_perf_sample_interval,
  747. &self.read_perf_status,
  748. );
  749. if let Some(pinnable_slice) = self.backend.get_pinned_cf(self.handle(), key)? {
  750. let value = C::deserialize(pinnable_slice.as_ref())?;
  751. result = Ok(Some(value))
  752. }
  753. if let Some(op_start_instant) = is_perf_enabled {
  754. report_rocksdb_read_perf(
  755. C::NAME,
  756. PERF_METRIC_OP_NAME_GET,
  757. &op_start_instant.elapsed(),
  758. &self.column_options,
  759. );
  760. }
  761. result
  762. }
  763. pub fn put(&self, index: C::Index, value: &C::Type) -> Result<()> {
  764. let is_perf_enabled = maybe_enable_rocksdb_perf(
  765. self.column_options.rocks_perf_sample_interval,
  766. &self.write_perf_status,
  767. );
  768. let serialized_value = C::serialize(value)?;
  769. let key = <C as Column>::key(&index);
  770. let result = self.backend.put_cf(self.handle(), key, &serialized_value);
  771. if let Some(op_start_instant) = is_perf_enabled {
  772. report_rocksdb_write_perf(
  773. C::NAME,
  774. PERF_METRIC_OP_NAME_PUT,
  775. &op_start_instant.elapsed(),
  776. &self.column_options,
  777. );
  778. }
  779. result
  780. }
  781. pub fn put_in_batch(
  782. &self,
  783. batch: &mut WriteBatch,
  784. index: C::Index,
  785. value: &C::Type,
  786. ) -> Result<()> {
  787. let key = <C as Column>::key(&index);
  788. let serialized_value = C::serialize(value)?;
  789. batch.put_cf(self.handle(), key, &serialized_value)
  790. }
  791. }
  792. impl<C> LedgerColumn<C>
  793. where
  794. C: ProtobufColumn + ColumnName,
  795. {
  796. pub fn get_protobuf_or_bincode<T: DeserializeOwned + Into<C::Type>>(
  797. &self,
  798. index: C::Index,
  799. ) -> Result<Option<C::Type>> {
  800. let key = <C as Column>::key(&index);
  801. self.get_raw_protobuf_or_bincode::<T>(key)
  802. }
  803. pub(crate) fn get_raw_protobuf_or_bincode<T: DeserializeOwned + Into<C::Type>>(
  804. &self,
  805. key: impl AsRef<[u8]>,
  806. ) -> Result<Option<C::Type>> {
  807. let is_perf_enabled = maybe_enable_rocksdb_perf(
  808. self.column_options.rocks_perf_sample_interval,
  809. &self.read_perf_status,
  810. );
  811. let result = self.backend.get_pinned_cf(self.handle(), key);
  812. if let Some(op_start_instant) = is_perf_enabled {
  813. report_rocksdb_read_perf(
  814. C::NAME,
  815. PERF_METRIC_OP_NAME_GET,
  816. &op_start_instant.elapsed(),
  817. &self.column_options,
  818. );
  819. }
  820. if let Some(pinnable_slice) = result? {
  821. let value = match C::Type::decode(pinnable_slice.as_ref()) {
  822. Ok(value) => value,
  823. Err(_) => deserialize::<T>(pinnable_slice.as_ref())?.into(),
  824. };
  825. Ok(Some(value))
  826. } else {
  827. Ok(None)
  828. }
  829. }
  830. pub fn get_protobuf(&self, index: C::Index) -> Result<Option<C::Type>> {
  831. let is_perf_enabled = maybe_enable_rocksdb_perf(
  832. self.column_options.rocks_perf_sample_interval,
  833. &self.read_perf_status,
  834. );
  835. let key = <C as Column>::key(&index);
  836. let result = self.backend.get_pinned_cf(self.handle(), key);
  837. if let Some(op_start_instant) = is_perf_enabled {
  838. report_rocksdb_read_perf(
  839. C::NAME,
  840. PERF_METRIC_OP_NAME_GET,
  841. &op_start_instant.elapsed(),
  842. &self.column_options,
  843. );
  844. }
  845. if let Some(pinnable_slice) = result? {
  846. Ok(Some(C::Type::decode(pinnable_slice.as_ref())?))
  847. } else {
  848. Ok(None)
  849. }
  850. }
  851. pub fn put_protobuf(&self, index: C::Index, value: &C::Type) -> Result<()> {
  852. let mut buf = Vec::with_capacity(value.encoded_len());
  853. value.encode(&mut buf)?;
  854. let is_perf_enabled = maybe_enable_rocksdb_perf(
  855. self.column_options.rocks_perf_sample_interval,
  856. &self.write_perf_status,
  857. );
  858. let key = <C as Column>::key(&index);
  859. let result = self.backend.put_cf(self.handle(), key, &buf);
  860. if let Some(op_start_instant) = is_perf_enabled {
  861. report_rocksdb_write_perf(
  862. C::NAME,
  863. PERF_METRIC_OP_NAME_PUT,
  864. &op_start_instant.elapsed(),
  865. &self.column_options,
  866. );
  867. }
  868. result
  869. }
  870. }
  871. impl<C> LedgerColumn<C>
  872. where
  873. C: ColumnIndexDeprecation + ColumnName,
  874. {
  875. pub(crate) fn iter_current_index_filtered(
  876. &self,
  877. iterator_mode: IteratorMode<C::Index>,
  878. ) -> Result<impl Iterator<Item = (C::Index, Box<[u8]>)> + '_> {
  879. let start_key: <C as Column>::Key;
  880. let iterator_mode = match iterator_mode {
  881. IteratorMode::Start => RocksIteratorMode::Start,
  882. IteratorMode::End => RocksIteratorMode::End,
  883. IteratorMode::From(start, direction) => {
  884. start_key = <C as Column>::key(&start);
  885. RocksIteratorMode::From(start_key.as_ref(), direction)
  886. }
  887. };
  888. let iter = self.backend.iterator_cf(self.handle(), iterator_mode);
  889. Ok(iter.filter_map(|pair| {
  890. let (key, value) = pair.unwrap();
  891. C::try_current_index(&key).ok().map(|index| (index, value))
  892. }))
  893. }
  894. pub(crate) fn iter_deprecated_index_filtered(
  895. &self,
  896. iterator_mode: IteratorMode<C::DeprecatedIndex>,
  897. ) -> Result<impl Iterator<Item = (C::DeprecatedIndex, Box<[u8]>)> + '_> {
  898. let start_key: <C as ColumnIndexDeprecation>::DeprecatedKey;
  899. let iterator_mode = match iterator_mode {
  900. IteratorMode::Start => RocksIteratorMode::Start,
  901. IteratorMode::End => RocksIteratorMode::End,
  902. IteratorMode::From(start_from, direction) => {
  903. start_key = C::deprecated_key(start_from);
  904. RocksIteratorMode::From(start_key.as_ref(), direction)
  905. }
  906. };
  907. let iterator = self.backend.iterator_cf(self.handle(), iterator_mode);
  908. Ok(iterator.filter_map(|pair| {
  909. let (key, value) = pair.unwrap();
  910. C::try_deprecated_index(&key)
  911. .ok()
  912. .map(|index| (index, value))
  913. }))
  914. }
  915. pub(crate) fn delete_deprecated_in_batch(
  916. &self,
  917. batch: &mut WriteBatch,
  918. index: C::DeprecatedIndex,
  919. ) -> Result<()> {
  920. let key = C::deprecated_key(index);
  921. batch.delete_cf(self.handle(), &key)
  922. }
  923. }
  924. /// A CompactionFilter implementation to remove keys older than a given slot.
  925. struct PurgedSlotFilter<C: Column + ColumnName> {
  926. /// The oldest slot to keep; any slot < oldest_slot will be removed
  927. oldest_slot: Slot,
  928. /// Whether to preserve keys that return slot 0, even when oldest_slot > 0.
  929. // This is used to delete old column data that wasn't keyed with a Slot, and so always returns
  930. // `C::slot() == 0`
  931. clean_slot_0: bool,
  932. name: CString,
  933. _phantom: PhantomData<C>,
  934. }
  935. impl<C: Column + ColumnName> CompactionFilter for PurgedSlotFilter<C> {
  936. fn filter(&mut self, _level: u32, key: &[u8], _value: &[u8]) -> CompactionDecision {
  937. use rocksdb::CompactionDecision::*;
  938. let slot_in_key = C::slot(C::index(key));
  939. if slot_in_key >= self.oldest_slot || (slot_in_key == 0 && !self.clean_slot_0) {
  940. Keep
  941. } else {
  942. Remove
  943. }
  944. }
  945. fn name(&self) -> &CStr {
  946. &self.name
  947. }
  948. }
  949. struct PurgedSlotFilterFactory<C: Column + ColumnName> {
  950. oldest_slot: OldestSlot,
  951. name: CString,
  952. _phantom: PhantomData<C>,
  953. }
  954. impl<C: Column + ColumnName> CompactionFilterFactory for PurgedSlotFilterFactory<C> {
  955. type Filter = PurgedSlotFilter<C>;
  956. fn create(&mut self, _context: CompactionFilterContext) -> Self::Filter {
  957. let copied_oldest_slot = self.oldest_slot.get();
  958. let copied_clean_slot_0 = self.oldest_slot.get_clean_slot_0();
  959. PurgedSlotFilter::<C> {
  960. oldest_slot: copied_oldest_slot,
  961. clean_slot_0: copied_clean_slot_0,
  962. name: CString::new(format!(
  963. "purged_slot_filter({}, {:?})",
  964. C::NAME,
  965. copied_oldest_slot
  966. ))
  967. .unwrap(),
  968. _phantom: PhantomData,
  969. }
  970. }
  971. fn name(&self) -> &CStr {
  972. &self.name
  973. }
  974. }
  975. fn new_cf_descriptor<C: 'static + Column + ColumnName>(
  976. options: &BlockstoreOptions,
  977. oldest_slot: &OldestSlot,
  978. ) -> ColumnFamilyDescriptor {
  979. ColumnFamilyDescriptor::new(C::NAME, get_cf_options::<C>(options, oldest_slot))
  980. }
  981. fn get_cf_options<C: 'static + Column + ColumnName>(
  982. options: &BlockstoreOptions,
  983. oldest_slot: &OldestSlot,
  984. ) -> Options {
  985. let mut cf_options = Options::default();
  986. // 256 * 8 = 2GB. 6 of these columns should take at most 12GB of RAM
  987. cf_options.set_max_write_buffer_number(8);
  988. cf_options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE as usize);
  989. let file_num_compaction_trigger = 4;
  990. // Recommend that this be around the size of level 0. Level 0 estimated size in stable state is
  991. // write_buffer_size * min_write_buffer_number_to_merge * level0_file_num_compaction_trigger
  992. // Source: https://docs.rs/rocksdb/0.6.0/rocksdb/struct.Options.html#method.set_level_zero_file_num_compaction_trigger
  993. let total_size_base = MAX_WRITE_BUFFER_SIZE * file_num_compaction_trigger;
  994. let file_size_base = total_size_base / 10;
  995. cf_options.set_level_zero_file_num_compaction_trigger(file_num_compaction_trigger as i32);
  996. cf_options.set_max_bytes_for_level_base(total_size_base);
  997. cf_options.set_target_file_size_base(file_size_base);
  998. let disable_auto_compactions = should_disable_auto_compactions(&options.access_type);
  999. if disable_auto_compactions {
  1000. cf_options.set_disable_auto_compactions(true);
  1001. }
  1002. if !disable_auto_compactions && should_enable_cf_compaction(C::NAME) {
  1003. cf_options.set_compaction_filter_factory(PurgedSlotFilterFactory::<C> {
  1004. oldest_slot: oldest_slot.clone(),
  1005. name: CString::new(format!("purged_slot_filter_factory({})", C::NAME)).unwrap(),
  1006. _phantom: PhantomData,
  1007. });
  1008. }
  1009. process_cf_options_advanced::<C>(&mut cf_options, &options.column_options);
  1010. cf_options
  1011. }
  1012. fn process_cf_options_advanced<C: 'static + Column + ColumnName>(
  1013. cf_options: &mut Options,
  1014. column_options: &LedgerColumnOptions,
  1015. ) {
  1016. // Explicitly disable compression on all columns by default
  1017. // See https://docs.rs/rocksdb/0.21.0/rocksdb/struct.Options.html#method.set_compression_type
  1018. cf_options.set_compression_type(DBCompressionType::None);
  1019. if should_enable_compression::<C>() {
  1020. cf_options.set_compression_type(
  1021. column_options
  1022. .compression_type
  1023. .to_rocksdb_compression_type(),
  1024. );
  1025. }
  1026. }
  1027. fn get_db_options(blockstore_options: &BlockstoreOptions) -> Options {
  1028. let mut options = Options::default();
  1029. // Create missing items to support a clean start
  1030. options.create_if_missing(true);
  1031. options.create_missing_column_families(true);
  1032. // rocksdb builds two threadpools: low and high priority. The low priority
  1033. // pool is used for compactions whereas the high priority pool is used for
  1034. // memtable flushes. Separate pools are created so that compactions are
  1035. // unable to stall memtable flushes (which could stall memtable writes).
  1036. //
  1037. // For now, use the deprecated methods to configure the exact amount of
  1038. // threads for each pool. The new method, set_max_background_jobs(N),
  1039. // configures N/4 low priority threads and 3N/4 high priority threads.
  1040. #[allow(deprecated)]
  1041. {
  1042. options.set_max_background_compactions(
  1043. blockstore_options.num_rocksdb_compaction_threads.get() as i32,
  1044. );
  1045. options
  1046. .set_max_background_flushes(blockstore_options.num_rocksdb_flush_threads.get() as i32);
  1047. }
  1048. // Set max total wal size to 4G.
  1049. options.set_max_total_wal_size(4 * 1024 * 1024 * 1024);
  1050. if should_disable_auto_compactions(&blockstore_options.access_type) {
  1051. options.set_disable_auto_compactions(true);
  1052. }
  1053. // Limit to (10) 50 MB log files (500 MB total)
  1054. // Logs grow at < 5 MB / hour, so this provides several days of logs
  1055. options.set_max_log_file_size(50 * 1024 * 1024);
  1056. options.set_keep_log_file_num(10);
  1057. // Allow Rocks to open/keep open as many files as it needs for performance;
  1058. // however, this is also explicitly required for a secondary instance.
  1059. // See https://github.com/facebook/rocksdb/wiki/Secondary-instance
  1060. options.set_max_open_files(-1);
  1061. options
  1062. }
  1063. /// The default number of threads to use for rocksdb compaction in the rocksdb
  1064. /// low priority threadpool
  1065. pub fn default_num_compaction_threads() -> NonZeroUsize {
  1066. NonZeroUsize::new(num_cpus::get()).expect("thread count is non-zero")
  1067. }
  1068. /// The default number of threads to use for rocksdb memtable flushes in the
  1069. /// rocksdb high priority threadpool
  1070. pub fn default_num_flush_threads() -> NonZeroUsize {
  1071. NonZeroUsize::new((num_cpus::get() / 4).max(1)).expect("thread count is non-zero")
  1072. }
  1073. // Returns whether automatic compactions should be disabled for the entire
  1074. // database based upon the given access type.
  1075. fn should_disable_auto_compactions(access_type: &AccessType) -> bool {
  1076. // Leave automatic compactions enabled (do not disable) in Primary mode;
  1077. // disable in all other modes to prevent accidental cleaning
  1078. !matches!(access_type, AccessType::Primary)
  1079. }
  1080. // Returns whether compactions should be enabled for the given column (name).
  1081. fn should_enable_cf_compaction(cf_name: &str) -> bool {
  1082. // In order to keep the ledger storage footprint within a desired size,
  1083. // LedgerCleanupService removes data in FIFO order by slot.
  1084. //
  1085. // Several columns do not contain slot in their key. These columns must
  1086. // be manually managed to avoid unbounded storage growth.
  1087. //
  1088. // Columns where slot is the primary index can be efficiently cleaned via
  1089. // Database::delete_range_cf() && Database::delete_file_in_range_cf().
  1090. //
  1091. // Columns where a slot is part of the key but not the primary index can
  1092. // not be range deleted like above. Instead, the individual key/value pairs
  1093. // must be iterated over and a decision to keep or discard that pair is
  1094. // made. The comparison logic is implemented in PurgedSlotFilter which is
  1095. // configured to run as part of rocksdb's automatic compactions. Storage
  1096. // space is reclaimed on this class of columns once compaction has
  1097. // completed on a given range or file.
  1098. matches!(
  1099. cf_name,
  1100. columns::TransactionStatus::NAME
  1101. | columns::TransactionMemos::NAME
  1102. | columns::AddressSignatures::NAME
  1103. )
  1104. }
  1105. // Returns true if the column family enables compression.
  1106. fn should_enable_compression<C: 'static + Column + ColumnName>() -> bool {
  1107. C::NAME == columns::TransactionStatus::NAME
  1108. }
  1109. #[cfg(test)]
  1110. pub mod tests {
  1111. use {
  1112. super::*, crate::blockstore_db::columns::ShredData, std::path::PathBuf, tempfile::tempdir,
  1113. };
  1114. #[test]
  1115. fn test_compaction_filter() {
  1116. // this doesn't implement Clone...
  1117. let dummy_compaction_filter_context = || CompactionFilterContext {
  1118. is_full_compaction: true,
  1119. is_manual_compaction: true,
  1120. };
  1121. let oldest_slot = OldestSlot::default();
  1122. oldest_slot.set_clean_slot_0(true);
  1123. let mut factory = PurgedSlotFilterFactory::<ShredData> {
  1124. oldest_slot: oldest_slot.clone(),
  1125. name: CString::new("test compaction filter").unwrap(),
  1126. _phantom: PhantomData,
  1127. };
  1128. let mut compaction_filter = factory.create(dummy_compaction_filter_context());
  1129. let dummy_level = 0;
  1130. let key = ShredData::key(&ShredData::as_index(0));
  1131. let dummy_value = vec![];
  1132. // we can't use assert_matches! because CompactionDecision doesn't implement Debug
  1133. assert!(matches!(
  1134. compaction_filter.filter(dummy_level, &key, &dummy_value),
  1135. CompactionDecision::Keep
  1136. ));
  1137. // mutating oldest_slot doesn't affect existing compaction filters...
  1138. oldest_slot.set(1);
  1139. assert!(matches!(
  1140. compaction_filter.filter(dummy_level, &key, &dummy_value),
  1141. CompactionDecision::Keep
  1142. ));
  1143. // recreating compaction filter starts to expire the key
  1144. let mut compaction_filter = factory.create(dummy_compaction_filter_context());
  1145. assert!(matches!(
  1146. compaction_filter.filter(dummy_level, &key, &dummy_value),
  1147. CompactionDecision::Remove
  1148. ));
  1149. // newer key shouldn't be removed
  1150. let key = ShredData::key(&ShredData::as_index(1));
  1151. matches!(
  1152. compaction_filter.filter(dummy_level, &key, &dummy_value),
  1153. CompactionDecision::Keep
  1154. );
  1155. }
  1156. #[test]
  1157. fn test_cf_names_and_descriptors_equal_length() {
  1158. let path = PathBuf::default();
  1159. let options = BlockstoreOptions::default();
  1160. let oldest_slot = OldestSlot::default();
  1161. // The names and descriptors don't need to be in the same order for our use cases;
  1162. // however, there should be the same number of each. For example, adding a new column
  1163. // should update both lists.
  1164. assert_eq!(
  1165. Rocks::columns().len(),
  1166. Rocks::cf_descriptors(&path, &options, &oldest_slot).len()
  1167. );
  1168. }
  1169. #[test]
  1170. fn test_should_disable_auto_compactions() {
  1171. assert!(!should_disable_auto_compactions(&AccessType::Primary));
  1172. assert!(should_disable_auto_compactions(
  1173. &AccessType::PrimaryForMaintenance
  1174. ));
  1175. assert!(should_disable_auto_compactions(&AccessType::Secondary));
  1176. }
  1177. #[test]
  1178. fn test_should_enable_cf_compaction() {
  1179. let columns_to_compact = [
  1180. columns::TransactionStatus::NAME,
  1181. columns::AddressSignatures::NAME,
  1182. ];
  1183. columns_to_compact.iter().for_each(|cf_name| {
  1184. assert!(should_enable_cf_compaction(cf_name));
  1185. });
  1186. assert!(!should_enable_cf_compaction("something else"));
  1187. }
  1188. #[test]
  1189. fn test_open_unknown_columns() {
  1190. agave_logger::setup();
  1191. let temp_dir = tempdir().unwrap();
  1192. let db_path = temp_dir.path();
  1193. // Open with Primary to create the new database
  1194. {
  1195. let options = BlockstoreOptions {
  1196. access_type: AccessType::Primary,
  1197. ..BlockstoreOptions::default()
  1198. };
  1199. let mut rocks = Rocks::open(db_path.to_path_buf(), options).unwrap();
  1200. // Introduce a new column that will not be known
  1201. rocks
  1202. .db
  1203. .create_cf("new_column", &Options::default())
  1204. .unwrap();
  1205. }
  1206. // Opening with either Secondary or Primary access should succeed,
  1207. // even though the Rocks code is unaware of "new_column"
  1208. {
  1209. let options = BlockstoreOptions {
  1210. access_type: AccessType::Secondary,
  1211. ..BlockstoreOptions::default()
  1212. };
  1213. let _ = Rocks::open(db_path.to_path_buf(), options).unwrap();
  1214. }
  1215. {
  1216. let options = BlockstoreOptions {
  1217. access_type: AccessType::Primary,
  1218. ..BlockstoreOptions::default()
  1219. };
  1220. let _ = Rocks::open(db_path.to_path_buf(), options).unwrap();
  1221. }
  1222. }
  1223. #[test]
  1224. fn test_remove_deprecated_progam_costs_column_compat() {
  1225. agave_logger::setup();
  1226. fn is_program_costs_column_present(path: &Path) -> bool {
  1227. DB::list_cf(&Options::default(), path)
  1228. .unwrap()
  1229. .iter()
  1230. .any(|column_name| column_name == DEPRECATED_PROGRAM_COSTS_COLUMN_NAME)
  1231. }
  1232. let temp_dir = tempdir().unwrap();
  1233. let db_path = temp_dir.path();
  1234. let options = BlockstoreOptions {
  1235. access_type: AccessType::Primary,
  1236. ..BlockstoreOptions::default()
  1237. };
  1238. // Create a new database
  1239. {
  1240. let _rocks = Rocks::open(db_path.to_path_buf(), options.clone()).unwrap();
  1241. }
  1242. // The newly created database should not contain the deprecated column
  1243. assert!(!is_program_costs_column_present(db_path));
  1244. // Create a program_costs column to simulate an old database that had the column
  1245. {
  1246. let mut rocks = Rocks::open(db_path.to_path_buf(), options.clone()).unwrap();
  1247. rocks
  1248. .db
  1249. .create_cf(DEPRECATED_PROGRAM_COSTS_COLUMN_NAME, &Options::default())
  1250. .unwrap();
  1251. }
  1252. // Ensure the column we just created is detected
  1253. assert!(is_program_costs_column_present(db_path));
  1254. // Reopen the database which has logic to delete program_costs column
  1255. {
  1256. let _rocks = Rocks::open(db_path.to_path_buf(), options.clone()).unwrap();
  1257. }
  1258. // The deprecated column should have been dropped by Rocks::open()
  1259. assert!(!is_program_costs_column_present(db_path));
  1260. }
  1261. impl<C> LedgerColumn<C>
  1262. where
  1263. C: ColumnIndexDeprecation + ProtobufColumn + ColumnName,
  1264. {
  1265. pub fn put_deprecated_protobuf(
  1266. &self,
  1267. index: C::DeprecatedIndex,
  1268. value: &C::Type,
  1269. ) -> Result<()> {
  1270. let mut buf = Vec::with_capacity(value.encoded_len());
  1271. value.encode(&mut buf)?;
  1272. self.backend
  1273. .put_cf(self.handle(), C::deprecated_key(index), &buf)
  1274. }
  1275. }
  1276. impl<C> LedgerColumn<C>
  1277. where
  1278. C: ColumnIndexDeprecation + TypedColumn + ColumnName,
  1279. {
  1280. pub fn put_deprecated(&self, index: C::DeprecatedIndex, value: &C::Type) -> Result<()> {
  1281. let serialized_value = C::serialize(value)?;
  1282. self.backend
  1283. .put_cf(self.handle(), C::deprecated_key(index), &serialized_value)
  1284. }
  1285. }
  1286. }