Эх сурвалжийг харах

feat(fortuna): Explorer apis (#2649)

A set of APIs to expose recent history of processed events in fortuna
Amin Moghaddam 6 сар өмнө
parent
commit
58a72833f2
34 өөрчлөгдсөн 1968 нэмэгдсэн , 241 устгасан
  1. 1 0
      .github/workflows/ci-fortuna.yml
  2. 6 0
      .pre-commit-config.yaml
  3. 12 0
      apps/fortuna/.sqlx/query-16635b3d9c6f9b743614e0e08bfa2b26d7ec6346f0323d9f16b98c32fd9a91f6.json
  4. 98 0
      apps/fortuna/.sqlx/query-795b81369e5b039cfa38df06bd6c8da8610d84f19a294fb8d3a8370a47a3f241.json
  5. 98 0
      apps/fortuna/.sqlx/query-7d4365a9cb7c9ec16fd4ca60e1d558419954a0326b29180fa9943605813f04e6.json
  6. 98 0
      apps/fortuna/.sqlx/query-8cd10cd5839b81bd9538aeb10fdfd27c6e36baf5d90a4fb9e61718f021812710.json
  7. 98 0
      apps/fortuna/.sqlx/query-905dbc91cd5319537c5c194277d531689ac5c1338396414467496d0f50ddc3f0.json
  8. 12 0
      apps/fortuna/.sqlx/query-9d7448c9bbad50d6242dfc0ba7d5ad4837201a1585bd56cc9a65fe75d0fa5952.json
  9. 98 0
      apps/fortuna/.sqlx/query-a62e094cee65ae58bd12ce7d3e7df44f5aca31520d1ceced83f492945e850764.json
  10. 12 0
      apps/fortuna/.sqlx/query-b2baa9f9d46f873a3a7117c38ecab09f56082c5267dbf5180f39c608b6262f5a.json
  11. 98 0
      apps/fortuna/.sqlx/query-b848d03ffc893e1719d364beb32976ef879e79727c660c973bdad670082f5c36.json
  12. 98 0
      apps/fortuna/.sqlx/query-ba011bb5690ad6821689bec939c5303c8619b6302ef33145db3bf62259492783.json
  13. 331 79
      apps/fortuna/Cargo.lock
  14. 4 1
      apps/fortuna/Cargo.toml
  15. 22 0
      apps/fortuna/README.md
  16. 3 0
      apps/fortuna/check-sqlx.sh
  17. 1 0
      apps/fortuna/migrations/20250502164500_init.down.sql
  18. 25 0
      apps/fortuna/migrations/20250502164500_init.up.sql
  19. 26 4
      apps/fortuna/src/api.rs
  20. 95 0
      apps/fortuna/src/api/explorer.rs
  21. 4 3
      apps/fortuna/src/api/revelation.rs
  22. 18 6
      apps/fortuna/src/chain/ethereum.rs
  23. 19 1
      apps/fortuna/src/chain/reader.rs
  24. 19 2
      apps/fortuna/src/command/run.rs
  25. 1 1
      apps/fortuna/src/eth_utils/nonce_manager.rs
  26. 1 1
      apps/fortuna/src/eth_utils/utils.rs
  27. 549 0
      apps/fortuna/src/history.rs
  28. 23 22
      apps/fortuna/src/keeper.rs
  29. 49 102
      apps/fortuna/src/keeper/block.rs
  30. 4 3
      apps/fortuna/src/keeper/fee.rs
  31. 1 2
      apps/fortuna/src/keeper/keeper_metrics.rs
  32. 42 12
      apps/fortuna/src/keeper/process_event.rs
  33. 1 2
      apps/fortuna/src/keeper/track.rs
  34. 1 0
      apps/fortuna/src/lib.rs

+ 1 - 0
.github/workflows/ci-fortuna.yml

@@ -24,6 +24,7 @@ jobs:
           profile: minimal
           toolchain: 1.82.0
           override: true
+          components: rustfmt, clippy
       - name: Format check
         run: cargo fmt --all -- --check
         if: success() || failure()

+ 6 - 0
.pre-commit-config.yaml

@@ -76,6 +76,12 @@ repos:
         entry: cargo +1.82.0 fmt --manifest-path ./apps/fortuna/Cargo.toml --all
         pass_filenames: false
         files: apps/fortuna
+      - id: cargo-sqlx-fortuna
+        name: Cargo sqlx prepare check for Fortuna
+        language: "script"
+        entry: ./apps/fortuna/check-sqlx.sh
+        pass_filenames: false
+        files: apps/fortuna
       - id: cargo-clippy-fortuna
         name: Cargo clippy for Fortuna
         language: "rust"

+ 12 - 0
apps/fortuna/.sqlx/query-16635b3d9c6f9b743614e0e08bfa2b26d7ec6346f0323d9f16b98c32fd9a91f6.json

@@ -0,0 +1,12 @@
+{
+  "db_name": "SQLite",
+  "query": "INSERT INTO request(chain_id, provider, sequence, created_at, last_updated_at, state, request_block_number, request_tx_hash, user_random_number, sender) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+  "describe": {
+    "columns": [],
+    "parameters": {
+      "Right": 10
+    },
+    "nullable": []
+  },
+  "hash": "16635b3d9c6f9b743614e0e08bfa2b26d7ec6346f0323d9f16b98c32fd9a91f6"
+}

+ 98 - 0
apps/fortuna/.sqlx/query-795b81369e5b039cfa38df06bd6c8da8610d84f19a294fb8d3a8370a47a3f241.json

@@ -0,0 +1,98 @@
+{
+  "db_name": "SQLite",
+  "query": "SELECT * FROM request WHERE sender = ? AND chain_id = ?",
+  "describe": {
+    "columns": [
+      {
+        "name": "chain_id",
+        "ordinal": 0,
+        "type_info": "Text"
+      },
+      {
+        "name": "provider",
+        "ordinal": 1,
+        "type_info": "Text"
+      },
+      {
+        "name": "sequence",
+        "ordinal": 2,
+        "type_info": "Integer"
+      },
+      {
+        "name": "created_at",
+        "ordinal": 3,
+        "type_info": "Datetime"
+      },
+      {
+        "name": "last_updated_at",
+        "ordinal": 4,
+        "type_info": "Datetime"
+      },
+      {
+        "name": "state",
+        "ordinal": 5,
+        "type_info": "Text"
+      },
+      {
+        "name": "request_block_number",
+        "ordinal": 6,
+        "type_info": "Integer"
+      },
+      {
+        "name": "request_tx_hash",
+        "ordinal": 7,
+        "type_info": "Text"
+      },
+      {
+        "name": "user_random_number",
+        "ordinal": 8,
+        "type_info": "Text"
+      },
+      {
+        "name": "sender",
+        "ordinal": 9,
+        "type_info": "Text"
+      },
+      {
+        "name": "reveal_block_number",
+        "ordinal": 10,
+        "type_info": "Integer"
+      },
+      {
+        "name": "reveal_tx_hash",
+        "ordinal": 11,
+        "type_info": "Text"
+      },
+      {
+        "name": "provider_random_number",
+        "ordinal": 12,
+        "type_info": "Text"
+      },
+      {
+        "name": "info",
+        "ordinal": 13,
+        "type_info": "Text"
+      }
+    ],
+    "parameters": {
+      "Right": 2
+    },
+    "nullable": [
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      true,
+      true,
+      true,
+      true
+    ]
+  },
+  "hash": "795b81369e5b039cfa38df06bd6c8da8610d84f19a294fb8d3a8370a47a3f241"
+}

+ 98 - 0
apps/fortuna/.sqlx/query-7d4365a9cb7c9ec16fd4ca60e1d558419954a0326b29180fa9943605813f04e6.json

@@ -0,0 +1,98 @@
+{
+  "db_name": "SQLite",
+  "query": "SELECT * FROM request WHERE sequence = ? AND chain_id = ?",
+  "describe": {
+    "columns": [
+      {
+        "name": "chain_id",
+        "ordinal": 0,
+        "type_info": "Text"
+      },
+      {
+        "name": "provider",
+        "ordinal": 1,
+        "type_info": "Text"
+      },
+      {
+        "name": "sequence",
+        "ordinal": 2,
+        "type_info": "Integer"
+      },
+      {
+        "name": "created_at",
+        "ordinal": 3,
+        "type_info": "Datetime"
+      },
+      {
+        "name": "last_updated_at",
+        "ordinal": 4,
+        "type_info": "Datetime"
+      },
+      {
+        "name": "state",
+        "ordinal": 5,
+        "type_info": "Text"
+      },
+      {
+        "name": "request_block_number",
+        "ordinal": 6,
+        "type_info": "Integer"
+      },
+      {
+        "name": "request_tx_hash",
+        "ordinal": 7,
+        "type_info": "Text"
+      },
+      {
+        "name": "user_random_number",
+        "ordinal": 8,
+        "type_info": "Text"
+      },
+      {
+        "name": "sender",
+        "ordinal": 9,
+        "type_info": "Text"
+      },
+      {
+        "name": "reveal_block_number",
+        "ordinal": 10,
+        "type_info": "Integer"
+      },
+      {
+        "name": "reveal_tx_hash",
+        "ordinal": 11,
+        "type_info": "Text"
+      },
+      {
+        "name": "provider_random_number",
+        "ordinal": 12,
+        "type_info": "Text"
+      },
+      {
+        "name": "info",
+        "ordinal": 13,
+        "type_info": "Text"
+      }
+    ],
+    "parameters": {
+      "Right": 2
+    },
+    "nullable": [
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      true,
+      true,
+      true,
+      true
+    ]
+  },
+  "hash": "7d4365a9cb7c9ec16fd4ca60e1d558419954a0326b29180fa9943605813f04e6"
+}

+ 98 - 0
apps/fortuna/.sqlx/query-8cd10cd5839b81bd9538aeb10fdfd27c6e36baf5d90a4fb9e61718f021812710.json

