|
|
@@ -1,7 +1,7 @@
|
|
|
use {
|
|
|
crate::api::{ChainId, NetworkId, StateTag},
|
|
|
anyhow::Result,
|
|
|
- chrono::{DateTime, NaiveDateTime},
|
|
|
+ chrono::DateTime,
|
|
|
ethers::{
|
|
|
core::utils::hex::ToHex,
|
|
|
prelude::TxHash,
|
|
|
@@ -10,13 +10,16 @@ use {
|
|
|
},
|
|
|
serde::Serialize,
|
|
|
serde_with::serde_as,
|
|
|
- sqlx::{migrate, FromRow, Pool, QueryBuilder, Sqlite, SqlitePool},
|
|
|
+ sqlx::{any::AnyPoolOptions, migrate, AnyPool, FromRow},
|
|
|
std::{str::FromStr, sync::Arc},
|
|
|
tokio::{spawn, sync::mpsc},
|
|
|
utoipa::ToSchema,
|
|
|
};
|
|
|
|
|
|
const LOG_RETURN_LIMIT: u64 = 1000;
|
|
|
+const ONE_DAY: u64 = 60 * 60 * 24;
|
|
|
+const ONE_HOUR: u64 = 60 * 60;
|
|
|
+const DEFAULT_DATABASE_URL: &str = "sqlite:fortuna.db?mode=rwc";
|
|
|
|
|
|
#[serde_as]
|
|
|
#[derive(Clone, Debug, Serialize, ToSchema, PartialEq)]
|
|
|
@@ -105,8 +108,8 @@ struct RequestRow {
|
|
|
network_id: i64,
|
|
|
provider: String,
|
|
|
sequence: i64,
|
|
|
- created_at: NaiveDateTime,
|
|
|
- last_updated_at: NaiveDateTime,
|
|
|
+ created_at: i64, // Unix timestamp
|
|
|
+ last_updated_at: i64, // Unix timestamp
|
|
|
state: String,
|
|
|
request_block_number: i64,
|
|
|
request_tx_hash: String,
|
|
|
@@ -128,8 +131,10 @@ impl TryFrom<RequestRow> for RequestStatus {
|
|
|
let network_id = row.network_id as u64;
|
|
|
let provider = row.provider.parse()?;
|
|
|
let sequence = row.sequence as u64;
|
|
|
- let created_at = row.created_at.and_utc();
|
|
|
- let last_updated_at = row.last_updated_at.and_utc();
|
|
|
+ let created_at = DateTime::from_timestamp(row.created_at, 0)
|
|
|
+ .ok_or(anyhow::anyhow!("Invalid created_at timestamp"))?;
|
|
|
+ let last_updated_at = DateTime::from_timestamp(row.last_updated_at, 0)
|
|
|
+ .ok_or(anyhow::anyhow!("Invalid last_updated_at timestamp"))?;
|
|
|
let request_block_number = row.request_block_number as u64;
|
|
|
let user_random_number = hex::FromHex::from_hex(row.user_random_number)?;
|
|
|
let request_tx_hash = row.request_tx_hash.parse()?;
|
|
|
@@ -211,7 +216,7 @@ impl From<RequestRow> for Option<RequestStatus> {
|
|
|
}
|
|
|
|
|
|
pub struct History {
|
|
|
- pool: Pool<Sqlite>,
|
|
|
+ pool: AnyPool,
|
|
|
write_queue: mpsc::Sender<RequestStatus>,
|
|
|
_writer_thread: Arc<tokio::task::JoinHandle<()>>,
|
|
|
}
|
|
|
@@ -219,20 +224,46 @@ pub struct History {
|
|
|
impl History {
|
|
|
const MAX_WRITE_QUEUE: usize = 1_000;
|
|
|
pub async fn new() -> Result<Self> {
|
|
|
- Self::new_with_url("sqlite:fortuna.db?mode=rwc").await
|
|
|
+ let database_url =
|
|
|
+ std::env::var("DATABASE_URL").unwrap_or_else(|_| DEFAULT_DATABASE_URL.to_string());
|
|
|
+ Self::new_with_url(&database_url).await
|
|
|
}
|
|
|
|
|
|
+ /// Create a History instance with an ephemeral in-memory DB.
|
|
|
+ /// Useful for testing.
|
|
|
pub async fn new_in_memory() -> Result<Self> {
|
|
|
- Self::new_with_url("sqlite::memory:").await
|
|
|
+ sqlx::any::install_default_drivers();
|
|
|
+ // Connect to an in-memory SQLite database
|
|
|
+ // Don't let the pool drop the cxn, otherwise the database will be deleted
|
|
|
+ let pool = AnyPoolOptions::new()
|
|
|
+ .min_connections(1)
|
|
|
+ .max_connections(1)
|
|
|
+ .idle_timeout(None)
|
|
|
+ .max_lifetime(None)
|
|
|
+ .connect("sqlite::memory:")
|
|
|
+ .await?;
|
|
|
+ let migrator = migrate!(); // defaults to "./migrations"
|
|
|
+ migrator.run(&pool).await?;
|
|
|
+ Self::new_with_pool(pool).await
|
|
|
}
|
|
|
|
|
|
+ /// Create a History instance with production DB parameters
|
|
|
pub async fn new_with_url(url: &str) -> Result<Self> {
|
|
|
- let pool = SqlitePool::connect(url).await?;
|
|
|
+ sqlx::any::install_default_drivers();
|
|
|
+ let pool = AnyPoolOptions::new()
|
|
|
+ .min_connections(0)
|
|
|
+ .max_connections(10)
|
|
|
+ // Allow the cloud DB to spin down after 1 hour of inactivity (cost savings)
|
|
|
+ .idle_timeout(std::time::Duration::from_secs(ONE_HOUR))
|
|
|
+ // Retire the connection after 1 day to avoid memory leaks in the DB
|
|
|
+ .max_lifetime(std::time::Duration::from_secs(ONE_DAY))
|
|
|
+ .connect(url)
|
|
|
+ .await?;
|
|
|
let migrator = migrate!("./migrations");
|
|
|
migrator.run(&pool).await?;
|
|
|
Self::new_with_pool(pool).await
|
|
|
}
|
|
|
- pub async fn new_with_pool(pool: Pool<Sqlite>) -> Result<Self> {
|
|
|
+ pub async fn new_with_pool(pool: AnyPool) -> Result<Self> {
|
|
|
let (sender, mut receiver) = mpsc::channel(Self::MAX_WRITE_QUEUE);
|
|
|
let pool_write_connection = pool.clone();
|
|
|
let writer_thread = spawn(async move {
|
|
|
@@ -247,7 +278,7 @@ impl History {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
- async fn update_request_status(pool: &Pool<Sqlite>, new_status: RequestStatus) {
|
|
|
+ async fn update_request_status(pool: &AnyPool, new_status: RequestStatus) {
|
|
|
let sequence = new_status.sequence as i64;
|
|
|
let chain_id = new_status.chain_id;
|
|
|
let network_id = new_status.network_id as i64;
|
|
|
@@ -259,13 +290,13 @@ impl History {
|
|
|
let block_number = new_status.request_block_number as i64;
|
|
|
let sender: String = new_status.sender.encode_hex();
|
|
|
let user_random_number: String = new_status.user_random_number.encode_hex();
|
|
|
- sqlx::query("INSERT INTO request(chain_id, network_id, provider, sequence, created_at, last_updated_at, state, request_block_number, request_tx_hash, user_random_number, sender, gas_limit) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
|
|
|
+ sqlx::query("INSERT INTO request(chain_id, network_id, provider, sequence, created_at, last_updated_at, state, request_block_number, request_tx_hash, user_random_number, sender, gas_limit) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)")
|
|
|
.bind(chain_id.clone())
|
|
|
.bind(network_id)
|
|
|
.bind(provider.clone())
|
|
|
.bind(sequence)
|
|
|
- .bind(new_status.created_at)
|
|
|
- .bind(new_status.last_updated_at)
|
|
|
+ .bind(new_status.created_at.timestamp())
|
|
|
+ .bind(new_status.last_updated_at.timestamp())
|
|
|
.bind("Pending")
|
|
|
.bind(block_number)
|
|
|
.bind(request_tx_hash.clone())
|
|
|
@@ -286,9 +317,9 @@ impl History {
|
|
|
let reveal_tx_hash: String = reveal_tx_hash.encode_hex();
|
|
|
let provider_random_number: String = provider_random_number.encode_hex();
|
|
|
let gas_used: String = gas_used.to_string();
|
|
|
- let result = sqlx::query("UPDATE request SET state = ?, last_updated_at = ?, reveal_block_number = ?, reveal_tx_hash = ?, provider_random_number =?, gas_used = ? WHERE network_id = ? AND sequence = ? AND provider = ? AND request_tx_hash = ?")
|
|
|
+ let result = sqlx::query("UPDATE request SET state = $1, last_updated_at = $2, reveal_block_number = $3, reveal_tx_hash = $4, provider_random_number = $5, gas_used = $6 WHERE network_id = $7 AND sequence = $8 AND provider = $9 AND request_tx_hash = $10")
|
|
|
.bind("Completed")
|
|
|
- .bind(new_status.last_updated_at)
|
|
|
+ .bind(new_status.last_updated_at.timestamp())
|
|
|
.bind(reveal_block_number)
|
|
|
.bind(reveal_tx_hash)
|
|
|
.bind(provider_random_number)
|
|
|
@@ -312,9 +343,9 @@ impl History {
|
|
|
} => {
|
|
|
let provider_random_number: Option<String> = provider_random_number
|
|
|
.map(|provider_random_number| provider_random_number.encode_hex());
|
|
|
- sqlx::query("UPDATE request SET state = ?, last_updated_at = ?, info = ?, provider_random_number = ? WHERE network_id = ? AND sequence = ? AND provider = ? AND request_tx_hash = ? AND state = 'Pending'")
|
|
|
+ sqlx::query("UPDATE request SET state = $1, last_updated_at = $2, info = $3, provider_random_number = $4 WHERE network_id = $5 AND sequence = $6 AND provider = $7 AND request_tx_hash = $8 AND state = 'Pending'")
|
|
|
.bind("Failed")
|
|
|
- .bind(new_status.last_updated_at)
|
|
|
+ .bind(new_status.last_updated_at.timestamp())
|
|
|
.bind(reason)
|
|
|
.bind(provider_random_number)
|
|
|
.bind(network_id)
|
|
|
@@ -343,7 +374,7 @@ impl History {
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
|
|
pub struct RequestQueryBuilder<'a> {
|
|
|
- pool: &'a Pool<Sqlite>,
|
|
|
+ pool: &'a AnyPool,
|
|
|
pub search: Option<SearchField>,
|
|
|
pub network_id: Option<i64>,
|
|
|
pub state: Option<StateTag>,
|
|
|
@@ -354,7 +385,7 @@ pub struct RequestQueryBuilder<'a> {
|
|
|
}
|
|
|
|
|
|
impl<'a> RequestQueryBuilder<'a> {
|
|
|
- fn new(pool: &'a Pool<Sqlite>) -> Self {
|
|
|
+ fn new(pool: &'a AnyPool) -> Self {
|
|
|
Self {
|
|
|
pool,
|
|
|
search: None,
|
|
|
@@ -426,14 +457,76 @@ impl<'a> RequestQueryBuilder<'a> {
|
|
|
}
|
|
|
|
|
|
pub async fn execute(&self) -> Result<Vec<RequestStatus>> {
|
|
|
- let mut query_builder = self.build_query("*");
|
|
|
- query_builder.push(" LIMIT ");
|
|
|
- query_builder.push_bind(self.limit);
|
|
|
- query_builder.push(" OFFSET ");
|
|
|
- query_builder.push_bind(self.offset);
|
|
|
+ let mut sql = "SELECT * FROM request WHERE created_at BETWEEN $1 AND $2".to_string();
|
|
|
+ let mut param_count = 2;
|
|
|
+
|
|
|
+ // Build the SQL string with parameter placeholders
|
|
|
+ match &self.search {
|
|
|
+ Some(SearchField::TxHash(_)) => {
|
|
|
+ param_count += 1;
|
|
|
+ sql.push_str(&format!(" AND (request_tx_hash = ${param_count}"));
|
|
|
+ param_count += 1;
|
|
|
+ sql.push_str(&format!(" OR reveal_tx_hash = ${param_count})"));
|
|
|
+ }
|
|
|
+ Some(SearchField::Sender(_)) => {
|
|
|
+ param_count += 1;
|
|
|
+ sql.push_str(&format!(" AND sender = ${param_count}"));
|
|
|
+ }
|
|
|
+ Some(SearchField::SequenceNumber(_)) => {
|
|
|
+ param_count += 1;
|
|
|
+ sql.push_str(&format!(" AND sequence = ${param_count}"));
|
|
|
+ }
|
|
|
+ None => (),
|
|
|
+ }
|
|
|
+
|
|
|
+ if self.network_id.is_some() {
|
|
|
+ param_count += 1;
|
|
|
+ sql.push_str(&format!(" AND network_id = ${param_count}"));
|
|
|
+ }
|
|
|
+
|
|
|
+ if self.state.is_some() {
|
|
|
+ param_count += 1;
|
|
|
+ sql.push_str(&format!(" AND state = ${param_count}"));
|
|
|
+ }
|
|
|
+
|
|
|
+ sql.push_str(" ORDER BY created_at DESC");
|
|
|
+
|
|
|
+ param_count += 1;
|
|
|
+ sql.push_str(&format!(" LIMIT ${param_count}"));
|
|
|
+ param_count += 1;
|
|
|
+ sql.push_str(&format!(" OFFSET ${param_count}"));
|
|
|
+
|
|
|
+ // Now bind all parameters in order
|
|
|
+ let mut query = sqlx::query_as::<_, RequestRow>(&sql)
|
|
|
+ .bind(self.min_timestamp.timestamp())
|
|
|
+ .bind(self.max_timestamp.timestamp());
|
|
|
+
|
|
|
+ match &self.search {
|
|
|
+ Some(SearchField::TxHash(tx_hash)) => {
|
|
|
+ let tx_hash: String = tx_hash.encode_hex();
|
|
|
+ query = query.bind(tx_hash.clone()).bind(tx_hash);
|
|
|
+ }
|
|
|
+ Some(SearchField::Sender(sender)) => {
|
|
|
+ let sender: String = sender.encode_hex();
|
|
|
+ query = query.bind(sender);
|
|
|
+ }
|
|
|
+ Some(SearchField::SequenceNumber(sequence_number)) => {
|
|
|
+ query = query.bind(sequence_number);
|
|
|
+ }
|
|
|
+ None => (),
|
|
|
+ }
|
|
|
+
|
|
|
+ if let Some(network_id) = &self.network_id {
|
|
|
+ query = query.bind(network_id);
|
|
|
+ }
|
|
|
+
|
|
|
+ if let Some(state) = &self.state {
|
|
|
+ query = query.bind(state.to_string());
|
|
|
+ }
|
|
|
+
|
|
|
+ query = query.bind(self.limit).bind(self.offset);
|
|
|
|
|
|
- let result: sqlx::Result<Vec<RequestRow>> =
|
|
|
- query_builder.build_query_as().fetch_all(self.pool).await;
|
|
|
+ let result: sqlx::Result<Vec<RequestRow>> = query.fetch_all(self.pool).await;
|
|
|
|
|
|
if let Err(e) = &result {
|
|
|
tracing::error!("Failed to fetch request: {}", e);
|
|
|
@@ -442,55 +535,68 @@ impl<'a> RequestQueryBuilder<'a> {
|
|
|
Ok(result?.into_iter().filter_map(|row| row.into()).collect())
|
|
|
}
|
|
|
|
|
|
- pub async fn count_results(&self) -> Result<u64> {
|
|
|
- self.build_query("COUNT(*) AS count")
|
|
|
- .build_query_scalar::<u64>()
|
|
|
- .fetch_one(self.pool)
|
|
|
- .await
|
|
|
- .map_err(|err| err.into())
|
|
|
- }
|
|
|
+ pub async fn count_results(&self) -> Result<i64> {
|
|
|
+ let mut sql = "SELECT COUNT(*) FROM request WHERE created_at BETWEEN $1 AND $2".to_string();
|
|
|
+ let mut param_count = 2;
|
|
|
+
|
|
|
+ // Build the SQL string with parameter placeholders
|
|
|
+ match &self.search {
|
|
|
+ Some(SearchField::TxHash(_)) => {
|
|
|
+ param_count += 1;
|
|
|
+ sql.push_str(&format!(" AND (request_tx_hash = ${param_count}"));
|
|
|
+ param_count += 1;
|
|
|
+ sql.push_str(&format!(" OR reveal_tx_hash = ${param_count})"));
|
|
|
+ }
|
|
|
+ Some(SearchField::Sender(_)) => {
|
|
|
+ param_count += 1;
|
|
|
+ sql.push_str(&format!(" AND sender = ${param_count}"));
|
|
|
+ }
|
|
|
+ Some(SearchField::SequenceNumber(_)) => {
|
|
|
+ param_count += 1;
|
|
|
+ sql.push_str(&format!(" AND sequence = ${param_count}"));
|
|
|
+ }
|
|
|
+ None => (),
|
|
|
+ }
|
|
|
+
|
|
|
+ if self.network_id.is_some() {
|
|
|
+ param_count += 1;
|
|
|
+ sql.push_str(&format!(" AND network_id = ${param_count}"));
|
|
|
+ }
|
|
|
+
|
|
|
+ if self.state.is_some() {
|
|
|
+ param_count += 1;
|
|
|
+ sql.push_str(&format!(" AND state = ${param_count}"));
|
|
|
+ }
|
|
|
|
|
|
- fn build_query(&self, columns: &str) -> QueryBuilder<Sqlite> {
|
|
|
- let mut query_builder = QueryBuilder::new(format!(
|
|
|
- "SELECT {columns} FROM request WHERE created_at BETWEEN "
|
|
|
- ));
|
|
|
- query_builder.push_bind(self.min_timestamp);
|
|
|
- query_builder.push(" AND ");
|
|
|
- query_builder.push_bind(self.max_timestamp);
|
|
|
+ // Now bind all parameters in order
|
|
|
+ let mut query = sqlx::query_scalar::<_, i64>(&sql)
|
|
|
+ .bind(self.min_timestamp.timestamp())
|
|
|
+ .bind(self.max_timestamp.timestamp());
|
|
|
|
|
|
match &self.search {
|
|
|
Some(SearchField::TxHash(tx_hash)) => {
|
|
|
let tx_hash: String = tx_hash.encode_hex();
|
|
|
- query_builder.push(" AND (request_tx_hash = ");
|
|
|
- query_builder.push_bind(tx_hash.clone());
|
|
|
- query_builder.push(" OR reveal_tx_hash = ");
|
|
|
- query_builder.push_bind(tx_hash);
|
|
|
- query_builder.push(")");
|
|
|
+ query = query.bind(tx_hash.clone()).bind(tx_hash);
|
|
|
}
|
|
|
Some(SearchField::Sender(sender)) => {
|
|
|
let sender: String = sender.encode_hex();
|
|
|
- query_builder.push(" AND sender = ");
|
|
|
- query_builder.push_bind(sender);
|
|
|
+ query = query.bind(sender);
|
|
|
}
|
|
|
Some(SearchField::SequenceNumber(sequence_number)) => {
|
|
|
- query_builder.push(" AND sequence = ");
|
|
|
- query_builder.push_bind(sequence_number);
|
|
|
+ query = query.bind(sequence_number);
|
|
|
}
|
|
|
None => (),
|
|
|
}
|
|
|
|
|
|
if let Some(network_id) = &self.network_id {
|
|
|
- query_builder.push(" AND network_id = ");
|
|
|
- query_builder.push_bind(network_id);
|
|
|
+ query = query.bind(network_id);
|
|
|
}
|
|
|
|
|
|
if let Some(state) = &self.state {
|
|
|
- query_builder.push(" AND state = ");
|
|
|
- query_builder.push_bind(state);
|
|
|
+ query = query.bind(state.to_string());
|
|
|
}
|
|
|
|
|
|
- query_builder.push(" ORDER BY created_at DESC");
|
|
|
- query_builder
|
|
|
+ query.fetch_one(self.pool).await.map_err(|err| err.into())
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -510,7 +616,11 @@ pub enum SearchField {
|
|
|
|
|
|
#[cfg(test)]
|
|
|
mod test {
|
|
|
- use {super::*, chrono::Duration, tokio::time::sleep};
|
|
|
+ use {
|
|
|
+ super::*,
|
|
|
+ chrono::{Duration, Timelike},
|
|
|
+ tokio::time::sleep,
|
|
|
+ };
|
|
|
|
|
|
fn get_random_request_status() -> RequestStatus {
|
|
|
RequestStatus {
|
|
|
@@ -518,8 +628,8 @@ mod test {
|
|
|
network_id: 121,
|
|
|
provider: Address::random(),
|
|
|
sequence: 1,
|
|
|
- created_at: chrono::Utc::now(),
|
|
|
- last_updated_at: chrono::Utc::now(),
|
|
|
+ created_at: chrono::Utc::now().with_nanosecond(0).unwrap(),
|
|
|
+ last_updated_at: chrono::Utc::now().with_nanosecond(0).unwrap(),
|
|
|
request_block_number: 1,
|
|
|
request_tx_hash: TxHash::random(),
|
|
|
user_random_number: [20; 32],
|