event.ts 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. import { PublicKey } from "@solana/web3.js";
  2. import * as assert from "assert";
  3. import { IdlEvent, IdlEventField } from "../idl.js";
  4. import { Coder } from "../coder/index.js";
  5. import { DecodeType } from "./namespace/types.js";
  6. import Provider from "../provider.js";
  7. const LOG_START_INDEX = "Program log: ".length;
  8. // Deserialized event.
  9. export type Event<
  10. E extends IdlEvent = IdlEvent,
  11. Defined = Record<string, never>
  12. > = {
  13. name: E["name"];
  14. data: EventData<E["fields"][number], Defined>;
  15. };
  16. export type EventData<T extends IdlEventField, Defined> = {
  17. [N in T["name"]]: DecodeType<(T & { name: N })["type"], Defined>;
  18. };
  19. type EventCallback = (event: any, slot: number) => void;
  20. export class EventManager {
  21. /**
  22. * Program ID for event subscriptions.
  23. */
  24. private _programId: PublicKey;
  25. /**
  26. * Network and wallet provider.
  27. */
  28. private _provider: Provider;
  29. /**
  30. * Event parser to handle onLogs callbacks.
  31. */
  32. private _eventParser: EventParser;
  33. /**
  34. * Maps event listener id to [event-name, callback].
  35. */
  36. private _eventCallbacks: Map<number, [string, EventCallback]>;
  37. /**
  38. * Maps event name to all listeners for the event.
  39. */
  40. private _eventListeners: Map<string, Array<number>>;
  41. /**
  42. * The next listener id to allocate.
  43. */
  44. private _listenerIdCount: number;
  45. /**
  46. * The subscription id from the connection onLogs subscription.
  47. */
  48. private _onLogsSubscriptionId: number | undefined;
  49. constructor(programId: PublicKey, provider: Provider, coder: Coder) {
  50. this._programId = programId;
  51. this._provider = provider;
  52. this._eventParser = new EventParser(programId, coder);
  53. this._eventCallbacks = new Map();
  54. this._eventListeners = new Map();
  55. this._listenerIdCount = 0;
  56. }
  57. public addEventListener(
  58. eventName: string,
  59. callback: (event: any, slot: number) => void
  60. ): number {
  61. let listener = this._listenerIdCount;
  62. this._listenerIdCount += 1;
  63. // Store the listener into the event map.
  64. if (!(eventName in this._eventCallbacks)) {
  65. this._eventListeners.set(eventName, []);
  66. }
  67. this._eventListeners.set(
  68. eventName,
  69. (this._eventListeners.get(eventName) ?? []).concat(listener)
  70. );
  71. // Store the callback into the listener map.
  72. this._eventCallbacks.set(listener, [eventName, callback]);
  73. // Create the subscription singleton, if needed.
  74. if (this._onLogsSubscriptionId !== undefined) {
  75. return listener;
  76. }
  77. this._onLogsSubscriptionId = this._provider!.connection.onLogs(
  78. this._programId,
  79. (logs, ctx) => {
  80. if (logs.err) {
  81. return;
  82. }
  83. this._eventParser.parseLogs(logs.logs, (event) => {
  84. const allListeners = this._eventListeners.get(event.name);
  85. if (allListeners) {
  86. allListeners.forEach((listener) => {
  87. const listenerCb = this._eventCallbacks.get(listener);
  88. if (listenerCb) {
  89. const [, callback] = listenerCb;
  90. callback(event.data, ctx.slot);
  91. }
  92. });
  93. }
  94. });
  95. }
  96. );
  97. return listener;
  98. }
  99. public async removeEventListener(listener: number): Promise<void> {
  100. // Get the callback.
  101. const callback = this._eventCallbacks.get(listener);
  102. if (!callback) {
  103. throw new Error(`Event listener ${listener} doesn't exist!`);
  104. }
  105. const [eventName] = callback;
  106. // Get the listeners.
  107. let listeners = this._eventListeners.get(eventName);
  108. if (!listeners) {
  109. throw new Error(`Event listeners don't exist for ${eventName}!`);
  110. }
  111. // Update both maps.
  112. this._eventCallbacks.delete(listener);
  113. listeners = listeners.filter((l) => l !== listener);
  114. if (listeners.length === 0) {
  115. this._eventListeners.delete(eventName);
  116. }
  117. // Kill the websocket connection if all listeners have been removed.
  118. if (this._eventCallbacks.size == 0) {
  119. assert.ok(this._eventListeners.size === 0);
  120. if (this._onLogsSubscriptionId !== undefined) {
  121. await this._provider!.connection.removeOnLogsListener(
  122. this._onLogsSubscriptionId
  123. );
  124. this._onLogsSubscriptionId = undefined;
  125. }
  126. }
  127. }
  128. }
  129. export class EventParser {
  130. private coder: Coder;
  131. private programId: PublicKey;
  132. constructor(programId: PublicKey, coder: Coder) {
  133. this.coder = coder;
  134. this.programId = programId;
  135. }
  136. // Each log given, represents an array of messages emitted by
  137. // a single transaction, which can execute many different programs across
  138. // CPI boundaries. However, the subscription is only interested in the
  139. // events emitted by *this* program. In achieving this, we keep track of the
  140. // program execution context by parsing each log and looking for a CPI
  141. // `invoke` call. If one exists, we know a new program is executing. So we
  142. // push the programId onto a stack and switch the program context. This
  143. // allows us to track, for a given log, which program was executing during
  144. // its emission, thereby allowing us to know if a given log event was
  145. // emitted by *this* program. If it was, then we parse the raw string and
  146. // emit the event if the string matches the event being subscribed to.
  147. public parseLogs(logs: string[], callback: (log: Event) => void) {
  148. const logScanner = new LogScanner(logs);
  149. const execution = new ExecutionContext(logScanner.next() as string);
  150. let log = logScanner.next();
  151. while (log !== null) {
  152. let [event, newProgram, didPop] = this.handleLog(execution, log);
  153. if (event) {
  154. callback(event);
  155. }
  156. if (newProgram) {
  157. execution.push(newProgram);
  158. }
  159. if (didPop) {
  160. execution.pop();
  161. }
  162. log = logScanner.next();
  163. }
  164. }
  165. // Main log handler. Returns a three element array of the event, the
  166. // next program that was invoked for CPI, and a boolean indicating if
  167. // a program has completed execution (and thus should be popped off the
  168. // execution stack).
  169. private handleLog(
  170. execution: ExecutionContext,
  171. log: string
  172. ): [Event | null, string | null, boolean] {
  173. // Executing program is this program.
  174. if (
  175. execution.stack.length > 0 &&
  176. execution.program() === this.programId.toString()
  177. ) {
  178. return this.handleProgramLog(log);
  179. }
  180. // Executing program is not this program.
  181. else {
  182. return [null, ...this.handleSystemLog(log)];
  183. }
  184. }
  185. // Handles logs from *this* program.
  186. private handleProgramLog(
  187. log: string
  188. ): [Event | null, string | null, boolean] {
  189. // This is a `msg!` log.
  190. if (log.startsWith("Program log:")) {
  191. const logStr = log.slice(LOG_START_INDEX);
  192. const event = this.coder.events.decode(logStr);
  193. return [event, null, false];
  194. }
  195. // System log.
  196. else {
  197. return [null, ...this.handleSystemLog(log)];
  198. }
  199. }
  200. // Handles logs when the current program being executing is *not* this.
  201. private handleSystemLog(log: string): [string | null, boolean] {
  202. // System component.
  203. const logStart = log.split(":")[0];
  204. // Did the program finish executing?
  205. if (logStart.match(/^Program (.*) success/g) !== null) {
  206. return [null, true];
  207. // Recursive call.
  208. } else if (
  209. logStart.startsWith(`Program ${this.programId.toString()} invoke`)
  210. ) {
  211. return [this.programId.toString(), false];
  212. }
  213. // CPI call.
  214. else if (logStart.includes("invoke")) {
  215. return ["cpi", false]; // Any string will do.
  216. } else {
  217. return [null, false];
  218. }
  219. }
  220. }
  221. // Stack frame execution context, allowing one to track what program is
  222. // executing for a given log.
  223. class ExecutionContext {
  224. stack: string[];
  225. constructor(log: string) {
  226. // Assumes the first log in every transaction is an `invoke` log from the
  227. // runtime.
  228. const program = /^Program (.*) invoke.*$/g.exec(log)?.[1];
  229. if (!program) {
  230. throw new Error(`Could not find program invocation log line`);
  231. }
  232. this.stack = [program];
  233. }
  234. program(): string {
  235. assert.ok(this.stack.length > 0);
  236. return this.stack[this.stack.length - 1];
  237. }
  238. push(newProgram: string) {
  239. this.stack.push(newProgram);
  240. }
  241. pop() {
  242. assert.ok(this.stack.length > 0);
  243. this.stack.pop();
  244. }
  245. }
  246. class LogScanner {
  247. constructor(public logs: string[]) {}
  248. next(): string | null {
  249. if (this.logs.length === 0) {
  250. return null;
  251. }
  252. let l = this.logs[0];
  253. this.logs = this.logs.slice(1);
  254. return l;
  255. }
  256. }