|
|
@@ -15,18 +15,18 @@ let conditionTimeout = 20000;
|
|
|
|
|
|
type PendingPayload = {
|
|
|
vaa_bytes: string;
|
|
|
- pa: helpers.PythPriceAttestation;
|
|
|
+ batchAttestation: helpers.PythBatchPriceAttestation;
|
|
|
receiveTime: Date;
|
|
|
seqNum: number;
|
|
|
};
|
|
|
|
|
|
-let pendingMap = new Map<string, PendingPayload>(); // The key to this is price_id. Note that Map maintains insertion order, not key order.
|
|
|
+let pendingMap = new Map<string, PendingPayload>(); // The key to this is hash of price_ids in the batch attestation. Note that Map maintains insertion order, not key order.
|
|
|
|
|
|
type ProductData = {
|
|
|
key: string;
|
|
|
lastTimePublished: Date;
|
|
|
numTimesPublished: number;
|
|
|
- lastPa: helpers.PythPriceAttestation;
|
|
|
+ lastBatchAttestation: helpers.PythBatchPriceAttestation;
|
|
|
lastResult: any;
|
|
|
};
|
|
|
|
|
|
@@ -35,7 +35,7 @@ type CurrentEntry = {
|
|
|
currObj: ProductData;
|
|
|
};
|
|
|
|
|
|
-let productMap = new Map<string, ProductData>(); // The key to this is price_id
|
|
|
+let productMap = new Map<string, ProductData>(); // The key to this is hash of price_ids in the batch attestation.
|
|
|
|
|
|
let connectionData: main.ConnectionData;
|
|
|
let metrics: PromHelper;
|
|
|
@@ -233,11 +233,11 @@ async function getPendingEventsAlreadyLocked(
|
|
|
while (pendingMap.size !== 0 && currObjs.length < maxPerBatch) {
|
|
|
const first = pendingMap.entries().next();
|
|
|
logger.debug("processing event with key [" + first.value[0] + "]");
|
|
|
- const pendingValue = first.value[1];
|
|
|
- let pendingKey = pendingValue.pa.priceId;
|
|
|
+ const pendingValue: PendingPayload = first.value[1];
|
|
|
+ let pendingKey = helpers.getBatchAttestationHashKey(pendingValue.batchAttestation);
|
|
|
let currObj = productMap.get(pendingKey);
|
|
|
if (currObj) {
|
|
|
- currObj.lastPa = pendingValue.pa;
|
|
|
+ currObj.lastBatchAttestation = pendingValue.batchAttestation;
|
|
|
currObj.lastTimePublished = new Date();
|
|
|
productMap.set(pendingKey, currObj);
|
|
|
logger.debug(
|
|
|
@@ -257,7 +257,7 @@ async function getPendingEventsAlreadyLocked(
|
|
|
);
|
|
|
currObj = {
|
|
|
key: pendingKey,
|
|
|
- lastPa: pendingValue.pa,
|
|
|
+ lastBatchAttestation: pendingValue.batchAttestation,
|
|
|
lastTimePublished: new Date(),
|
|
|
numTimesPublished: 0,
|
|
|
lastResult: "",
|
|
|
@@ -458,17 +458,11 @@ async function finalizeEventsAlreadyLocked(
|
|
|
);
|
|
|
|
|
|
logger.info(
|
|
|
- "complete: priceId: " +
|
|
|
- currEntry.pa.priceId +
|
|
|
- ", seqNum: " +
|
|
|
+ "complete:" +
|
|
|
+ "seqNum: " +
|
|
|
currEntry.seqNum +
|
|
|
- ", price: " +
|
|
|
- helpers.computePrice(currEntry.pa.price, currEntry.pa.exponent) +
|
|
|
- ", ci: " +
|
|
|
- helpers.computePrice(
|
|
|
- currEntry.pa.confidenceInterval,
|
|
|
- currEntry.pa.exponent
|
|
|
- ) +
|
|
|
+ ", price_ids: " +
|
|
|
+ helpers.getBatchSummary(currEntry.batchAttestation) +
|
|
|
", rcv2SendBegin: " +
|
|
|
(sendTime.getTime() - currEntry.receiveTime.getTime()) +
|
|
|
", rcv2SendComplete: " +
|
|
|
@@ -503,18 +497,17 @@ async function finalizeEventsAlreadyLocked(
|
|
|
|
|
|
export async function postEvent(
|
|
|
vaaBytes: any,
|
|
|
- pa: helpers.PythPriceAttestation,
|
|
|
+ batchAttestation: helpers.PythBatchPriceAttestation,
|
|
|
sequence: number,
|
|
|
receiveTime: Date
|
|
|
) {
|
|
|
let event: PendingPayload = {
|
|
|
vaa_bytes: uint8ArrayToHex(vaaBytes),
|
|
|
- pa: pa,
|
|
|
+ batchAttestation: batchAttestation,
|
|
|
receiveTime: receiveTime,
|
|
|
seqNum: sequence,
|
|
|
};
|
|
|
- let pendingKey = pa.priceId;
|
|
|
- // pendingKey = pendingKey + ":" + sequence;
|
|
|
+ let pendingKey = helpers.getBatchAttestationHashKey(batchAttestation);
|
|
|
await mutex.runExclusive(() => {
|
|
|
logger.debug("posting event with key [" + pendingKey + "]");
|
|
|
pendingMap.set(pendingKey, event);
|
|
|
@@ -537,13 +530,7 @@ export async function getStatus() {
|
|
|
}
|
|
|
|
|
|
let item: object = {
|
|
|
- product_id: value.lastPa.productId,
|
|
|
- price_id: value.lastPa.priceId,
|
|
|
- price: helpers.computePrice(value.lastPa.price, value.lastPa.exponent),
|
|
|
- ci: helpers.computePrice(
|
|
|
- value.lastPa.confidenceInterval,
|
|
|
- value.lastPa.exponent
|
|
|
- ),
|
|
|
+ summary: helpers.getBatchSummary(value.lastBatchAttestation),
|
|
|
num_times_published: value.numTimesPublished,
|
|
|
last_time_published: value.lastTimePublished.toISOString(),
|
|
|
result: value.lastResult,
|
|
|
@@ -559,12 +546,11 @@ export async function getStatus() {
|
|
|
|
|
|
// Note that querying the contract does not update the sequence number, so we don't need to be locked.
|
|
|
export async function getPriceData(
|
|
|
- productId: string,
|
|
|
priceId: string
|
|
|
): Promise<any> {
|
|
|
let result: any;
|
|
|
// await mutex.runExclusive(async () => {
|
|
|
- result = await main.query(productId, priceId);
|
|
|
+ result = await main.query(priceId);
|
|
|
// });
|
|
|
|
|
|
return result;
|