@@ -0,0 +1,98 @@
+{
+  "db_name": "SQLite",
+  "query": "SELECT * FROM request WHERE sender = ?",
+  "describe": {
+    "columns": [
+      {
+        "name": "chain_id",
+        "ordinal": 0,
+        "type_info": "Text"
+      },
+      {
+        "name": "provider",
+        "ordinal": 1,
+        "type_info": "Text"
+      },
+      {
+        "name": "sequence",
+        "ordinal": 2,
+        "type_info": "Integer"
+      },
+      {
+        "name": "created_at",
+        "ordinal": 3,
+        "type_info": "Datetime"
+      },
+      {
+        "name": "last_updated_at",
+        "ordinal": 4,
+        "type_info": "Datetime"
+      },
+      {
+        "name": "state",
+        "ordinal": 5,
+        "type_info": "Text"
+      },
+      {
+        "name": "request_block_number",
+        "ordinal": 6,
+        "type_info": "Integer"
+      },
+      {
+        "name": "request_tx_hash",
+        "ordinal": 7,
+        "type_info": "Text"
+      },
+      {
+        "name": "user_random_number",
+        "ordinal": 8,
+        "type_info": "Text"
+      },
+      {
+        "name": "sender",
+        "ordinal": 9,
+        "type_info": "Text"
+      },
+      {
+        "name": "reveal_block_number",
+        "ordinal": 10,
+        "type_info": "Integer"
+      },
+      {
+        "name": "reveal_tx_hash",
+        "ordinal": 11,
+        "type_info": "Text"
+      },
+      {
+        "name": "provider_random_number",
+        "ordinal": 12,
+        "type_info": "Text"
+      },
+      {
+        "name": "info",
+        "ordinal": 13,
+        "type_info": "Text"
+      }
+    ],
+    "parameters": {
+      "Right": 1
+    },
+    "nullable": [
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      true,
+      true,
+      true,
+      true
+    ]
+  },
+  "hash": "8cd10cd5839b81bd9538aeb10fdfd27c6e36baf5d90a4fb9e61718f021812710"
+}

+ 98 - 0
apps/fortuna/.sqlx/query-905dbc91cd5319537c5c194277d531689ac5c1338396414467496d0f50ddc3f0.json

@@ -0,0 +1,98 @@
+{
+  "db_name": "SQLite",
+  "query": "SELECT * FROM request WHERE sequence = ?",
+  "describe": {
+    "columns": [
+      {
+        "name": "chain_id",
+        "ordinal": 0,
+        "type_info": "Text"
+      },
+      {
+        "name": "provider",
+        "ordinal": 1,
+        "type_info": "Text"
+      },
+      {
+        "name": "sequence",
+        "ordinal": 2,
+        "type_info": "Integer"
+      },
+      {
+        "name": "created_at",
+        "ordinal": 3,
+        "type_info": "Datetime"
+      },
+      {
+        "name": "last_updated_at",
+        "ordinal": 4,
+        "type_info": "Datetime"
+      },
+      {
+        "name": "state",
+        "ordinal": 5,
+        "type_info": "Text"
+      },
+      {
+        "name": "request_block_number",
+        "ordinal": 6,
+        "type_info": "Integer"
+      },
+      {
+        "name": "request_tx_hash",
+        "ordinal": 7,
+        "type_info": "Text"
+      },
+      {
+        "name": "user_random_number",
+        "ordinal": 8,
+        "type_info": "Text"
+      },
+      {
+        "name": "sender",
+        "ordinal": 9,
+        "type_info": "Text"
+      },
+      {
+        "name": "reveal_block_number",
+        "ordinal": 10,
+        "type_info": "Integer"
+      },
+      {
+        "name": "reveal_tx_hash",
+        "ordinal": 11,
+        "type_info": "Text"
+      },
+      {
+        "name": "provider_random_number",
+        "ordinal": 12,
+        "type_info": "Text"
+      },
+      {
+        "name": "info",
+        "ordinal": 13,
+        "type_info": "Text"
+      }
+    ],
+    "parameters": {
+      "Right": 1
+    },
+    "nullable": [
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      true,
+      true,
+      true,
+      true
+    ]
+  },
+  "hash": "905dbc91cd5319537c5c194277d531689ac5c1338396414467496d0f50ddc3f0"
+}

+ 12 - 0
apps/fortuna/.sqlx/query-9d7448c9bbad50d6242dfc0ba7d5ad4837201a1585bd56cc9a65fe75d0fa5952.json

@@ -0,0 +1,12 @@
+{
+  "db_name": "SQLite",
+  "query": "UPDATE request SET state = ?, last_updated_at = ?, reveal_block_number = ?, reveal_tx_hash = ?, provider_random_number = ? WHERE chain_id = ? AND sequence = ? AND provider = ? AND request_tx_hash = ?",
+  "describe": {
+    "columns": [],
+    "parameters": {
+      "Right": 9
+    },
+    "nullable": []
+  },
+  "hash": "9d7448c9bbad50d6242dfc0ba7d5ad4837201a1585bd56cc9a65fe75d0fa5952"
+}

+ 98 - 0
apps/fortuna/.sqlx/query-a62e094cee65ae58bd12ce7d3e7df44f5aca31520d1ceced83f492945e850764.json

@@ -0,0 +1,98 @@
+{
+  "db_name": "SQLite",
+  "query": "SELECT * FROM request WHERE request_tx_hash = ? OR reveal_tx_hash = ?",
+  "describe": {
+    "columns": [
+      {
+        "name": "chain_id",
+        "ordinal": 0,
+        "type_info": "Text"
+      },
+      {
+        "name": "provider",
+        "ordinal": 1,
+        "type_info": "Text"
+      },
+      {
+        "name": "sequence",
+        "ordinal": 2,
+        "type_info": "Integer"
+      },
+      {
+        "name": "created_at",
+        "ordinal": 3,
+        "type_info": "Datetime"
+      },
+      {
+        "name": "last_updated_at",
+        "ordinal": 4,
+        "type_info": "Datetime"
+      },
+      {
+        "name": "state",
+        "ordinal": 5,
+        "type_info": "Text"
+      },
+      {
+        "name": "request_block_number",
+        "ordinal": 6,
+        "type_info": "Integer"
+      },
+      {
+        "name": "request_tx_hash",
+        "ordinal": 7,
+        "type_info": "Text"
+      },
+      {
+        "name": "user_random_number",
+        "ordinal": 8,
+        "type_info": "Text"
+      },
+      {
+        "name": "sender",
+        "ordinal": 9,
+        "type_info": "Text"
+      },
+      {
+        "name": "reveal_block_number",
+        "ordinal": 10,
+        "type_info": "Integer"
+      },
+      {
+        "name": "reveal_tx_hash",
+        "ordinal": 11,
+        "type_info": "Text"
+      },
+      {
+        "name": "provider_random_number",
+        "ordinal": 12,
+        "type_info": "Text"
+      },
+      {
+        "name": "info",
+        "ordinal": 13,
+        "type_info": "Text"
+      }
+    ],
+    "parameters": {
+      "Right": 2
+    },
+    "nullable": [
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      true,
+      true,
+      true,
+      true
+    ]
+  },
+  "hash": "a62e094cee65ae58bd12ce7d3e7df44f5aca31520d1ceced83f492945e850764"
+}

+ 12 - 0
apps/fortuna/.sqlx/query-b2baa9f9d46f873a3a7117c38ecab09f56082c5267dbf5180f39c608b6262f5a.json

@@ -0,0 +1,12 @@
+{
+  "db_name": "SQLite",
+  "query": "UPDATE request SET state = ?, last_updated_at = ?, info = ? WHERE chain_id = ? AND sequence = ? AND provider = ? AND request_tx_hash = ? AND state = 'Pending'",
+  "describe": {
+    "columns": [],
+    "parameters": {
+      "Right": 7
+    },
+    "nullable": []
+  },
+  "hash": "b2baa9f9d46f873a3a7117c38ecab09f56082c5267dbf5180f39c608b6262f5a"
+}

+ 98 - 0
apps/fortuna/.sqlx/query-b848d03ffc893e1719d364beb32976ef879e79727c660c973bdad670082f5c36.json

@@ -0,0 +1,98 @@
+{
+  "db_name": "SQLite",
+  "query": "SELECT * FROM request WHERE created_at >= ? AND created_at <= ? ORDER BY created_at DESC LIMIT ?",
+  "describe": {
+    "columns": [
+      {
+        "name": "chain_id",
+        "ordinal": 0,
+        "type_info": "Text"
+      },
+      {
+        "name": "provider",
+        "ordinal": 1,
+        "type_info": "Text"
+      },
+      {
+        "name": "sequence",
+        "ordinal": 2,
+        "type_info": "Integer"
+      },
+      {
+        "name": "created_at",
+        "ordinal": 3,
+        "type_info": "Datetime"
+      },
+      {
+        "name": "last_updated_at",
+        "ordinal": 4,
+        "type_info": "Datetime"
+      },
+      {
+        "name": "state",
+        "ordinal": 5,
+        "type_info": "Text"
+      },
+      {
+        "name": "request_block_number",
+        "ordinal": 6,
+        "type_info": "Integer"
+      },
+      {
+        "name": "request_tx_hash",
+        "ordinal": 7,
+        "type_info": "Text"
+      },
+      {
+        "name": "user_random_number",
+        "ordinal": 8,
+        "type_info": "Text"
+      },
+      {
+        "name": "sender",
+        "ordinal": 9,
+        "type_info": "Text"
+      },
+      {
+        "name": "reveal_block_number",
+        "ordinal": 10,
+        "type_info": "Integer"
+      },
+      {
+        "name": "reveal_tx_hash",
+        "ordinal": 11,
+        "type_info": "Text"
+      },
+      {
+        "name": "provider_random_number",
+        "ordinal": 12,
+        "type_info": "Text"
+      },
+      {
+        "name": "info",
+        "ordinal": 13,
+        "type_info": "Text"
+      }
+    ],
+    "parameters": {
+      "Right": 3
+    },
+    "nullable": [
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      true,
+      true,
+      true,
+      true
+    ]
+  },
+  "hash": "b848d03ffc893e1719d364beb32976ef879e79727c660c973bdad670082f5c36"
+}

