esp_radio_rtos_driver/queue.rs
1//! # Queues
2//!
3//! Queues are a synchronization primitive used to communicate between tasks.
4//! They allow tasks to send and receive data in a first-in-first-out (FIFO) manner.
5//!
6//! ## Implementation
7//!
8//! Implement the `QueueImplementation` trait for an object, and use the
9//! `register_queue_implementation` to register that implementation for esp-radio.
10//!
11//! See the [`QueueImplementation`] documentation for more information.
12//!
13//! You may also choose to use the [`CompatQueue`] implementation provided by this crate.
14//!
15//! ## Usage
16//!
17//! Users should use [`QueueHandle`] to interact with queues created by the driver implementation.
18//!
19//! > Note that the only expected user of this crate is esp-radio.
20
21use core::{cell::UnsafeCell, ptr::NonNull};
22
23/// Pointer to an opaque queue created by the driver implementation.
24pub type QueuePtr = NonNull<()>;
25
26unsafe extern "Rust" {
27 fn esp_rtos_queue_create(capacity: usize, item_size: usize) -> QueuePtr;
28 fn esp_rtos_queue_delete(queue: QueuePtr);
29
30 fn esp_rtos_queue_send_to_front(
31 queue: QueuePtr,
32 item: *const u8,
33 timeout_us: Option<u32>,
34 ) -> bool;
35 fn esp_rtos_queue_send_to_back(
36 queue: QueuePtr,
37 item: *const u8,
38 timeout_us: Option<u32>,
39 ) -> bool;
40 fn esp_rtos_queue_try_send_to_back_from_isr(
41 queue: QueuePtr,
42 item: *const u8,
43 higher_prio_task_waken: Option<&mut bool>,
44 ) -> bool;
45 fn esp_rtos_queue_receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool;
46 fn esp_rtos_queue_try_receive_from_isr(
47 queue: QueuePtr,
48 item: *mut u8,
49 higher_prio_task_waken: Option<&mut bool>,
50 ) -> bool;
51 fn esp_rtos_queue_remove(queue: QueuePtr, item: *const u8);
52 fn esp_rtos_queue_messages_waiting(queue: QueuePtr) -> usize;
53}
54
55/// A queue primitive.
56///
57/// The following snippet demonstrates the boilerplate necessary to implement a queue using the
58/// `QueueImplementation` trait:
59///
60/// ```rust,no_run
61/// use esp_radio_rtos_driver::{
62/// queue::{QueueImplementation, QueuePtr},
63/// register_queue_implementation,
64/// };
65///
66/// struct MyQueue {
67/// // Queue implementation details
68/// }
69///
70/// impl QueueImplementation for MyQueue {
71/// fn create(capacity: usize, item_size: usize) -> QueuePtr {
72/// unimplemented!()
73/// }
74///
75/// unsafe fn delete(queue: QueuePtr) {
76/// unimplemented!()
77/// }
78///
79/// unsafe fn send_to_front(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
80/// unimplemented!()
81/// }
82///
83/// unsafe fn send_to_back(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
84/// unimplemented!()
85/// }
86///
87/// unsafe fn try_send_to_back_from_isr(
88/// queue: QueuePtr,
89/// item: *const u8,
90/// higher_prio_task_waken: Option<&mut bool>,
91/// ) -> bool {
92/// unimplemented!()
93/// }
94///
95/// unsafe fn receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool {
96/// unimplemented!()
97/// }
98///
99/// unsafe fn try_receive_from_isr(
100/// queue: QueuePtr,
101/// item: *mut u8,
102/// higher_prio_task_waken: Option<&mut bool>,
103/// ) -> bool {
104/// unimplemented!()
105/// }
106///
107/// unsafe fn remove(queue: QueuePtr, item: *const u8) {
108/// unimplemented!()
109/// }
110///
111/// fn messages_waiting(queue: QueuePtr) -> usize {
112/// unimplemented!()
113/// }
114/// }
115///
116/// register_queue_implementation!(MyQueue);
117/// ```
118pub trait QueueImplementation {
119 /// Creates a new, empty queue instance.
120 ///
121 /// The queue must have a capacity for `capacity` number of `item_size` byte items.
122 fn create(capacity: usize, item_size: usize) -> QueuePtr;
123
124 /// Deletes a queue instance.
125 ///
126 /// # Safety
127 ///
128 /// `queue` must be a pointer returned from [`Self::create`].
129 unsafe fn delete(queue: QueuePtr);
130
131 /// Enqueues a high-priority item.
132 ///
133 /// If the queue is full, this function will block for the given timeout. If timeout is None,
134 /// the function will block indefinitely.
135 ///
136 /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
137 ///
138 /// # Safety
139 ///
140 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
141 /// a size equal to the queue's item size.
142 unsafe fn send_to_front(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool;
143
144 /// Enqueues an item.
145 ///
146 /// If the queue is full, this function will block for the given timeout. If timeout is None,
147 /// the function will block indefinitely.
148 ///
149 /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
150 ///
151 /// # Safety
152 ///
153 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
154 /// a size equal to the queue's item size.
155 unsafe fn send_to_back(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool;
156
157 /// Attempts to enqueues an item.
158 ///
159 /// If the queue is full, this function will immediately return `false`.
160 ///
161 /// The `higher_prio_task_waken` parameter is an optional mutable reference to a boolean flag.
162 /// If the flag is `Some`, the implementation may set it to `true` to request a context switch.
163 ///
164 /// # Safety
165 ///
166 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
167 /// a size equal to the queue's item size.
168 unsafe fn try_send_to_back_from_isr(
169 queue: QueuePtr,
170 item: *const u8,
171 higher_prio_task_waken: Option<&mut bool>,
172 ) -> bool;
173
174 /// Dequeues an item from the queue.
175 ///
176 /// If the queue is empty, this function will block for the given timeout. If timeout is None,
177 /// the function will block indefinitely.
178 ///
179 /// This function returns `true` if the item was successfully dequeued, `false` otherwise.
180 ///
181 /// # Safety
182 ///
183 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
184 /// a size equal to the queue's item size.
185 unsafe fn receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool;
186
187 /// Attempts to dequeue an item from the queue.
188 ///
189 /// If the queue is empty, this function will return `false` immediately.
190 ///
191 /// The `higher_prio_task_waken` parameter is an optional mutable reference to a boolean flag.
192 /// If the flag is `Some`, the implementation may set it to `true` to request a context switch.
193 ///
194 /// This function returns `true` if the item was successfully dequeued, `false` otherwise.
195 ///
196 /// # Safety
197 ///
198 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
199 /// a size equal to the queue's item size.
200 unsafe fn try_receive_from_isr(
201 queue: QueuePtr,
202 item: *mut u8,
203 higher_prio_task_waken: Option<&mut bool>,
204 ) -> bool;
205
206 /// Removes an item from the queue.
207 ///
208 /// # Safety
209 ///
210 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
211 /// a size equal to the queue's item size.
212 unsafe fn remove(queue: QueuePtr, item: *const u8);
213
214 /// Returns the number of messages in the queue.
215 fn messages_waiting(queue: QueuePtr) -> usize;
216}
217
218#[macro_export]
219macro_rules! register_queue_implementation {
220 ($t: ty) => {
221 #[unsafe(no_mangle)]
222 #[inline]
223 fn esp_rtos_queue_create(capacity: usize, item_size: usize) -> $crate::queue::QueuePtr {
224 <$t as $crate::queue::QueueImplementation>::create(capacity, item_size)
225 }
226
227 #[unsafe(no_mangle)]
228 #[inline]
229 fn esp_rtos_queue_delete(queue: $crate::queue::QueuePtr) {
230 unsafe { <$t as $crate::queue::QueueImplementation>::delete(queue) }
231 }
232
233 #[unsafe(no_mangle)]
234 #[inline]
235 fn esp_rtos_queue_send_to_front(
236 queue: QueuePtr,
237 item: *const u8,
238 timeout_us: Option<u32>,
239 ) -> bool {
240 unsafe {
241 <$t as $crate::queue::QueueImplementation>::send_to_front(queue, item, timeout_us)
242 }
243 }
244
245 #[unsafe(no_mangle)]
246 #[inline]
247 fn esp_rtos_queue_send_to_back(
248 queue: QueuePtr,
249 item: *const u8,
250 timeout_us: Option<u32>,
251 ) -> bool {
252 unsafe {
253 <$t as $crate::queue::QueueImplementation>::send_to_back(queue, item, timeout_us)
254 }
255 }
256
257 #[unsafe(no_mangle)]
258 #[inline]
259 fn esp_rtos_queue_try_send_to_back_from_isr(
260 queue: QueuePtr,
261 item: *const u8,
262 higher_prio_task_waken: Option<&mut bool>,
263 ) -> bool {
264 unsafe {
265 <$t as $crate::queue::QueueImplementation>::try_send_to_back_from_isr(
266 queue,
267 item,
268 higher_prio_task_waken,
269 )
270 }
271 }
272
273 #[unsafe(no_mangle)]
274 #[inline]
275 fn esp_rtos_queue_receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool {
276 unsafe { <$t as $crate::queue::QueueImplementation>::receive(queue, item, timeout_us) }
277 }
278
279 #[unsafe(no_mangle)]
280 #[inline]
281 fn esp_rtos_queue_try_receive_from_isr(
282 queue: QueuePtr,
283 item: *mut u8,
284 higher_prio_task_waken: Option<&mut bool>,
285 ) -> bool {
286 unsafe {
287 <$t as $crate::queue::QueueImplementation>::try_receive_from_isr(
288 queue,
289 item,
290 higher_prio_task_waken,
291 )
292 }
293 }
294
295 #[unsafe(no_mangle)]
296 #[inline]
297 fn esp_rtos_queue_remove(queue: QueuePtr, item: *mut u8) {
298 unsafe { <$t as $crate::queue::QueueImplementation>::remove(queue, item) }
299 }
300
301 #[unsafe(no_mangle)]
302 #[inline]
303 fn esp_rtos_queue_messages_waiting(queue: QueuePtr) -> usize {
304 unsafe { <$t as $crate::queue::QueueImplementation>::messages_waiting(queue) }
305 }
306 };
307}
308
309/// Queue handle.
310///
311/// This handle is used to interact with queues created by the driver implementation.
312#[repr(transparent)]
313pub struct QueueHandle(QueuePtr);
314impl QueueHandle {
315 /// Creates a new queue instance.
316 #[inline]
317 pub fn new(capacity: usize, item_size: usize) -> Self {
318 let ptr = unsafe { esp_rtos_queue_create(capacity, item_size) };
319 Self(ptr)
320 }
321
322 /// Converts this object into a pointer without dropping it.
323 #[inline]
324 pub fn leak(self) -> QueuePtr {
325 let ptr = self.0;
326 core::mem::forget(self);
327 ptr
328 }
329
330 /// Recovers the object from a leaked pointer.
331 ///
332 /// # Safety
333 ///
334 /// - The caller must only use pointers created using [`Self::leak`].
335 /// - The caller must ensure the pointer is not shared.
336 #[inline]
337 pub unsafe fn from_ptr(ptr: QueuePtr) -> Self {
338 Self(ptr)
339 }
340
341 /// Creates a reference to this object from a leaked pointer.
342 ///
343 /// This function is used in the esp-radio code to interact with the queue.
344 ///
345 /// # Safety
346 ///
347 /// - The caller must only use pointers created using [`Self::leak`].
348 #[inline]
349 pub unsafe fn ref_from_ptr(ptr: &QueuePtr) -> &Self {
350 unsafe { core::mem::transmute(ptr) }
351 }
352
353 /// Enqueues a high-priority item.
354 ///
355 /// If the queue is full, this function will block for the given timeout. If timeout is None,
356 /// the function will block indefinitely.
357 ///
358 /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
359 ///
360 /// # Safety
361 ///
362 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
363 /// a size equal to the queue's item size.
364 #[inline]
365 pub unsafe fn send_to_front(&self, item: *const u8, timeout_us: Option<u32>) -> bool {
366 unsafe { esp_rtos_queue_send_to_front(self.0, item, timeout_us) }
367 }
368
369 /// Enqueues an item.
370 ///
371 /// If the queue is full, this function will block for the given timeout. If timeout is None,
372 /// the function will block indefinitely.
373 ///
374 /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
375 ///
376 /// # Safety
377 ///
378 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
379 /// a size equal to the queue's item size.
380 #[inline]
381 pub unsafe fn send_to_back(&self, item: *const u8, timeout_us: Option<u32>) -> bool {
382 unsafe { esp_rtos_queue_send_to_back(self.0, item, timeout_us) }
383 }
384
385 /// Attempts to enqueues an item.
386 ///
387 /// If the queue is full, this function will immediately return `false`.
388 ///
389 /// If a higher priority task is woken up by this operation, the `higher_prio_task_waken` flag
390 /// is set to `true`.
391 ///
392 /// # Safety
393 ///
394 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
395 /// a size equal to the queue's item size.
396 #[inline]
397 pub unsafe fn try_send_to_back_from_isr(
398 &self,
399 item: *const u8,
400 higher_priority_task_waken: Option<&mut bool>,
401 ) -> bool {
402 unsafe {
403 esp_rtos_queue_try_send_to_back_from_isr(self.0, item, higher_priority_task_waken)
404 }
405 }
406
407 /// Dequeues an item from the queue.
408 ///
409 /// If the queue is empty, this function will block for the given timeout. If timeout is None,
410 /// the function will block indefinitely.
411 ///
412 /// This function returns `true` if the item was successfully dequeued, `false` otherwise.
413 ///
414 /// # Safety
415 ///
416 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
417 /// a size equal to the queue's item size.
418 #[inline]
419 pub unsafe fn receive(&self, item: *mut u8, timeout_us: Option<u32>) -> bool {
420 unsafe { esp_rtos_queue_receive(self.0, item, timeout_us) }
421 }
422
423 /// Attempts to dequeue an item from the queue.
424 ///
425 /// If the queue is empty, this function will return `false` immediately.
426 ///
427 /// This function returns `true` if the item was successfully dequeued, `false` otherwise.
428 ///
429 /// If a higher priority task is woken up by this operation, the `higher_prio_task_waken` flag
430 /// is set to `true`.
431 ///
432 /// # Safety
433 ///
434 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
435 /// a size equal to the queue's item size.
436 #[inline]
437 pub unsafe fn try_receive_from_isr(
438 &self,
439 item: *mut u8,
440 higher_priority_task_waken: Option<&mut bool>,
441 ) -> bool {
442 unsafe { esp_rtos_queue_try_receive_from_isr(self.0, item, higher_priority_task_waken) }
443 }
444
445 /// Removes an item from the queue.
446 ///
447 /// # Safety
448 ///
449 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
450 /// a size equal to the queue's item size.
451 #[inline]
452 pub unsafe fn remove(&self, item: *const u8) {
453 unsafe { esp_rtos_queue_remove(self.0, item) }
454 }
455
456 /// Returns the number of messages in the queue.
457 #[inline]
458 pub fn messages_waiting(&self) -> usize {
459 unsafe { esp_rtos_queue_messages_waiting(self.0) }
460 }
461}
462
463impl Drop for QueueHandle {
464 #[inline]
465 fn drop(&mut self) {
466 unsafe { esp_rtos_queue_delete(self.0) };
467 }
468}
469
470use alloc::{boxed::Box, vec};
471
472use crate::semaphore::{SemaphoreHandle, SemaphoreKind};
473
474struct QueueInner {
475 storage: Box<[u8]>,
476 item_size: usize,
477 capacity: usize,
478 count: usize,
479 current_read: usize,
480 current_write: usize,
481}
482
483impl QueueInner {
484 fn get(&self, index: usize) -> &[u8] {
485 let item_start = self.item_size * index;
486 &self.storage[item_start..][..self.item_size]
487 }
488
489 fn get_mut(&mut self, index: usize) -> &mut [u8] {
490 let item_start = self.item_size * index;
491 &mut self.storage[item_start..][..self.item_size]
492 }
493
494 fn len(&self) -> usize {
495 self.count
496 }
497
498 fn send_to_back(&mut self, item: *const u8) {
499 let item = unsafe { core::slice::from_raw_parts(item, self.item_size) };
500
501 let dst = self.get_mut(self.current_write);
502 dst.copy_from_slice(item);
503
504 self.current_write = (self.current_write + 1) % self.capacity;
505 self.count += 1;
506 }
507
508 fn send_to_front(&mut self, item: *const u8) {
509 let item = unsafe { core::slice::from_raw_parts(item, self.item_size) };
510
511 self.current_read = (self.current_read + self.capacity - 1) % self.capacity;
512
513 let dst = self.get_mut(self.current_read);
514 dst.copy_from_slice(item);
515
516 self.count += 1;
517 }
518
519 fn read_from_front(&mut self, dst: *mut u8) {
520 let dst = unsafe { core::slice::from_raw_parts_mut(dst, self.item_size) };
521
522 let src = self.get(self.current_read);
523 dst.copy_from_slice(src);
524
525 self.current_read = (self.current_read + 1) % self.capacity;
526 self.count -= 1;
527 }
528
529 fn remove(&mut self, item: *const u8) -> bool {
530 let count = self.len();
531
532 if count == 0 {
533 return false;
534 }
535
536 let mut tmp_item = vec![0; self.item_size];
537
538 let mut found = false;
539 let item_slice = unsafe { core::slice::from_raw_parts(item, self.item_size) };
540 for _ in 0..count {
541 self.read_from_front(tmp_item.as_mut_ptr().cast());
542
543 if found || &tmp_item[..] != item_slice {
544 self.send_to_back(tmp_item.as_mut_ptr().cast());
545 } else {
546 found = true;
547 }
548
549 // Note that even if we find our item, we'll need to keep cycling through everything to
550 // keep insertion order.
551 }
552
553 found
554 }
555}
556
557/// A suitable queue implementation that only requires semaphores from the OS.
558///
559/// Register in your OS implementation by adding the following code:
560///
561/// ```rust
562/// use esp_radio_rtos_driver::{queue::CompatQueue, register_queue_implementation};
563///
564/// register_queue_implementation!(CompatQueue);
565/// ```
566pub struct CompatQueue {
567 /// Allows interior mutability for the queue's inner state, when the mutex is held.
568 inner: UnsafeCell<QueueInner>,
569
570 semaphore_empty: SemaphoreHandle,
571 semaphore_full: SemaphoreHandle,
572 mutex: SemaphoreHandle,
573}
574
575impl CompatQueue {
576 fn new(capacity: usize, item_size: usize) -> Self {
577 let storage = vec![0; capacity * item_size].into_boxed_slice();
578 let semaphore_empty = SemaphoreHandle::new(SemaphoreKind::Counting {
579 max: capacity as u32,
580 initial: capacity as u32,
581 });
582 let semaphore_full = SemaphoreHandle::new(SemaphoreKind::Counting {
583 max: capacity as u32,
584 initial: 0,
585 });
586 let mutex = SemaphoreHandle::new(SemaphoreKind::Mutex);
587 Self {
588 inner: UnsafeCell::new(QueueInner {
589 storage,
590 item_size,
591 capacity,
592 count: 0,
593 current_read: 0,
594 current_write: 0,
595 }),
596 semaphore_empty,
597 semaphore_full,
598 mutex,
599 }
600 }
601
602 unsafe fn from_ptr<'a>(ptr: QueuePtr) -> &'a Self {
603 unsafe { ptr.cast::<Self>().as_ref() }
604 }
605}
606
607impl QueueImplementation for CompatQueue {
608 fn create(capacity: usize, item_size: usize) -> QueuePtr {
609 let q = Box::new(CompatQueue::new(capacity, item_size));
610 NonNull::from(Box::leak(q)).cast()
611 }
612
613 unsafe fn delete(queue: QueuePtr) {
614 let q = unsafe { Box::from_raw(queue.cast::<CompatQueue>().as_ptr()) };
615 core::mem::drop(q);
616 }
617
618 unsafe fn send_to_front(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
619 let queue = unsafe { CompatQueue::from_ptr(queue) };
620
621 if queue.semaphore_empty.take(timeout_us) {
622 // The inner mutex shouldn't be held for a long time, but we still shouldn't block
623 // indefinitely.
624 if queue.mutex.take(timeout_us) {
625 let inner = unsafe { &mut *queue.inner.get() };
626 inner.send_to_front(item);
627
628 queue.mutex.give();
629 queue.semaphore_full.give();
630 true
631 } else {
632 queue.semaphore_empty.give();
633 false
634 }
635 } else {
636 false
637 }
638 }
639
640 unsafe fn send_to_back(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
641 let queue = unsafe { CompatQueue::from_ptr(queue) };
642
643 if queue.semaphore_empty.take(timeout_us) {
644 // The inner mutex shouldn't be held for a long time, but we still shouldn't block
645 // indefinitely.
646 if queue.mutex.take(timeout_us) {
647 let inner = unsafe { &mut *queue.inner.get() };
648 inner.send_to_back(item);
649
650 queue.mutex.give();
651 queue.semaphore_full.give();
652 true
653 } else {
654 queue.semaphore_empty.give();
655 false
656 }
657 } else {
658 false
659 }
660 }
661
662 unsafe fn try_send_to_back_from_isr(
663 queue: QueuePtr,
664 item: *const u8,
665 mut higher_prio_task_waken: Option<&mut bool>,
666 ) -> bool {
667 let queue = unsafe { CompatQueue::from_ptr(queue) };
668
669 if queue
670 .semaphore_empty
671 .try_take_from_isr(higher_prio_task_waken.as_deref_mut())
672 {
673 if queue
674 .mutex
675 .try_take_from_isr(higher_prio_task_waken.as_deref_mut())
676 {
677 let inner = unsafe { &mut *queue.inner.get() };
678 inner.send_to_back(item);
679
680 queue
681 .mutex
682 .try_give_from_isr(higher_prio_task_waken.as_deref_mut());
683 queue
684 .semaphore_full
685 .try_give_from_isr(higher_prio_task_waken);
686 true
687 } else {
688 queue
689 .semaphore_empty
690 .try_give_from_isr(higher_prio_task_waken);
691 false
692 }
693 } else {
694 false
695 }
696 }
697
698 unsafe fn receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool {
699 let queue = unsafe { CompatQueue::from_ptr(queue) };
700
701 if queue.semaphore_full.take(timeout_us) {
702 if queue.mutex.take(timeout_us) {
703 let inner = unsafe { &mut *queue.inner.get() };
704 inner.read_from_front(item);
705
706 queue.mutex.give();
707 queue.semaphore_empty.give();
708 true
709 } else {
710 queue.semaphore_full.give();
711 false
712 }
713 } else {
714 false
715 }
716 }
717
718 unsafe fn try_receive_from_isr(
719 queue: QueuePtr,
720 item: *mut u8,
721 mut higher_prio_task_waken: Option<&mut bool>,
722 ) -> bool {
723 let queue = unsafe { CompatQueue::from_ptr(queue) };
724
725 if queue
726 .semaphore_full
727 .try_take_from_isr(higher_prio_task_waken.as_deref_mut())
728 {
729 if queue
730 .mutex
731 .try_take_from_isr(higher_prio_task_waken.as_deref_mut())
732 {
733 let inner = unsafe { &mut *queue.inner.get() };
734 inner.read_from_front(item);
735
736 queue
737 .mutex
738 .try_give_from_isr(higher_prio_task_waken.as_deref_mut());
739 queue
740 .semaphore_empty
741 .try_give_from_isr(higher_prio_task_waken);
742 true
743 } else {
744 queue
745 .semaphore_full
746 .try_give_from_isr(higher_prio_task_waken);
747 false
748 }
749 } else {
750 false
751 }
752 }
753
754 unsafe fn remove(queue: QueuePtr, item: *const u8) {
755 let queue = unsafe { CompatQueue::from_ptr(queue) };
756
757 if queue.semaphore_full.take(Some(0)) {
758 queue.mutex.take(None);
759
760 let inner = unsafe { &mut *queue.inner.get() };
761 let item_removed = inner.remove(item);
762
763 queue.mutex.give();
764
765 if item_removed {
766 queue.semaphore_empty.give();
767 } else {
768 queue.semaphore_full.give();
769 }
770 }
771 }
772
773 fn messages_waiting(queue: QueuePtr) -> usize {
774 let queue = unsafe { CompatQueue::from_ptr(queue) };
775
776 queue.semaphore_full.current_count() as usize
777 }
778}