123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 |
- import { PublicKey } from "@solana/web3.js";
- import * as assert from "assert";
- import { IdlEvent, IdlEventField } from "../idl.js";
- import { Coder } from "../coder/index.js";
- import { DecodeType } from "./namespace/types.js";
- import Provider from "../provider.js";
- const LOG_START_INDEX = "Program log: ".length;
- // Deserialized event.
- export type Event<
- E extends IdlEvent = IdlEvent,
- Defined = Record<string, never>
- > = {
- name: E["name"];
- data: EventData<E["fields"][number], Defined>;
- };
- export type EventData<T extends IdlEventField, Defined> = {
- [N in T["name"]]: DecodeType<(T & { name: N })["type"], Defined>;
- };
- type EventCallback = (event: any, slot: number) => void;
- export class EventManager {
- /**
- * Program ID for event subscriptions.
- */
- private _programId: PublicKey;
- /**
- * Network and wallet provider.
- */
- private _provider: Provider;
- /**
- * Event parser to handle onLogs callbacks.
- */
- private _eventParser: EventParser;
- /**
- * Maps event listener id to [event-name, callback].
- */
- private _eventCallbacks: Map<number, [string, EventCallback]>;
- /**
- * Maps event name to all listeners for the event.
- */
- private _eventListeners: Map<string, Array<number>>;
- /**
- * The next listener id to allocate.
- */
- private _listenerIdCount: number;
- /**
- * The subscription id from the connection onLogs subscription.
- */
- private _onLogsSubscriptionId: number | undefined;
- constructor(programId: PublicKey, provider: Provider, coder: Coder) {
- this._programId = programId;
- this._provider = provider;
- this._eventParser = new EventParser(programId, coder);
- this._eventCallbacks = new Map();
- this._eventListeners = new Map();
- this._listenerIdCount = 0;
- }
- public addEventListener(
- eventName: string,
- callback: (event: any, slot: number) => void
- ): number {
- let listener = this._listenerIdCount;
- this._listenerIdCount += 1;
- // Store the listener into the event map.
- if (!(eventName in this._eventCallbacks)) {
- this._eventListeners.set(eventName, []);
- }
- this._eventListeners.set(
- eventName,
- (this._eventListeners.get(eventName) ?? []).concat(listener)
- );
- // Store the callback into the listener map.
- this._eventCallbacks.set(listener, [eventName, callback]);
- // Create the subscription singleton, if needed.
- if (this._onLogsSubscriptionId !== undefined) {
- return listener;
- }
- this._onLogsSubscriptionId = this._provider!.connection.onLogs(
- this._programId,
- (logs, ctx) => {
- if (logs.err) {
- return;
- }
- this._eventParser.parseLogs(logs.logs, (event) => {
- const allListeners = this._eventListeners.get(event.name);
- if (allListeners) {
- allListeners.forEach((listener) => {
- const listenerCb = this._eventCallbacks.get(listener);
- if (listenerCb) {
- const [, callback] = listenerCb;
- callback(event.data, ctx.slot);
- }
- });
- }
- });
- }
- );
- return listener;
- }
- public async removeEventListener(listener: number): Promise<void> {
- // Get the callback.
- const callback = this._eventCallbacks.get(listener);
- if (!callback) {
- throw new Error(`Event listener ${listener} doesn't exist!`);
- }
- const [eventName] = callback;
- // Get the listeners.
- let listeners = this._eventListeners.get(eventName);
- if (!listeners) {
- throw new Error(`Event listeners don't exist for ${eventName}!`);
- }
- // Update both maps.
- this._eventCallbacks.delete(listener);
- listeners = listeners.filter((l) => l !== listener);
- if (listeners.length === 0) {
- this._eventListeners.delete(eventName);
- }
- // Kill the websocket connection if all listeners have been removed.
- if (this._eventCallbacks.size == 0) {
- assert.ok(this._eventListeners.size === 0);
- if (this._onLogsSubscriptionId !== undefined) {
- await this._provider!.connection.removeOnLogsListener(
- this._onLogsSubscriptionId
- );
- this._onLogsSubscriptionId = undefined;
- }
- }
- }
- }
- export class EventParser {
- private coder: Coder;
- private programId: PublicKey;
- constructor(programId: PublicKey, coder: Coder) {
- this.coder = coder;
- this.programId = programId;
- }
- // Each log given, represents an array of messages emitted by
- // a single transaction, which can execute many different programs across
- // CPI boundaries. However, the subscription is only interested in the
- // events emitted by *this* program. In achieving this, we keep track of the
- // program execution context by parsing each log and looking for a CPI
- // `invoke` call. If one exists, we know a new program is executing. So we
- // push the programId onto a stack and switch the program context. This
- // allows us to track, for a given log, which program was executing during
- // its emission, thereby allowing us to know if a given log event was
- // emitted by *this* program. If it was, then we parse the raw string and
- // emit the event if the string matches the event being subscribed to.
- public parseLogs(logs: string[], callback: (log: Event) => void) {
- const logScanner = new LogScanner(logs);
- const execution = new ExecutionContext(logScanner.next() as string);
- let log = logScanner.next();
- while (log !== null) {
- let [event, newProgram, didPop] = this.handleLog(execution, log);
- if (event) {
- callback(event);
- }
- if (newProgram) {
- execution.push(newProgram);
- }
- if (didPop) {
- execution.pop();
- }
- log = logScanner.next();
- }
- }
- // Main log handler. Returns a three element array of the event, the
- // next program that was invoked for CPI, and a boolean indicating if
- // a program has completed execution (and thus should be popped off the
- // execution stack).
- private handleLog(
- execution: ExecutionContext,
- log: string
- ): [Event | null, string | null, boolean] {
- // Executing program is this program.
- if (
- execution.stack.length > 0 &&
- execution.program() === this.programId.toString()
- ) {
- return this.handleProgramLog(log);
- }
- // Executing program is not this program.
- else {
- return [null, ...this.handleSystemLog(log)];
- }
- }
- // Handles logs from *this* program.
- private handleProgramLog(
- log: string
- ): [Event | null, string | null, boolean] {
- // This is a `msg!` log.
- if (log.startsWith("Program log:")) {
- const logStr = log.slice(LOG_START_INDEX);
- const event = this.coder.events.decode(logStr);
- return [event, null, false];
- }
- // System log.
- else {
- return [null, ...this.handleSystemLog(log)];
- }
- }
- // Handles logs when the current program being executing is *not* this.
- private handleSystemLog(log: string): [string | null, boolean] {
- // System component.
- const logStart = log.split(":")[0];
- // Did the program finish executing?
- if (logStart.match(/^Program (.*) success/g) !== null) {
- return [null, true];
- // Recursive call.
- } else if (
- logStart.startsWith(`Program ${this.programId.toString()} invoke`)
- ) {
- return [this.programId.toString(), false];
- }
- // CPI call.
- else if (logStart.includes("invoke")) {
- return ["cpi", false]; // Any string will do.
- } else {
- return [null, false];
- }
- }
- }
- // Stack frame execution context, allowing one to track what program is
- // executing for a given log.
- class ExecutionContext {
- stack: string[];
- constructor(log: string) {
- // Assumes the first log in every transaction is an `invoke` log from the
- // runtime.
- const program = /^Program (.*) invoke.*$/g.exec(log)?.[1];
- if (!program) {
- throw new Error(`Could not find program invocation log line`);
- }
- this.stack = [program];
- }
- program(): string {
- assert.ok(this.stack.length > 0);
- return this.stack[this.stack.length - 1];
- }
- push(newProgram: string) {
- this.stack.push(newProgram);
- }
- pop() {
- assert.ok(this.stack.length > 0);
- this.stack.pop();
- }
- }
- class LogScanner {
- constructor(public logs: string[]) {}
- next(): string | null {
- if (this.logs.length === 0) {
- return null;
- }
- let l = this.logs[0];
- this.logs = this.logs.slice(1);
- return l;
- }
- }
|