+ 98 - 0
apps/fortuna/.sqlx/query-ba011bb5690ad6821689bec939c5303c8619b6302ef33145db3bf62259492783.json

@@ -0,0 +1,98 @@
+{
+  "db_name": "SQLite",
+  "query": "SELECT * FROM request WHERE chain_id = ? AND created_at >= ? AND created_at <= ? ORDER BY created_at DESC LIMIT ?",
+  "describe": {
+    "columns": [
+      {
+        "name": "chain_id",
+        "ordinal": 0,
+        "type_info": "Text"
+      },
+      {
+        "name": "provider",
+        "ordinal": 1,
+        "type_info": "Text"
+      },
+      {
+        "name": "sequence",
+        "ordinal": 2,
+        "type_info": "Integer"
+      },
+      {
+        "name": "created_at",
+        "ordinal": 3,
+        "type_info": "Datetime"
+      },
+      {
+        "name": "last_updated_at",
+        "ordinal": 4,
+        "type_info": "Datetime"
+      },
+      {
+        "name": "state",
+        "ordinal": 5,
+        "type_info": "Text"
+      },
+      {
+        "name": "request_block_number",
+        "ordinal": 6,
+        "type_info": "Integer"
+      },
+      {
+        "name": "request_tx_hash",
+        "ordinal": 7,
+        "type_info": "Text"
+      },
+      {
+        "name": "user_random_number",
+        "ordinal": 8,
+        "type_info": "Text"
+      },
+      {
+        "name": "sender",
+        "ordinal": 9,
+        "type_info": "Text"
+      },
+      {
+        "name": "reveal_block_number",
+        "ordinal": 10,
+        "type_info": "Integer"
+      },
+      {
+        "name": "reveal_tx_hash",
+        "ordinal": 11,
+        "type_info": "Text"
+      },
+      {
+        "name": "provider_random_number",
+        "ordinal": 12,
+        "type_info": "Text"
+      },
+      {
+        "name": "info",
+        "ordinal": 13,
+        "type_info": "Text"
+      }
+    ],
+    "parameters": {
+      "Right": 4
+    },
+    "nullable": [
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      false,
+      true,
+      true,
+      true,
+      true
+    ]
+  },
+  "hash": "ba011bb5690ad6821689bec939c5303c8619b6302ef33145db3bf62259492783"
+}

Файлын зөрүү хэтэрхий том тул дарагдсан байна
+ 331 - 79
apps/fortuna/Cargo.lock


+ 4 - 1
apps/fortuna/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "fortuna"
-version = "7.5.3"
+version = "7.6.0"
 edition = "2021"
 
 [lib]
@@ -41,10 +41,13 @@ url = "2.5.0"
 chrono = { version = "0.4.38", features = [
   "clock",
   "std",
+  "serde"
 ], default-features = false }
 backoff = { version = "0.4.0", features = ["futures", "tokio"] }
 thiserror = "1.0.61"
 futures-locks = "0.7.1"
+sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono" ] }
+
 
 
 [dev-dependencies]

+ 22 - 0
apps/fortuna/README.md

@@ -10,9 +10,31 @@ Each blockchain is configured in `config.yaml`.
 
 ## Build & Test
 
+We use sqlx query macros to check the SQL queries at compile time. This requires
+a database to be available at build time. Create a `.env` file in the root of the project with the following content:
+
+```
+DATABASE_URL="sqlite:fortuna.db?mode=rwc"
+```
+
+Next, you need to create the database and apply the schema migrations. You can do this by running:
+
+```bash
+cargo sqlx migrate run # automatically picks up the .env file
+```
+This will create a SQLite database file called `fortuna.db` in the root of the project and apply the schema migrations to it.
+This will allow `cargo check` to check the queries against the existing database.
+
 Fortuna uses Cargo for building and dependency management.
 Simply run `cargo build` and `cargo test` to build and test the project.
 
+If you have changed any queries in the code, you need to update the .sqlx folder with the new queries:
+
+```bash
+cargo sqlx prepare
+```
+Please add the changed files in the `.sqlx` folder to your git commit.
+
 ## Command-Line Interface
 
 The Fortuna binary has a command-line interface to perform useful operations on the contract, such as

+ 3 - 0
apps/fortuna/check-sqlx.sh

@@ -0,0 +1,3 @@
+#!/bin/bash
+cd apps/fortuna || exit 1
+cargo sqlx prepare --check

+ 1 - 0
apps/fortuna/migrations/20250502164500_init.down.sql

@@ -0,0 +1 @@
+DROP TABLE request;

+ 25 - 0
apps/fortuna/migrations/20250502164500_init.up.sql

@@ -0,0 +1,25 @@
+-- we use VARCHAR(40) for addresses and VARCHAR(64) for tx_hashes and 32 byte numbers
+CREATE TABLE request(
+                    chain_id VARCHAR(20) NOT NULL,
+                    provider VARCHAR(40) NOT NULL,
+                    sequence INTEGER NOT NULL,
+                    created_at DATETIME NOT NULL,
+                    last_updated_at DATETIME NOT NULL,
+                    state VARCHAR(10) NOT NULL,
+                    request_block_number INT NOT NULL,
+                    request_tx_hash VARCHAR(64) NOT NULL,
+                    user_random_number VARCHAR(64) NOT NULL,
+                    sender VARCHAR(40) NOT NULL,
+                    reveal_block_number INT,
+                    reveal_tx_hash VARCHAR(64),
+                    provider_random_number VARCHAR(64),
+                    info TEXT,
+                    PRIMARY KEY (chain_id, sequence, provider, request_tx_hash)
+);
+
+CREATE INDEX idx_request_sequence ON request (sequence);
+CREATE INDEX idx_request_chain_id_created_at ON request (chain_id, created_at);
+CREATE INDEX idx_request_created_at ON request (created_at);
+CREATE INDEX idx_request_request_tx_hash ON request (request_tx_hash) WHERE request_tx_hash IS NOT NULL;
+CREATE INDEX idx_request_reveal_tx_hash ON request (reveal_tx_hash) WHERE reveal_tx_hash IS NOT NULL;
+CREATE INDEX idx_request_sender ON request (sender) WHERE sender IS NOT NULL;

+ 26 - 4
apps/fortuna/src/api.rs

