|
@@ -323,27 +323,27 @@ where
|
|
|
// Update the aggregate state
|
|
// Update the aggregate state
|
|
|
let mut aggregate_state = self.into().data.write().await;
|
|
let mut aggregate_state = self.into().data.write().await;
|
|
|
|
|
|
|
|
- // Check if the update is new or out of order
|
|
|
|
|
- match aggregate_state.latest_completed_slot {
|
|
|
|
|
|
|
+ // Send update event to subscribers. We are purposefully ignoring the result
|
|
|
|
|
+ // because there might be no subscribers.
|
|
|
|
|
+ let _ = match aggregate_state.latest_completed_slot {
|
|
|
None => {
|
|
None => {
|
|
|
aggregate_state.latest_completed_slot.replace(slot);
|
|
aggregate_state.latest_completed_slot.replace(slot);
|
|
|
self.into()
|
|
self.into()
|
|
|
.api_update_tx
|
|
.api_update_tx
|
|
|
- .send(AggregationEvent::New { slot })?;
|
|
|
|
|
|
|
+ .send(AggregationEvent::New { slot })
|
|
|
}
|
|
}
|
|
|
Some(latest) if slot > latest => {
|
|
Some(latest) if slot > latest => {
|
|
|
self.prune_removed_keys(message_state_keys).await;
|
|
self.prune_removed_keys(message_state_keys).await;
|
|
|
aggregate_state.latest_completed_slot.replace(slot);
|
|
aggregate_state.latest_completed_slot.replace(slot);
|
|
|
self.into()
|
|
self.into()
|
|
|
.api_update_tx
|
|
.api_update_tx
|
|
|
- .send(AggregationEvent::New { slot })?;
|
|
|
|
|
|
|
+ .send(AggregationEvent::New { slot })
|
|
|
}
|
|
}
|
|
|
- _ => {
|
|
|
|
|
- self.into()
|
|
|
|
|
- .api_update_tx
|
|
|
|
|
- .send(AggregationEvent::OutOfOrder { slot })?;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ _ => self
|
|
|
|
|
+ .into()
|
|
|
|
|
+ .api_update_tx
|
|
|
|
|
+ .send(AggregationEvent::OutOfOrder { slot }),
|
|
|
|
|
+ };
|
|
|
|
|
|
|
|
aggregate_state.latest_completed_slot = aggregate_state
|
|
aggregate_state.latest_completed_slot = aggregate_state
|
|
|
.latest_completed_slot
|
|
.latest_completed_slot
|