|
|
@@ -227,7 +227,6 @@ pub(crate) mod external {
|
|
|
|
|
|
pub(crate) struct ExternalWorker {
|
|
|
exit: Arc<AtomicBool>,
|
|
|
- receiver: shaq::Consumer<PackToWorkerMessage>,
|
|
|
consumer: Consumer,
|
|
|
sender: shaq::Producer<WorkerToPackMessage>,
|
|
|
allocator: rts_alloc::Allocator,
|
|
|
@@ -244,7 +243,6 @@ pub(crate) mod external {
|
|
|
pub fn new(
|
|
|
id: u32,
|
|
|
exit: Arc<AtomicBool>,
|
|
|
- receiver: shaq::Consumer<PackToWorkerMessage>,
|
|
|
consumer: Consumer,
|
|
|
sender: shaq::Producer<WorkerToPackMessage>,
|
|
|
allocator: rts_alloc::Allocator,
|
|
|
@@ -253,7 +251,6 @@ pub(crate) mod external {
|
|
|
) -> Self {
|
|
|
Self {
|
|
|
exit,
|
|
|
- receiver,
|
|
|
consumer,
|
|
|
sender,
|
|
|
allocator,
|
|
|
@@ -267,7 +264,10 @@ pub(crate) mod external {
|
|
|
self.metrics.clone()
|
|
|
}
|
|
|
|
|
|
- pub fn run(mut self) -> Result<(), ExternalConsumeWorkerError> {
|
|
|
+ pub fn run(
|
|
|
+ mut self,
|
|
|
+ mut receiver: shaq::Consumer<PackToWorkerMessage>,
|
|
|
+ ) -> Result<(), ExternalConsumeWorkerError> {
|
|
|
let mut should_drain_executes = false;
|
|
|
let mut did_work = false;
|
|
|
let mut last_empty_time = Instant::now();
|
|
|
@@ -275,21 +275,19 @@ pub(crate) mod external {
|
|
|
|
|
|
while !self.exit.load(Ordering::Relaxed) {
|
|
|
self.allocator.clean_remote_free_lists();
|
|
|
- if self.receiver.is_empty() {
|
|
|
- self.receiver.sync();
|
|
|
+ if receiver.is_empty() {
|
|
|
+ receiver.sync();
|
|
|
should_drain_executes = false;
|
|
|
}
|
|
|
|
|
|
- match self.receiver.try_read_ptr() {
|
|
|
+ match receiver.try_read() {
|
|
|
Some(message) => {
|
|
|
did_work = true;
|
|
|
self.sender.sync();
|
|
|
- // SAFETY: `try_read` gives a ptr to a properly aligned
|
|
|
- // region for a `PackToWorkerMessage`
|
|
|
- should_drain_executes |= self
|
|
|
- .process_message(unsafe { message.as_ref() }, should_drain_executes)?;
|
|
|
+ should_drain_executes |=
|
|
|
+ self.process_message(message, should_drain_executes)?;
|
|
|
self.sender.commit();
|
|
|
- self.receiver.finalize();
|
|
|
+ receiver.finalize();
|
|
|
}
|
|
|
None => {
|
|
|
let now = Instant::now();
|