|
@@ -127,6 +127,10 @@ impl Shred {
|
|
|
dispatch!(fn merkle_root(&self) -> Result<Hash, Error>);
|
|
dispatch!(fn merkle_root(&self) -> Result<Hash, Error>);
|
|
|
dispatch!(fn proof_size(&self) -> Result<u8, Error>);
|
|
dispatch!(fn proof_size(&self) -> Result<u8, Error>);
|
|
|
|
|
|
|
|
|
|
+ fn fec_set_index(&self) -> u32 {
|
|
|
|
|
+ self.common_header().fec_set_index
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
fn index(&self) -> u32 {
|
|
fn index(&self) -> u32 {
|
|
|
self.common_header().index
|
|
self.common_header().index
|
|
|
}
|
|
}
|
|
@@ -1098,17 +1102,29 @@ pub(super) fn make_shreds_from_data(
|
|
|
// If shreds.is_empty() then the data argument was empty. In that case we
|
|
// If shreds.is_empty() then the data argument was empty. In that case we
|
|
|
// want to generate one data shred with empty data.
|
|
// want to generate one data shred with empty data.
|
|
|
if !data.is_empty() || shreds.is_empty() {
|
|
if !data.is_empty() || shreds.is_empty() {
|
|
|
|
|
+ // Should generate at least one data shred (which may have no data).
|
|
|
|
|
+ // Last erasure batch should also be padded with empty data shreds to
|
|
|
|
|
+ // make >= 32 data shreds. This gaurantees that the batch cannot be
|
|
|
|
|
+ // recovered unless 32+ shreds are received from turbine or repair.
|
|
|
|
|
+ let min_num_data_shreds = if is_last_in_slot {
|
|
|
|
|
+ DATA_SHREDS_PER_FEC_BLOCK
|
|
|
|
|
+ } else {
|
|
|
|
|
+ 1
|
|
|
|
|
+ };
|
|
|
// Find the Merkle proof_size and data_buffer_size
|
|
// Find the Merkle proof_size and data_buffer_size
|
|
|
// which can embed the remaining data.
|
|
// which can embed the remaining data.
|
|
|
- let (proof_size, data_buffer_size) = (1u8..32)
|
|
|
|
|
|
|
+ let (proof_size, data_buffer_size, num_data_shreds) = (1u8..32)
|
|
|
.find_map(|proof_size| {
|
|
.find_map(|proof_size| {
|
|
|
let data_buffer_size = ShredData::capacity(proof_size, chained, resigned).ok()?;
|
|
let data_buffer_size = ShredData::capacity(proof_size, chained, resigned).ok()?;
|
|
|
let num_data_shreds = (data.len() + data_buffer_size - 1) / data_buffer_size;
|
|
let num_data_shreds = (data.len() + data_buffer_size - 1) / data_buffer_size;
|
|
|
- let num_data_shreds = num_data_shreds.max(1);
|
|
|
|
|
|
|
+ let num_data_shreds = num_data_shreds.max(min_num_data_shreds);
|
|
|
let erasure_batch_size =
|
|
let erasure_batch_size =
|
|
|
shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot);
|
|
shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot);
|
|
|
- (proof_size == get_proof_size(erasure_batch_size))
|
|
|
|
|
- .then_some((proof_size, data_buffer_size))
|
|
|
|
|
|
|
+ (proof_size == get_proof_size(erasure_batch_size)).then_some((
|
|
|
|
|
+ proof_size,
|
|
|
|
|
+ data_buffer_size,
|
|
|
|
|
+ num_data_shreds,
|
|
|
|
|
+ ))
|
|
|
})
|
|
})
|
|
|
.ok_or(Error::UnknownProofSize)?;
|
|
.ok_or(Error::UnknownProofSize)?;
|
|
|
common_header.shred_variant = ShredVariant::MerkleData {
|
|
common_header.shred_variant = ShredVariant::MerkleData {
|
|
@@ -1117,13 +1133,11 @@ pub(super) fn make_shreds_from_data(
|
|
|
resigned,
|
|
resigned,
|
|
|
};
|
|
};
|
|
|
common_header.fec_set_index = common_header.index;
|
|
common_header.fec_set_index = common_header.index;
|
|
|
- let chunks = if data.is_empty() {
|
|
|
|
|
- // Generate one data shred with empty data.
|
|
|
|
|
- Either::Left(std::iter::once(data))
|
|
|
|
|
- } else {
|
|
|
|
|
- Either::Right(data.chunks(data_buffer_size))
|
|
|
|
|
- };
|
|
|
|
|
- for shred in chunks {
|
|
|
|
|
|
|
+ for shred in data
|
|
|
|
|
+ .chunks(data_buffer_size)
|
|
|
|
|
+ .chain(std::iter::repeat(&[][..]))
|
|
|
|
|
+ .take(num_data_shreds)
|
|
|
|
|
+ {
|
|
|
let shred = new_shred_data(common_header, data_header, shred);
|
|
let shred = new_shred_data(common_header, data_header, shred);
|
|
|
shreds.push(shred);
|
|
shreds.push(shred);
|
|
|
common_header.index += 1;
|
|
common_header.index += 1;
|
|
@@ -1132,12 +1146,17 @@ pub(super) fn make_shreds_from_data(
|
|
|
stats.data_buffer_residual += data_buffer_size - shred.data()?.len();
|
|
stats.data_buffer_residual += data_buffer_size - shred.data()?.len();
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- // Only the very last shred may have residual data buffer.
|
|
|
|
|
- debug_assert!(shreds.iter().rev().skip(1).all(|shred| {
|
|
|
|
|
- let proof_size = shred.proof_size().unwrap();
|
|
|
|
|
- let capacity = ShredData::capacity(proof_size, chained, resigned).unwrap();
|
|
|
|
|
- shred.data().unwrap().len() == capacity
|
|
|
|
|
- }));
|
|
|
|
|
|
|
+ // Only the trailing data shreds may have residual data buffer.
|
|
|
|
|
+ debug_assert!(shreds
|
|
|
|
|
+ .iter()
|
|
|
|
|
+ .rev()
|
|
|
|
|
+ .skip_while(|shred| is_last_in_slot && shred.data().unwrap().is_empty())
|
|
|
|
|
+ .skip(1)
|
|
|
|
|
+ .all(|shred| {
|
|
|
|
|
+ let proof_size = shred.proof_size().unwrap();
|
|
|
|
|
+ let capacity = ShredData::capacity(proof_size, chained, resigned).unwrap();
|
|
|
|
|
+ shred.data().unwrap().len() == capacity
|
|
|
|
|
+ }));
|
|
|
// Adjust flags for the very last shred.
|
|
// Adjust flags for the very last shred.
|
|
|
if let Some(shred) = shreds.last_mut() {
|
|
if let Some(shred) = shreds.last_mut() {
|
|
|
shred.data_header.flags |= if is_last_in_slot {
|
|
shred.data_header.flags |= if is_last_in_slot {
|
|
@@ -1890,6 +1909,18 @@ mod test {
|
|
|
.contains(ShredFlags::LAST_SHRED_IN_SLOT),
|
|
.contains(ShredFlags::LAST_SHRED_IN_SLOT),
|
|
|
is_last_in_slot
|
|
is_last_in_slot
|
|
|
);
|
|
);
|
|
|
|
|
+ // Assert that the last erasure batch has 32+ data shreds.
|
|
|
|
|
+ if is_last_in_slot {
|
|
|
|
|
+ let fec_set_index = shreds.iter().map(Shred::fec_set_index).max().unwrap();
|
|
|
|
|
+ assert!(
|
|
|
|
|
+ shreds
|
|
|
|
|
+ .iter()
|
|
|
|
|
+ .filter(|shred| shred.fec_set_index() == fec_set_index)
|
|
|
|
|
+ .filter(|shred| shred.shred_type() == ShredType::Data)
|
|
|
|
|
+ .count()
|
|
|
|
|
+ >= 32
|
|
|
|
|
+ )
|
|
|
|
|
+ }
|
|
|
// Assert that data shreds can be recovered from coding shreds.
|
|
// Assert that data shreds can be recovered from coding shreds.
|
|
|
let recovered_data_shreds: Vec<_> = shreds
|
|
let recovered_data_shreds: Vec<_> = shreds
|
|
|
.iter()
|
|
.iter()
|