esp_radio_rtos_driver/queue.rs
1//! # Queues
2//!
3//! Queues are a synchronization primitive used to communicate between tasks.
4//! They allow tasks to send and receive data in a first-in-first-out (FIFO) manner.
5//!
6//! ## Implementation
7//!
8//! Implement the `QueueImplementation` trait for an object, and use the
9//! `register_queue_implementation` to register that implementation for esp-radio.
10//!
11//! See the [`QueueImplementation`] documentation for more information.
12//!
13//! You may also choose to use the [`CompatQueue`] implementation provided by this crate.
14//!
15//! ## Usage
16//!
17//! Users should use [`QueueHandle`] to interact with queues created by the driver implementation.
18//!
19//! > Note that the only expected user of this crate is esp-radio.
20
21use core::ptr::NonNull;
22
23/// Pointer to an opaque queue created by the driver implementation.
24pub type QueuePtr = NonNull<()>;
25
26unsafe extern "Rust" {
27 fn esp_rtos_queue_create(capacity: usize, item_size: usize) -> QueuePtr;
28 fn esp_rtos_queue_delete(queue: QueuePtr);
29
30 fn esp_rtos_queue_send_to_front(
31 queue: QueuePtr,
32 item: *const u8,
33 timeout_us: Option<u32>,
34 ) -> bool;
35 fn esp_rtos_queue_send_to_front_with_deadline(
36 queue: QueuePtr,
37 item: *const u8,
38 deadline_instant: Option<u64>,
39 ) -> bool;
40
41 fn esp_rtos_queue_send_to_back(
42 queue: QueuePtr,
43 item: *const u8,
44 timeout_us: Option<u32>,
45 ) -> bool;
46 fn esp_rtos_queue_send_to_back_with_deadline(
47 queue: QueuePtr,
48 item: *const u8,
49 deadline_instant: Option<u64>,
50 ) -> bool;
51
52 fn esp_rtos_queue_try_send_to_back_from_isr(
53 queue: QueuePtr,
54 item: *const u8,
55 higher_prio_task_waken: Option<&mut bool>,
56 ) -> bool;
57 fn esp_rtos_queue_receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool;
58 fn esp_rtos_queue_receive_with_deadline(
59 queue: QueuePtr,
60 item: *mut u8,
61 deadline_instant: Option<u64>,
62 ) -> bool;
63 fn esp_rtos_queue_try_receive_from_isr(
64 queue: QueuePtr,
65 item: *mut u8,
66 higher_prio_task_waken: Option<&mut bool>,
67 ) -> bool;
68 fn esp_rtos_queue_remove(queue: QueuePtr, item: *const u8);
69 fn esp_rtos_queue_messages_waiting(queue: QueuePtr) -> usize;
70}
71
72/// A queue primitive.
73///
74/// The following snippet demonstrates the boilerplate necessary to implement a queue using the
75/// `QueueImplementation` trait:
76///
77/// ```rust,no_run
78/// use esp_radio_rtos_driver::{
79/// queue::{QueueImplementation, QueuePtr},
80/// register_queue_implementation,
81/// };
82///
83/// struct MyQueue {
84/// // Queue implementation details
85/// }
86///
87/// impl QueueImplementation for MyQueue {
88/// fn create(capacity: usize, item_size: usize) -> QueuePtr {
89/// unimplemented!()
90/// }
91///
92/// unsafe fn delete(queue: QueuePtr) {
93/// unimplemented!()
94/// }
95///
96/// unsafe fn send_to_front(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
97/// unimplemented!()
98/// }
99///
100/// unsafe fn send_to_back(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
101/// unimplemented!()
102/// }
103///
104/// unsafe fn try_send_to_back_from_isr(
105/// queue: QueuePtr,
106/// item: *const u8,
107/// higher_prio_task_waken: Option<&mut bool>,
108/// ) -> bool {
109/// unimplemented!()
110/// }
111///
112/// unsafe fn receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool {
113/// unimplemented!()
114/// }
115///
116/// unsafe fn try_receive_from_isr(
117/// queue: QueuePtr,
118/// item: *mut u8,
119/// higher_prio_task_waken: Option<&mut bool>,
120/// ) -> bool {
121/// unimplemented!()
122/// }
123///
124/// unsafe fn remove(queue: QueuePtr, item: *const u8) {
125/// unimplemented!()
126/// }
127///
128/// fn messages_waiting(queue: QueuePtr) -> usize {
129/// unimplemented!()
130/// }
131/// }
132///
133/// register_queue_implementation!(MyQueue);
134/// ```
135pub trait QueueImplementation {
136 /// Creates a new, empty queue instance.
137 ///
138 /// The queue must have a capacity for `capacity` number of `item_size` byte items.
139 fn create(capacity: usize, item_size: usize) -> QueuePtr;
140
141 /// Deletes a queue instance.
142 ///
143 /// # Safety
144 ///
145 /// `queue` must be a pointer returned from [`Self::create`].
146 unsafe fn delete(queue: QueuePtr);
147
148 /// Enqueues a high-priority item.
149 ///
150 /// If the queue is full, this function will block for the given timeout. If timeout is None,
151 /// the function will block indefinitely.
152 ///
153 /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
154 ///
155 /// # Safety
156 ///
157 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
158 /// a size equal to the queue's item size.
159 unsafe fn send_to_front(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool;
160
161 /// Enqueues a high-priority item.
162 ///
163 /// If the queue is full, this function will block until the deadline is reached. If the
164 /// deadline is None, the function will block indefinitely.
165 ///
166 /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
167 ///
168 /// # Safety
169 ///
170 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
171 /// a size equal to the queue's item size.
172 unsafe fn send_to_front_with_deadline(
173 queue: QueuePtr,
174 item: *const u8,
175 deadline_instant: Option<u64>,
176 ) -> bool;
177
178 /// Enqueues an item.
179 ///
180 /// If the queue is full, this function will block for the given timeout. If timeout is None,
181 /// the function will block indefinitely.
182 ///
183 /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
184 ///
185 /// # Safety
186 ///
187 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
188 /// a size equal to the queue's item size.
189 unsafe fn send_to_back(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool;
190
191 /// Enqueues an item.
192 ///
193 /// If the queue is full, this function will block until the given deadline. If deadline is
194 /// None, the function will block indefinitely.
195 ///
196 /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
197 ///
198 /// # Safety
199 ///
200 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
201 /// a size equal to the queue's item size.
202 unsafe fn send_to_back_with_deadline(
203 queue: QueuePtr,
204 item: *const u8,
205 deadline_instant: Option<u64>,
206 ) -> bool;
207
208 /// Attempts to enqueues an item.
209 ///
210 /// If the queue is full, this function will immediately return `false`.
211 ///
212 /// The `higher_prio_task_waken` parameter is an optional mutable reference to a boolean flag.
213 /// If the flag is `Some`, the implementation may set it to `true` to request a context switch.
214 ///
215 /// # Safety
216 ///
217 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
218 /// a size equal to the queue's item size.
219 unsafe fn try_send_to_back_from_isr(
220 queue: QueuePtr,
221 item: *const u8,
222 higher_prio_task_waken: Option<&mut bool>,
223 ) -> bool;
224
225 /// Dequeues an item from the queue.
226 ///
227 /// If the queue is empty, this function will block for the given timeout. If timeout is None,
228 /// the function will block indefinitely.
229 ///
230 /// This function returns `true` if the item was successfully dequeued, `false` otherwise.
231 ///
232 /// # Safety
233 ///
234 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
235 /// a size equal to the queue's item size.
236 unsafe fn receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool;
237
238 /// Dequeues an item from the queue.
239 ///
240 /// If the queue is empty, this function will block until the given deadline is reached. If the
241 /// deadline is None, the function will block indefinitely.
242 ///
243 /// This function returns `true` if the item was successfully dequeued, `false` otherwise.
244 ///
245 /// # Safety
246 ///
247 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
248 /// a size equal to the queue's item size.
249 unsafe fn receive_with_deadline(
250 queue: QueuePtr,
251 item: *mut u8,
252 deadline_instant: Option<u64>,
253 ) -> bool;
254
255 /// Attempts to dequeue an item from the queue.
256 ///
257 /// If the queue is empty, this function will return `false` immediately.
258 ///
259 /// The `higher_prio_task_waken` parameter is an optional mutable reference to a boolean flag.
260 /// If the flag is `Some`, the implementation may set it to `true` to request a context switch.
261 ///
262 /// This function returns `true` if the item was successfully dequeued, `false` otherwise.
263 ///
264 /// # Safety
265 ///
266 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
267 /// a size equal to the queue's item size.
268 unsafe fn try_receive_from_isr(
269 queue: QueuePtr,
270 item: *mut u8,
271 higher_prio_task_waken: Option<&mut bool>,
272 ) -> bool;
273
274 /// Removes an item from the queue.
275 ///
276 /// # Safety
277 ///
278 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
279 /// a size equal to the queue's item size.
280 unsafe fn remove(queue: QueuePtr, item: *const u8);
281
282 /// Returns the number of messages in the queue.
283 fn messages_waiting(queue: QueuePtr) -> usize;
284}
285
286#[macro_export]
287macro_rules! register_queue_implementation {
288 ($t: ty) => {
289 #[unsafe(no_mangle)]
290 #[inline]
291 fn esp_rtos_queue_create(capacity: usize, item_size: usize) -> $crate::queue::QueuePtr {
292 <$t as $crate::queue::QueueImplementation>::create(capacity, item_size)
293 }
294
295 #[unsafe(no_mangle)]
296 #[inline]
297 fn esp_rtos_queue_delete(queue: $crate::queue::QueuePtr) {
298 unsafe { <$t as $crate::queue::QueueImplementation>::delete(queue) }
299 }
300
301 #[unsafe(no_mangle)]
302 #[inline]
303 fn esp_rtos_queue_send_to_front(
304 queue: $crate::queue::QueuePtr,
305 item: *const u8,
306 timeout_us: Option<u32>,
307 ) -> bool {
308 unsafe {
309 <$t as $crate::queue::QueueImplementation>::send_to_front(queue, item, timeout_us)
310 }
311 }
312
313 #[unsafe(no_mangle)]
314 #[inline]
315 fn esp_rtos_queue_send_to_front_with_deadline(
316 queue: $crate::queue::QueuePtr,
317 item: *const u8,
318 deadline_instant: Option<u64>,
319 ) -> bool {
320 unsafe {
321 <$t as $crate::queue::QueueImplementation>::send_to_front_with_deadline(
322 queue,
323 item,
324 deadline_instant,
325 )
326 }
327 }
328
329 #[unsafe(no_mangle)]
330 #[inline]
331 fn esp_rtos_queue_send_to_back(
332 queue: $crate::queue::QueuePtr,
333 item: *const u8,
334 timeout_us: Option<u32>,
335 ) -> bool {
336 unsafe {
337 <$t as $crate::queue::QueueImplementation>::send_to_back(queue, item, timeout_us)
338 }
339 }
340
341 #[unsafe(no_mangle)]
342 #[inline]
343 fn esp_rtos_queue_send_to_back_with_deadline(
344 queue: $crate::queue::QueuePtr,
345 item: *const u8,
346 deadline_instant: Option<u64>,
347 ) -> bool {
348 unsafe {
349 <$t as $crate::queue::QueueImplementation>::send_to_back_with_deadline(
350 queue,
351 item,
352 deadline_instant,
353 )
354 }
355 }
356
357 #[unsafe(no_mangle)]
358 #[inline]
359 fn esp_rtos_queue_try_send_to_back_from_isr(
360 queue: $crate::queue::QueuePtr,
361 item: *const u8,
362 higher_prio_task_waken: Option<&mut bool>,
363 ) -> bool {
364 unsafe {
365 <$t as $crate::queue::QueueImplementation>::try_send_to_back_from_isr(
366 queue,
367 item,
368 higher_prio_task_waken,
369 )
370 }
371 }
372
373 #[unsafe(no_mangle)]
374 #[inline]
375 fn esp_rtos_queue_receive(
376 queue: $crate::queue::QueuePtr,
377 item: *mut u8,
378 timeout_us: Option<u32>,
379 ) -> bool {
380 unsafe { <$t as $crate::queue::QueueImplementation>::receive(queue, item, timeout_us) }
381 }
382
383 #[unsafe(no_mangle)]
384 #[inline]
385 fn esp_rtos_queue_receive_with_deadline(
386 queue: $crate::queue::QueuePtr,
387 item: *mut u8,
388 deadline_instant: Option<u64>,
389 ) -> bool {
390 unsafe {
391 <$t as $crate::queue::QueueImplementation>::receive_with_deadline(
392 queue,
393 item,
394 deadline_instant,
395 )
396 }
397 }
398
399 #[unsafe(no_mangle)]
400 #[inline]
401 fn esp_rtos_queue_try_receive_from_isr(
402 queue: $crate::queue::QueuePtr,
403 item: *mut u8,
404 higher_prio_task_waken: Option<&mut bool>,
405 ) -> bool {
406 unsafe {
407 <$t as $crate::queue::QueueImplementation>::try_receive_from_isr(
408 queue,
409 item,
410 higher_prio_task_waken,
411 )
412 }
413 }
414
415 #[unsafe(no_mangle)]
416 #[inline]
417 fn esp_rtos_queue_remove(queue: $crate::queue::QueuePtr, item: *mut u8) {
418 unsafe { <$t as $crate::queue::QueueImplementation>::remove(queue, item) }
419 }
420
421 #[unsafe(no_mangle)]
422 #[inline]
423 fn esp_rtos_queue_messages_waiting(queue: $crate::queue::QueuePtr) -> usize {
424 unsafe { <$t as $crate::queue::QueueImplementation>::messages_waiting(queue) }
425 }
426 };
427}
428
429/// Queue handle.
430///
431/// This handle is used to interact with queues created by the driver implementation.
432#[repr(transparent)]
433pub struct QueueHandle(QueuePtr);
434impl QueueHandle {
435 /// Creates a new queue instance.
436 #[inline]
437 pub fn new(capacity: usize, item_size: usize) -> Self {
438 let ptr = unsafe { esp_rtos_queue_create(capacity, item_size) };
439 Self(ptr)
440 }
441
442 /// Converts this object into a pointer without dropping it.
443 #[inline]
444 pub fn leak(self) -> QueuePtr {
445 let ptr = self.0;
446 core::mem::forget(self);
447 ptr
448 }
449
450 /// Recovers the object from a leaked pointer.
451 ///
452 /// # Safety
453 ///
454 /// - The caller must only use pointers created using [`Self::leak`].
455 /// - The caller must ensure the pointer is not shared.
456 #[inline]
457 pub unsafe fn from_ptr(ptr: QueuePtr) -> Self {
458 Self(ptr)
459 }
460
461 /// Creates a reference to this object from a leaked pointer.
462 ///
463 /// This function is used in the esp-radio code to interact with the queue.
464 ///
465 /// # Safety
466 ///
467 /// - The caller must only use pointers created using [`Self::leak`].
468 #[inline]
469 pub unsafe fn ref_from_ptr(ptr: &QueuePtr) -> &Self {
470 unsafe { core::mem::transmute(ptr) }
471 }
472
473 /// Enqueues a high-priority item.
474 ///
475 /// If the queue is full, this function will block for the given timeout. If timeout is None,
476 /// the function will block indefinitely.
477 ///
478 /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
479 ///
480 /// # Safety
481 ///
482 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
483 /// a size equal to the queue's item size.
484 #[inline]
485 pub unsafe fn send_to_front(&self, item: *const u8, timeout_us: Option<u32>) -> bool {
486 unsafe { esp_rtos_queue_send_to_front(self.0, item, timeout_us) }
487 }
488
489 /// Enqueues a high-priority item.
490 ///
491 /// If the queue is full, this function will block until the deadline is reached. If the
492 /// deadline is None, the function will block indefinitely.
493 ///
494 /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
495 ///
496 /// # Safety
497 ///
498 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
499 /// a size equal to the queue's item size.
500 #[inline]
501 pub unsafe fn send_to_front_with_deadline(
502 &self,
503 item: *const u8,
504 deadline_instant: Option<u64>,
505 ) -> bool {
506 unsafe { esp_rtos_queue_send_to_front_with_deadline(self.0, item, deadline_instant) }
507 }
508
509 /// Enqueues an item.
510 ///
511 /// If the queue is full, this function will block for the given timeout. If timeout is None,
512 /// the function will block indefinitely.
513 ///
514 /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
515 ///
516 /// # Safety
517 ///
518 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
519 /// a size equal to the queue's item size.
520 #[inline]
521 pub unsafe fn send_to_back(&self, item: *const u8, timeout_us: Option<u32>) -> bool {
522 unsafe { esp_rtos_queue_send_to_back(self.0, item, timeout_us) }
523 }
524
525 /// Enqueues an item.
526 ///
527 /// If the queue is full, this function will block until the given deadline. If deadline is
528 /// None, the function will block indefinitely.
529 ///
530 /// This function returns `true` if the item was successfully enqueued, `false` otherwise.
531 ///
532 /// # Safety
533 ///
534 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
535 /// a size equal to the queue's item size.
536 #[inline]
537 pub unsafe fn send_to_back_with_deadline(
538 &self,
539 item: *const u8,
540 deadline_instant: Option<u64>,
541 ) -> bool {
542 unsafe { esp_rtos_queue_send_to_back_with_deadline(self.0, item, deadline_instant) }
543 }
544
545 /// Attempts to enqueues an item.
546 ///
547 /// If the queue is full, this function will immediately return `false`.
548 ///
549 /// If a higher priority task is woken up by this operation, the `higher_prio_task_waken` flag
550 /// is set to `true`.
551 ///
552 /// # Safety
553 ///
554 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
555 /// a size equal to the queue's item size.
556 #[inline]
557 pub unsafe fn try_send_to_back_from_isr(
558 &self,
559 item: *const u8,
560 higher_priority_task_waken: Option<&mut bool>,
561 ) -> bool {
562 unsafe {
563 esp_rtos_queue_try_send_to_back_from_isr(self.0, item, higher_priority_task_waken)
564 }
565 }
566
567 /// Dequeues an item from the queue.
568 ///
569 /// If the queue is empty, this function will block for the given timeout. If timeout is None,
570 /// the function will block indefinitely.
571 ///
572 /// This function returns `true` if the item was successfully dequeued, `false` otherwise.
573 ///
574 /// # Safety
575 ///
576 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
577 /// a size equal to the queue's item size.
578 #[inline]
579 pub unsafe fn receive(&self, item: *mut u8, timeout_us: Option<u32>) -> bool {
580 unsafe { esp_rtos_queue_receive(self.0, item, timeout_us) }
581 }
582
583 /// Dequeues an item from the queue.
584 ///
585 /// If the queue is empty, this function will block until the given deadline is reached. If
586 /// deadline is None, the function will block indefinitely.
587 ///
588 /// This function returns `true` if the item was successfully dequeued, `false` otherwise.
589 ///
590 /// # Safety
591 ///
592 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
593 /// a size equal to the queue's item size.
594 #[inline]
595 pub unsafe fn receive_with_deadline(
596 &self,
597 item: *mut u8,
598 deadline_instant: Option<u64>,
599 ) -> bool {
600 unsafe { esp_rtos_queue_receive_with_deadline(self.0, item, deadline_instant) }
601 }
602
603 /// Attempts to dequeue an item from the queue.
604 ///
605 /// If the queue is empty, this function will return `false` immediately.
606 ///
607 /// This function returns `true` if the item was successfully dequeued, `false` otherwise.
608 ///
609 /// If a higher priority task is woken up by this operation, the `higher_prio_task_waken` flag
610 /// is set to `true`.
611 ///
612 /// # Safety
613 ///
614 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
615 /// a size equal to the queue's item size.
616 #[inline]
617 pub unsafe fn try_receive_from_isr(
618 &self,
619 item: *mut u8,
620 higher_priority_task_waken: Option<&mut bool>,
621 ) -> bool {
622 unsafe { esp_rtos_queue_try_receive_from_isr(self.0, item, higher_priority_task_waken) }
623 }
624
625 /// Removes an item from the queue.
626 ///
627 /// # Safety
628 ///
629 /// The caller must ensure that `item` can be dereferenced and points to an allocation of
630 /// a size equal to the queue's item size.
631 #[inline]
632 pub unsafe fn remove(&self, item: *const u8) {
633 unsafe { esp_rtos_queue_remove(self.0, item) }
634 }
635
636 /// Returns the number of messages in the queue.
637 #[inline]
638 pub fn messages_waiting(&self) -> usize {
639 unsafe { esp_rtos_queue_messages_waiting(self.0) }
640 }
641}
642
643impl Drop for QueueHandle {
644 #[inline]
645 fn drop(&mut self) {
646 unsafe { esp_rtos_queue_delete(self.0) };
647 }
648}
649
650#[cfg(feature = "ipc-implementations")]
651mod implementation {
652 use alloc::{boxed::Box, vec};
653
654 use esp_sync::NonReentrantMutex;
655
656 use super::*;
657 use crate::{
658 now,
659 semaphore::{SemaphoreHandle, SemaphoreKind},
660 };
661
662 struct QueueInner {
663 storage: Box<[u8]>,
664 item_size: usize,
665 capacity: usize,
666 count: usize,
667 current_read: usize,
668 current_write: usize,
669 }
670
671 impl QueueInner {
672 fn get(&self, index: usize) -> &[u8] {
673 let item_start = self.item_size * index;
674 &self.storage[item_start..][..self.item_size]
675 }
676
677 fn get_mut(&mut self, index: usize) -> &mut [u8] {
678 let item_start = self.item_size * index;
679 &mut self.storage[item_start..][..self.item_size]
680 }
681
682 fn len(&self) -> usize {
683 self.count
684 }
685
686 fn send_to_back(&mut self, item: *const u8) {
687 let item = unsafe { core::slice::from_raw_parts(item, self.item_size) };
688
689 let dst = self.get_mut(self.current_write);
690 dst.copy_from_slice(item);
691
692 self.current_write = (self.current_write + 1) % self.capacity;
693 self.count += 1;
694 }
695
696 fn send_to_front(&mut self, item: *const u8) {
697 let item = unsafe { core::slice::from_raw_parts(item, self.item_size) };
698
699 self.current_read = (self.current_read + self.capacity - 1) % self.capacity;
700
701 let dst = self.get_mut(self.current_read);
702 dst.copy_from_slice(item);
703
704 self.count += 1;
705 }
706
707 fn read_from_front(&mut self, dst: *mut u8) {
708 let dst = unsafe { core::slice::from_raw_parts_mut(dst, self.item_size) };
709
710 let src = self.get(self.current_read);
711 dst.copy_from_slice(src);
712
713 self.current_read = (self.current_read + 1) % self.capacity;
714 self.count -= 1;
715 }
716
717 fn remove(&mut self, item: *const u8) -> bool {
718 let count = self.len();
719
720 if count == 0 {
721 return false;
722 }
723
724 let mut tmp_item = vec![0; self.item_size];
725
726 let mut found = false;
727 let item_slice = unsafe { core::slice::from_raw_parts(item, self.item_size) };
728 for _ in 0..count {
729 self.read_from_front(tmp_item.as_mut_ptr().cast());
730
731 if found || &tmp_item[..] != item_slice {
732 self.send_to_back(tmp_item.as_mut_ptr().cast());
733 } else {
734 found = true;
735 }
736
737 // Note that even if we find our item, we'll need to keep cycling through everything
738 // to keep insertion order.
739 }
740
741 found
742 }
743 }
744
745 /// A suitable queue implementation that only requires semaphores from the OS.
746 ///
747 /// Register in your OS implementation by adding the following code:
748 ///
749 /// ```rust
750 /// use esp_radio_rtos_driver::{queue::CompatQueue, register_queue_implementation};
751 ///
752 /// register_queue_implementation!(CompatQueue);
753 /// ```
754 pub struct CompatQueue {
755 /// Allows interior mutability for the queue's inner state, when the mutex is held.
756 inner: NonReentrantMutex<QueueInner>,
757
758 semaphore_empty: SemaphoreHandle,
759 semaphore_full: SemaphoreHandle,
760 }
761
762 impl CompatQueue {
763 fn new(capacity: usize, item_size: usize) -> Self {
764 let storage = vec![0; capacity * item_size].into_boxed_slice();
765 let semaphore_empty = SemaphoreHandle::new(SemaphoreKind::Counting {
766 max: capacity as u32,
767 initial: capacity as u32,
768 });
769 let semaphore_full = SemaphoreHandle::new(SemaphoreKind::Counting {
770 max: capacity as u32,
771 initial: 0,
772 });
773 Self {
774 inner: NonReentrantMutex::new(QueueInner {
775 storage,
776 item_size,
777 capacity,
778 count: 0,
779 current_read: 0,
780 current_write: 0,
781 }),
782 semaphore_empty,
783 semaphore_full,
784 }
785 }
786
787 unsafe fn from_ptr<'a>(ptr: QueuePtr) -> &'a Self {
788 unsafe { ptr.cast::<Self>().as_ref() }
789 }
790
791 fn with<R>(&self, f: impl FnOnce(&mut QueueInner) -> R) -> R {
792 self.inner.with(f)
793 }
794 }
795
796 impl QueueImplementation for CompatQueue {
797 fn create(capacity: usize, item_size: usize) -> QueuePtr {
798 let q = Box::new(CompatQueue::new(capacity, item_size));
799 NonNull::from(Box::leak(q)).cast()
800 }
801
802 unsafe fn delete(queue: QueuePtr) {
803 let q = unsafe { Box::from_raw(queue.cast::<CompatQueue>().as_ptr()) };
804 core::mem::drop(q);
805 }
806
807 unsafe fn send_to_front(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
808 let deadline_instant = timeout_us.map(|timeout| now() + timeout as u64);
809 unsafe { Self::send_to_front_with_deadline(queue, item, deadline_instant) }
810 }
811
812 unsafe fn send_to_front_with_deadline(
813 queue: QueuePtr,
814 item: *const u8,
815 deadline_instant: Option<u64>,
816 ) -> bool {
817 let queue = unsafe { CompatQueue::from_ptr(queue) };
818
819 if queue.semaphore_empty.take_with_deadline(deadline_instant) {
820 queue.with(|inner| inner.send_to_front(item));
821 queue.semaphore_full.give();
822 true
823 } else {
824 false
825 }
826 }
827
828 unsafe fn send_to_back(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
829 let deadline_instant = timeout_us.map(|timeout| now() + timeout as u64);
830 unsafe { Self::send_to_back_with_deadline(queue, item, deadline_instant) }
831 }
832
833 unsafe fn send_to_back_with_deadline(
834 queue: QueuePtr,
835 item: *const u8,
836 deadline_instant: Option<u64>,
837 ) -> bool {
838 let queue = unsafe { CompatQueue::from_ptr(queue) };
839
840 if queue.semaphore_empty.take_with_deadline(deadline_instant) {
841 queue.with(|inner| inner.send_to_back(item));
842 queue.semaphore_full.give();
843 true
844 } else {
845 false
846 }
847 }
848
849 unsafe fn try_send_to_back_from_isr(
850 queue: QueuePtr,
851 item: *const u8,
852 mut higher_prio_task_waken: Option<&mut bool>,
853 ) -> bool {
854 let queue = unsafe { CompatQueue::from_ptr(queue) };
855
856 if queue
857 .semaphore_empty
858 .try_take_from_isr(higher_prio_task_waken.as_deref_mut())
859 {
860 queue.with(|inner| inner.send_to_back(item));
861 queue
862 .semaphore_full
863 .try_give_from_isr(higher_prio_task_waken);
864 true
865 } else {
866 false
867 }
868 }
869
870 unsafe fn receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool {
871 let deadline_instant = timeout_us.map(|timeout| now() + timeout as u64);
872 unsafe { Self::receive_with_deadline(queue, item, deadline_instant) }
873 }
874
875 unsafe fn receive_with_deadline(
876 queue: QueuePtr,
877 item: *mut u8,
878 deadline_instant: Option<u64>,
879 ) -> bool {
880 let queue = unsafe { CompatQueue::from_ptr(queue) };
881
882 if queue.semaphore_full.take_with_deadline(deadline_instant) {
883 queue.with(|inner| inner.read_from_front(item));
884 queue.semaphore_empty.give();
885 true
886 } else {
887 false
888 }
889 }
890
891 unsafe fn try_receive_from_isr(
892 queue: QueuePtr,
893 item: *mut u8,
894 mut higher_prio_task_waken: Option<&mut bool>,
895 ) -> bool {
896 let queue = unsafe { CompatQueue::from_ptr(queue) };
897
898 if queue
899 .semaphore_full
900 .try_take_from_isr(higher_prio_task_waken.as_deref_mut())
901 {
902 queue.with(|inner| inner.read_from_front(item));
903 queue
904 .semaphore_empty
905 .try_give_from_isr(higher_prio_task_waken);
906 true
907 } else {
908 false
909 }
910 }
911
912 unsafe fn remove(queue: QueuePtr, item: *const u8) {
913 let queue = unsafe { CompatQueue::from_ptr(queue) };
914
915 if queue.semaphore_full.take(Some(0)) {
916 let item_removed = queue.with(|inner| inner.remove(item));
917
918 if item_removed {
919 queue.semaphore_empty.give();
920 } else {
921 queue.semaphore_full.give();
922 }
923 }
924 }
925
926 fn messages_waiting(queue: QueuePtr) -> usize {
927 let queue = unsafe { CompatQueue::from_ptr(queue) };
928
929 queue.semaphore_full.current_count() as usize
930 }
931 }
932}
933
934#[cfg(feature = "ipc-implementations")]
935pub use implementation::CompatQueue;