@@ -1,6 +1,7 @@
 use {
     crate::{
         chain::reader::{BlockNumber, BlockStatus, EntropyReader},
+        history::History,
         state::HashChainState,
     },
     anyhow::Result,
@@ -21,9 +22,10 @@ use {
     tokio::sync::RwLock,
     url::Url,
 };
-pub use {chain_ids::*, index::*, live::*, metrics::*, ready::*, revelation::*};
+pub use {chain_ids::*, explorer::*, index::*, live::*, metrics::*, ready::*, revelation::*};
 
 mod chain_ids;
+mod explorer;
 mod index;
 mod live;
 mod metrics;
@@ -45,6 +47,8 @@ pub struct ApiMetrics {
 pub struct ApiState {
     pub chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>>,
 
+    pub history: Arc<History>,
+
     pub metrics_registry: Arc<RwLock<Registry>>,
 
     /// Prometheus metrics
@@ -55,6 +59,7 @@ impl ApiState {
     pub async fn new(
         chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>>,
         metrics_registry: Arc<RwLock<Registry>>,
+        history: Arc<History>,
     ) -> ApiState {
         let metrics = ApiMetrics {
             http_requests: Family::default(),
@@ -70,6 +75,7 @@ impl ApiState {
         ApiState {
             chains,
             metrics: Arc::new(metrics),
+            history,
             metrics_registry,
         }
     }
@@ -105,6 +111,8 @@ pub enum RestError {
     InvalidSequenceNumber,
     /// The caller passed an unsupported chain id
     InvalidChainId,
+    /// The query is not parsable to a transaction hash, address, or sequence number
+    InvalidQueryString,
     /// The caller requested a random value that can't currently be revealed (because it
     /// hasn't been committed to on-chain)
     NoPendingRequest,
@@ -132,6 +140,11 @@ impl IntoResponse for RestError {
             RestError::InvalidChainId => {
                 (StatusCode::BAD_REQUEST, "The chain id is not supported").into_response()
             }
+            RestError::InvalidQueryString => (
+                StatusCode::BAD_REQUEST,
+                "The query string is not parsable to a transaction hash, address, or sequence number",
+            )
+                .into_response(),
             RestError::NoPendingRequest => (
                 StatusCode::FORBIDDEN,
                 "The request with the given sequence number has not been made yet, or the random value has already been revealed on chain.",
@@ -167,6 +180,7 @@ pub fn routes(state: ApiState) -> Router<(), Body> {
         .route("/metrics", get(metrics))
         .route("/ready", get(ready))
         .route("/v1/chains", get(chain_ids))
+        .route("/v1/logs", get(explorer))
         .route(
             "/v1/chains/:chain_id/revelations/:sequence",
             get(revelation),
@@ -186,11 +200,14 @@ pub fn get_register_uri(base_uri: &str, chain_id: &str) -> Result<String> {
 
 #[cfg(test)]
 mod test {
-    use crate::api::ApiBlockChainState;
     use {
         crate::{
-            api::{self, ApiState, BinaryEncoding, Blob, BlockchainState, GetRandomValueResponse},
+            api::{
+                self, ApiBlockChainState, ApiState, BinaryEncoding, Blob, BlockchainState,
+                GetRandomValueResponse,
+            },
             chain::reader::{mock::MockEntropyReader, BlockStatus},
+            history::History,
             state::{HashChainState, PebbleHashChain},
         },
         axum::http::StatusCode,
@@ -252,7 +269,12 @@ mod test {
             ApiBlockChainState::Initialized(avax_state),
         );
 
-        let api_state = ApiState::new(Arc::new(RwLock::new(chains)), metrics_registry).await;
+        let api_state = ApiState::new(
+            Arc::new(RwLock::new(chains)),
+            metrics_registry,
+            Arc::new(History::new().await.unwrap()),
+        )
+        .await;
 
         let app = api::routes(api_state);
         (TestServer::new(app).unwrap(), eth_read, avax_read)

+ 95 - 0
apps/fortuna/src/api/explorer.rs

@@ -0,0 +1,95 @@
+use {
+    crate::{
+        api::{ChainId, RestError},
+        history::RequestStatus,
+    },
+    axum::{
+        extract::{Query, State},
+        Json,
+    },
+    chrono::{DateTime, Utc},
+    ethers::types::{Address, TxHash},
+    std::str::FromStr,
+    utoipa::IntoParams,
+};
+
+#[derive(Debug, serde::Serialize, serde::Deserialize, IntoParams)]
+#[into_params(parameter_in=Query)]
+pub struct ExplorerQueryParams {
+    /// Only return logs that are newer or equal to this timestamp.
+    #[param(value_type = Option<String>, example = "2023-10-01T00:00:00Z")]
+    pub min_timestamp: Option<DateTime<Utc>>,
+    /// Only return logs that are older or equal to this timestamp.
+    #[param(value_type = Option<String>, example = "2033-10-01T00:00:00Z")]
+    pub max_timestamp: Option<DateTime<Utc>>,
+    /// The query string to search for. This can be a transaction hash, sender address, or sequence number.
+    pub query: Option<String>,
+    #[param(value_type = Option<String>)]
+    /// The chain ID to filter the results by.
+    pub chain_id: Option<ChainId>,
+}
+
+const LOG_RETURN_LIMIT: u64 = 1000;
+
+/// Returns the logs of all requests captured by the keeper.
+///
+/// This endpoint allows you to filter the logs by a specific chain ID, a query string (which can be a transaction hash, sender address, or sequence number), and a time range.
+/// This is useful for debugging and monitoring the requests made to the Entropy contracts on various chains.
+#[utoipa::path(
+    get,
+    path = "/v1/logs",
+    responses((status = 200, description = "A list of Entropy request logs", body = Vec<RequestStatus>)),
+    params(ExplorerQueryParams)
+)]
+pub async fn explorer(
+    State(state): State<crate::api::ApiState>,
+    Query(query_params): Query<ExplorerQueryParams>,
+) -> anyhow::Result<Json<Vec<RequestStatus>>, RestError> {
+    if let Some(chain_id) = &query_params.chain_id {
+        if !state.chains.read().await.contains_key(chain_id) {
+            return Err(RestError::InvalidChainId);
+        }
+    }
+    if let Some(query) = query_params.query {
+        if let Ok(tx_hash) = TxHash::from_str(&query) {
+            return Ok(Json(
+                state
+                    .history
+                    .get_requests_by_tx_hash(tx_hash)
+                    .await
+                    .map_err(|_| RestError::TemporarilyUnavailable)?,
+            ));
+        }
+        if let Ok(sender) = Address::from_str(&query) {
+            return Ok(Json(
+                state
+                    .history
+                    .get_requests_by_sender(sender, query_params.chain_id)
+                    .await
+                    .map_err(|_| RestError::TemporarilyUnavailable)?,
+            ));
+        }
+        if let Ok(sequence_number) = u64::from_str(&query) {
+            return Ok(Json(
+                state
+                    .history
+                    .get_requests_by_sequence(sequence_number, query_params.chain_id)
+                    .await
+                    .map_err(|_| RestError::TemporarilyUnavailable)?,
+            ));
+        }
+        return Err(RestError::InvalidQueryString);
+    }
+    Ok(Json(
+        state
+            .history
+            .get_requests_by_time(
+                query_params.chain_id,
+                LOG_RETURN_LIMIT,
+                query_params.min_timestamp,
+                query_params.max_timestamp,
+            )
+            .await
+            .map_err(|_| RestError::TemporarilyUnavailable)?,
+    ))
+}

+ 4 - 3
apps/fortuna/src/api/revelation.rs

@@ -1,7 +1,8 @@
-use crate::api::ApiBlockChainState;
-use crate::chain::reader::BlockNumber;
 use {
-    crate::api::{ChainId, RequestLabel, RestError},
+    crate::{
+        api::{ApiBlockChainState, ChainId, RequestLabel, RestError},
+        chain::reader::BlockNumber,
+    },
     anyhow::Result,
     axum::{
         extract::{Path, Query, State},

+ 18 - 6
apps/fortuna/src/chain/ethereum.rs

@@ -2,7 +2,8 @@ use {
     crate::{
         api::ChainId,
         chain::reader::{
-            self, BlockNumber, BlockStatus, EntropyReader, RequestedWithCallbackEvent,
+            self, BlockNumber, BlockStatus, EntropyReader, EntropyRequestInfo,
+            RequestedWithCallbackEvent,
         },
         config::EthereumConfig,
         eth_utils::{
@@ -16,7 +17,7 @@ use {
     axum::async_trait,
     ethers::{
         abi::RawLog,
-        contract::{abigen, EthLogDecode},
+        contract::{abigen, EthLogDecode, LogMeta},
         core::types::Address,
         middleware::{gas_oracle::GasOracleMiddleware, SignerMiddleware},
         prelude::JsonRpcClient,
@@ -285,14 +286,25 @@ impl<T: JsonRpcClient + 'static> EntropyReader for PythRandom<Provider<T>> {
             .to_block(to_block)
             .topic1(provider);
 
-        let res: Vec<RequestedWithCallbackFilter> = event.query().await?;
-
+        let res: Vec<(RequestedWithCallbackFilter, LogMeta)> = event.query_with_meta().await?;
         Ok(res
-            .iter()
-            .map(|r| RequestedWithCallbackEvent {
+            .into_iter()
+            .map(|(r, meta)| RequestedWithCallbackEvent {
                 sequence_number: r.sequence_number,
                 user_random_number: r.user_random_number,
                 provider_address: r.request.provider,
+                requestor: r.requestor,
+                request: EntropyRequestInfo {
+                    provider: r.request.provider,
+                    sequence_number: r.request.sequence_number,
+                    num_hashes: r.request.num_hashes,
+                    commitment: r.request.commitment,
+                    block_number: r.request.block_number,
+                    requester: r.request.requester,
+                    use_blockhash: r.request.use_blockhash,
+                    is_request_with_callback: r.request.is_request_with_callback,
+                },
+                log_meta: meta,
             })
             .filter(|r| r.provider_address == provider)
             .collect())

+ 19 - 1
apps/fortuna/src/chain/reader.rs

@@ -1,7 +1,10 @@
 use {
     anyhow::Result,
     axum::async_trait,
-    ethers::types::{Address, BlockNumber as EthersBlockNumber, U256},
+    ethers::{
+        prelude::LogMeta,
+        types::{Address, BlockNumber as EthersBlockNumber, U256},
+    },
 };
 
 pub type BlockNumber = u64;
@@ -29,11 +32,26 @@ impl From<BlockStatus> for EthersBlockNumber {
     }
 }
 
+#[derive(Clone, Debug, PartialEq)]
+pub struct EntropyRequestInfo {
+    pub provider: Address,
+    pub sequence_number: u64,
+    pub num_hashes: u32,
+    pub commitment: [u8; 32],
+    pub block_number: u64,
+    pub requester: Address,
+    pub use_blockhash: bool,
+    pub is_request_with_callback: bool,
+}
+
 #[derive(Clone)]
 pub struct RequestedWithCallbackEvent {
     pub sequence_number: u64,
     pub user_random_number: [u8; 32],
     pub provider_address: Address,
+    pub requestor: Address,
+    pub request: EntropyRequestInfo,
+    pub log_meta: LogMeta,
 }
 
 /// EntropyReader is the read-only interface of the Entropy contract.

+ 19 - 2
apps/fortuna/src/command/run.rs

@@ -5,6 +5,7 @@ use {
         command::register_provider::CommitmentMetadata,
         config::{Commitment, Config, EthereumConfig, ProviderConfig, RunOptions},
         eth_utils::traced_client::RpcMetrics,
+        history::History,
         keeper::{self, keeper_metrics::KeeperMetrics},
         state::{HashChainState, PebbleHashChain},
     },
@@ -26,6 +27,7 @@ pub async fn run_api(
     socket_addr: SocketAddr,
     chains: Arc<RwLock<HashMap<String, ApiBlockChainState>>>,
     metrics_registry: Arc<RwLock<Registry>>,
+    history: Arc<History>,
     mut rx_exit: watch::Receiver<bool>,
 ) -> Result<()> {
     #[derive(OpenApi)]
@@ -33,10 +35,13 @@ pub async fn run_api(
     paths(
     crate::api::revelation,
     crate::api::chain_ids,
+    crate::api::explorer,
     ),
     components(
     schemas(
     crate::api::GetRandomValueResponse,
+    crate::history::RequestStatus,
+    crate::history::RequestEntryState,
     crate::api::Blob,
     crate::api::BinaryEncoding,
     )
@@ -47,7 +52,7 @@ pub async fn run_api(
     )]
     struct ApiDoc;
 
-    let api_state = api::ApiState::new(chains, metrics_registry).await;
+    let api_state = api::ApiState::new(chains, metrics_registry, history).await;
 
     // Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the
     // `with_state` method which replaces `Body` with `State` in the type signature.
@@ -98,6 +103,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
             .map(|chain_id| (chain_id.clone(), ApiBlockChainState::Uninitialized))
             .collect(),
     ));
+    let history = Arc::new(History::new().await?);
     for (chain_id, chain_config) in config.chains.clone() {
         keeper_metrics.add_chain(chain_id.clone(), config.provider.address);
         let keeper_metrics = keeper_metrics.clone();
@@ -106,6 +112,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
         let secret_copy = secret.clone();
         let rpc_metrics = rpc_metrics.clone();
         let provider_config = config.provider.clone();
+        let history = history.clone();
         spawn(async move {
             loop {
                 let setup_result = setup_chain_and_run_keeper(
@@ -116,6 +123,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
                     keeper_private_key_option.clone(),
                     chains.clone(),
                     &secret_copy,
+                    history.clone(),
                     rpc_metrics.clone(),
                 )
                 .await;
@@ -145,7 +153,14 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
         Ok::<(), Error>(())
     });
 
-    run_api(opts.addr, chains.clone(), metrics_registry.clone(), rx_exit).await?;
+    run_api(
+        opts.addr,
+        chains.clone(),
+        metrics_registry.clone(),
+        history,
+        rx_exit,
+    )
+    .await?;
     Ok(())
 }
 
@@ -158,6 +173,7 @@ async fn setup_chain_and_run_keeper(
     keeper_private_key_option: Option<String>,
     chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>>,
     secret_copy: &str,
+    history: Arc<History>,
     rpc_metrics: Arc<RpcMetrics>,
 ) -> Result<()> {
     let state = setup_chain_state(
@@ -179,6 +195,7 @@ async fn setup_chain_and_run_keeper(
             chain_config,
             state,
             keeper_metrics.clone(),
+            history,
             rpc_metrics.clone(),
         )
         .await?;

+ 1 - 1
apps/fortuna/src/eth_utils/nonce_manager.rs

@@ -4,9 +4,9 @@
 use {
     super::legacy_tx_middleware::LegacyTxMiddleware,
     axum::async_trait,
-    ethers::prelude::GasOracle,
     ethers::{
         middleware::gas_oracle::GasOracleMiddleware,
+        prelude::GasOracle,
         providers::{Middleware, MiddlewareError, PendingTransaction},
         types::{transaction::eip2718::TypedTransaction, *},
     },

+ 1 - 1
apps/fortuna/src/eth_utils/utils.rs

@@ -1,8 +1,8 @@
-use ethabi::ethereum_types::U64;
 use {
     crate::eth_utils::nonce_manager::NonceManaged,
     anyhow::{anyhow, Result},
     backoff::ExponentialBackoff,
+    ethabi::ethereum_types::U64,
     ethers::{
         contract::ContractCall,
         middleware::Middleware,

+ 549 - 0
apps/fortuna/src/history.rs

@@ -0,0 +1,549 @@
+use {
+    crate::api::ChainId,
+    anyhow::Result,
+    chrono::{DateTime, NaiveDateTime},
+    ethers::{core::utils::hex::ToHex, prelude::TxHash, types::Address},
+    serde::Serialize,
+    serde_with::serde_as,
+    sqlx::{migrate, Pool, Sqlite, SqlitePool},
+    std::sync::Arc,
+    tokio::{spawn, sync::mpsc},
+    utoipa::ToSchema,
+};
+
+#[serde_as]
+#[derive(Clone, Debug, Serialize, ToSchema, PartialEq)]
+#[serde(tag = "state", rename_all = "kebab-case")]
+pub enum RequestEntryState {
+    Pending,
+    Completed {
+        reveal_block_number: u64,
+        /// The transaction hash of the reveal transaction.
+        #[schema(example = "0xfe5f880ac10c0aae43f910b5a17f98a93cdd2eb2dce3a5ae34e5827a3a071a32", value_type = String)]
+        reveal_tx_hash: TxHash,
+        /// The provider contribution to the random number.
+        #[schema(example = "a905ab56567d31a7fda38ed819d97bc257f3ebe385fc5c72ce226d3bb855f0fe")]
+        #[serde_as(as = "serde_with::hex::Hex")]
+        provider_random_number: [u8; 32],
+    },
+    Failed {
+        reason: String,
+    },
+}
+
+#[serde_as]
+#[derive(Clone, Debug, Serialize, ToSchema, PartialEq)]
+pub struct RequestStatus {
+    /// The chain ID of the request.
+    #[schema(example = "ethereum", value_type = String)]
+    pub chain_id: ChainId,
+    #[schema(example = "0x6cc14824ea2918f5de5c2f75a9da968ad4bd6344", value_type = String)]
+    pub provider: Address,
+    pub sequence: u64,
+    #[schema(example = "2023-10-01T00:00:00Z", value_type = String)]
+    pub created_at: DateTime<chrono::Utc>,
+    #[schema(example = "2023-10-01T00:00:05Z", value_type = String)]
+    pub last_updated_at: DateTime<chrono::Utc>,
+    pub request_block_number: u64,
+    /// The transaction hash of the request transaction.
+    #[schema(example = "0x5a3a984f41bb5443f5efa6070ed59ccb25edd8dbe6ce7f9294cf5caa64ed00ae", value_type = String)]
+    pub request_tx_hash: TxHash,
+    /// The user contribution to the random number.
+    #[schema(example = "a905ab56567d31a7fda38ed819d97bc257f3ebe385fc5c72ce226d3bb855f0fe")]
+    #[serde_as(as = "serde_with::hex::Hex")]
+    pub user_random_number: [u8; 32],
+    /// This is the address that initiated the request.
+    #[schema(example = "0x78357316239040e19fc823372cc179ca75e64b81", value_type = String)]
+    pub sender: Address,
+    pub state: RequestEntryState,
+}
+
+#[derive(Clone, Debug, Serialize, ToSchema, PartialEq)]
+struct RequestRow {
+    chain_id: String,
+    provider: String,
+    sequence: i64,
+    created_at: NaiveDateTime,
+    last_updated_at: NaiveDateTime,
+    state: String,
+    request_block_number: i64,
+    request_tx_hash: String,
+    user_random_number: String,
+    sender: String,
+    reveal_block_number: Option<i64>,
+    reveal_tx_hash: Option<String>,
+    provider_random_number: Option<String>,
+    info: Option<String>,
+}
+
+impl TryFrom<RequestRow> for RequestStatus {
+    type Error = anyhow::Error;
+
+    fn try_from(row: RequestRow) -> Result<Self, Self::Error> {
+        let chain_id = row.chain_id;
+        let provider = row.provider.parse()?;
+        let sequence = row.sequence as u64;
+        let created_at = row.created_at.and_utc();
+        let last_updated_at = row.last_updated_at.and_utc();
+        let request_block_number = row.request_block_number as u64;
+        let user_random_number = hex::FromHex::from_hex(row.user_random_number)?;
+        let request_tx_hash = row.request_tx_hash.parse()?;
+        let sender = row.sender.parse()?;
+
+        let state = match row.state.as_str() {
+            "Pending" => RequestEntryState::Pending,
+            "Completed" => {
+                let reveal_block_number = row.reveal_block_number.ok_or(anyhow::anyhow!(
+                    "Reveal block number is missing for completed request"
+                ))? as u64;
+                let reveal_tx_hash = row
+                    .reveal_tx_hash
+                    .ok_or(anyhow::anyhow!(
+                        "Reveal transaction hash is missing for completed request"
+                    ))?
+                    .parse()?;
+                let provider_random_number = row.provider_random_number.ok_or(anyhow::anyhow!(
+                    "Provider random number is missing for completed request"
+                ))?;
+                let provider_random_number: [u8; 32] =
+                    hex::FromHex::from_hex(provider_random_number)?;
+                RequestEntryState::Completed {
+                    reveal_block_number,
+                    reveal_tx_hash,
+                    provider_random_number,
+                }
+            }
+            "Failed" => RequestEntryState::Failed {
+                reason: row.info.unwrap_or_default(),
+            },
+            _ => return Err(anyhow::anyhow!("Unknown request state: {}", row.state)),
+        };
+        Ok(Self {
+            chain_id,
+            provider,
+            sequence,
+            created_at,
+            last_updated_at,
+            state,
+            request_block_number,
+            request_tx_hash,
+            user_random_number,
+            sender,
+        })
+    }
+}
+
+impl From<RequestRow> for Option<RequestStatus> {
+    fn from(row: RequestRow) -> Self {
+        match RequestStatus::try_from(row) {
+            Ok(status) => Some(status),
+            Err(e) => {
+                tracing::error!("Failed to convert RequestRow to RequestStatus: {}", e);
+                None
+            }
+        }
+    }
+}
+
+pub struct History {
+    pool: Pool<Sqlite>,
+    write_queue: mpsc::Sender<RequestStatus>,
+    _writer_thread: Arc<tokio::task::JoinHandle<()>>,
+}
+
+impl History {
+    const MAX_WRITE_QUEUE: usize = 1_000;
+    pub async fn new() -> Result<Self> {
+        Self::new_with_url("sqlite:fortuna.db?mode=rwc").await
+    }
+
+    pub async fn new_in_memory() -> Result<Self> {
+        Self::new_with_url("sqlite::memory:").await
+    }
+
+    pub async fn new_with_url(url: &str) -> Result<Self> {
+        let pool = SqlitePool::connect(url).await?;
+        let migrator = migrate!("./migrations");
+        migrator.run(&pool).await?;
+        Self::new_with_pool(pool).await
+    }
+    pub async fn new_with_pool(pool: Pool<Sqlite>) -> Result<Self> {
+        let (sender, mut receiver) = mpsc::channel(Self::MAX_WRITE_QUEUE);
+        let pool_write_connection = pool.clone();
+        let writer_thread = spawn(async move {
+            while let Some(log) = receiver.recv().await {
+                Self::update_request_status(&pool_write_connection, log).await;
+            }
+        });
+        Ok(Self {
+            pool,
+            write_queue: sender,
+            _writer_thread: Arc::new(writer_thread),
+        })
+    }
+
+    async fn update_request_status(pool: &Pool<Sqlite>, new_status: RequestStatus) {
+        let sequence = new_status.sequence as i64;
+        let chain_id = new_status.chain_id;
+        let request_tx_hash: String = new_status.request_tx_hash.encode_hex();
+        let provider: String = new_status.provider.encode_hex();
+        let result = match new_status.state {
+            RequestEntryState::Pending => {
+                let block_number = new_status.request_block_number as i64;
+                let sender: String = new_status.sender.encode_hex();
+                let user_random_number: String = new_status.user_random_number.encode_hex();
+                sqlx::query!("INSERT INTO request(chain_id, provider, sequence, created_at, last_updated_at, state, request_block_number, request_tx_hash, user_random_number, sender) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+                    chain_id,
+                    provider,
+                    sequence,
+                    new_status.created_at,
+                    new_status.last_updated_at,
+                    "Pending",
+                    block_number,
+                    request_tx_hash,
+                    user_random_number,
+                    sender)
+                    .execute(pool)
+                    .await
+            }
+            RequestEntryState::Completed {
+                reveal_block_number,
+                reveal_tx_hash,
+                provider_random_number,
+            } => {
+                let reveal_block_number = reveal_block_number as i64;
+                let reveal_tx_hash: String = reveal_tx_hash.encode_hex();
+                let provider_random_number: String = provider_random_number.encode_hex();
+                sqlx::query!("UPDATE request SET state = ?, last_updated_at = ?, reveal_block_number = ?, reveal_tx_hash = ?, provider_random_number = ? WHERE chain_id = ? AND sequence = ? AND provider = ? AND request_tx_hash = ?",
+                    "Completed",
+                    new_status.last_updated_at,
+                    reveal_block_number,
+                    reveal_tx_hash,
+                    provider_random_number,
+                    chain_id,
+                    sequence,
+                    provider,
+                    request_tx_hash)
+                    .execute(pool)
+                    .await
+            }
+            RequestEntryState::Failed { reason } => {
+                sqlx::query!("UPDATE request SET state = ?, last_updated_at = ?, info = ? WHERE chain_id = ? AND sequence = ? AND provider = ? AND request_tx_hash = ? AND state = 'Pending'",
+                    "Failed",
+                    new_status.last_updated_at,
+                    reason,
+                    chain_id,
+                    sequence,
+                    provider,
+                    request_tx_hash)
+                    .execute(pool)
+                    .await
+            }
+        };
+        if let Err(e) = result {
+            tracing::error!("Failed to update request status: {}", e);
+        }
+    }
+
+    pub fn add(&self, log: &RequestStatus) {
+        if let Err(e) = self.write_queue.try_send(log.clone()) {
+            tracing::error!("Failed to send log to write queue: {}", e);
+        }
+    }
+
+    pub async fn get_requests_by_tx_hash(&self, tx_hash: TxHash) -> Result<Vec<RequestStatus>> {
+        let tx_hash: String = tx_hash.encode_hex();
+        let rows = sqlx::query_as!(
+            RequestRow,
+            "SELECT * FROM request WHERE request_tx_hash = ? OR reveal_tx_hash = ?",
+            tx_hash,
+            tx_hash
+        )
+        .fetch_all(&self.pool)
+        .await
+        .map_err(|e| {
+            tracing::error!("Failed to fetch request by tx hash: {}", e);
+            e
+        })?;
+        Ok(rows.into_iter().filter_map(|row| row.into()).collect())
+    }
+
+    pub async fn get_requests_by_sender(
+        &self,
+        sender: Address,
+        chain_id: Option<ChainId>,
+    ) -> Result<Vec<RequestStatus>> {
+        let sender: String = sender.encode_hex();
+        let rows = match chain_id {
+            Some(chain_id) => {
+                sqlx::query_as!(
+                    RequestRow,
+                    "SELECT * FROM request WHERE sender = ? AND chain_id = ?",
+                    sender,
+                    chain_id,
+                )
+                .fetch_all(&self.pool)
+                .await
+            }
+            None => {
+                sqlx::query_as!(RequestRow, "SELECT * FROM request WHERE sender = ?", sender,)
+                    .fetch_all(&self.pool)
+                    .await
+            }
+        }
+        .map_err(|e| {
+            tracing::error!("Failed to fetch request by sender: {}", e);
+            e
+        })?;
+        Ok(rows.into_iter().filter_map(|row| row.into()).collect())
+    }
+
+    pub async fn get_requests_by_sequence(
+        &self,
+        sequence: u64,
+        chain_id: Option<ChainId>,
+    ) -> Result<Vec<RequestStatus>> {
+        let sequence = sequence as i64;
+        let rows = match chain_id {
+            Some(chain_id) => {
+                sqlx::query_as!(
+                    RequestRow,
+                    "SELECT * FROM request WHERE sequence = ? AND chain_id = ?",
+                    sequence,
+                    chain_id,
+                )
+                .fetch_all(&self.pool)
+                .await
+            }
+            None => {
+                sqlx::query_as!(
+                    RequestRow,
+                    "SELECT * FROM request WHERE sequence = ?",
+                    sequence,
+                )
+                .fetch_all(&self.pool)
+                .await
+            }
+        }
+        .map_err(|e| {
+            tracing::error!("Failed to fetch request by sequence: {}", e);
+            e
+        })?;
+        Ok(rows.into_iter().filter_map(|row| row.into()).collect())
+    }
+
+    pub async fn get_requests_by_time(
+        &self,
+        chain_id: Option<ChainId>,
+        limit: u64,
+        min_timestamp: Option<DateTime<chrono::Utc>>,
+        max_timestamp: Option<DateTime<chrono::Utc>>,
+    ) -> Result<Vec<RequestStatus>> {
+        // UTC_MIN and UTC_MAX are not valid timestamps in SQLite
+        // So we need small and large enough timestamps to replace them
+        let min_timestamp = min_timestamp.unwrap_or(
+            "2012-12-12T12:12:12Z"
+                .parse::<DateTime<chrono::Utc>>()
+                .unwrap(),
+        );
+        let max_timestamp = max_timestamp.unwrap_or(
+            "2050-12-12T12:12:12Z"
+                .parse::<DateTime<chrono::Utc>>()
+                .unwrap(),
+        );
+        let limit = limit as i64;
+        let rows = match chain_id {
+            Some(chain_id) => {
+                let chain_id = chain_id.to_string();
+                sqlx::query_as!(RequestRow, "SELECT * FROM request WHERE chain_id = ? AND created_at >= ? AND created_at <= ? ORDER BY created_at DESC LIMIT ?",
+                    chain_id,
+                    min_timestamp,
+                    max_timestamp,
+                    limit).fetch_all(&self.pool).await
+            }
+            None => {
+                sqlx::query_as!(RequestRow, "SELECT * FROM request WHERE created_at >= ? AND created_at <= ? ORDER BY created_at DESC LIMIT ?",
+                    min_timestamp,
+                    max_timestamp,
+                    limit).fetch_all(&self.pool).await
+            }
+        }.map_err(|e| {
+            tracing::error!("Failed to fetch request by time: {}", e);
+            e
+        })?;
+        Ok(rows.into_iter().filter_map(|row| row.into()).collect())
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use {super::*, chrono::Duration, tokio::time::sleep};
+
+    fn get_random_request_status() -> RequestStatus {
+        RequestStatus {
+            chain_id: "ethereum".to_string(),
+            provider: Address::random(),
+            sequence: 1,
+            created_at: chrono::Utc::now(),
+            last_updated_at: chrono::Utc::now(),
+            request_block_number: 1,
+            request_tx_hash: TxHash::random(),
+            user_random_number: [20; 32],
+            sender: Address::random(),
+            state: RequestEntryState::Pending,
+        }
+    }
+
+    #[tokio::test]
+    async fn test_history_return_correct_logs() {
+        let history = History::new_in_memory().await.unwrap();
+        let reveal_tx_hash = TxHash::random();
+        let mut status = get_random_request_status();
+        History::update_request_status(&history.pool, status.clone()).await;
+        status.state = RequestEntryState::Completed {
+            reveal_block_number: 1,
+            reveal_tx_hash,
+            provider_random_number: [40; 32],
+        };
+        History::update_request_status(&history.pool, status.clone()).await;
+
+        let logs = history
+            .get_requests_by_sequence(status.sequence, Some(status.chain_id.clone()))
+            .await
+            .unwrap();
+        assert_eq!(logs, vec![status.clone()]);
+
+        let logs = history
+            .get_requests_by_sequence(status.sequence, None)
+            .await
+            .unwrap();
+        assert_eq!(logs, vec![status.clone()]);
+
+        let logs = history
+            .get_requests_by_tx_hash(status.request_tx_hash)
+            .await
+            .unwrap();
+        assert_eq!(logs, vec![status.clone()]);
+
+        let logs = history
+            .get_requests_by_tx_hash(reveal_tx_hash)
+            .await
+            .unwrap();
+        assert_eq!(logs, vec![status.clone()]);
+
+        let logs = history
+            .get_requests_by_sender(status.sender, Some(status.chain_id.clone()))
+            .await
+            .unwrap();
+        assert_eq!(logs, vec![status.clone()]);
+
+        let logs = history
+            .get_requests_by_sender(status.sender, None)
+            .await
+            .unwrap();
+        assert_eq!(logs, vec![status.clone()]);
+    }
+
+    #[tokio::test]
+
+    async fn test_history_filter_irrelevant_logs() {
+        let history = History::new_in_memory().await.unwrap();
+        let status = get_random_request_status();
+        History::update_request_status(&history.pool, status.clone()).await;
+
+        let logs = history
+            .get_requests_by_sequence(status.sequence, Some("not-ethereum".to_string()))
+            .await
+            .unwrap();
+        assert_eq!(logs, vec![]);
+
+        let logs = history
+            .get_requests_by_sequence(status.sequence + 1, None)
+            .await
+            .unwrap();
+        assert_eq!(logs, vec![]);
+
+        let logs = history
+            .get_requests_by_tx_hash(TxHash::zero())
+            .await
+            .unwrap();
+        assert_eq!(logs, vec![]);
+
+        let logs = history
+            .get_requests_by_sender(Address::zero(), Some(status.chain_id.clone()))
+            .await
+            .unwrap();
+        assert_eq!(logs, vec![]);
+
+        let logs = history
+            .get_requests_by_sender(Address::zero(), None)
+            .await
+            .unwrap();
+        assert_eq!(logs, vec![]);
+    }
+
+    #[tokio::test]
+    async fn test_history_time_filters() {
+        let history = History::new_in_memory().await.unwrap();
+        let status = get_random_request_status();
+        History::update_request_status(&history.pool, status.clone()).await;
+        for chain_id in [None, Some("ethereum".to_string())] {
+            // min = created_at = max
+            let logs = history
+                .get_requests_by_time(
+                    chain_id.clone(),
+                    10,
+                    Some(status.created_at),
+                    Some(status.created_at),
+                )
+                .await
+                .unwrap();
+            assert_eq!(logs, vec![status.clone()]);
+
+            // min = created_at + 1
+            let logs = history
+                .get_requests_by_time(
+                    chain_id.clone(),
+                    10,
+                    Some(status.created_at + Duration::seconds(1)),
+                    None,
+                )
+                .await
+                .unwrap();
+            assert_eq!(logs, vec![]);
+
+            // max = created_at - 1
+            let logs = history
+                .get_requests_by_time(
+                    chain_id.clone(),
+                    10,
+                    None,
+                    Some(status.created_at - Duration::seconds(1)),
+                )
+                .await
+                .unwrap();
+            assert_eq!(logs, vec![]);
+
+            // no min or max
+            let logs = history
+                .get_requests_by_time(chain_id.clone(), 10, None, None)
+                .await
+                .unwrap();
+            assert_eq!(logs, vec![status.clone()]);
+        }
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn test_writer_thread() {
+        let history = History::new_in_memory().await.unwrap();
+        let status = get_random_request_status();
+        history.add(&status);
+        // wait for the writer thread to write to the db
+        sleep(std::time::Duration::from_secs(1)).await;
+        let logs = history
+            .get_requests_by_sequence(1, Some("ethereum".to_string()))
+            .await
+            .unwrap();
+        assert_eq!(logs, vec![status]);
+    }
+}

+ 23 - 22
apps/fortuna/src/keeper.rs

@@ -1,20 +1,21 @@
-use crate::keeper::track::track_block_timestamp_lag;
 use {
     crate::{
         api::{BlockchainState, ChainId},
         chain::ethereum::{InstrumentedPythContract, InstrumentedSignablePythContract},
         config::EthereumConfig,
         eth_utils::traced_client::RpcMetrics,
-        keeper::block::{
-            get_latest_safe_block, process_backlog, process_new_blocks, watch_blocks_wrapper,
-            BlockRange,
+        history::History,
+        keeper::{
+            block::{
+                get_latest_safe_block, process_backlog, process_new_blocks, watch_blocks_wrapper,
+                BlockRange, ProcessParams,
+            },
+            commitment::update_commitments_loop,
+            fee::{adjust_fee_wrapper, withdraw_fees_wrapper},
+            track::{
+                track_accrued_pyth_fees, track_balance, track_block_timestamp_lag, track_provider,
+            },
         },
-        keeper::commitment::update_commitments_loop,
-        keeper::fee::adjust_fee_wrapper,
-        keeper::fee::withdraw_fees_wrapper,
-        keeper::track::track_accrued_pyth_fees,
-        keeper::track::track_balance,
-        keeper::track::track_provider,
     },
     ethers::{signers::Signer, types::U256},
     keeper_metrics::{AccountLabel, KeeperMetrics},
@@ -59,6 +60,7 @@ pub async fn run_keeper_threads(
     chain_eth_config: EthereumConfig,
     chain_state: BlockchainState,
     metrics: Arc<KeeperMetrics>,
+    history: Arc<History>,
     rpc_metrics: Arc<RpcMetrics>,
 ) -> anyhow::Result<()> {
     tracing::info!("Starting keeper");
@@ -80,18 +82,22 @@ pub async fn run_keeper_threads(
 
     // Spawn a thread to handle the events from last backlog_range blocks.
     let gas_limit: U256 = chain_eth_config.gas_limit.into();
+    let process_params = ProcessParams {
+        chain_state: chain_state.clone(),
+        contract: contract.clone(),
+        gas_limit,
+        escalation_policy: chain_eth_config.escalation_policy.to_policy(),
+        metrics: metrics.clone(),
+        fulfilled_requests_cache,
+        history,
+    };
     spawn(
         process_backlog(
+            process_params.clone(),
             BlockRange {
                 from: latest_safe_block.saturating_sub(chain_eth_config.backlog_range),
                 to: latest_safe_block,
             },
-            contract.clone(),
-            gas_limit,
-            chain_eth_config.escalation_policy.to_policy(),
-            chain_state.clone(),
-            metrics.clone(),
-            fulfilled_requests_cache.clone(),
             chain_eth_config.block_delays.clone(),
         )
         .in_current_span(),
@@ -104,13 +110,8 @@ pub async fn run_keeper_threads(
     // Spawn a thread for block processing with configured delays
     spawn(
         process_new_blocks(
-            chain_state.clone(),
+            process_params.clone(),
             rx,
-            Arc::clone(&contract),
-            gas_limit,
-            chain_eth_config.escalation_policy.to_policy(),
-            metrics.clone(),
-            fulfilled_requests_cache.clone(),
             chain_eth_config.block_delays.clone(),
         )
         .in_current_span(),

+ 49 - 102
apps/fortuna/src/keeper/block.rs

@@ -1,10 +1,13 @@
 use {
     crate::{
-        api::{self, BlockchainState},
+        api::BlockchainState,
         chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber},
         eth_utils::utils::EscalationPolicy,
-        keeper::keeper_metrics::{ChainIdLabel, KeeperMetrics},
-        keeper::process_event::process_event_with_backoff,
+        history::History,
+        keeper::{
+            keeper_metrics::{ChainIdLabel, KeeperMetrics},
+            process_event::process_event_with_backoff,
+        },
     },
     anyhow::Result,
     ethers::types::U256,
@@ -36,6 +39,17 @@ pub struct BlockRange {
     pub to: BlockNumber,
 }
 
+#[derive(Clone)]
+pub struct ProcessParams {
+    pub contract: Arc<InstrumentedSignablePythContract>,
+    pub gas_limit: U256,
+    pub escalation_policy: EscalationPolicy,
+    pub chain_state: BlockchainState,
+    pub metrics: Arc<KeeperMetrics>,
+    pub history: Arc<History>,
+    pub fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
+}
+
 /// Get the latest safe block number for the chain. Retry internally if there is an error.
 pub async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
     loop {
@@ -63,15 +77,7 @@ pub async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber
 #[tracing::instrument(skip_all, fields(
     range_from_block = block_range.from, range_to_block = block_range.to
 ))]
-pub async fn process_block_range(
-    block_range: BlockRange,
-    contract: Arc<InstrumentedSignablePythContract>,
-    gas_limit: U256,
-    escalation_policy: EscalationPolicy,
-    chain_state: api::BlockchainState,
-    metrics: Arc<KeeperMetrics>,
-    fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
-) {
+pub async fn process_block_range(block_range: BlockRange, process_params: ProcessParams) {
     let BlockRange {
         from: first_block,
         to: last_block,
@@ -89,12 +95,7 @@ pub async fn process_block_range(
                 from: current_block,
                 to: to_block,
             },
-            contract.clone(),
-            gas_limit,
-            escalation_policy.clone(),
-            chain_state.clone(),
-            metrics.clone(),
-            fulfilled_requests_cache.clone(),
+            process_params.clone(),
         )
         .in_current_span()
         .await;
@@ -110,26 +111,19 @@ pub async fn process_block_range(
 #[tracing::instrument(name = "batch", skip_all, fields(
     batch_from_block = block_range.from, batch_to_block = block_range.to
 ))]
-pub async fn process_single_block_batch(
-    block_range: BlockRange,
-    contract: Arc<InstrumentedSignablePythContract>,
-    gas_limit: U256,
-    escalation_policy: EscalationPolicy,
-    chain_state: api::BlockchainState,
-    metrics: Arc<KeeperMetrics>,
-    fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
-) {
+
+pub async fn process_single_block_batch(block_range: BlockRange, process_params: ProcessParams) {
     let label = ChainIdLabel {
-        chain_id: chain_state.id.clone(),
+        chain_id: process_params.chain_state.id.clone(),
     };
-
     loop {
-        let events_res = chain_state
+        let events_res = process_params
+            .chain_state
             .contract
             .get_request_with_callback_events(
                 block_range.from,
                 block_range.to,
-                chain_state.provider_address,
+                process_params.chain_state.provider_address,
             )
             .await;
 
@@ -141,17 +135,20 @@ pub async fn process_single_block_batch(
                 .duration_since(UNIX_EPOCH)
                 .map(|duration| duration.as_secs() as i64)
                 .unwrap_or(0);
-            metrics
+            process_params
+                .metrics
                 .process_event_timestamp
                 .get_or_create(&label)
                 .set(server_timestamp);
 
-            let current_block = metrics
+            let current_block = process_params
+                .metrics
                 .process_event_block_number
                 .get_or_create(&label)
                 .get();
             if block_range.to > current_block as u64 {
-                metrics
+                process_params
+                    .metrics
                     .process_event_block_number
                     .get_or_create(&label)
                     .set(block_range.to as i64);
@@ -163,21 +160,15 @@ pub async fn process_single_block_batch(
                 tracing::info!(num_of_events = &events.len(), "Processing",);
                 for event in &events {
                     // the write lock guarantees we spawn only one task per sequence number
-                    let newly_inserted = fulfilled_requests_cache
+                    let newly_inserted = process_params
+                        .fulfilled_requests_cache
                         .write()
                         .await
                         .insert(event.sequence_number);
                     if newly_inserted {
                         spawn(
-                            process_event_with_backoff(
-                                event.clone(),
-                                chain_state.clone(),
-                                contract.clone(),
-                                gas_limit,
-                                escalation_policy.clone(),
-                                metrics.clone(),
-                            )
-                            .in_current_span(),
+                            process_event_with_backoff(event.clone(), process_params.clone())
+                                .in_current_span(),
                         );
                     }
                 }
@@ -279,32 +270,18 @@ pub async fn watch_blocks(
 /// It waits on rx channel to receive block ranges and then calls process_block_range to process them
 /// for each configured block delay.
 #[tracing::instrument(skip_all)]
-#[allow(clippy::too_many_arguments)]
 pub async fn process_new_blocks(
-    chain_state: BlockchainState,
+    process_params: ProcessParams,
     mut rx: mpsc::Receiver<BlockRange>,
-    contract: Arc<InstrumentedSignablePythContract>,
-    gas_limit: U256,
-    escalation_policy: EscalationPolicy,
-    metrics: Arc<KeeperMetrics>,
-    fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
     block_delays: Vec<u64>,
 ) {
     tracing::info!("Waiting for new block ranges to process");
     loop {
         if let Some(block_range) = rx.recv().await {
             // Process blocks immediately first
-            process_block_range(
-                block_range.clone(),
-                Arc::clone(&contract),
-                gas_limit,
-                escalation_policy.clone(),
-                chain_state.clone(),
-                metrics.clone(),
-                fulfilled_requests_cache.clone(),
-            )
-            .in_current_span()
-            .await;
+            process_block_range(block_range.clone(), process_params.clone())
+                .in_current_span()
+                .await;
 
             // Then process with each configured delay
             for delay in &block_delays {
@@ -312,17 +289,9 @@ pub async fn process_new_blocks(
                     from: block_range.from.saturating_sub(*delay),
                     to: block_range.to.saturating_sub(*delay),
                 };
-                process_block_range(
-                    adjusted_range,
-                    Arc::clone(&contract),
-                    gas_limit,
-                    escalation_policy.clone(),
-                    chain_state.clone(),
-                    metrics.clone(),
-                    fulfilled_requests_cache.clone(),
-                )
-                .in_current_span()
-                .await;
+                process_block_range(adjusted_range, process_params.clone())
+                    .in_current_span()
+                    .await;
             }
         }
     }
@@ -330,31 +299,17 @@ pub async fn process_new_blocks(
 
 /// Processes the backlog_range for a chain.
 /// It processes the backlog range for each configured block delay.
-#[allow(clippy::too_many_arguments)]
 #[tracing::instrument(skip_all)]
 pub async fn process_backlog(
+    process_params: ProcessParams,
     backlog_range: BlockRange,
-    contract: Arc<InstrumentedSignablePythContract>,
-    gas_limit: U256,
-    escalation_policy: EscalationPolicy,
-    chain_state: BlockchainState,
-    metrics: Arc<KeeperMetrics>,
-    fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
     block_delays: Vec<u64>,
 ) {
     tracing::info!("Processing backlog");
     // Process blocks immediately first
-    process_block_range(
-        backlog_range.clone(),
-        Arc::clone(&contract),
-        gas_limit,
-        escalation_policy.clone(),
-        chain_state.clone(),
-        metrics.clone(),
-        fulfilled_requests_cache.clone(),
-    )
-    .in_current_span()
-    .await;
+    process_block_range(backlog_range.clone(), process_params.clone())
+        .in_current_span()
+        .await;
 
     // Then process with each configured delay
     for delay in &block_delays {
@@ -362,17 +317,9 @@ pub async fn process_backlog(
             from: backlog_range.from.saturating_sub(*delay),
             to: backlog_range.to.saturating_sub(*delay),
         };
-        process_block_range(
-            adjusted_range,
-            Arc::clone(&contract),
-            gas_limit,
-            escalation_policy.clone(),
-            chain_state.clone(),
-            metrics.clone(),
-            fulfilled_requests_cache.clone(),
-        )
-        .in_current_span()
-        .await;
+        process_block_range(adjusted_range, process_params.clone())
+            .in_current_span()
+            .await;
     }
     tracing::info!("Backlog processed");
 }

+ 4 - 3
apps/fortuna/src/keeper/fee.rs

@@ -1,8 +1,9 @@
 use {
     crate::{
-        api::BlockchainState, chain::ethereum::InstrumentedSignablePythContract,
-        eth_utils::utils::estimate_tx_cost, eth_utils::utils::send_and_confirm,
-        keeper::AccountLabel, keeper::ChainId, keeper::KeeperMetrics,
+        api::BlockchainState,
+        chain::ethereum::InstrumentedSignablePythContract,
+        eth_utils::utils::{estimate_tx_cost, send_and_confirm},
+        keeper::{AccountLabel, ChainId, KeeperMetrics},
     },
     anyhow::{anyhow, Result},
     ethers::{

+ 1 - 2
apps/fortuna/src/keeper/keeper_metrics.rs

@@ -5,8 +5,7 @@ use {
         metrics::{counter::Counter, family::Family, gauge::Gauge, histogram::Histogram},
         registry::Registry,
     },
-    std::sync::atomic::AtomicU64,
-    std::sync::Arc,
+    std::sync::{atomic::AtomicU64, Arc},
     tokio::sync::RwLock,
 };
 

+ 42 - 12
apps/fortuna/src/keeper/process_event.rs

@@ -1,13 +1,12 @@
 use {
-    super::keeper_metrics::{AccountLabel, KeeperMetrics},
+    super::keeper_metrics::AccountLabel,
     crate::{
-        api::BlockchainState,
-        chain::{ethereum::InstrumentedSignablePythContract, reader::RequestedWithCallbackEvent},
-        eth_utils::utils::{submit_tx_with_backoff, EscalationPolicy},
+        chain::reader::RequestedWithCallbackEvent,
+        eth_utils::utils::submit_tx_with_backoff,
+        history::{RequestEntryState, RequestStatus},
+        keeper::block::ProcessParams,
     },
     anyhow::{anyhow, Result},
-    ethers::types::U256,
-    std::sync::Arc,
     tracing,
 };
 
@@ -17,12 +16,18 @@ use {
 ))]
 pub async fn process_event_with_backoff(
     event: RequestedWithCallbackEvent,
-    chain_state: BlockchainState,
-    contract: Arc<InstrumentedSignablePythContract>,
-    gas_limit: U256,
-    escalation_policy: EscalationPolicy,
-    metrics: Arc<KeeperMetrics>,
+    process_param: ProcessParams,
 ) -> Result<()> {
+    let ProcessParams {
+        chain_state,
+        contract,
+        gas_limit,
+        escalation_policy,
+        metrics,
+        history,
+        ..
+    } = process_param;
+
     // ignore requests that are not for the configured provider
     if chain_state.provider_address != event.provider_address {
         return Ok(());
@@ -35,11 +40,30 @@ pub async fn process_event_with_backoff(
 
     metrics.requests.get_or_create(&account_label).inc();
     tracing::info!("Started processing event");
+    let mut status = RequestStatus {
+        chain_id: chain_state.id.clone(),
+        provider: event.provider_address,
+        sequence: event.sequence_number,
+        created_at: chrono::Utc::now(),
+        last_updated_at: chrono::Utc::now(),
+        request_block_number: event.log_meta.block_number.as_u64(),
+        request_tx_hash: event.log_meta.transaction_hash,
+        sender: event.requestor,
+        user_random_number: event.user_random_number,
+        state: RequestEntryState::Pending,
+    };
+    history.add(&status);
 
     let provider_revelation = chain_state
         .state
         .reveal(event.sequence_number)
-        .map_err(|e| anyhow!("Error revealing: {:?}", e))?;
+        .map_err(|e| {
+            status.state = RequestEntryState::Failed {
+                reason: format!("Error revealing: {:?}", e),
+            };
+            history.add(&status);
+            anyhow!("Error revealing: {:?}", e)
+        })?;
 
     let contract_call = contract.reveal_with_callback(
         event.provider_address,
@@ -63,6 +87,12 @@ pub async fn process_event_with_backoff(
 
     match success {
         Ok(result) => {
+            status.state = RequestEntryState::Completed {
+                reveal_block_number: result.receipt.block_number.unwrap_or_default().as_u64(),
+                reveal_tx_hash: result.receipt.transaction_hash,
+                provider_random_number: provider_revelation,
+            };
+            history.add(&status);
             tracing::info!(
                 "Processed event successfully in {:?} after {} retries. Receipt: {:?}",
                 result.duration,

+ 1 - 2
apps/fortuna/src/keeper/track.rs

@@ -5,8 +5,7 @@ use {
         eth_utils::traced_client::TracedClient,
     },
     anyhow::{anyhow, Result},
-    ethers::middleware::Middleware,
-    ethers::{prelude::BlockNumber, providers::Provider, types::Address},
+    ethers::{middleware::Middleware, prelude::BlockNumber, providers::Provider, types::Address},
     std::{
         sync::Arc,
         time::{SystemTime, UNIX_EPOCH},

+ 1 - 0
apps/fortuna/src/lib.rs

@@ -3,5 +3,6 @@ pub mod chain;
 pub mod command;
 pub mod config;
 pub mod eth_utils;
+pub mod history;
 pub mod keeper;
 pub mod state;

Энэ ялгаанд хэт олон файл өөрчлөгдсөн тул зарим файлыг харуулаагүй болно