|
|
@@ -1,8 +1,4 @@
|
|
|
-import {
|
|
|
- ChainId,
|
|
|
- hexToUint8Array,
|
|
|
- uint8ArrayToHex,
|
|
|
-} from "@certusone/wormhole-sdk";
|
|
|
+import { ChainId, uint8ArrayToHex } from "@certusone/wormhole-sdk";
|
|
|
|
|
|
import {
|
|
|
createSpyRPCServiceClient,
|
|
|
@@ -11,6 +7,8 @@ import {
|
|
|
|
|
|
import { importCoreWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";
|
|
|
|
|
|
+import { createHash } from "crypto";
|
|
|
+
|
|
|
import {
|
|
|
getBatchSummary,
|
|
|
parseBatchPriceAttestation,
|
|
|
@@ -25,10 +23,12 @@ import { HexString, PriceFeed } from "@pythnetwork/pyth-sdk-js";
|
|
|
import { sleep, TimestampInSec } from "./helpers";
|
|
|
import { logger } from "./logging";
|
|
|
import { PromClient } from "./promClient";
|
|
|
+import LRUCache from "lru-cache";
|
|
|
|
|
|
export type PriceInfo = {
|
|
|
- vaaBytes: string;
|
|
|
+ vaa: Buffer;
|
|
|
seqNum: number;
|
|
|
+ publishTime: TimestampInSec;
|
|
|
attestationTime: TimestampInSec;
|
|
|
priceFeed: PriceFeed;
|
|
|
emitterChainId: number;
|
|
|
@@ -52,6 +52,8 @@ type ListenerConfig = {
|
|
|
readiness: ListenerReadinessConfig;
|
|
|
};
|
|
|
|
|
|
+type VaaHash = string;
|
|
|
+
|
|
|
export class Listener implements PriceStore {
|
|
|
// Mapping of Price Feed Id to Vaa
|
|
|
private priceFeedVaaMap = new Map<string, PriceInfo>();
|
|
|
@@ -61,6 +63,7 @@ export class Listener implements PriceStore {
|
|
|
private spyConnectionTime: TimestampInSec | undefined;
|
|
|
private readinessConfig: ListenerReadinessConfig;
|
|
|
private updateCallbacks: ((priceInfo: PriceInfo) => any)[];
|
|
|
+ private observedVaas: LRUCache<VaaHash, boolean>;
|
|
|
|
|
|
constructor(config: ListenerConfig, promClient?: PromClient) {
|
|
|
this.promClient = promClient;
|
|
|
@@ -68,6 +71,10 @@ export class Listener implements PriceStore {
|
|
|
this.loadFilters(config.filtersRaw);
|
|
|
this.readinessConfig = config.readiness;
|
|
|
this.updateCallbacks = [];
|
|
|
+ this.observedVaas = new LRUCache({
|
|
|
+ max: 10000, // At most 10000 items
|
|
|
+ ttl: 60 * 1000, // 60 seconds
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
private loadFilters(filtersRaw?: string) {
|
|
|
@@ -114,7 +121,7 @@ export class Listener implements PriceStore {
|
|
|
);
|
|
|
stream = await subscribeSignedVAA(client, { filters: this.filters });
|
|
|
|
|
|
- stream!.on("data", ({ vaaBytes }: { vaaBytes: string }) => {
|
|
|
+ stream!.on("data", ({ vaaBytes }: { vaaBytes: Buffer }) => {
|
|
|
this.processVaa(vaaBytes);
|
|
|
});
|
|
|
|
|
|
@@ -150,19 +157,29 @@ export class Listener implements PriceStore {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- async processVaa(vaaBytes: string) {
|
|
|
+ async processVaa(vaa: Buffer) {
|
|
|
const { parse_vaa } = await importCoreWasm();
|
|
|
- const parsedVAA = parse_vaa(hexToUint8Array(vaaBytes));
|
|
|
+
|
|
|
+ const vaaHash: VaaHash = createHash("md5").update(vaa).digest("base64");
|
|
|
+
|
|
|
+ if (this.observedVaas.has(vaaHash)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ this.observedVaas.set(vaaHash, true);
|
|
|
+ this.promClient?.incReceivedVaa();
|
|
|
+
|
|
|
+ const parsedVaa = parse_vaa(vaa);
|
|
|
|
|
|
let batchAttestation;
|
|
|
|
|
|
try {
|
|
|
batchAttestation = await parseBatchPriceAttestation(
|
|
|
- Buffer.from(parsedVAA.payload)
|
|
|
+ Buffer.from(parsedVaa.payload)
|
|
|
);
|
|
|
} catch (e: any) {
|
|
|
logger.error(e, e.stack);
|
|
|
- logger.error("Parsing failed. Dropping vaa: %o", parsedVAA);
|
|
|
+ logger.error("Parsing failed. Dropping vaa: %o", parsedVaa);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
@@ -194,15 +211,30 @@ export class Listener implements PriceStore {
|
|
|
) {
|
|
|
const priceFeed = priceAttestationToPriceFeed(priceAttestation);
|
|
|
const priceInfo = {
|
|
|
- seqNum: parsedVAA.sequence,
|
|
|
- vaaBytes,
|
|
|
+ seqNum: parsedVaa.sequence,
|
|
|
+ vaa,
|
|
|
+ publishTime: priceAttestation.publishTime,
|
|
|
attestationTime: priceAttestation.attestationTime,
|
|
|
priceFeed,
|
|
|
- emitterChainId: parsedVAA.emitter_chain,
|
|
|
+ emitterChainId: parsedVaa.emitter_chain,
|
|
|
priceServiceReceiveTime: Math.floor(new Date().getTime() / 1000),
|
|
|
};
|
|
|
this.priceFeedVaaMap.set(key, priceInfo);
|
|
|
|
|
|
+ if (lastAttestationTime !== undefined) {
|
|
|
+ this.promClient?.addPriceUpdatesAttestationTimeGap(
|
|
|
+ priceAttestation.attestationTime - lastAttestationTime
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ const lastPublishTime = this.priceFeedVaaMap.get(key)?.publishTime;
|
|
|
+
|
|
|
+ if (lastPublishTime !== undefined) {
|
|
|
+ this.promClient?.addPriceUpdatesPublishTimeGap(
|
|
|
+ priceAttestation.publishTime - lastPublishTime
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
for (const callback of this.updateCallbacks) {
|
|
|
callback(priceInfo);
|
|
|
}
|
|
|
@@ -211,16 +243,14 @@ export class Listener implements PriceStore {
|
|
|
|
|
|
logger.info(
|
|
|
"Parsed a new Batch Price Attestation: [" +
|
|
|
- parsedVAA.emitter_chain +
|
|
|
+ parsedVaa.emitter_chain +
|
|
|
":" +
|
|
|
- uint8ArrayToHex(parsedVAA.emitter_address) +
|
|
|
+ uint8ArrayToHex(parsedVaa.emitter_address) +
|
|
|
"], seqNum: " +
|
|
|
- parsedVAA.sequence +
|
|
|
+ parsedVaa.sequence +
|
|
|
", Batch Summary: " +
|
|
|
getBatchSummary(batchAttestation)
|
|
|
);
|
|
|
-
|
|
|
- this.promClient?.incReceivedVaa();
|
|
|
}
|
|
|
|
|
|
getLatestPriceInfo(priceFeedId: string): PriceInfo | undefined {
|