bigtable.rs 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124
  1. // Primitives for reading/writing BigTable tables
  2. use {
  3. crate::{
  4. access_token::{AccessToken, Scope},
  5. compression::{compress_best, decompress},
  6. root_ca_certificate, CredentialType,
  7. },
  8. backoff::{future::retry, Error as BackoffError, ExponentialBackoff},
  9. log::*,
  10. std::{
  11. str::FromStr,
  12. time::{Duration, Instant},
  13. },
  14. thiserror::Error,
  15. tonic::{codegen::InterceptedService, transport::ClientTlsConfig, Request, Status},
  16. };
  17. #[allow(clippy::all)]
  18. mod google {
  19. mod rpc {
  20. include!(concat!(
  21. env!("CARGO_MANIFEST_DIR"),
  22. concat!("/proto/google.rpc.rs")
  23. ));
  24. }
  25. pub mod bigtable {
  26. pub mod v2 {
  27. include!(concat!(
  28. env!("CARGO_MANIFEST_DIR"),
  29. concat!("/proto/google.bigtable.v2.rs")
  30. ));
  31. }
  32. }
  33. }
  34. use google::bigtable::v2::*;
  35. pub type RowKey = String;
  36. pub type RowData = Vec<(CellName, CellValue)>;
  37. pub type RowDataSlice<'a> = &'a [(CellName, CellValue)];
  38. pub type CellName = String;
  39. pub type CellValue = Vec<u8>;
  40. pub enum CellData<B, P> {
  41. Bincode(B),
  42. Protobuf(P),
  43. }
  44. #[derive(Debug, Error)]
  45. pub enum Error {
  46. #[error("AccessToken: {0}")]
  47. AccessToken(String),
  48. #[error("Certificate: {0}")]
  49. Certificate(String),
  50. #[error("I/O: {0}")]
  51. Io(std::io::Error),
  52. #[error("Transport: {0}")]
  53. Transport(tonic::transport::Error),
  54. #[error("Invalid URI {0}: {1}")]
  55. InvalidUri(String, String),
  56. #[error("Row not found")]
  57. RowNotFound,
  58. #[error("Row write failed")]
  59. RowWriteFailed,
  60. #[error("Row delete failed")]
  61. RowDeleteFailed,
  62. #[error("Object not found: {0}")]
  63. ObjectNotFound(String),
  64. #[error("Object is corrupt: {0}")]
  65. ObjectCorrupt(String),
  66. #[error("RPC: {0}")]
  67. Rpc(Box<tonic::Status>),
  68. #[error("Timeout")]
  69. Timeout,
  70. }
  71. fn to_backoff_err(err: Error) -> BackoffError<Error> {
  72. if let Error::Rpc(ref status) = err {
  73. if status.code() == tonic::Code::NotFound && status.message().starts_with("table") {
  74. return BackoffError::Permanent(err);
  75. }
  76. }
  77. err.into()
  78. }
  79. impl std::convert::From<std::io::Error> for Error {
  80. fn from(err: std::io::Error) -> Self {
  81. Self::Io(err)
  82. }
  83. }
  84. impl std::convert::From<tonic::transport::Error> for Error {
  85. fn from(err: tonic::transport::Error) -> Self {
  86. Self::Transport(err)
  87. }
  88. }
  89. impl std::convert::From<tonic::Status> for Error {
  90. fn from(err: tonic::Status) -> Self {
  91. Self::Rpc(Box::new(err))
  92. }
  93. }
  94. pub type Result<T> = std::result::Result<T, Error>;
  95. type InterceptedRequestResult = std::result::Result<Request<()>, Status>;
  96. #[derive(Clone)]
  97. pub struct BigTableConnection {
  98. access_token: Option<AccessToken>,
  99. channel: tonic::transport::Channel,
  100. table_prefix: String,
  101. app_profile_id: String,
  102. timeout: Option<Duration>,
  103. max_message_size: usize,
  104. }
  105. impl BigTableConnection {
  106. /// Establish a connection to the BigTable instance named `instance_name`. If read-only access
  107. /// is required, the `read_only` flag should be used to reduce the requested OAuth2 scope.
  108. ///
  109. /// The GOOGLE_APPLICATION_CREDENTIALS environment variable will be used to determine the
  110. /// program name that contains the BigTable instance in addition to access credentials.
  111. ///
  112. /// The BIGTABLE_EMULATOR_HOST environment variable is also respected.
  113. ///
  114. /// The BIGTABLE_PROXY environment variable is used to configure the gRPC connection through a
  115. /// forward proxy (see HTTP_PROXY).
  116. ///
  117. pub async fn new(
  118. instance_name: &str,
  119. app_profile_id: &str,
  120. read_only: bool,
  121. timeout: Option<Duration>,
  122. credential_type: CredentialType,
  123. max_message_size: usize,
  124. ) -> Result<Self> {
  125. match std::env::var("BIGTABLE_EMULATOR_HOST") {
  126. Ok(endpoint) => {
  127. info!("Connecting to bigtable emulator at {endpoint}");
  128. Self::new_for_emulator(
  129. instance_name,
  130. app_profile_id,
  131. &endpoint,
  132. timeout,
  133. max_message_size,
  134. )
  135. }
  136. Err(_) => {
  137. let access_token = AccessToken::new(
  138. if read_only {
  139. Scope::BigTableDataReadOnly
  140. } else {
  141. Scope::BigTableData
  142. },
  143. credential_type,
  144. )
  145. .await
  146. .map_err(Error::AccessToken)?;
  147. let table_prefix = format!(
  148. "projects/{}/instances/{}/tables/",
  149. access_token.project(),
  150. instance_name
  151. );
  152. let endpoint = {
  153. let endpoint =
  154. tonic::transport::Channel::from_static("https://bigtable.googleapis.com")
  155. .tls_config(
  156. ClientTlsConfig::new()
  157. .ca_certificate(
  158. root_ca_certificate::load().map_err(Error::Certificate)?,
  159. )
  160. .domain_name("bigtable.googleapis.com"),
  161. )?;
  162. if let Some(timeout) = timeout {
  163. endpoint.timeout(timeout)
  164. } else {
  165. endpoint
  166. }
  167. };
  168. let mut http = hyper::client::HttpConnector::new();
  169. http.enforce_http(false);
  170. http.set_nodelay(true);
  171. let channel = match std::env::var("BIGTABLE_PROXY") {
  172. Ok(proxy_uri) => {
  173. let proxy = hyper_proxy::Proxy::new(
  174. hyper_proxy::Intercept::All,
  175. proxy_uri
  176. .parse::<http::Uri>()
  177. .map_err(|err| Error::InvalidUri(proxy_uri, err.to_string()))?,
  178. );
  179. let mut proxy_connector =
  180. hyper_proxy::ProxyConnector::from_proxy(http, proxy)?;
  181. // tonic handles TLS as a separate layer
  182. proxy_connector.set_tls(None);
  183. endpoint.connect_with_connector_lazy(proxy_connector)
  184. }
  185. _ => endpoint.connect_with_connector_lazy(http),
  186. };
  187. Ok(Self {
  188. access_token: Some(access_token),
  189. channel,
  190. table_prefix,
  191. app_profile_id: app_profile_id.to_string(),
  192. timeout,
  193. max_message_size,
  194. })
  195. }
  196. }
  197. }
  198. pub fn new_for_emulator(
  199. instance_name: &str,
  200. app_profile_id: &str,
  201. endpoint: &str,
  202. timeout: Option<Duration>,
  203. max_message_size: usize,
  204. ) -> Result<Self> {
  205. Ok(Self {
  206. access_token: None,
  207. channel: tonic::transport::Channel::from_shared(format!("http://{endpoint}"))
  208. .map_err(|err| Error::InvalidUri(String::from(endpoint), err.to_string()))?
  209. .connect_lazy(),
  210. table_prefix: format!("projects/emulator/instances/{instance_name}/tables/"),
  211. app_profile_id: app_profile_id.to_string(),
  212. timeout,
  213. max_message_size,
  214. })
  215. }
  216. /// Create a new BigTable client.
  217. ///
  218. /// Clients require `&mut self`, due to `Tonic::transport::Channel` limitations, however
  219. /// creating new clients is cheap and thus can be used as a work around for ease of use.
  220. pub fn client(&self) -> BigTable<impl FnMut(Request<()>) -> InterceptedRequestResult> {
  221. let access_token = self.access_token.clone();
  222. let client = bigtable_client::BigtableClient::with_interceptor(
  223. self.channel.clone(),
  224. move |mut req: Request<()>| {
  225. if let Some(access_token) = &access_token {
  226. match FromStr::from_str(&access_token.get()) {
  227. Ok(authorization_header) => {
  228. req.metadata_mut()
  229. .insert("authorization", authorization_header);
  230. }
  231. Err(err) => {
  232. warn!("Failed to set authorization header: {err}");
  233. }
  234. }
  235. }
  236. Ok(req)
  237. },
  238. )
  239. .max_decoding_message_size(self.max_message_size)
  240. .max_encoding_message_size(self.max_message_size);
  241. BigTable {
  242. access_token: self.access_token.clone(),
  243. client,
  244. table_prefix: self.table_prefix.clone(),
  245. app_profile_id: self.app_profile_id.clone(),
  246. timeout: self.timeout,
  247. }
  248. }
  249. pub async fn put_bincode_cells_with_retry<T>(
  250. &self,
  251. table: &str,
  252. cells: &[(RowKey, T)],
  253. ) -> Result<usize>
  254. where
  255. T: serde::ser::Serialize,
  256. {
  257. retry(ExponentialBackoff::default(), || async {
  258. let mut client = self.client();
  259. let result = client.put_bincode_cells(table, cells).await;
  260. result.map_err(to_backoff_err)
  261. })
  262. .await
  263. }
  264. pub async fn delete_rows_with_retry(&self, table: &str, row_keys: &[RowKey]) -> Result<()> {
  265. retry(ExponentialBackoff::default(), || async {
  266. let mut client = self.client();
  267. Ok(client.delete_rows(table, row_keys).await?)
  268. })
  269. .await
  270. }
  271. pub async fn get_bincode_cells_with_retry<T>(
  272. &self,
  273. table: &str,
  274. row_keys: &[RowKey],
  275. ) -> Result<Vec<(RowKey, Result<T>)>>
  276. where
  277. T: serde::de::DeserializeOwned,
  278. {
  279. retry(ExponentialBackoff::default(), || async {
  280. let mut client = self.client();
  281. Ok(client.get_bincode_cells(table, row_keys).await?)
  282. })
  283. .await
  284. }
  285. pub async fn put_protobuf_cells_with_retry<T>(
  286. &self,
  287. table: &str,
  288. cells: &[(RowKey, T)],
  289. ) -> Result<usize>
  290. where
  291. T: prost::Message,
  292. {
  293. retry(ExponentialBackoff::default(), || async {
  294. let mut client = self.client();
  295. let result = client.put_protobuf_cells(table, cells).await;
  296. result.map_err(to_backoff_err)
  297. })
  298. .await
  299. }
  300. }
  301. pub struct BigTable<F: FnMut(Request<()>) -> InterceptedRequestResult> {
  302. access_token: Option<AccessToken>,
  303. client: bigtable_client::BigtableClient<InterceptedService<tonic::transport::Channel, F>>,
  304. table_prefix: String,
  305. app_profile_id: String,
  306. timeout: Option<Duration>,
  307. }
  308. impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
  309. async fn decode_read_rows_response(
  310. &self,
  311. mut rrr: tonic::codec::Streaming<ReadRowsResponse>,
  312. ) -> Result<Vec<(RowKey, RowData)>> {
  313. let mut rows: Vec<(RowKey, RowData)> = vec![];
  314. let mut row_key = None;
  315. let mut row_data = vec![];
  316. let mut cell_name = None;
  317. let mut cell_timestamp = 0;
  318. let mut cell_value = vec![];
  319. let mut cell_version_ok = true;
  320. let started = Instant::now();
  321. while let Some(res) = rrr.message().await? {
  322. if let Some(timeout) = self.timeout {
  323. if Instant::now().duration_since(started) > timeout {
  324. return Err(Error::Timeout);
  325. }
  326. }
  327. for (i, mut chunk) in res.chunks.into_iter().enumerate() {
  328. // The comments for `read_rows_response::CellChunk` provide essential details for
  329. // understanding how the below decoding works...
  330. trace!("chunk {i}: {chunk:?}");
  331. // Starting a new row?
  332. if !chunk.row_key.is_empty() {
  333. row_key = String::from_utf8(chunk.row_key).ok(); // Require UTF-8 for row keys
  334. }
  335. // Starting a new cell?
  336. if let Some(qualifier) = chunk.qualifier {
  337. if let Some(cell_name) = cell_name {
  338. row_data.push((cell_name, cell_value));
  339. cell_value = vec![];
  340. }
  341. cell_name = String::from_utf8(qualifier).ok(); // Require UTF-8 for cell names
  342. cell_timestamp = chunk.timestamp_micros;
  343. cell_version_ok = true;
  344. } else {
  345. // Continuing the existing cell. Check if this is the start of another version of the cell
  346. if chunk.timestamp_micros != 0 {
  347. if chunk.timestamp_micros < cell_timestamp {
  348. cell_version_ok = false; // ignore older versions of the cell
  349. } else {
  350. // newer version of the cell, remove the older cell
  351. cell_version_ok = true;
  352. cell_value = vec![];
  353. cell_timestamp = chunk.timestamp_micros;
  354. }
  355. }
  356. }
  357. if cell_version_ok {
  358. cell_value.append(&mut chunk.value);
  359. }
  360. // End of a row?
  361. if chunk.row_status.is_some() {
  362. if let Some(read_rows_response::cell_chunk::RowStatus::CommitRow(_)) =
  363. chunk.row_status
  364. {
  365. if let Some(cell_name) = cell_name {
  366. row_data.push((cell_name, cell_value));
  367. }
  368. if let Some(row_key) = row_key {
  369. rows.push((row_key, row_data))
  370. }
  371. }
  372. row_key = None;
  373. row_data = vec![];
  374. cell_value = vec![];
  375. cell_name = None;
  376. }
  377. }
  378. }
  379. Ok(rows)
  380. }
  381. fn refresh_access_token(&self) {
  382. if let Some(ref access_token) = self.access_token {
  383. access_token.refresh();
  384. }
  385. }
  386. /// Get `table` row keys in lexical order.
  387. ///
  388. /// If `start_at` is provided, the row key listing will start with key.
  389. /// Otherwise the listing will start from the start of the table.
  390. ///
  391. /// If `end_at` is provided, the row key listing will end at the key. Otherwise it will
  392. /// continue until the `rows_limit` is reached or the end of the table, whichever comes first.
  393. /// If `rows_limit` is zero, this method will return an empty array.
  394. pub async fn get_row_keys(
  395. &mut self,
  396. table_name: &str,
  397. start_at: Option<RowKey>,
  398. end_at: Option<RowKey>,
  399. rows_limit: i64,
  400. ) -> Result<Vec<RowKey>> {
  401. if rows_limit == 0 {
  402. return Ok(vec![]);
  403. }
  404. self.refresh_access_token();
  405. let response = self
  406. .read_rows(
  407. table_name,
  408. ReadRowsRequest {
  409. table_name: format!("{}{}", self.table_prefix, table_name),
  410. app_profile_id: self.app_profile_id.clone(),
  411. rows_limit,
  412. rows: Some(RowSet {
  413. row_keys: vec![],
  414. row_ranges: vec![RowRange {
  415. start_key: start_at.map(|row_key| {
  416. row_range::StartKey::StartKeyClosed(row_key.into_bytes())
  417. }),
  418. end_key: end_at.map(|row_key| {
  419. row_range::EndKey::EndKeyClosed(row_key.into_bytes())
  420. }),
  421. }],
  422. }),
  423. filter: Some(RowFilter {
  424. filter: Some(row_filter::Filter::Chain(row_filter::Chain {
  425. filters: vec![
  426. RowFilter {
  427. // Return minimal number of cells
  428. filter: Some(row_filter::Filter::CellsPerRowLimitFilter(1)),
  429. },
  430. RowFilter {
  431. // Only return the latest version of each cell
  432. filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
  433. },
  434. RowFilter {
  435. // Strip the cell values
  436. filter: Some(row_filter::Filter::StripValueTransformer(true)),
  437. },
  438. ],
  439. })),
  440. }),
  441. request_stats_view: 0,
  442. reversed: false,
  443. },
  444. )
  445. .await?
  446. .into_inner();
  447. let rows = self.decode_read_rows_response(response).await?;
  448. Ok(rows.into_iter().map(|r| r.0).collect())
  449. }
  450. /// Check whether a row key exists in a `table`
  451. pub async fn row_key_exists(&mut self, table_name: &str, row_key: RowKey) -> Result<bool> {
  452. self.refresh_access_token();
  453. let response = self
  454. .read_rows(
  455. table_name,
  456. ReadRowsRequest {
  457. table_name: format!("{}{}", self.table_prefix, table_name),
  458. app_profile_id: self.app_profile_id.clone(),
  459. rows_limit: 1,
  460. rows: Some(RowSet {
  461. row_keys: vec![row_key.into_bytes()],
  462. row_ranges: vec![],
  463. }),
  464. filter: Some(RowFilter {
  465. filter: Some(row_filter::Filter::StripValueTransformer(true)),
  466. }),
  467. request_stats_view: 0,
  468. reversed: false,
  469. },
  470. )
  471. .await?
  472. .into_inner();
  473. let rows = self.decode_read_rows_response(response).await?;
  474. Ok(!rows.is_empty())
  475. }
  476. /// Get latest data from `table`.
  477. ///
  478. /// All column families are accepted, and only the latest version of each column cell will be
  479. /// returned.
  480. ///
  481. /// If `start_at` is provided, the row key listing will start with key, or the next key in the
  482. /// table if the explicit key does not exist. Otherwise the listing will start from the start
  483. /// of the table.
  484. ///
  485. /// If `end_at` is provided, the row key listing will end at the key. Otherwise it will
  486. /// continue until the `rows_limit` is reached or the end of the table, whichever comes first.
  487. /// If `rows_limit` is zero, this method will return an empty array.
  488. pub async fn get_row_data(
  489. &mut self,
  490. table_name: &str,
  491. start_at: Option<RowKey>,
  492. end_at: Option<RowKey>,
  493. rows_limit: i64,
  494. ) -> Result<Vec<(RowKey, RowData)>> {
  495. if rows_limit == 0 {
  496. return Ok(vec![]);
  497. }
  498. self.refresh_access_token();
  499. let response = self
  500. .read_rows(
  501. table_name,
  502. ReadRowsRequest {
  503. table_name: format!("{}{}", self.table_prefix, table_name),
  504. app_profile_id: self.app_profile_id.clone(),
  505. rows_limit,
  506. rows: Some(RowSet {
  507. row_keys: vec![],
  508. row_ranges: vec![RowRange {
  509. start_key: start_at.map(|row_key| {
  510. row_range::StartKey::StartKeyClosed(row_key.into_bytes())
  511. }),
  512. end_key: end_at.map(|row_key| {
  513. row_range::EndKey::EndKeyClosed(row_key.into_bytes())
  514. }),
  515. }],
  516. }),
  517. filter: Some(RowFilter {
  518. // Only return the latest version of each cell
  519. filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
  520. }),
  521. request_stats_view: 0,
  522. reversed: false,
  523. },
  524. )
  525. .await?
  526. .into_inner();
  527. self.decode_read_rows_response(response).await
  528. }
  529. /// Get latest data from multiple rows of `table`, if those rows exist.
  530. pub async fn get_multi_row_data(
  531. &mut self,
  532. table_name: &str,
  533. row_keys: &[RowKey],
  534. ) -> Result<Vec<(RowKey, RowData)>> {
  535. self.refresh_access_token();
  536. let response = self
  537. .read_rows(
  538. table_name,
  539. ReadRowsRequest {
  540. table_name: format!("{}{}", self.table_prefix, table_name),
  541. app_profile_id: self.app_profile_id.clone(),
  542. rows_limit: 0, // return all existing rows
  543. rows: Some(RowSet {
  544. row_keys: row_keys
  545. .iter()
  546. .map(|k| k.as_bytes().to_vec())
  547. .collect::<Vec<_>>(),
  548. row_ranges: vec![],
  549. }),
  550. filter: Some(RowFilter {
  551. // Only return the latest version of each cell
  552. filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
  553. }),
  554. request_stats_view: 0,
  555. reversed: false,
  556. },
  557. )
  558. .await?
  559. .into_inner();
  560. self.decode_read_rows_response(response).await
  561. }
  562. /// Get latest data from a single row of `table`, if that row exists. Returns an error if that
  563. /// row does not exist.
  564. ///
  565. /// All column families are accepted, and only the latest version of each column cell will be
  566. /// returned.
  567. pub async fn get_single_row_data(
  568. &mut self,
  569. table_name: &str,
  570. row_key: RowKey,
  571. ) -> Result<RowData> {
  572. self.refresh_access_token();
  573. let response = self
  574. .read_rows(
  575. table_name,
  576. ReadRowsRequest {
  577. table_name: format!("{}{}", self.table_prefix, table_name),
  578. app_profile_id: self.app_profile_id.clone(),
  579. rows_limit: 1,
  580. rows: Some(RowSet {
  581. row_keys: vec![row_key.into_bytes()],
  582. row_ranges: vec![],
  583. }),
  584. filter: Some(RowFilter {
  585. // Only return the latest version of each cell
  586. filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
  587. }),
  588. request_stats_view: 0,
  589. reversed: false,
  590. },
  591. )
  592. .await?
  593. .into_inner();
  594. let rows = self.decode_read_rows_response(response).await?;
  595. rows.into_iter()
  596. .next()
  597. .map(|r| r.1)
  598. .ok_or(Error::RowNotFound)
  599. }
  600. /// Delete one or more `table` rows
  601. async fn delete_rows(&mut self, table_name: &str, row_keys: &[RowKey]) -> Result<()> {
  602. self.refresh_access_token();
  603. let mut entries = vec![];
  604. for row_key in row_keys {
  605. entries.push(mutate_rows_request::Entry {
  606. row_key: row_key.as_bytes().to_vec(),
  607. mutations: vec![Mutation {
  608. mutation: Some(mutation::Mutation::DeleteFromRow(
  609. mutation::DeleteFromRow {},
  610. )),
  611. }],
  612. });
  613. }
  614. let mut response = self
  615. .client
  616. .mutate_rows(MutateRowsRequest {
  617. table_name: format!("{}{}", self.table_prefix, table_name),
  618. app_profile_id: self.app_profile_id.clone(),
  619. entries,
  620. })
  621. .await?
  622. .into_inner();
  623. while let Some(res) = response.message().await? {
  624. for entry in res.entries {
  625. if let Some(status) = entry.status {
  626. if status.code != 0 {
  627. eprintln!("delete_rows error {}: {}", status.code, status.message);
  628. warn!("delete_rows error {}: {}", status.code, status.message);
  629. return Err(Error::RowDeleteFailed);
  630. }
  631. }
  632. }
  633. }
  634. Ok(())
  635. }
  636. /// Store data for one or more `table` rows in the `family_name` Column family
  637. async fn put_row_data(
  638. &mut self,
  639. table_name: &str,
  640. family_name: &str,
  641. row_data: &[(&RowKey, RowData)],
  642. ) -> Result<()> {
  643. self.refresh_access_token();
  644. let mut entries = vec![];
  645. for (row_key, row_data) in row_data {
  646. let mutations = row_data
  647. .iter()
  648. .map(|(column_key, column_value)| Mutation {
  649. mutation: Some(mutation::Mutation::SetCell(mutation::SetCell {
  650. family_name: family_name.to_string(),
  651. column_qualifier: column_key.clone().into_bytes(),
  652. timestamp_micros: -1, // server assigned
  653. value: column_value.to_vec(),
  654. })),
  655. })
  656. .collect();
  657. entries.push(mutate_rows_request::Entry {
  658. row_key: (*row_key).clone().into_bytes(),
  659. mutations,
  660. });
  661. }
  662. let mut response = self
  663. .client
  664. .mutate_rows(MutateRowsRequest {
  665. table_name: format!("{}{}", self.table_prefix, table_name),
  666. app_profile_id: self.app_profile_id.clone(),
  667. entries,
  668. })
  669. .await?
  670. .into_inner();
  671. while let Some(res) = response.message().await? {
  672. for entry in res.entries {
  673. if let Some(status) = entry.status {
  674. if status.code != 0 {
  675. eprintln!("put_row_data error {}: {}", status.code, status.message);
  676. warn!("put_row_data error {}: {}", status.code, status.message);
  677. return Err(Error::RowWriteFailed);
  678. }
  679. }
  680. }
  681. }
  682. Ok(())
  683. }
  684. pub async fn get_bincode_cell<T>(&mut self, table: &str, key: RowKey) -> Result<T>
  685. where
  686. T: serde::de::DeserializeOwned,
  687. {
  688. let row_data = self.get_single_row_data(table, key.clone()).await?;
  689. deserialize_bincode_cell_data(&row_data, table, key.to_string())
  690. }
  691. pub async fn get_bincode_cells<T>(
  692. &mut self,
  693. table: &str,
  694. keys: &[RowKey],
  695. ) -> Result<Vec<(RowKey, Result<T>)>>
  696. where
  697. T: serde::de::DeserializeOwned,
  698. {
  699. Ok(self
  700. .get_multi_row_data(table, keys)
  701. .await?
  702. .into_iter()
  703. .map(|(key, row_data)| {
  704. let key_str = key.to_string();
  705. (
  706. key,
  707. deserialize_bincode_cell_data(&row_data, table, key_str),
  708. )
  709. })
  710. .collect())
  711. }
  712. pub async fn get_protobuf_cell<P>(&mut self, table: &str, key: RowKey) -> Result<P>
  713. where
  714. P: prost::Message + Default,
  715. {
  716. let row_data = self.get_single_row_data(table, key.clone()).await?;
  717. deserialize_protobuf_cell_data(&row_data, table, key.to_string())
  718. }
  719. pub async fn get_protobuf_or_bincode_cell<B, P>(
  720. &mut self,
  721. table: &str,
  722. key: RowKey,
  723. ) -> Result<CellData<B, P>>
  724. where
  725. B: serde::de::DeserializeOwned,
  726. P: prost::Message + Default,
  727. {
  728. let row_data = self.get_single_row_data(table, key.clone()).await?;
  729. deserialize_protobuf_or_bincode_cell_data(&row_data, table, key)
  730. }
  731. pub async fn get_protobuf_or_bincode_cells<'a, B, P>(
  732. &mut self,
  733. table: &'a str,
  734. row_keys: impl IntoIterator<Item = RowKey>,
  735. ) -> Result<impl Iterator<Item = (RowKey, CellData<B, P>)> + 'a>
  736. where
  737. B: serde::de::DeserializeOwned,
  738. P: prost::Message + Default,
  739. {
  740. Ok(self
  741. .get_multi_row_data(
  742. table,
  743. row_keys.into_iter().collect::<Vec<RowKey>>().as_slice(),
  744. )
  745. .await?
  746. .into_iter()
  747. .map(|(key, row_data)| {
  748. let key_str = key.to_string();
  749. (
  750. key,
  751. deserialize_protobuf_or_bincode_cell_data(&row_data, table, key_str).unwrap(),
  752. )
  753. }))
  754. }
  755. pub async fn put_bincode_cells<T>(
  756. &mut self,
  757. table: &str,
  758. cells: &[(RowKey, T)],
  759. ) -> Result<usize>
  760. where
  761. T: serde::ser::Serialize,
  762. {
  763. let mut bytes_written = 0;
  764. let mut new_row_data = vec![];
  765. for (row_key, data) in cells {
  766. let data = compress_best(&bincode::serialize(&data).unwrap())?;
  767. bytes_written += data.len();
  768. new_row_data.push((row_key, vec![("bin".to_string(), data)]));
  769. }
  770. self.put_row_data(table, "x", &new_row_data).await?;
  771. Ok(bytes_written)
  772. }
  773. pub async fn put_protobuf_cells<T>(
  774. &mut self,
  775. table: &str,
  776. cells: &[(RowKey, T)],
  777. ) -> Result<usize>
  778. where
  779. T: prost::Message,
  780. {
  781. let mut bytes_written = 0;
  782. let mut new_row_data = vec![];
  783. for (row_key, data) in cells {
  784. let mut buf = Vec::with_capacity(data.encoded_len());
  785. data.encode(&mut buf).unwrap();
  786. let data = compress_best(&buf)?;
  787. bytes_written += data.len();
  788. new_row_data.push((row_key, vec![("proto".to_string(), data)]));
  789. }
  790. self.put_row_data(table, "x", &new_row_data).await?;
  791. Ok(bytes_written)
  792. }
  793. async fn read_rows(
  794. &mut self,
  795. table_name: &str,
  796. request: ReadRowsRequest,
  797. ) -> Result<tonic::Response<tonic::Streaming<ReadRowsResponse>>> {
  798. let datapoint_bigtable = if table_name == "blocks" {
  799. "bigtable_blocks"
  800. } else if table_name == "tx" {
  801. "bigtable_tx"
  802. } else if table_name == "tx-by-addr" {
  803. "bigtable_tx-by-addr"
  804. } else if table_name == "entries" {
  805. "bigtable_entries"
  806. } else {
  807. "bigtable_unknown"
  808. };
  809. datapoint_info!(datapoint_bigtable, ("read_rows", 1, i64));
  810. tokio::time::timeout(
  811. self.timeout.unwrap_or(Duration::from_secs(30)),
  812. self.client.read_rows(request),
  813. )
  814. .await
  815. .map_err(|_| {
  816. datapoint_error!(datapoint_bigtable, ("timeout", 1, i64));
  817. Error::Timeout
  818. })?
  819. .map_err(Error::from)
  820. }
  821. }
  822. pub(crate) fn deserialize_protobuf_or_bincode_cell_data<B, P>(
  823. row_data: RowDataSlice,
  824. table: &str,
  825. key: RowKey,
  826. ) -> Result<CellData<B, P>>
  827. where
  828. B: serde::de::DeserializeOwned,
  829. P: prost::Message + Default,
  830. {
  831. match deserialize_protobuf_cell_data(row_data, table, key.to_string()) {
  832. Ok(result) => return Ok(CellData::Protobuf(result)),
  833. Err(err) => match err {
  834. Error::ObjectNotFound(_) => {}
  835. _ => return Err(err),
  836. },
  837. }
  838. deserialize_bincode_cell_data(row_data, table, key).map(CellData::Bincode)
  839. }
  840. pub(crate) fn deserialize_protobuf_cell_data<T>(
  841. row_data: RowDataSlice,
  842. table: &str,
  843. key: RowKey,
  844. ) -> Result<T>
  845. where
  846. T: prost::Message + Default,
  847. {
  848. let value = &row_data
  849. .iter()
  850. .find(|(name, _)| name == "proto")
  851. .ok_or_else(|| Error::ObjectNotFound(format!("{table}/{key}")))?
  852. .1;
  853. let data = decompress(value)?;
  854. T::decode(&data[..]).map_err(|err| {
  855. warn!("Failed to deserialize {table}/{key}: {err}");
  856. Error::ObjectCorrupt(format!("{table}/{key}"))
  857. })
  858. }
  859. pub(crate) fn deserialize_bincode_cell_data<T>(
  860. row_data: RowDataSlice,
  861. table: &str,
  862. key: RowKey,
  863. ) -> Result<T>
  864. where
  865. T: serde::de::DeserializeOwned,
  866. {
  867. let value = &row_data
  868. .iter()
  869. .find(|(name, _)| name == "bin")
  870. .ok_or_else(|| Error::ObjectNotFound(format!("{table}/{key}")))?
  871. .1;
  872. let data = decompress(value)?;
  873. bincode::deserialize(&data).map_err(|err| {
  874. warn!("Failed to deserialize {table}/{key}: {err}");
  875. Error::ObjectCorrupt(format!("{table}/{key}"))
  876. })
  877. }
  878. #[cfg(test)]
  879. mod tests {
  880. use {
  881. super::*,
  882. crate::StoredConfirmedBlock,
  883. prost::Message,
  884. solana_hash::Hash,
  885. solana_keypair::Keypair,
  886. solana_message::v0::LoadedAddresses,
  887. solana_storage_proto::convert::generated,
  888. solana_system_transaction as system_transaction,
  889. solana_transaction::versioned::VersionedTransaction,
  890. solana_transaction_context::TransactionReturnData,
  891. solana_transaction_status::{
  892. ConfirmedBlock, TransactionStatusMeta, TransactionWithStatusMeta,
  893. VersionedTransactionWithStatusMeta,
  894. },
  895. std::convert::TryInto,
  896. };
  897. fn confirmed_block_into_protobuf(confirmed_block: ConfirmedBlock) -> generated::ConfirmedBlock {
  898. let ConfirmedBlock {
  899. previous_blockhash,
  900. blockhash,
  901. parent_slot,
  902. transactions,
  903. rewards,
  904. num_partitions,
  905. block_time,
  906. block_height,
  907. } = confirmed_block;
  908. generated::ConfirmedBlock {
  909. previous_blockhash,
  910. blockhash,
  911. parent_slot,
  912. transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
  913. rewards: rewards.into_iter().map(|r| r.into()).collect(),
  914. num_partitions: num_partitions
  915. .map(|num_partitions| generated::NumPartitions { num_partitions }),
  916. block_time: block_time.map(|timestamp| generated::UnixTimestamp { timestamp }),
  917. block_height: block_height.map(|block_height| generated::BlockHeight { block_height }),
  918. }
  919. }
  920. #[test]
  921. fn test_deserialize_protobuf_or_bincode_cell_data() {
  922. let from = Keypair::new();
  923. let recipient = solana_pubkey::new_rand();
  924. let transaction = system_transaction::transfer(&from, &recipient, 42, Hash::default());
  925. let with_meta = TransactionWithStatusMeta::Complete(VersionedTransactionWithStatusMeta {
  926. transaction: VersionedTransaction::from(transaction),
  927. meta: TransactionStatusMeta {
  928. status: Ok(()),
  929. fee: 1,
  930. pre_balances: vec![43, 0, 1],
  931. post_balances: vec![0, 42, 1],
  932. inner_instructions: Some(vec![]),
  933. log_messages: Some(vec![]),
  934. pre_token_balances: Some(vec![]),
  935. post_token_balances: Some(vec![]),
  936. rewards: Some(vec![]),
  937. loaded_addresses: LoadedAddresses::default(),
  938. return_data: Some(TransactionReturnData::default()),
  939. compute_units_consumed: Some(1234),
  940. cost_units: Some(5678),
  941. },
  942. });
  943. let expected_block = ConfirmedBlock {
  944. transactions: vec![with_meta],
  945. parent_slot: 1,
  946. blockhash: Hash::default().to_string(),
  947. previous_blockhash: Hash::default().to_string(),
  948. rewards: vec![],
  949. num_partitions: None,
  950. block_time: Some(1_234_567_890),
  951. block_height: Some(1),
  952. };
  953. let bincode_block = compress_best(
  954. &bincode::serialize::<StoredConfirmedBlock>(&expected_block.clone().into()).unwrap(),
  955. )
  956. .unwrap();
  957. let protobuf_block = confirmed_block_into_protobuf(expected_block.clone());
  958. let mut buf = Vec::with_capacity(protobuf_block.encoded_len());
  959. protobuf_block.encode(&mut buf).unwrap();
  960. let protobuf_block = compress_best(&buf).unwrap();
  961. let deserialized = deserialize_protobuf_or_bincode_cell_data::<
  962. StoredConfirmedBlock,
  963. generated::ConfirmedBlock,
  964. >(
  965. &[("proto".to_string(), protobuf_block.clone())],
  966. "",
  967. "".to_string(),
  968. )
  969. .unwrap();
  970. if let CellData::Protobuf(protobuf_block) = deserialized {
  971. assert_eq!(expected_block, protobuf_block.try_into().unwrap());
  972. } else {
  973. panic!("deserialization should produce CellData::Protobuf");
  974. }
  975. let deserialized = deserialize_protobuf_or_bincode_cell_data::<
  976. StoredConfirmedBlock,
  977. generated::ConfirmedBlock,
  978. >(
  979. &[("bin".to_string(), bincode_block.clone())],
  980. "",
  981. "".to_string(),
  982. )
  983. .unwrap();
  984. if let CellData::Bincode(bincode_block) = deserialized {
  985. let mut block = expected_block;
  986. if let TransactionWithStatusMeta::Complete(VersionedTransactionWithStatusMeta {
  987. meta,
  988. ..
  989. }) = &mut block.transactions[0]
  990. {
  991. meta.inner_instructions = None; // Legacy bincode implementation does not support inner_instructions
  992. meta.log_messages = None; // Legacy bincode implementation does not support log_messages
  993. meta.pre_token_balances = None; // Legacy bincode implementation does not support token balances
  994. meta.post_token_balances = None; // Legacy bincode implementation does not support token balances
  995. meta.rewards = None; // Legacy bincode implementation does not support rewards
  996. meta.return_data = None; // Legacy bincode implementation does not support return data
  997. meta.compute_units_consumed = None; // Legacy bincode implementation does not support CU consumed
  998. meta.cost_units = None; // Legacy bincode implementation does not support CU
  999. }
  1000. assert_eq!(block, bincode_block.into());
  1001. } else {
  1002. panic!("deserialization should produce CellData::Bincode");
  1003. }
  1004. let result = deserialize_protobuf_or_bincode_cell_data::<
  1005. StoredConfirmedBlock,
  1006. generated::ConfirmedBlock,
  1007. >(&[("proto".to_string(), bincode_block)], "", "".to_string());
  1008. assert!(result.is_err());
  1009. let result = deserialize_protobuf_or_bincode_cell_data::<
  1010. StoredConfirmedBlock,
  1011. generated::ConfirmedBlock,
  1012. >(
  1013. &[("proto".to_string(), vec![1, 2, 3, 4])],
  1014. "",
  1015. "".to_string(),
  1016. );
  1017. assert!(result.is_err());
  1018. let result = deserialize_protobuf_or_bincode_cell_data::<
  1019. StoredConfirmedBlock,
  1020. generated::ConfirmedBlock,
  1021. >(&[("bin".to_string(), protobuf_block)], "", "".to_string());
  1022. assert!(result.is_err());
  1023. let result = deserialize_protobuf_or_bincode_cell_data::<
  1024. StoredConfirmedBlock,
  1025. generated::ConfirmedBlock,
  1026. >(&[("bin".to_string(), vec![1, 2, 3, 4])], "", "".to_string());
  1027. assert!(result.is_err());
  1028. }
  1029. }