accounts_db.rs 55 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558
  1. //! Persistent accounts are stored in below path location:
  2. //! <path>/<pid>/data/
  3. //!
  4. //! The persistent store would allow for this mode of operation:
  5. //! - Concurrent single thread append with many concurrent readers.
  6. //!
  7. //! The underlying memory is memory mapped to a file. The accounts would be
  8. //! stored across multiple files and the mappings of file and offset of a
  9. //! particular account would be stored in a shared index. This will allow for
  10. //! concurrent commits without blocking reads, which will sequentially write
  11. //! to memory, ssd or disk, and should be as fast as the hardware allow for.
  12. //! The only required in memory data structure with a write lock is the index,
  13. //! which should be fast to update.
  14. //!
  15. //! AppendVec's only store accounts for single forks. To bootstrap the
  16. //! index from a persistent store of AppendVec's, the entries include
  17. //! a "write_version". A single global atomic `AccountsDB::write_version`
  18. //! tracks the number of commits to the entire data store. So the latest
  19. //! commit for each fork entry would be indexed.
  20. use crate::accounts_index::{AccountsIndex, Fork};
  21. use crate::append_vec::{AppendVec, StorageMeta, StoredAccount};
  22. use bincode::{deserialize_from, serialize_into};
  23. use fs_extra::dir::CopyOptions;
  24. use log::*;
  25. use rand::{thread_rng, Rng};
  26. use rayon::prelude::*;
  27. use rayon::ThreadPool;
  28. use serde::de::{MapAccess, Visitor};
  29. use serde::ser::{SerializeMap, Serializer};
  30. use serde::{Deserialize, Serialize};
  31. use solana_measure::measure::Measure;
  32. use solana_rayon_threadlimit::get_thread_count;
  33. use solana_sdk::account::Account;
  34. use solana_sdk::pubkey::Pubkey;
  35. use std::collections::{HashMap, HashSet};
  36. use std::fmt;
  37. use std::io::{BufReader, Cursor, Error as IOError, ErrorKind, Read, Result as IOResult};
  38. use std::path::Path;
  39. use std::path::PathBuf;
  40. use std::sync::atomic::{AtomicUsize, Ordering};
  41. use std::sync::{Arc, RwLock};
  42. use tempfile::TempDir;
  43. pub const DEFAULT_FILE_SIZE: u64 = 4 * 1024 * 1024;
  44. pub const DEFAULT_NUM_THREADS: u32 = 8;
  45. pub const DEFAULT_NUM_DIRS: u32 = 4;
  46. #[derive(Debug, Default)]
  47. pub struct ErrorCounters {
  48. pub account_not_found: usize,
  49. pub account_in_use: usize,
  50. pub account_loaded_twice: usize,
  51. pub blockhash_not_found: usize,
  52. pub blockhash_too_old: usize,
  53. pub reserve_blockhash: usize,
  54. pub invalid_account_for_fee: usize,
  55. pub insufficient_funds: usize,
  56. pub invalid_account_index: usize,
  57. pub duplicate_signature: usize,
  58. pub call_chain_too_deep: usize,
  59. pub missing_signature_for_fee: usize,
  60. }
  61. #[derive(Deserialize, Serialize, Default, Debug, PartialEq, Clone)]
  62. pub struct AccountInfo {
  63. /// index identifying the append storage
  64. id: AppendVecId,
  65. /// offset into the storage
  66. offset: usize,
  67. /// lamports in the account used when squashing kept for optimization
  68. /// purposes to remove accounts with zero balance.
  69. lamports: u64,
  70. }
  71. /// An offset into the AccountsDB::storage vector
  72. pub type AppendVecId = usize;
  73. // Each fork has a set of storage entries.
  74. type ForkStores = HashMap<usize, Arc<AccountStorageEntry>>;
  75. #[derive(Clone, Default, Debug)]
  76. pub struct AccountStorage(pub HashMap<Fork, ForkStores>);
  77. pub struct AccountStorageSerialize<'a> {
  78. account_storage: &'a AccountStorage,
  79. slot: u64,
  80. }
  81. impl<'a> AccountStorageSerialize<'a> {
  82. pub fn new(account_storage: &'a AccountStorage, slot: u64) -> Self {
  83. Self {
  84. account_storage,
  85. slot,
  86. }
  87. }
  88. }
  89. struct AccountStorageVisitor;
  90. impl<'de> Visitor<'de> for AccountStorageVisitor {
  91. type Value = AccountStorage;
  92. fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
  93. formatter.write_str("Expecting AccountStorage")
  94. }
  95. #[allow(clippy::mutex_atomic)]
  96. fn visit_map<M>(self, mut access: M) -> Result<Self::Value, M::Error>
  97. where
  98. M: MapAccess<'de>,
  99. {
  100. let mut map = HashMap::new();
  101. while let Some((storage_id, storage_entry)) = access.next_entry()? {
  102. let storage_entry: AccountStorageEntry = storage_entry;
  103. let storage_fork_map = map
  104. .entry(storage_entry.fork_id)
  105. .or_insert_with(HashMap::new);
  106. storage_fork_map.insert(storage_id, Arc::new(storage_entry));
  107. }
  108. Ok(AccountStorage(map))
  109. }
  110. }
  111. impl<'a> Serialize for AccountStorageSerialize<'a> {
  112. fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
  113. where
  114. S: Serializer,
  115. {
  116. let mut len: usize = 0;
  117. for (fork_id, storage) in &self.account_storage.0 {
  118. if *fork_id <= self.slot {
  119. len += storage.len();
  120. }
  121. }
  122. let mut map = serializer.serialize_map(Some(len))?;
  123. let mut count = 0;
  124. let mut serialize_account_storage_timer = Measure::start("serialize_account_storage_ms");
  125. for fork_storage in self.account_storage.0.values() {
  126. for (storage_id, account_storage_entry) in fork_storage {
  127. if account_storage_entry.fork_id <= self.slot {
  128. map.serialize_entry(storage_id, &**account_storage_entry)?;
  129. count += 1;
  130. }
  131. }
  132. }
  133. serialize_account_storage_timer.stop();
  134. datapoint_info!(
  135. "serialize_account_storage_ms",
  136. ("duration", serialize_account_storage_timer.as_ms(), i64),
  137. ("num_entries", count, i64),
  138. );
  139. map.end()
  140. }
  141. }
  142. impl<'de> Deserialize<'de> for AccountStorage {
  143. fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
  144. where
  145. D: serde::Deserializer<'de>,
  146. {
  147. deserializer.deserialize_map(AccountStorageVisitor)
  148. }
  149. }
  150. #[derive(Debug, PartialEq, Copy, Clone, Deserialize, Serialize)]
  151. pub enum AccountStorageStatus {
  152. Available = 0,
  153. Full = 1,
  154. Candidate = 2,
  155. }
  156. /// Persistent storage structure holding the accounts
  157. #[derive(Debug, Deserialize, Serialize)]
  158. pub struct AccountStorageEntry {
  159. id: AppendVecId,
  160. fork_id: Fork,
  161. /// storage holding the accounts
  162. accounts: AppendVec,
  163. /// Keeps track of the number of accounts stored in a specific AppendVec.
  164. /// This is periodically checked to reuse the stores that do not have
  165. /// any accounts in it
  166. /// status corresponding to the storage, lets us know that
  167. /// the append_vec, once maxed out, then emptied, can be reclaimed
  168. count_and_status: RwLock<(usize, AccountStorageStatus)>,
  169. }
  170. impl AccountStorageEntry {
  171. pub fn new(path: &Path, fork_id: Fork, id: usize, file_size: u64) -> Self {
  172. let tail = AppendVec::new_relative_path(fork_id, id);
  173. let path = Path::new(path).join(&tail);
  174. let accounts = AppendVec::new(&path, true, file_size as usize);
  175. AccountStorageEntry {
  176. id,
  177. fork_id,
  178. accounts,
  179. count_and_status: RwLock::new((0, AccountStorageStatus::Available)),
  180. }
  181. }
  182. pub fn set_status(&self, mut status: AccountStorageStatus) {
  183. let mut count_and_status = self.count_and_status.write().unwrap();
  184. let count = count_and_status.0;
  185. if status == AccountStorageStatus::Full && count == 0 {
  186. // this case arises when the append_vec is full (store_ptrs fails),
  187. // but all accounts have already been removed from the storage
  188. //
  189. // the only time it's safe to call reset() on an append_vec is when
  190. // every account has been removed
  191. // **and**
  192. // the append_vec has previously been completely full
  193. //
  194. self.accounts.reset();
  195. status = AccountStorageStatus::Available;
  196. }
  197. *count_and_status = (count, status);
  198. }
  199. pub fn status(&self) -> AccountStorageStatus {
  200. self.count_and_status.read().unwrap().1
  201. }
  202. pub fn count(&self) -> usize {
  203. self.count_and_status.read().unwrap().0
  204. }
  205. pub fn fork_id(&self) -> Fork {
  206. self.fork_id
  207. }
  208. pub fn append_vec_id(&self) -> AppendVecId {
  209. self.id
  210. }
  211. fn add_account(&self) {
  212. let mut count_and_status = self.count_and_status.write().unwrap();
  213. *count_and_status = (count_and_status.0 + 1, count_and_status.1);
  214. }
  215. fn try_available(&self) -> bool {
  216. let mut count_and_status = self.count_and_status.write().unwrap();
  217. let (count, status) = *count_and_status;
  218. if status == AccountStorageStatus::Available {
  219. *count_and_status = (count, AccountStorageStatus::Candidate);
  220. true
  221. } else {
  222. false
  223. }
  224. }
  225. fn remove_account(&self) -> usize {
  226. let mut count_and_status = self.count_and_status.write().unwrap();
  227. let (count, mut status) = *count_and_status;
  228. if count == 1 && status == AccountStorageStatus::Full {
  229. // this case arises when we remove the last account from the
  230. // storage, but we've learned from previous write attempts that
  231. // the storage is full
  232. //
  233. // the only time it's safe to call reset() on an append_vec is when
  234. // every account has been removed
  235. // **and**
  236. // the append_vec has previously been completely full
  237. //
  238. // otherwise, the storage may be in flight with a store()
  239. // call
  240. self.accounts.reset();
  241. status = AccountStorageStatus::Available;
  242. }
  243. if count > 0 {
  244. *count_and_status = (count - 1, status);
  245. } else {
  246. warn!("count value 0 for fork {}", self.fork_id);
  247. }
  248. count_and_status.0
  249. }
  250. pub fn set_file<P: AsRef<Path>>(&mut self, path: P) -> IOResult<()> {
  251. self.accounts.set_file(path)
  252. }
  253. pub fn get_relative_path(&self) -> Option<PathBuf> {
  254. AppendVec::get_relative_path(self.accounts.get_path())
  255. }
  256. pub fn get_path(&self) -> PathBuf {
  257. self.accounts.get_path()
  258. }
  259. }
  260. pub fn get_paths_vec(paths: &str) -> Vec<PathBuf> {
  261. paths.split(',').map(PathBuf::from).collect()
  262. }
  263. pub fn get_temp_accounts_paths(count: u32) -> IOResult<(Vec<TempDir>, String)> {
  264. let temp_dirs: IOResult<Vec<TempDir>> = (0..count).map(|_| TempDir::new()).collect();
  265. let temp_dirs = temp_dirs?;
  266. let paths: Vec<String> = temp_dirs
  267. .iter()
  268. .map(|t| t.path().to_str().unwrap().to_owned())
  269. .collect();
  270. Ok((temp_dirs, paths.join(",")))
  271. }
  272. pub struct AccountsDBSerialize<'a> {
  273. accounts_db: &'a AccountsDB,
  274. slot: u64,
  275. }
  276. impl<'a> AccountsDBSerialize<'a> {
  277. pub fn new(accounts_db: &'a AccountsDB, slot: u64) -> Self {
  278. Self { accounts_db, slot }
  279. }
  280. }
  281. impl<'a> Serialize for AccountsDBSerialize<'a> {
  282. fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
  283. where
  284. S: serde::ser::Serializer,
  285. {
  286. use serde::ser::Error;
  287. let storage = self.accounts_db.storage.read().unwrap();
  288. let mut wr = Cursor::new(vec![]);
  289. let version: u64 = self.accounts_db.write_version.load(Ordering::Relaxed) as u64;
  290. let account_storage_serialize = AccountStorageSerialize::new(&*storage, self.slot);
  291. serialize_into(&mut wr, &account_storage_serialize).map_err(Error::custom)?;
  292. serialize_into(&mut wr, &version).map_err(Error::custom)?;
  293. let len = wr.position() as usize;
  294. serializer.serialize_bytes(&wr.into_inner()[..len])
  295. }
  296. }
  297. // This structure handles the load/store of the accounts
  298. #[derive(Debug)]
  299. pub struct AccountsDB {
  300. /// Keeps tracks of index into AppendVec on a per fork basis
  301. pub accounts_index: RwLock<AccountsIndex<AccountInfo>>,
  302. /// Account storage
  303. pub storage: RwLock<AccountStorage>,
  304. /// distribute the accounts across storage lists
  305. pub next_id: AtomicUsize,
  306. /// write version
  307. write_version: AtomicUsize,
  308. /// Set of storage paths to pick from
  309. paths: RwLock<Vec<PathBuf>>,
  310. /// Directory of paths this accounts_db needs to hold/remove
  311. temp_paths: Option<Vec<TempDir>>,
  312. /// Starting file size of appendvecs
  313. file_size: u64,
  314. /// Thread pool used for par_iter
  315. pub thread_pool: ThreadPool,
  316. min_num_stores: usize,
  317. }
  318. impl Default for AccountsDB {
  319. fn default() -> Self {
  320. let num_threads = get_thread_count();
  321. AccountsDB {
  322. accounts_index: RwLock::new(AccountsIndex::default()),
  323. storage: RwLock::new(AccountStorage(HashMap::new())),
  324. next_id: AtomicUsize::new(0),
  325. write_version: AtomicUsize::new(0),
  326. paths: RwLock::new(vec![]),
  327. temp_paths: None,
  328. file_size: DEFAULT_FILE_SIZE,
  329. thread_pool: rayon::ThreadPoolBuilder::new()
  330. .num_threads(num_threads)
  331. .build()
  332. .unwrap(),
  333. min_num_stores: num_threads,
  334. }
  335. }
  336. }
  337. impl AccountsDB {
  338. pub fn new(paths: Option<String>) -> Self {
  339. if let Some(paths) = paths {
  340. Self {
  341. paths: RwLock::new(get_paths_vec(&paths)),
  342. temp_paths: None,
  343. ..Self::default()
  344. }
  345. } else {
  346. // Create a temprorary set of accounts directories, used primarily
  347. // for testing
  348. let (temp_dirs, paths) = get_temp_accounts_paths(DEFAULT_NUM_DIRS).unwrap();
  349. Self {
  350. paths: RwLock::new(get_paths_vec(&paths)),
  351. temp_paths: Some(temp_dirs),
  352. ..Self::default()
  353. }
  354. }
  355. }
  356. #[cfg(test)]
  357. pub fn new_single() -> Self {
  358. AccountsDB {
  359. min_num_stores: 0,
  360. ..AccountsDB::new(None)
  361. }
  362. }
  363. #[cfg(test)]
  364. pub fn new_sized(paths: Option<String>, file_size: u64) -> Self {
  365. AccountsDB {
  366. file_size,
  367. ..AccountsDB::new(paths)
  368. }
  369. }
  370. pub fn format_paths<P: AsRef<Path>>(paths: Vec<P>) -> String {
  371. let paths: Vec<String> = paths
  372. .iter()
  373. .map(|p| p.as_ref().to_str().unwrap().to_owned())
  374. .collect();
  375. paths.join(",")
  376. }
  377. pub fn accounts_from_stream<R: Read, P: AsRef<Path>>(
  378. &self,
  379. mut stream: &mut BufReader<R>,
  380. local_account_paths: String,
  381. append_vecs_path: P,
  382. ) -> Result<(), IOError> {
  383. let _len: usize =
  384. deserialize_from(&mut stream).map_err(|e| AccountsDB::get_io_error(&e.to_string()))?;
  385. let storage: AccountStorage =
  386. deserialize_from(&mut stream).map_err(|e| AccountsDB::get_io_error(&e.to_string()))?;
  387. // Remap the deserialized AppendVec paths to point to correct local paths
  388. let local_account_paths = get_paths_vec(&local_account_paths);
  389. let new_storage_map: Result<HashMap<Fork, ForkStores>, IOError> = storage
  390. .0
  391. .into_iter()
  392. .map(|(fork_id, mut fork_storage)| {
  393. let mut new_fork_storage = HashMap::new();
  394. for (id, storage_entry) in fork_storage.drain() {
  395. let path_index = thread_rng().gen_range(0, local_account_paths.len());
  396. let local_dir = &local_account_paths[path_index];
  397. std::fs::create_dir_all(local_dir).expect("Create directory failed");
  398. // Move the corresponding AppendVec from the snapshot into the directory pointed
  399. // at by `local_dir`
  400. let append_vec_relative_path =
  401. AppendVec::new_relative_path(fork_id, storage_entry.id);
  402. let append_vec_abs_path =
  403. append_vecs_path.as_ref().join(&append_vec_relative_path);
  404. let mut copy_options = CopyOptions::new();
  405. copy_options.overwrite = true;
  406. fs_extra::move_items(&vec![&append_vec_abs_path], &local_dir, &copy_options)
  407. .map_err(|e| {
  408. AccountsDB::get_io_error(&format!(
  409. "Unable to move {:?} to {:?}: {}",
  410. append_vec_abs_path, local_dir, e
  411. ))
  412. })?;
  413. // Notify the AppendVec of the new file location
  414. let local_path = local_dir.join(append_vec_relative_path);
  415. let mut u_storage_entry = Arc::try_unwrap(storage_entry).unwrap();
  416. u_storage_entry
  417. .set_file(local_path)
  418. .map_err(|e| AccountsDB::get_io_error(&e.to_string()))?;
  419. new_fork_storage.insert(id, Arc::new(u_storage_entry));
  420. }
  421. Ok((fork_id, new_fork_storage))
  422. })
  423. .collect();
  424. let new_storage_map = new_storage_map?;
  425. let storage = AccountStorage(new_storage_map);
  426. let version: u64 = deserialize_from(&mut stream)
  427. .map_err(|_| AccountsDB::get_io_error("write version deserialize error"))?;
  428. // Process deserialized data, set necessary fields in self
  429. *self.paths.write().unwrap() = local_account_paths;
  430. let max_id: usize = *storage
  431. .0
  432. .values()
  433. .flat_map(HashMap::keys)
  434. .max()
  435. .expect("At least one storage entry must exist from deserializing stream");
  436. {
  437. let mut stores = self.storage.write().unwrap();
  438. /*if let Some((_, store0)) = storage.0.remove_entry(&0) {
  439. let fork_storage0 = stores.0.entry(0).or_insert_with(HashMap::new);
  440. for (id, store) in store0.iter() {
  441. fork_storage0.insert(*id, store.clone());
  442. }
  443. }*/
  444. stores.0.extend(storage.0);
  445. }
  446. self.next_id.store(max_id + 1, Ordering::Relaxed);
  447. self.write_version
  448. .fetch_add(version as usize, Ordering::Relaxed);
  449. self.generate_index();
  450. Ok(())
  451. }
  452. fn new_storage_entry(&self, fork_id: Fork, path: &Path, size: u64) -> AccountStorageEntry {
  453. AccountStorageEntry::new(
  454. path,
  455. fork_id,
  456. self.next_id.fetch_add(1, Ordering::Relaxed),
  457. size,
  458. )
  459. }
  460. pub fn has_accounts(&self, fork: Fork) -> bool {
  461. if let Some(storage_forks) = self.storage.read().unwrap().0.get(&fork) {
  462. for x in storage_forks.values() {
  463. if x.count() > 0 {
  464. return true;
  465. }
  466. }
  467. }
  468. false
  469. }
  470. pub fn scan_accounts<F, A>(&self, ancestors: &HashMap<Fork, usize>, scan_func: F) -> A
  471. where
  472. F: Fn(&mut A, Option<(&Pubkey, Account, Fork)>) -> (),
  473. A: Default,
  474. {
  475. let mut collector = A::default();
  476. let accounts_index = self.accounts_index.read().unwrap();
  477. let storage = self.storage.read().unwrap();
  478. accounts_index.scan_accounts(ancestors, |pubkey, (account_info, fork)| {
  479. scan_func(
  480. &mut collector,
  481. storage
  482. .0
  483. .get(&fork)
  484. .and_then(|storage_map| storage_map.get(&account_info.id))
  485. .and_then(|store| {
  486. Some(
  487. store
  488. .accounts
  489. .get_account(account_info.offset)?
  490. .0
  491. .clone_account(),
  492. )
  493. })
  494. .map(|account| (pubkey, account, fork)),
  495. )
  496. });
  497. collector
  498. }
  499. /// Scan a specific fork through all the account storage in parallel with sequential read
  500. // PERF: Sequentially read each storage entry in parallel
  501. pub fn scan_account_storage<F, B>(&self, fork_id: Fork, scan_func: F) -> Vec<B>
  502. where
  503. F: Fn(&StoredAccount, AppendVecId, &mut B) -> (),
  504. F: Send + Sync,
  505. B: Send + Default,
  506. {
  507. let storage_maps: Vec<Arc<AccountStorageEntry>> = self
  508. .storage
  509. .read()
  510. .unwrap()
  511. .0
  512. .get(&fork_id)
  513. .unwrap_or(&HashMap::new())
  514. .values()
  515. .cloned()
  516. .collect();
  517. self.thread_pool.install(|| {
  518. storage_maps
  519. .into_par_iter()
  520. .map(|storage| {
  521. let accounts = storage.accounts.accounts(0);
  522. let mut retval = B::default();
  523. accounts.iter().for_each(|stored_account| {
  524. scan_func(stored_account, storage.id, &mut retval)
  525. });
  526. retval
  527. })
  528. .collect()
  529. })
  530. }
  531. pub fn load(
  532. storage: &AccountStorage,
  533. ancestors: &HashMap<Fork, usize>,
  534. accounts_index: &AccountsIndex<AccountInfo>,
  535. pubkey: &Pubkey,
  536. ) -> Option<(Account, Fork)> {
  537. let (lock, index) = accounts_index.get(pubkey, ancestors)?;
  538. let fork = lock[index].0;
  539. //TODO: thread this as a ref
  540. if let Some(fork_storage) = storage.0.get(&fork) {
  541. let info = &lock[index].1;
  542. fork_storage
  543. .get(&info.id)
  544. .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account()))
  545. .map(|account| (account, fork))
  546. } else {
  547. None
  548. }
  549. }
  550. pub fn load_slow(
  551. &self,
  552. ancestors: &HashMap<Fork, usize>,
  553. pubkey: &Pubkey,
  554. ) -> Option<(Account, Fork)> {
  555. let accounts_index = self.accounts_index.read().unwrap();
  556. let storage = self.storage.read().unwrap();
  557. Self::load(&storage, ancestors, &accounts_index, pubkey)
  558. }
  559. fn find_storage_candidate(&self, fork_id: Fork) -> Arc<AccountStorageEntry> {
  560. let mut create_extra = false;
  561. let stores = self.storage.read().unwrap();
  562. if let Some(fork_stores) = stores.0.get(&fork_id) {
  563. if !fork_stores.is_empty() {
  564. if fork_stores.len() <= self.min_num_stores {
  565. let mut total_accounts = 0;
  566. for store in fork_stores.values() {
  567. total_accounts += store.count_and_status.read().unwrap().0;
  568. }
  569. // Create more stores so that when scanning the storage all CPUs have work
  570. if (total_accounts / 16) >= fork_stores.len() {
  571. create_extra = true;
  572. }
  573. }
  574. // pick an available store at random by iterating from a random point
  575. let to_skip = thread_rng().gen_range(0, fork_stores.len());
  576. for (i, store) in fork_stores.values().cycle().skip(to_skip).enumerate() {
  577. if store.try_available() {
  578. let ret = store.clone();
  579. drop(stores);
  580. if create_extra {
  581. self.create_and_insert_store(fork_id, self.file_size);
  582. }
  583. return ret;
  584. }
  585. // looked at every store, bail...
  586. if i == fork_stores.len() {
  587. break;
  588. }
  589. }
  590. }
  591. }
  592. drop(stores);
  593. let store = self.create_and_insert_store(fork_id, self.file_size);
  594. store.try_available();
  595. store
  596. }
  597. fn create_and_insert_store(&self, fork_id: Fork, size: u64) -> Arc<AccountStorageEntry> {
  598. let mut stores = self.storage.write().unwrap();
  599. let fork_storage = stores.0.entry(fork_id).or_insert_with(HashMap::new);
  600. self.create_store(fork_id, fork_storage, size)
  601. }
  602. fn create_store(
  603. &self,
  604. fork_id: Fork,
  605. fork_storage: &mut ForkStores,
  606. size: u64,
  607. ) -> Arc<AccountStorageEntry> {
  608. let paths = self.paths.read().unwrap();
  609. let path_index = thread_rng().gen_range(0, paths.len());
  610. let store = Arc::new(self.new_storage_entry(fork_id, &Path::new(&paths[path_index]), size));
  611. fork_storage.insert(store.id, store.clone());
  612. store
  613. }
  614. pub fn purge_fork(&self, fork: Fork) {
  615. //add_root should be called first
  616. let is_root = self.accounts_index.read().unwrap().is_root(fork);
  617. if !is_root {
  618. self.storage.write().unwrap().0.remove(&fork);
  619. }
  620. }
  621. fn store_accounts(&self, fork_id: Fork, accounts: &[(&Pubkey, &Account)]) -> Vec<AccountInfo> {
  622. let with_meta: Vec<(StorageMeta, &Account)> = accounts
  623. .iter()
  624. .map(|(pubkey, account)| {
  625. let write_version = self.write_version.fetch_add(1, Ordering::Relaxed) as u64;
  626. let data_len = if account.lamports == 0 {
  627. 0
  628. } else {
  629. account.data.len() as u64
  630. };
  631. let meta = StorageMeta {
  632. write_version,
  633. pubkey: **pubkey,
  634. data_len,
  635. };
  636. (meta, *account)
  637. })
  638. .collect();
  639. let mut infos: Vec<AccountInfo> = vec![];
  640. while infos.len() < with_meta.len() {
  641. let storage = self.find_storage_candidate(fork_id);
  642. let rvs = storage.accounts.append_accounts(&with_meta[infos.len()..]);
  643. if rvs.is_empty() {
  644. storage.set_status(AccountStorageStatus::Full);
  645. // See if an account overflows the default append vec size.
  646. let data_len = (with_meta[infos.len()].1.data.len() + 4096) as u64;
  647. if data_len > self.file_size {
  648. self.create_and_insert_store(fork_id, data_len * 2);
  649. }
  650. continue;
  651. }
  652. for (offset, (_, account)) in rvs.iter().zip(&with_meta[infos.len()..]) {
  653. storage.add_account();
  654. infos.push(AccountInfo {
  655. id: storage.id,
  656. offset: *offset,
  657. lamports: account.lamports,
  658. });
  659. }
  660. // restore the state to available
  661. storage.set_status(AccountStorageStatus::Available);
  662. }
  663. infos
  664. }
  665. fn update_index(
  666. &self,
  667. fork_id: Fork,
  668. infos: Vec<AccountInfo>,
  669. accounts: &[(&Pubkey, &Account)],
  670. ) -> (Vec<(Fork, AccountInfo)>, u64) {
  671. let mut reclaims: Vec<(Fork, AccountInfo)> = Vec::with_capacity(infos.len() * 2);
  672. let mut inserts = vec![];
  673. let index = self.accounts_index.read().unwrap();
  674. let mut update_index_work = Measure::start("update_index_work");
  675. for (info, pubkey_account) in infos.into_iter().zip(accounts.iter()) {
  676. let pubkey = pubkey_account.0;
  677. if let Some(info) = index.update(fork_id, pubkey, info, &mut reclaims) {
  678. inserts.push((pubkey, info));
  679. }
  680. }
  681. let last_root = index.last_root;
  682. drop(index);
  683. if !inserts.is_empty() {
  684. let mut index = self.accounts_index.write().unwrap();
  685. for (pubkey, info) in inserts {
  686. index.insert(fork_id, pubkey, info, &mut reclaims);
  687. }
  688. }
  689. update_index_work.stop();
  690. (reclaims, last_root)
  691. }
  692. fn remove_dead_accounts(&self, reclaims: Vec<(Fork, AccountInfo)>) -> HashSet<Fork> {
  693. let storage = self.storage.read().unwrap();
  694. let mut dead_forks = HashSet::new();
  695. for (fork_id, account_info) in reclaims {
  696. if let Some(fork_storage) = storage.0.get(&fork_id) {
  697. if let Some(store) = fork_storage.get(&account_info.id) {
  698. assert_eq!(
  699. fork_id, store.fork_id,
  700. "AccountDB::accounts_index corrupted. Storage should only point to one fork"
  701. );
  702. let count = store.remove_account();
  703. if count == 0 {
  704. dead_forks.insert(fork_id);
  705. }
  706. }
  707. }
  708. }
  709. dead_forks.retain(|fork| {
  710. if let Some(fork_storage) = storage.0.get(&fork) {
  711. for x in fork_storage.values() {
  712. if x.count() != 0 {
  713. return false;
  714. }
  715. }
  716. }
  717. true
  718. });
  719. dead_forks
  720. }
  721. fn cleanup_dead_forks(&self, dead_forks: &mut HashSet<Fork>, last_root: u64) {
  722. // a fork is not totally dead until it is older than the root
  723. dead_forks.retain(|fork| *fork < last_root);
  724. if !dead_forks.is_empty() {
  725. let mut index = self.accounts_index.write().unwrap();
  726. for fork in dead_forks.iter() {
  727. index.cleanup_dead_fork(*fork);
  728. }
  729. }
  730. }
  731. /// Store the account update.
  732. pub fn store(&self, fork_id: Fork, accounts: &[(&Pubkey, &Account)]) {
  733. let mut store_accounts = Measure::start("store::store_accounts");
  734. let infos = self.store_accounts(fork_id, accounts);
  735. store_accounts.stop();
  736. let mut update_index = Measure::start("store::update_index");
  737. let (reclaims, last_root) = self.update_index(fork_id, infos, accounts);
  738. update_index.stop();
  739. trace!("reclaim: {}", reclaims.len());
  740. let mut remove_dead_accounts = Measure::start("store::remove_dead");
  741. let mut dead_forks = self.remove_dead_accounts(reclaims);
  742. remove_dead_accounts.stop();
  743. trace!("dead_forks: {}", dead_forks.len());
  744. let mut cleanup_dead_forks = Measure::start("store::cleanup_dead_forks");
  745. self.cleanup_dead_forks(&mut dead_forks, last_root);
  746. cleanup_dead_forks.stop();
  747. trace!("purge_forks: {}", dead_forks.len());
  748. let mut purge_forks = Measure::start("store::purge_forks");
  749. for fork in dead_forks {
  750. self.purge_fork(fork);
  751. }
  752. purge_forks.stop();
  753. }
  754. pub fn add_root(&self, fork: Fork) {
  755. self.accounts_index.write().unwrap().add_root(fork)
  756. }
  757. pub fn get_storage_entries(&self) -> Vec<Arc<AccountStorageEntry>> {
  758. let r_storage = self.storage.read().unwrap();
  759. r_storage
  760. .0
  761. .values()
  762. .flat_map(|fork_store| fork_store.values().cloned())
  763. .collect()
  764. }
  765. fn merge(
  766. dest: &mut HashMap<Pubkey, (u64, AccountInfo)>,
  767. source: &HashMap<Pubkey, (u64, AccountInfo)>,
  768. ) {
  769. for (key, (source_version, source_info)) in source.iter() {
  770. if let Some((dest_version, _)) = dest.get(key) {
  771. if dest_version > source_version {
  772. continue;
  773. }
  774. }
  775. dest.insert(*key, (*source_version, source_info.clone()));
  776. }
  777. }
  778. fn get_io_error(error: &str) -> IOError {
  779. warn!("AccountsDB error: {:?}", error);
  780. IOError::new(ErrorKind::Other, error)
  781. }
  782. fn generate_index(&self) {
  783. let storage = self.storage.read().unwrap();
  784. let mut forks: Vec<Fork> = storage.0.keys().cloned().collect();
  785. forks.sort();
  786. let mut accounts_index = self.accounts_index.write().unwrap();
  787. for fork_id in forks.iter() {
  788. let mut accumulator: Vec<HashMap<Pubkey, (u64, AccountInfo)>> = self
  789. .scan_account_storage(
  790. *fork_id,
  791. |stored_account: &StoredAccount,
  792. id: AppendVecId,
  793. accum: &mut HashMap<Pubkey, (u64, AccountInfo)>| {
  794. let account_info = AccountInfo {
  795. id,
  796. offset: stored_account.offset,
  797. lamports: stored_account.balance.lamports,
  798. };
  799. accum.insert(
  800. stored_account.meta.pubkey,
  801. (stored_account.meta.write_version, account_info),
  802. );
  803. },
  804. );
  805. let mut account_maps = accumulator.pop().unwrap();
  806. while let Some(maps) = accumulator.pop() {
  807. AccountsDB::merge(&mut account_maps, &maps);
  808. }
  809. if !account_maps.is_empty() {
  810. accounts_index.roots.insert(*fork_id);
  811. let mut _reclaims: Vec<(u64, AccountInfo)> = vec![];
  812. for (pubkey, (_, account_info)) in account_maps.iter() {
  813. accounts_index.insert(*fork_id, pubkey, account_info.clone(), &mut _reclaims);
  814. }
  815. }
  816. }
  817. }
  818. }
  819. #[cfg(test)]
  820. pub mod tests {
  821. // TODO: all the bank tests are bank specific, issue: 2194
  822. use super::*;
  823. use bincode::serialize_into;
  824. use rand::{thread_rng, Rng};
  825. use solana_sdk::account::Account;
  826. use std::fs;
  827. use tempfile::TempDir;
  828. #[test]
  829. fn test_accountsdb_add_root() {
  830. solana_logger::setup();
  831. let db = AccountsDB::new(None);
  832. let key = Pubkey::default();
  833. let account0 = Account::new(1, 0, &key);
  834. db.store(0, &[(&key, &account0)]);
  835. db.add_root(0);
  836. let ancestors = vec![(1, 1)].into_iter().collect();
  837. assert_eq!(db.load_slow(&ancestors, &key), Some((account0, 0)));
  838. }
  839. #[test]
  840. fn test_accountsdb_latest_ancestor() {
  841. solana_logger::setup();
  842. let db = AccountsDB::new(None);
  843. let key = Pubkey::default();
  844. let account0 = Account::new(1, 0, &key);
  845. db.store(0, &[(&key, &account0)]);
  846. let account1 = Account::new(0, 0, &key);
  847. db.store(1, &[(&key, &account1)]);
  848. let ancestors = vec![(1, 1)].into_iter().collect();
  849. assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1);
  850. let ancestors = vec![(1, 1), (0, 0)].into_iter().collect();
  851. assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1);
  852. let accounts: Vec<Account> =
  853. db.scan_accounts(&ancestors, |accounts: &mut Vec<Account>, option| {
  854. if let Some(data) = option {
  855. accounts.push(data.1);
  856. }
  857. });
  858. assert_eq!(accounts, vec![account1]);
  859. }
  860. #[test]
  861. fn test_accountsdb_latest_ancestor_with_root() {
  862. solana_logger::setup();
  863. let db = AccountsDB::new(None);
  864. let key = Pubkey::default();
  865. let account0 = Account::new(1, 0, &key);
  866. db.store(0, &[(&key, &account0)]);
  867. let account1 = Account::new(0, 0, &key);
  868. db.store(1, &[(&key, &account1)]);
  869. db.add_root(0);
  870. let ancestors = vec![(1, 1)].into_iter().collect();
  871. assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1);
  872. let ancestors = vec![(1, 1), (0, 0)].into_iter().collect();
  873. assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1);
  874. }
  875. #[test]
  876. fn test_accountsdb_root_one_fork() {
  877. solana_logger::setup();
  878. let db = AccountsDB::new(None);
  879. let key = Pubkey::default();
  880. let account0 = Account::new(1, 0, &key);
  881. // store value 1 in the "root", i.e. db zero
  882. db.store(0, &[(&key, &account0)]);
  883. // now we have:
  884. //
  885. // root0 -> key.lamports==1
  886. // / \
  887. // / \
  888. // key.lamports==0 <- fork1 \
  889. // fork2 -> key.lamports==1
  890. // (via root0)
  891. // store value 0 in one child
  892. let account1 = Account::new(0, 0, &key);
  893. db.store(1, &[(&key, &account1)]);
  894. // masking accounts is done at the Accounts level, at accountsDB we see
  895. // original account (but could also accept "None", which is implemented
  896. // at the Accounts level)
  897. let ancestors = vec![(0, 0), (1, 1)].into_iter().collect();
  898. assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1);
  899. // we should see 1 token in fork 2
  900. let ancestors = vec![(0, 0), (2, 2)].into_iter().collect();
  901. assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account0);
  902. db.add_root(0);
  903. let ancestors = vec![(1, 1)].into_iter().collect();
  904. assert_eq!(db.load_slow(&ancestors, &key), Some((account1, 1)));
  905. let ancestors = vec![(2, 2)].into_iter().collect();
  906. assert_eq!(db.load_slow(&ancestors, &key), Some((account0, 0))); // original value
  907. }
  908. #[test]
  909. fn test_accountsdb_add_root_many() {
  910. let db = AccountsDB::new(None);
  911. let mut pubkeys: Vec<Pubkey> = vec![];
  912. create_account(&db, &mut pubkeys, 0, 100, 0, 0);
  913. for _ in 1..100 {
  914. let idx = thread_rng().gen_range(0, 99);
  915. let ancestors = vec![(0, 0)].into_iter().collect();
  916. let account = db.load_slow(&ancestors, &pubkeys[idx]).unwrap();
  917. let mut default_account = Account::default();
  918. default_account.lamports = (idx + 1) as u64;
  919. assert_eq!((default_account, 0), account);
  920. }
  921. db.add_root(0);
  922. // check that all the accounts appear with a new root
  923. for _ in 1..100 {
  924. let idx = thread_rng().gen_range(0, 99);
  925. let ancestors = vec![(0, 0)].into_iter().collect();
  926. let account0 = db.load_slow(&ancestors, &pubkeys[idx]).unwrap();
  927. let ancestors = vec![(1, 1)].into_iter().collect();
  928. let account1 = db.load_slow(&ancestors, &pubkeys[idx]).unwrap();
  929. let mut default_account = Account::default();
  930. default_account.lamports = (idx + 1) as u64;
  931. assert_eq!(&default_account, &account0.0);
  932. assert_eq!(&default_account, &account1.0);
  933. }
  934. }
  935. #[test]
  936. fn test_accountsdb_count_stores() {
  937. solana_logger::setup();
  938. let db = AccountsDB::new_single();
  939. let mut pubkeys: Vec<Pubkey> = vec![];
  940. create_account(&db, &mut pubkeys, 0, 2, DEFAULT_FILE_SIZE as usize / 3, 0);
  941. assert!(check_storage(&db, 0, 2));
  942. let pubkey = Pubkey::new_rand();
  943. let account = Account::new(1, DEFAULT_FILE_SIZE as usize / 3, &pubkey);
  944. db.store(1, &[(&pubkey, &account)]);
  945. db.store(1, &[(&pubkeys[0], &account)]);
  946. {
  947. let stores = db.storage.read().unwrap();
  948. let fork_0_stores = &stores.0.get(&0).unwrap();
  949. let fork_1_stores = &stores.0.get(&1).unwrap();
  950. assert_eq!(fork_0_stores.len(), 1);
  951. assert_eq!(fork_1_stores.len(), 1);
  952. assert_eq!(fork_0_stores[&0].count(), 2);
  953. assert_eq!(fork_1_stores[&1].count(), 2);
  954. }
  955. db.add_root(1);
  956. {
  957. let stores = db.storage.read().unwrap();
  958. let fork_0_stores = &stores.0.get(&0).unwrap();
  959. let fork_1_stores = &stores.0.get(&1).unwrap();
  960. assert_eq!(fork_0_stores.len(), 1);
  961. assert_eq!(fork_1_stores.len(), 1);
  962. assert_eq!(fork_0_stores[&0].count(), 2);
  963. assert_eq!(fork_1_stores[&1].count(), 2);
  964. }
  965. }
  966. #[test]
  967. fn test_accounts_unsquashed() {
  968. let key = Pubkey::default();
  969. // 1 token in the "root", i.e. db zero
  970. let db0 = AccountsDB::new(None);
  971. let account0 = Account::new(1, 0, &key);
  972. db0.store(0, &[(&key, &account0)]);
  973. // 0 lamports in the child
  974. let account1 = Account::new(0, 0, &key);
  975. db0.store(1, &[(&key, &account1)]);
  976. // masking accounts is done at the Accounts level, at accountsDB we see
  977. // original account
  978. let ancestors = vec![(0, 0), (1, 1)].into_iter().collect();
  979. assert_eq!(db0.load_slow(&ancestors, &key), Some((account1, 1)));
  980. let ancestors = vec![(0, 0)].into_iter().collect();
  981. assert_eq!(db0.load_slow(&ancestors, &key), Some((account0, 0)));
  982. }
  983. fn create_account(
  984. accounts: &AccountsDB,
  985. pubkeys: &mut Vec<Pubkey>,
  986. fork: Fork,
  987. num: usize,
  988. space: usize,
  989. num_vote: usize,
  990. ) {
  991. let ancestors = vec![(fork, 0)].into_iter().collect();
  992. for t in 0..num {
  993. let pubkey = Pubkey::new_rand();
  994. let account = Account::new((t + 1) as u64, space, &Account::default().owner);
  995. pubkeys.push(pubkey.clone());
  996. assert!(accounts.load_slow(&ancestors, &pubkey).is_none());
  997. accounts.store(fork, &[(&pubkey, &account)]);
  998. }
  999. for t in 0..num_vote {
  1000. let pubkey = Pubkey::new_rand();
  1001. let account = Account::new((num + t + 1) as u64, space, &solana_vote_api::id());
  1002. pubkeys.push(pubkey.clone());
  1003. let ancestors = vec![(fork, 0)].into_iter().collect();
  1004. assert!(accounts.load_slow(&ancestors, &pubkey).is_none());
  1005. accounts.store(fork, &[(&pubkey, &account)]);
  1006. }
  1007. }
  1008. fn update_accounts(accounts: &AccountsDB, pubkeys: &Vec<Pubkey>, fork: Fork, range: usize) {
  1009. for _ in 1..1000 {
  1010. let idx = thread_rng().gen_range(0, range);
  1011. let ancestors = vec![(fork, 0)].into_iter().collect();
  1012. if let Some((mut account, _)) = accounts.load_slow(&ancestors, &pubkeys[idx]) {
  1013. account.lamports = account.lamports + 1;
  1014. accounts.store(fork, &[(&pubkeys[idx], &account)]);
  1015. if account.lamports == 0 {
  1016. let ancestors = vec![(fork, 0)].into_iter().collect();
  1017. assert!(accounts.load_slow(&ancestors, &pubkeys[idx]).is_none());
  1018. } else {
  1019. let mut default_account = Account::default();
  1020. default_account.lamports = account.lamports;
  1021. assert_eq!(default_account, account);
  1022. }
  1023. }
  1024. }
  1025. }
  1026. fn check_storage(accounts: &AccountsDB, fork: Fork, count: usize) -> bool {
  1027. let storage = accounts.storage.read().unwrap();
  1028. assert_eq!(storage.0[&fork].len(), 1);
  1029. let fork_storage = storage.0.get(&fork).unwrap();
  1030. let mut total_count: usize = 0;
  1031. for store in fork_storage.values() {
  1032. assert_eq!(store.status(), AccountStorageStatus::Available);
  1033. total_count += store.count();
  1034. }
  1035. assert_eq!(total_count, count);
  1036. total_count == count
  1037. }
  1038. fn check_accounts(
  1039. accounts: &AccountsDB,
  1040. pubkeys: &Vec<Pubkey>,
  1041. fork: Fork,
  1042. num: usize,
  1043. count: usize,
  1044. ) {
  1045. let ancestors = vec![(fork, 0)].into_iter().collect();
  1046. for _ in 0..num {
  1047. let idx = thread_rng().gen_range(0, num);
  1048. let account = accounts.load_slow(&ancestors, &pubkeys[idx]);
  1049. let account1 = Some((
  1050. Account::new((idx + count) as u64, 0, &Account::default().owner),
  1051. fork,
  1052. ));
  1053. assert_eq!(account, account1);
  1054. }
  1055. }
  1056. fn modify_accounts(
  1057. accounts: &AccountsDB,
  1058. pubkeys: &Vec<Pubkey>,
  1059. fork: Fork,
  1060. num: usize,
  1061. count: usize,
  1062. ) {
  1063. for idx in 0..num {
  1064. let account = Account::new((idx + count) as u64, 0, &Account::default().owner);
  1065. accounts.store(fork, &[(&pubkeys[idx], &account)]);
  1066. }
  1067. }
  1068. #[test]
  1069. fn test_account_one() {
  1070. let (_accounts_dirs, paths) = get_temp_accounts_paths(1).unwrap();
  1071. let db = AccountsDB::new(Some(paths));
  1072. let mut pubkeys: Vec<Pubkey> = vec![];
  1073. create_account(&db, &mut pubkeys, 0, 1, 0, 0);
  1074. let ancestors = vec![(0, 0)].into_iter().collect();
  1075. let account = db.load_slow(&ancestors, &pubkeys[0]).unwrap();
  1076. let mut default_account = Account::default();
  1077. default_account.lamports = 1;
  1078. assert_eq!((default_account, 0), account);
  1079. }
  1080. #[test]
  1081. fn test_account_many() {
  1082. let (_accounts_dirs, paths) = get_temp_accounts_paths(2).unwrap();
  1083. let db = AccountsDB::new(Some(paths));
  1084. let mut pubkeys: Vec<Pubkey> = vec![];
  1085. create_account(&db, &mut pubkeys, 0, 100, 0, 0);
  1086. check_accounts(&db, &pubkeys, 0, 100, 1);
  1087. }
  1088. #[test]
  1089. fn test_account_update() {
  1090. let accounts = AccountsDB::new_single();
  1091. let mut pubkeys: Vec<Pubkey> = vec![];
  1092. create_account(&accounts, &mut pubkeys, 0, 100, 0, 0);
  1093. update_accounts(&accounts, &pubkeys, 0, 99);
  1094. assert_eq!(check_storage(&accounts, 0, 100), true);
  1095. }
  1096. #[test]
  1097. fn test_account_grow_many() {
  1098. let (_accounts_dir, paths) = get_temp_accounts_paths(2).unwrap();
  1099. let size = 4096;
  1100. let accounts = AccountsDB::new_sized(Some(paths), size);
  1101. let mut keys = vec![];
  1102. for i in 0..9 {
  1103. let key = Pubkey::new_rand();
  1104. let account = Account::new(i + 1, size as usize / 4, &key);
  1105. accounts.store(0, &[(&key, &account)]);
  1106. keys.push(key);
  1107. }
  1108. for (i, key) in keys.iter().enumerate() {
  1109. let ancestors = vec![(0, 0)].into_iter().collect();
  1110. assert_eq!(
  1111. accounts.load_slow(&ancestors, &key).unwrap().0.lamports,
  1112. (i as u64) + 1
  1113. );
  1114. }
  1115. let mut append_vec_histogram = HashMap::new();
  1116. for storage in accounts
  1117. .storage
  1118. .read()
  1119. .unwrap()
  1120. .0
  1121. .values()
  1122. .flat_map(|x| x.values())
  1123. {
  1124. *append_vec_histogram.entry(storage.fork_id).or_insert(0) += 1;
  1125. }
  1126. for count in append_vec_histogram.values() {
  1127. assert!(*count >= 2);
  1128. }
  1129. }
  1130. #[test]
  1131. fn test_account_grow() {
  1132. let accounts = AccountsDB::new_single();
  1133. let count = [0, 1];
  1134. let status = [AccountStorageStatus::Available, AccountStorageStatus::Full];
  1135. let pubkey1 = Pubkey::new_rand();
  1136. let account1 = Account::new(1, DEFAULT_FILE_SIZE as usize / 2, &pubkey1);
  1137. accounts.store(0, &[(&pubkey1, &account1)]);
  1138. {
  1139. let stores = accounts.storage.read().unwrap();
  1140. assert_eq!(stores.0.len(), 1);
  1141. assert_eq!(stores.0[&0][&0].count(), 1);
  1142. assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::Available);
  1143. }
  1144. let pubkey2 = Pubkey::new_rand();
  1145. let account2 = Account::new(1, DEFAULT_FILE_SIZE as usize / 2, &pubkey2);
  1146. accounts.store(0, &[(&pubkey2, &account2)]);
  1147. {
  1148. let stores = accounts.storage.read().unwrap();
  1149. assert_eq!(stores.0.len(), 1);
  1150. assert_eq!(stores.0[&0].len(), 2);
  1151. assert_eq!(stores.0[&0][&0].count(), 1);
  1152. assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::Full);
  1153. assert_eq!(stores.0[&0][&1].count(), 1);
  1154. assert_eq!(stores.0[&0][&1].status(), AccountStorageStatus::Available);
  1155. }
  1156. let ancestors = vec![(0, 0)].into_iter().collect();
  1157. assert_eq!(
  1158. accounts.load_slow(&ancestors, &pubkey1).unwrap().0,
  1159. account1
  1160. );
  1161. assert_eq!(
  1162. accounts.load_slow(&ancestors, &pubkey2).unwrap().0,
  1163. account2
  1164. );
  1165. // lots of stores, but 3 storages should be enough for everything
  1166. for i in 0..25 {
  1167. let index = i % 2;
  1168. accounts.store(0, &[(&pubkey1, &account1)]);
  1169. {
  1170. let stores = accounts.storage.read().unwrap();
  1171. assert_eq!(stores.0.len(), 1);
  1172. assert_eq!(stores.0[&0].len(), 3);
  1173. assert_eq!(stores.0[&0][&0].count(), count[index]);
  1174. assert_eq!(stores.0[&0][&0].status(), status[0]);
  1175. assert_eq!(stores.0[&0][&1].count(), 1);
  1176. assert_eq!(stores.0[&0][&1].status(), status[1]);
  1177. assert_eq!(stores.0[&0][&2].count(), count[index ^ 1]);
  1178. assert_eq!(stores.0[&0][&2].status(), status[0]);
  1179. }
  1180. let ancestors = vec![(0, 0)].into_iter().collect();
  1181. assert_eq!(
  1182. accounts.load_slow(&ancestors, &pubkey1).unwrap().0,
  1183. account1
  1184. );
  1185. assert_eq!(
  1186. accounts.load_slow(&ancestors, &pubkey2).unwrap().0,
  1187. account2
  1188. );
  1189. }
  1190. }
  1191. #[test]
  1192. fn test_purge_fork_not_root() {
  1193. let accounts = AccountsDB::new(None);
  1194. let mut pubkeys: Vec<Pubkey> = vec![];
  1195. create_account(&accounts, &mut pubkeys, 0, 1, 0, 0);
  1196. let ancestors = vec![(0, 0)].into_iter().collect();
  1197. assert!(accounts.load_slow(&ancestors, &pubkeys[0]).is_some());
  1198. accounts.purge_fork(0);
  1199. assert!(accounts.load_slow(&ancestors, &pubkeys[0]).is_none());
  1200. }
  1201. #[test]
  1202. fn test_purge_fork_after_root() {
  1203. let accounts = AccountsDB::new(None);
  1204. let mut pubkeys: Vec<Pubkey> = vec![];
  1205. create_account(&accounts, &mut pubkeys, 0, 1, 0, 0);
  1206. let ancestors = vec![(0, 0)].into_iter().collect();
  1207. accounts.add_root(0);
  1208. accounts.purge_fork(0);
  1209. assert!(accounts.load_slow(&ancestors, &pubkeys[0]).is_some());
  1210. }
  1211. #[test]
  1212. fn test_lazy_gc_fork() {
  1213. //This test is pedantic
  1214. //A fork is purged when a non root bank is cleaned up. If a fork is behind root but it is
  1215. //not root, it means we are retaining dead banks.
  1216. let accounts = AccountsDB::new(None);
  1217. let pubkey = Pubkey::new_rand();
  1218. let account = Account::new(1, 0, &Account::default().owner);
  1219. //store an account
  1220. accounts.store(0, &[(&pubkey, &account)]);
  1221. let ancestors = vec![(0, 0)].into_iter().collect();
  1222. let id = {
  1223. let index = accounts.accounts_index.read().unwrap();
  1224. let (list, idx) = index.get(&pubkey, &ancestors).unwrap();
  1225. list[idx].1.id
  1226. };
  1227. //fork 0 is behind root, but it is not root, therefore it is purged
  1228. accounts.add_root(1);
  1229. assert!(accounts.accounts_index.read().unwrap().is_purged(0));
  1230. //fork is still there, since gc is lazy
  1231. assert!(accounts.storage.read().unwrap().0[&0].get(&id).is_some());
  1232. //store causes cleanup
  1233. accounts.store(1, &[(&pubkey, &account)]);
  1234. //fork is gone
  1235. assert!(accounts.storage.read().unwrap().0.get(&0).is_none());
  1236. //new value is there
  1237. let ancestors = vec![(1, 1)].into_iter().collect();
  1238. assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some((account, 1)));
  1239. }
  1240. #[test]
  1241. fn test_accounts_db_serialize() {
  1242. solana_logger::setup();
  1243. let accounts = AccountsDB::new_single();
  1244. let mut pubkeys: Vec<Pubkey> = vec![];
  1245. create_account(&accounts, &mut pubkeys, 0, 100, 0, 0);
  1246. assert_eq!(check_storage(&accounts, 0, 100), true);
  1247. check_accounts(&accounts, &pubkeys, 0, 100, 1);
  1248. modify_accounts(&accounts, &pubkeys, 0, 100, 2);
  1249. check_accounts(&accounts, &pubkeys, 0, 100, 2);
  1250. accounts.add_root(0);
  1251. let mut pubkeys1: Vec<Pubkey> = vec![];
  1252. create_account(&accounts, &mut pubkeys1, 1, 10, 0, 0);
  1253. let mut writer = Cursor::new(vec![]);
  1254. serialize_into(&mut writer, &AccountsDBSerialize::new(&accounts, 1)).unwrap();
  1255. assert!(check_storage(&accounts, 0, 100));
  1256. assert!(check_storage(&accounts, 1, 10));
  1257. let buf = writer.into_inner();
  1258. let mut reader = BufReader::new(&buf[..]);
  1259. let daccounts = AccountsDB::new(None);
  1260. let local_paths = {
  1261. let paths = daccounts.paths.read().unwrap();
  1262. AccountsDB::format_paths(paths.to_vec())
  1263. };
  1264. let copied_accounts = TempDir::new().unwrap();
  1265. // Simulate obtaining a copy of the AppendVecs from a tarball
  1266. copy_append_vecs(&accounts, copied_accounts.path()).unwrap();
  1267. daccounts
  1268. .accounts_from_stream(&mut reader, local_paths, copied_accounts.path())
  1269. .unwrap();
  1270. assert_eq!(
  1271. daccounts.write_version.load(Ordering::Relaxed),
  1272. accounts.write_version.load(Ordering::Relaxed)
  1273. );
  1274. assert_eq!(
  1275. daccounts.next_id.load(Ordering::Relaxed),
  1276. accounts.next_id.load(Ordering::Relaxed)
  1277. );
  1278. check_accounts(&daccounts, &pubkeys, 0, 100, 2);
  1279. check_accounts(&daccounts, &pubkeys1, 1, 10, 1);
  1280. assert!(check_storage(&daccounts, 0, 100));
  1281. assert!(check_storage(&daccounts, 1, 10));
  1282. }
  1283. #[test]
  1284. #[ignore]
  1285. fn test_store_account_stress() {
  1286. let fork_id = 42;
  1287. let num_threads = 2;
  1288. let min_file_bytes = std::mem::size_of::<StorageMeta>()
  1289. + std::mem::size_of::<crate::append_vec::AccountBalance>();
  1290. let db = Arc::new(AccountsDB::new_sized(None, min_file_bytes as u64));
  1291. db.add_root(fork_id);
  1292. let thread_hdls: Vec<_> = (0..num_threads)
  1293. .into_iter()
  1294. .map(|_| {
  1295. let db = db.clone();
  1296. std::thread::Builder::new()
  1297. .name("account-writers".to_string())
  1298. .spawn(move || {
  1299. let pubkey = Pubkey::new_rand();
  1300. let mut account = Account::new(1, 0, &pubkey);
  1301. let mut i = 0;
  1302. loop {
  1303. let account_bal = thread_rng().gen_range(1, 99);
  1304. account.lamports = account_bal;
  1305. db.store(fork_id, &[(&pubkey, &account)]);
  1306. let (account, fork) = db.load_slow(&HashMap::new(), &pubkey).expect(
  1307. &format!("Could not fetch stored account {}, iter {}", pubkey, i),
  1308. );
  1309. assert_eq!(fork, fork_id);
  1310. assert_eq!(account.lamports, account_bal);
  1311. i += 1;
  1312. }
  1313. })
  1314. .unwrap()
  1315. })
  1316. .collect();
  1317. for t in thread_hdls {
  1318. t.join().unwrap();
  1319. }
  1320. }
  1321. #[test]
  1322. fn test_accountsdb_scan_accounts() {
  1323. solana_logger::setup();
  1324. let db = AccountsDB::new(None);
  1325. let key = Pubkey::default();
  1326. let key0 = Pubkey::new_rand();
  1327. let account0 = Account::new(1, 0, &key);
  1328. db.store(0, &[(&key0, &account0)]);
  1329. let key1 = Pubkey::new_rand();
  1330. let account1 = Account::new(2, 0, &key);
  1331. db.store(1, &[(&key1, &account1)]);
  1332. let ancestors = vec![(0, 0)].into_iter().collect();
  1333. let accounts: Vec<Account> =
  1334. db.scan_accounts(&ancestors, |accounts: &mut Vec<Account>, option| {
  1335. if let Some(data) = option {
  1336. accounts.push(data.1);
  1337. }
  1338. });
  1339. assert_eq!(accounts, vec![account0]);
  1340. let ancestors = vec![(1, 1), (0, 0)].into_iter().collect();
  1341. let accounts: Vec<Account> =
  1342. db.scan_accounts(&ancestors, |accounts: &mut Vec<Account>, option| {
  1343. if let Some(data) = option {
  1344. accounts.push(data.1);
  1345. }
  1346. });
  1347. assert_eq!(accounts.len(), 2);
  1348. }
  1349. #[test]
  1350. fn test_store_large_account() {
  1351. solana_logger::setup();
  1352. let db = AccountsDB::new(None);
  1353. let key = Pubkey::default();
  1354. let data_len = DEFAULT_FILE_SIZE as usize + 7;
  1355. let account = Account::new(1, data_len, &key);
  1356. db.store(0, &[(&key, &account)]);
  1357. let ancestors = vec![(0, 0)].into_iter().collect();
  1358. let ret = db.load_slow(&ancestors, &key).unwrap();
  1359. assert_eq!(ret.0.data.len(), data_len);
  1360. }
  1361. pub fn copy_append_vecs<P: AsRef<Path>>(
  1362. accounts_db: &AccountsDB,
  1363. output_dir: P,
  1364. ) -> IOResult<()> {
  1365. let storage_entries = accounts_db.get_storage_entries();
  1366. for storage in storage_entries {
  1367. let storage_path = storage.get_path();
  1368. let output_path = output_dir.as_ref().join(
  1369. storage_path
  1370. .file_name()
  1371. .expect("Invalid AppendVec file path"),
  1372. );
  1373. fs::copy(storage_path, output_path)?;
  1374. }
  1375. Ok(())
  1376. }
  1377. }