esp_radio_rtos_driver/
queue.rs

1//! # Queues
2//!
3//! Queues are a synchronization primitive used to communicate between tasks.
4//! They allow tasks to send and receive data in a first-in-first-out (FIFO) manner.
5//!
6//! ## Implementation
7//!
8//! Implement the `QueueImplementation` trait for an object, and use the
9//! `register_queue_implementation` to register that implementation for esp-radio.
10//!
11//! See the [`QueueImplementation`] documentation for more information.
12//!
13//! You may also choose to use the [`CompatQueue`] implementation provided by this crate.
14//!
15//! ## Usage
16//!
17//! Users should use [`QueueHandle`] to interact with queues created by the driver implementation.
18//!
19//! > Note that the only expected user of this crate is esp-radio.
20
21use core::{cell::UnsafeCell, ptr::NonNull};
22
23/// Pointer to an opaque queue created by the driver implementation.
24pub type QueuePtr = NonNull<()>;
25
26unsafe extern "Rust" {
27    fn esp_rtos_queue_create(capacity: usize, item_size: usize) -> QueuePtr;
28    fn esp_rtos_queue_delete(queue: QueuePtr);
29
30    fn esp_rtos_queue_send_to_front(
31        queue: QueuePtr,
32        item: *const u8,
33        timeout_us: Option<u32>,
34    ) -> bool;
35    fn esp_rtos_queue_send_to_back(
36        queue: QueuePtr,
37        item: *const u8,
38        timeout_us: Option<u32>,
39    ) -> bool;
40    fn esp_rtos_queue_try_send_to_back_from_isr(
41        queue: QueuePtr,
42        item: *const u8,
43        higher_prio_task_waken: Option<&mut bool>,
44    ) -> bool;
45    fn esp_rtos_queue_receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool;
46    fn esp_rtos_queue_try_receive_from_isr(
47        queue: QueuePtr,
48        item: *mut u8,
49        higher_prio_task_waken: Option<&mut bool>,
50    ) -> bool;
51    fn esp_rtos_queue_remove(queue: QueuePtr, item: *const u8);
52    fn esp_rtos_queue_messages_waiting(queue: QueuePtr) -> usize;
53}
54
55/// A queue primitive.
56///
57/// The following snippet demonstrates the boilerplate necessary to implement a queue using the
58/// `QueueImplementation` trait:
59///
60/// ```rust,no_run
61/// use esp_radio_rtos_driver::{
62///     queue::{QueueImplementation, QueuePtr},
63///     register_queue_implementation,
64/// };
65///
66/// struct MyQueue {
67///     // Queue implementation details
68/// }
69///
70/// impl QueueImplementation for MyQueue {
71///     fn create(capacity: usize, item_size: usize) -> QueuePtr {
72///         unimplemented!()
73///     }
74///
75///     unsafe fn delete(queue: QueuePtr) {
76///         unimplemented!()
77///     }
78///
79///     unsafe fn send_to_front(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
80///         unimplemented!()
81///     }
82///
83///     unsafe fn send_to_back(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
84///         unimplemented!()
85///     }
86///
87///     unsafe fn try_send_to_back_from_isr(
88///         queue: QueuePtr,
89///         item: *const u8,
90///         higher_prio_task_waken: Option<&mut bool>,
91///     ) -> bool {
92///         unimplemented!()
93///     }
94///
95///     unsafe fn receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool {
96///         unimplemented!()
97///     }
98///
99///     unsafe fn try_receive_from_isr(
100///         queue: QueuePtr,
101///         item: *mut u8,
102///         higher_prio_task_waken: Option<&mut bool>,
103///     ) -> bool {
104///         unimplemented!()
105///     }
106///
107///     unsafe fn remove(queue: QueuePtr, item: *const u8) {
108///         unimplemented!()
109///     }
110///
111///     fn messages_waiting(queue: QueuePtr) -> usize {
112///         unimplemented!()
113///     }
114/// }
115///
116/// register_queue_implementation!(MyQueue);
117/// ```
118pub trait QueueImplementation {
119    /// Creates a new, empty queue instance.
120    ///
121    /// The queue must have a capacity for `capacity` number of `item_size` byte items.
122    fn create(capacity: usize, item_size: usize) -> QueuePtr;
123
124    /// Deletes a queue instance.
125    ///
126    /// # Safety
127    ///
128    /// `queue` must be a pointer returned from [`Self::create`].
129    unsafe fn delete(queue: QueuePtr);
130
131    /// Enqueues a high-priority item.
132    ///
133    /// If the queue is full, this function will block for the given timeout. If timeout is None,
134    /// the function will block indefinitely.
135    ///
136    /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
137    ///
138    /// # Safety
139    ///
140    /// The caller must ensure that `item` can be dereferenced and points to an allocation of
141    /// a size equal to the queue's item size.
142    unsafe fn send_to_front(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool;
143
144    /// Enqueues an item.
145    ///
146    /// If the queue is full, this function will block for the given timeout. If timeout is None,
147    /// the function will block indefinitely.
148    ///
149    /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
150    ///
151    /// # Safety
152    ///
153    /// The caller must ensure that `item` can be dereferenced and points to an allocation of
154    /// a size equal to the queue's item size.
155    unsafe fn send_to_back(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool;
156
157    /// Attempts to enqueues an item.
158    ///
159    /// If the queue is full, this function will immediately return `false`.
160    ///
161    /// The `higher_prio_task_waken` parameter is an optional mutable reference to a boolean flag.
162    /// If the flag is `Some`, the implementation may set it to `true` to request a context switch.
163    ///
164    /// # Safety
165    ///
166    /// The caller must ensure that `item` can be dereferenced and points to an allocation of
167    /// a size equal to the queue's item size.
168    unsafe fn try_send_to_back_from_isr(
169        queue: QueuePtr,
170        item: *const u8,
171        higher_prio_task_waken: Option<&mut bool>,
172    ) -> bool;
173
174    /// Dequeues an item from the queue.
175    ///
176    /// If the queue is empty, this function will block for the given timeout. If timeout is None,
177    /// the function will block indefinitely.
178    ///
179    /// This function returns `true` if the item was successfully dequeued, `false` otherwise.
180    ///
181    /// # Safety
182    ///
183    /// The caller must ensure that `item` can be dereferenced and points to an allocation of
184    /// a size equal to the queue's item size.
185    unsafe fn receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool;
186
187    /// Attempts to dequeue an item from the queue.
188    ///
189    /// If the queue is empty, this function will return `false` immediately.
190    ///
191    /// The `higher_prio_task_waken` parameter is an optional mutable reference to a boolean flag.
192    /// If the flag is `Some`, the implementation may set it to `true` to request a context switch.
193    ///
194    /// This function returns `true` if the item was successfully dequeued, `false` otherwise.
195    ///
196    /// # Safety
197    ///
198    /// The caller must ensure that `item` can be dereferenced and points to an allocation of
199    /// a size equal to the queue's item size.
200    unsafe fn try_receive_from_isr(
201        queue: QueuePtr,
202        item: *mut u8,
203        higher_prio_task_waken: Option<&mut bool>,
204    ) -> bool;
205
206    /// Removes an item from the queue.
207    ///
208    /// # Safety
209    ///
210    /// The caller must ensure that `item` can be dereferenced and points to an allocation of
211    /// a size equal to the queue's item size.
212    unsafe fn remove(queue: QueuePtr, item: *const u8);
213
214    /// Returns the number of messages in the queue.
215    fn messages_waiting(queue: QueuePtr) -> usize;
216}
217
218#[macro_export]
219macro_rules! register_queue_implementation {
220    ($t: ty) => {
221        #[unsafe(no_mangle)]
222        #[inline]
223        fn esp_rtos_queue_create(capacity: usize, item_size: usize) -> $crate::queue::QueuePtr {
224            <$t as $crate::queue::QueueImplementation>::create(capacity, item_size)
225        }
226
227        #[unsafe(no_mangle)]
228        #[inline]
229        fn esp_rtos_queue_delete(queue: $crate::queue::QueuePtr) {
230            unsafe { <$t as $crate::queue::QueueImplementation>::delete(queue) }
231        }
232
233        #[unsafe(no_mangle)]
234        #[inline]
235        fn esp_rtos_queue_send_to_front(
236            queue: QueuePtr,
237            item: *const u8,
238            timeout_us: Option<u32>,
239        ) -> bool {
240            unsafe {
241                <$t as $crate::queue::QueueImplementation>::send_to_front(queue, item, timeout_us)
242            }
243        }
244
245        #[unsafe(no_mangle)]
246        #[inline]
247        fn esp_rtos_queue_send_to_back(
248            queue: QueuePtr,
249            item: *const u8,
250            timeout_us: Option<u32>,
251        ) -> bool {
252            unsafe {
253                <$t as $crate::queue::QueueImplementation>::send_to_back(queue, item, timeout_us)
254            }
255        }
256
257        #[unsafe(no_mangle)]
258        #[inline]
259        fn esp_rtos_queue_try_send_to_back_from_isr(
260            queue: QueuePtr,
261            item: *const u8,
262            higher_prio_task_waken: Option<&mut bool>,
263        ) -> bool {
264            unsafe {
265                <$t as $crate::queue::QueueImplementation>::try_send_to_back_from_isr(
266                    queue,
267                    item,
268                    higher_prio_task_waken,
269                )
270            }
271        }
272
273        #[unsafe(no_mangle)]
274        #[inline]
275        fn esp_rtos_queue_receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool {
276            unsafe { <$t as $crate::queue::QueueImplementation>::receive(queue, item, timeout_us) }
277        }
278
279        #[unsafe(no_mangle)]
280        #[inline]
281        fn esp_rtos_queue_try_receive_from_isr(
282            queue: QueuePtr,
283            item: *mut u8,
284            higher_prio_task_waken: Option<&mut bool>,
285        ) -> bool {
286            unsafe {
287                <$t as $crate::queue::QueueImplementation>::try_receive_from_isr(
288                    queue,
289                    item,
290                    higher_prio_task_waken,
291                )
292            }
293        }
294
295        #[unsafe(no_mangle)]
296        #[inline]
297        fn esp_rtos_queue_remove(queue: QueuePtr, item: *mut u8) {
298            unsafe { <$t as $crate::queue::QueueImplementation>::remove(queue, item) }
299        }
300
301        #[unsafe(no_mangle)]
302        #[inline]
303        fn esp_rtos_queue_messages_waiting(queue: QueuePtr) -> usize {
304            unsafe { <$t as $crate::queue::QueueImplementation>::messages_waiting(queue) }
305        }
306    };
307}
308
309/// Queue handle.
310///
311/// This handle is used to interact with queues created by the driver implementation.
312#[repr(transparent)]
313pub struct QueueHandle(QueuePtr);
314impl QueueHandle {
315    /// Creates a new queue instance.
316    #[inline]
317    pub fn new(capacity: usize, item_size: usize) -> Self {
318        let ptr = unsafe { esp_rtos_queue_create(capacity, item_size) };
319        Self(ptr)
320    }
321
322    /// Converts this object into a pointer without dropping it.
323    #[inline]
324    pub fn leak(self) -> QueuePtr {
325        let ptr = self.0;
326        core::mem::forget(self);
327        ptr
328    }
329
330    /// Recovers the object from a leaked pointer.
331    ///
332    /// # Safety
333    ///
334    /// - The caller must only use pointers created using [`Self::leak`].
335    /// - The caller must ensure the pointer is not shared.
336    #[inline]
337    pub unsafe fn from_ptr(ptr: QueuePtr) -> Self {
338        Self(ptr)
339    }
340
341    /// Creates a reference to this object from a leaked pointer.
342    ///
343    /// This function is used in the esp-radio code to interact with the queue.
344    ///
345    /// # Safety
346    ///
347    /// - The caller must only use pointers created using [`Self::leak`].
348    #[inline]
349    pub unsafe fn ref_from_ptr(ptr: &QueuePtr) -> &Self {
350        unsafe { core::mem::transmute(ptr) }
351    }
352
353    /// Enqueues a high-priority item.
354    ///
355    /// If the queue is full, this function will block for the given timeout. If timeout is None,
356    /// the function will block indefinitely.
357    ///
358    /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
359    ///
360    /// # Safety
361    ///
362    /// The caller must ensure that `item` can be dereferenced and points to an allocation of
363    /// a size equal to the queue's item size.
364    #[inline]
365    pub unsafe fn send_to_front(&self, item: *const u8, timeout_us: Option<u32>) -> bool {
366        unsafe { esp_rtos_queue_send_to_front(self.0, item, timeout_us) }
367    }
368
369    /// Enqueues an item.
370    ///
371    /// If the queue is full, this function will block for the given timeout. If timeout is None,
372    /// the function will block indefinitely.
373    ///
374    /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
375    ///
376    /// # Safety
377    ///
378    /// The caller must ensure that `item` can be dereferenced and points to an allocation of
379    /// a size equal to the queue's item size.
380    #[inline]
381    pub unsafe fn send_to_back(&self, item: *const u8, timeout_us: Option<u32>) -> bool {
382        unsafe { esp_rtos_queue_send_to_back(self.0, item, timeout_us) }
383    }
384
385    /// Attempts to enqueues an item.
386    ///
387    /// If the queue is full, this function will immediately return `false`.
388    ///
389    /// If a higher priority task is woken up by this operation, the `higher_prio_task_waken` flag
390    /// is set to `true`.
391    ///
392    /// # Safety
393    ///
394    /// The caller must ensure that `item` can be dereferenced and points to an allocation of
395    /// a size equal to the queue's item size.
396    #[inline]
397    pub unsafe fn try_send_to_back_from_isr(
398        &self,
399        item: *const u8,
400        higher_priority_task_waken: Option<&mut bool>,
401    ) -> bool {
402        unsafe {
403            esp_rtos_queue_try_send_to_back_from_isr(self.0, item, higher_priority_task_waken)
404        }
405    }
406
407    /// Dequeues an item from the queue.
408    ///
409    /// If the queue is empty, this function will block for the given timeout. If timeout is None,
410    /// the function will block indefinitely.
411    ///
412    /// This function returns `true` if the item was successfully dequeued, `false` otherwise.
413    ///
414    /// # Safety
415    ///
416    /// The caller must ensure that `item` can be dereferenced and points to an allocation of
417    /// a size equal to the queue's item size.
418    #[inline]
419    pub unsafe fn receive(&self, item: *mut u8, timeout_us: Option<u32>) -> bool {
420        unsafe { esp_rtos_queue_receive(self.0, item, timeout_us) }
421    }
422
423    /// Attempts to dequeue an item from the queue.
424    ///
425    /// If the queue is empty, this function will return `false` immediately.
426    ///
427    /// This function returns `true` if the item was successfully dequeued, `false` otherwise.
428    ///
429    /// If a higher priority task is woken up by this operation, the `higher_prio_task_waken` flag
430    /// is set to `true`.
431    ///
432    /// # Safety
433    ///
434    /// The caller must ensure that `item` can be dereferenced and points to an allocation of
435    /// a size equal to the queue's item size.
436    #[inline]
437    pub unsafe fn try_receive_from_isr(
438        &self,
439        item: *mut u8,
440        higher_priority_task_waken: Option<&mut bool>,
441    ) -> bool {
442        unsafe { esp_rtos_queue_try_receive_from_isr(self.0, item, higher_priority_task_waken) }
443    }
444
445    /// Removes an item from the queue.
446    ///
447    /// # Safety
448    ///
449    /// The caller must ensure that `item` can be dereferenced and points to an allocation of
450    /// a size equal to the queue's item size.
451    #[inline]
452    pub unsafe fn remove(&self, item: *const u8) {
453        unsafe { esp_rtos_queue_remove(self.0, item) }
454    }
455
456    /// Returns the number of messages in the queue.
457    #[inline]
458    pub fn messages_waiting(&self) -> usize {
459        unsafe { esp_rtos_queue_messages_waiting(self.0) }
460    }
461}
462
463impl Drop for QueueHandle {
464    #[inline]
465    fn drop(&mut self) {
466        unsafe { esp_rtos_queue_delete(self.0) };
467    }
468}
469
470use alloc::{boxed::Box, vec};
471
472use crate::semaphore::{SemaphoreHandle, SemaphoreKind};
473
474struct QueueInner {
475    storage: Box<[u8]>,
476    item_size: usize,
477    capacity: usize,
478    count: usize,
479    current_read: usize,
480    current_write: usize,
481}
482
483impl QueueInner {
484    fn get(&self, index: usize) -> &[u8] {
485        let item_start = self.item_size * index;
486        &self.storage[item_start..][..self.item_size]
487    }
488
489    fn get_mut(&mut self, index: usize) -> &mut [u8] {
490        let item_start = self.item_size * index;
491        &mut self.storage[item_start..][..self.item_size]
492    }
493
494    fn len(&self) -> usize {
495        self.count
496    }
497
498    fn send_to_back(&mut self, item: *const u8) {
499        let item = unsafe { core::slice::from_raw_parts(item, self.item_size) };
500
501        let dst = self.get_mut(self.current_write);
502        dst.copy_from_slice(item);
503
504        self.current_write = (self.current_write + 1) % self.capacity;
505        self.count += 1;
506    }
507
508    fn send_to_front(&mut self, item: *const u8) {
509        let item = unsafe { core::slice::from_raw_parts(item, self.item_size) };
510
511        self.current_read = (self.current_read + self.capacity - 1) % self.capacity;
512
513        let dst = self.get_mut(self.current_read);
514        dst.copy_from_slice(item);
515
516        self.count += 1;
517    }
518
519    fn read_from_front(&mut self, dst: *mut u8) {
520        let dst = unsafe { core::slice::from_raw_parts_mut(dst, self.item_size) };
521
522        let src = self.get(self.current_read);
523        dst.copy_from_slice(src);
524
525        self.current_read = (self.current_read + 1) % self.capacity;
526        self.count -= 1;
527    }
528
529    fn remove(&mut self, item: *const u8) -> bool {
530        let count = self.len();
531
532        if count == 0 {
533            return false;
534        }
535
536        let mut tmp_item = vec![0; self.item_size];
537
538        let mut found = false;
539        let item_slice = unsafe { core::slice::from_raw_parts(item, self.item_size) };
540        for _ in 0..count {
541            self.read_from_front(tmp_item.as_mut_ptr().cast());
542
543            if found || &tmp_item[..] != item_slice {
544                self.send_to_back(tmp_item.as_mut_ptr().cast());
545            } else {
546                found = true;
547            }
548
549            // Note that even if we find our item, we'll need to keep cycling through everything to
550            // keep insertion order.
551        }
552
553        found
554    }
555}
556
557/// A suitable queue implementation that only requires semaphores from the OS.
558///
559/// Register in your OS implementation by adding the following code:
560///
561/// ```rust
562/// use esp_radio_rtos_driver::{queue::CompatQueue, register_queue_implementation};
563///
564/// register_queue_implementation!(CompatQueue);
565/// ```
566pub struct CompatQueue {
567    /// Allows interior mutability for the queue's inner state, when the mutex is held.
568    inner: UnsafeCell<QueueInner>,
569
570    semaphore_empty: SemaphoreHandle,
571    semaphore_full: SemaphoreHandle,
572    mutex: SemaphoreHandle,
573}
574
575impl CompatQueue {
576    fn new(capacity: usize, item_size: usize) -> Self {
577        let storage = vec![0; capacity * item_size].into_boxed_slice();
578        let semaphore_empty = SemaphoreHandle::new(SemaphoreKind::Counting {
579            max: capacity as u32,
580            initial: capacity as u32,
581        });
582        let semaphore_full = SemaphoreHandle::new(SemaphoreKind::Counting {
583            max: capacity as u32,
584            initial: 0,
585        });
586        let mutex = SemaphoreHandle::new(SemaphoreKind::Mutex);
587        Self {
588            inner: UnsafeCell::new(QueueInner {
589                storage,
590                item_size,
591                capacity,
592                count: 0,
593                current_read: 0,
594                current_write: 0,
595            }),
596            semaphore_empty,
597            semaphore_full,
598            mutex,
599        }
600    }
601
602    unsafe fn from_ptr<'a>(ptr: QueuePtr) -> &'a Self {
603        unsafe { ptr.cast::<Self>().as_ref() }
604    }
605}
606
607impl QueueImplementation for CompatQueue {
608    fn create(capacity: usize, item_size: usize) -> QueuePtr {
609        let q = Box::new(CompatQueue::new(capacity, item_size));
610        NonNull::from(Box::leak(q)).cast()
611    }
612
613    unsafe fn delete(queue: QueuePtr) {
614        let q = unsafe { Box::from_raw(queue.cast::<CompatQueue>().as_ptr()) };
615        core::mem::drop(q);
616    }
617
618    unsafe fn send_to_front(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
619        let queue = unsafe { CompatQueue::from_ptr(queue) };
620
621        if queue.semaphore_empty.take(timeout_us) {
622            // The inner mutex shouldn't be held for a long time, but we still shouldn't block
623            // indefinitely.
624            if queue.mutex.take(timeout_us) {
625                let inner = unsafe { &mut *queue.inner.get() };
626                inner.send_to_front(item);
627
628                queue.mutex.give();
629                queue.semaphore_full.give();
630                true
631            } else {
632                queue.semaphore_empty.give();
633                false
634            }
635        } else {
636            false
637        }
638    }
639
640    unsafe fn send_to_back(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
641        let queue = unsafe { CompatQueue::from_ptr(queue) };
642
643        if queue.semaphore_empty.take(timeout_us) {
644            // The inner mutex shouldn't be held for a long time, but we still shouldn't block
645            // indefinitely.
646            if queue.mutex.take(timeout_us) {
647                let inner = unsafe { &mut *queue.inner.get() };
648                inner.send_to_back(item);
649
650                queue.mutex.give();
651                queue.semaphore_full.give();
652                true
653            } else {
654                queue.semaphore_empty.give();
655                false
656            }
657        } else {
658            false
659        }
660    }
661
662    unsafe fn try_send_to_back_from_isr(
663        queue: QueuePtr,
664        item: *const u8,
665        mut higher_prio_task_waken: Option<&mut bool>,
666    ) -> bool {
667        let queue = unsafe { CompatQueue::from_ptr(queue) };
668
669        if queue
670            .semaphore_empty
671            .try_take_from_isr(higher_prio_task_waken.as_deref_mut())
672        {
673            if queue
674                .mutex
675                .try_take_from_isr(higher_prio_task_waken.as_deref_mut())
676            {
677                let inner = unsafe { &mut *queue.inner.get() };
678                inner.send_to_back(item);
679
680                queue
681                    .mutex
682                    .try_give_from_isr(higher_prio_task_waken.as_deref_mut());
683                queue
684                    .semaphore_full
685                    .try_give_from_isr(higher_prio_task_waken);
686                true
687            } else {
688                queue
689                    .semaphore_empty
690                    .try_give_from_isr(higher_prio_task_waken);
691                false
692            }
693        } else {
694            false
695        }
696    }
697
698    unsafe fn receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool {
699        let queue = unsafe { CompatQueue::from_ptr(queue) };
700
701        if queue.semaphore_full.take(timeout_us) {
702            if queue.mutex.take(timeout_us) {
703                let inner = unsafe { &mut *queue.inner.get() };
704                inner.read_from_front(item);
705
706                queue.mutex.give();
707                queue.semaphore_empty.give();
708                true
709            } else {
710                queue.semaphore_full.give();
711                false
712            }
713        } else {
714            false
715        }
716    }
717
718    unsafe fn try_receive_from_isr(
719        queue: QueuePtr,
720        item: *mut u8,
721        mut higher_prio_task_waken: Option<&mut bool>,
722    ) -> bool {
723        let queue = unsafe { CompatQueue::from_ptr(queue) };
724
725        if queue
726            .semaphore_full
727            .try_take_from_isr(higher_prio_task_waken.as_deref_mut())
728        {
729            if queue
730                .mutex
731                .try_take_from_isr(higher_prio_task_waken.as_deref_mut())
732            {
733                let inner = unsafe { &mut *queue.inner.get() };
734                inner.read_from_front(item);
735
736                queue
737                    .mutex
738                    .try_give_from_isr(higher_prio_task_waken.as_deref_mut());
739                queue
740                    .semaphore_empty
741                    .try_give_from_isr(higher_prio_task_waken);
742                true
743            } else {
744                queue
745                    .semaphore_full
746                    .try_give_from_isr(higher_prio_task_waken);
747                false
748            }
749        } else {
750            false
751        }
752    }
753
754    unsafe fn remove(queue: QueuePtr, item: *const u8) {
755        let queue = unsafe { CompatQueue::from_ptr(queue) };
756
757        if queue.semaphore_full.take(Some(0)) {
758            queue.mutex.take(None);
759
760            let inner = unsafe { &mut *queue.inner.get() };
761            let item_removed = inner.remove(item);
762
763            queue.mutex.give();
764
765            if item_removed {
766                queue.semaphore_empty.give();
767            } else {
768                queue.semaphore_full.give();
769            }
770        }
771    }
772
773    fn messages_waiting(queue: QueuePtr) -> usize {
774        let queue = unsafe { CompatQueue::from_ptr(queue) };
775
776        queue.semaphore_full.current_count() as usize
777    }
778}