esp_rtos/esp_radio/
queue.rs

1use alloc::{boxed::Box, vec};
2use core::ptr::NonNull;
3
4use esp_radio_rtos_driver::{
5    queue::{QueueImplementation, QueuePtr},
6    register_queue_implementation,
7};
8use esp_sync::NonReentrantMutex;
9
10use crate::wait_queue::WaitQueue;
11
12struct QueueInner {
13    storage: Box<[u8]>,
14    item_size: usize,
15    capacity: usize,
16    count: usize,
17    current_read: usize,
18    current_write: usize,
19    waiting_for_space: WaitQueue,
20    waiting_for_item: WaitQueue,
21}
22
23impl QueueInner {
24    fn new(capacity: usize, item_size: usize) -> Self {
25        Self {
26            item_size,
27            capacity,
28            count: 0,
29            current_read: 0,
30            current_write: 0,
31            storage: vec![0; capacity * item_size].into_boxed_slice(),
32            waiting_for_space: WaitQueue::new(),
33            waiting_for_item: WaitQueue::new(),
34        }
35    }
36
37    fn get(&self, index: usize) -> &[u8] {
38        let item_start = self.item_size * index;
39        &self.storage[item_start..][..self.item_size]
40    }
41
42    fn get_mut(&mut self, index: usize) -> &mut [u8] {
43        let item_start = self.item_size * index;
44        &mut self.storage[item_start..][..self.item_size]
45    }
46
47    unsafe fn try_enqueue(&mut self, item: *const u8) -> bool {
48        if self.full() {
49            return false;
50        }
51
52        let item = unsafe { core::slice::from_raw_parts(item, self.item_size) };
53
54        let dst = self.get_mut(self.current_write);
55        dst.copy_from_slice(item);
56
57        self.current_write = (self.current_write + 1) % self.capacity;
58        self.count += 1;
59
60        true
61    }
62
63    unsafe fn try_enqueue_front(&mut self, item: *const u8) -> bool {
64        if self.full() {
65            return false;
66        }
67
68        let item = unsafe { core::slice::from_raw_parts(item, self.item_size) };
69
70        self.current_read = (self.current_read + self.capacity - 1) % self.capacity;
71        let dst = self.get_mut(self.current_read);
72        dst.copy_from_slice(item);
73
74        self.count += 1;
75
76        true
77    }
78
79    unsafe fn try_dequeue(&mut self, dst: *mut u8) -> bool {
80        if self.empty() {
81            return false;
82        }
83
84        let dst = unsafe { core::slice::from_raw_parts_mut(dst, self.item_size) };
85
86        let src = self.get(self.current_read);
87        dst.copy_from_slice(src);
88
89        self.current_read = (self.current_read + 1) % self.capacity;
90        self.count -= 1;
91
92        true
93    }
94
95    unsafe fn remove(&mut self, item: *const u8) {
96        if self.empty() {
97            return;
98        }
99
100        // do what the ESP-IDF implementations does...
101        // just remove all elements and add them back except the one we need to remove -
102        // good enough for now
103        let count = self.len();
104
105        let mut tmp_item = vec![0; self.item_size];
106
107        let item_slice = unsafe { core::slice::from_raw_parts(item, self.item_size) };
108        for _ in 0..count {
109            if !unsafe { self.try_dequeue(tmp_item.as_mut_ptr().cast()) } {
110                break;
111            }
112            if &tmp_item[..] != item_slice {
113                _ = unsafe { self.try_enqueue(tmp_item.as_mut_ptr().cast()) };
114            }
115            // Note that even if we find our item, we'll need to keep cycling through everything to
116            // keep insertion order.
117        }
118    }
119
120    fn len(&self) -> usize {
121        self.count
122    }
123
124    fn empty(&self) -> bool {
125        self.len() == 0
126    }
127
128    fn full(&self) -> bool {
129        self.len() == self.capacity
130    }
131}
132
133pub struct Queue {
134    inner: NonReentrantMutex<QueueInner>,
135}
136
137impl Queue {
138    pub fn new(capacity: usize, item_size: usize) -> Self {
139        Queue {
140            inner: NonReentrantMutex::new(QueueInner::new(capacity, item_size)),
141        }
142    }
143
144    unsafe fn from_ptr<'a>(ptr: QueuePtr) -> &'a Self {
145        unsafe { ptr.cast::<Self>().as_ref() }
146    }
147
148    unsafe fn send_to_back(&self, item: *const u8, timeout_us: Option<u32>) -> bool {
149        if crate::with_deadline(timeout_us, |deadline| {
150            self.inner.with(|queue| {
151                if unsafe { queue.try_enqueue(item) } {
152                    trace!("Queue - notify with item");
153                    queue.waiting_for_item.notify();
154                    true
155                } else {
156                    // The task will go to sleep when the above critical section is released.
157                    trace!("Queue - wait for space - {:?}", deadline);
158                    queue.waiting_for_space.wait_with_deadline(deadline);
159                    false
160                }
161            })
162        }) {
163            debug!("Queue - send to back - success");
164            true
165        } else {
166            debug!("Queue - send to back - timed out");
167            false
168        }
169    }
170
171    unsafe fn send_to_front(&self, item: *const u8, timeout_us: Option<u32>) -> bool {
172        if crate::with_deadline(timeout_us, |deadline| {
173            self.inner.with(|queue| {
174                if unsafe { queue.try_enqueue_front(item) } {
175                    trace!("Queue - notify with item");
176                    queue.waiting_for_item.notify();
177                    true
178                } else {
179                    // The task will go to sleep when the above critical section is released.
180                    trace!("Queue - wait for space - {:?}", deadline);
181                    queue.waiting_for_space.wait_with_deadline(deadline);
182                    false
183                }
184            })
185        }) {
186            debug!("Queue - send to front - success");
187            true
188        } else {
189            debug!("Queue - send to front - timed out");
190            false
191        }
192    }
193
194    unsafe fn try_send_to_back(&self, item: *const u8) -> bool {
195        self.inner.with(|queue| {
196            if unsafe { queue.try_enqueue(item) } {
197                queue.waiting_for_item.notify();
198                true
199            } else {
200                false
201            }
202        })
203    }
204
205    unsafe fn receive(&self, item: *mut u8, timeout_us: Option<u32>) -> bool {
206        if crate::with_deadline(timeout_us, |deadline| {
207            self.inner.with(|queue| {
208                if unsafe { queue.try_dequeue(item) } {
209                    trace!("Queue - notify with space");
210                    queue.waiting_for_space.notify();
211                    true
212                } else {
213                    // The task will go to sleep when the above critical section is released.
214                    trace!("Queue - wait for item - {:?}", deadline);
215                    queue.waiting_for_item.wait_with_deadline(deadline);
216                    false
217                }
218            })
219        }) {
220            debug!("Queue - dequeued item");
221            true
222        } else {
223            debug!("Queue - timed out waiting for item");
224            false
225        }
226    }
227
228    unsafe fn try_receive(&self, item: *mut u8) -> bool {
229        self.inner.with(|queue| {
230            if unsafe { queue.try_dequeue(item) } {
231                trace!("Queue - notify with space");
232                queue.waiting_for_space.notify();
233                true
234            } else {
235                false
236            }
237        })
238    }
239
240    unsafe fn remove(&self, item: *const u8) {
241        self.inner.with(|queue| {
242            let was_full = queue.full();
243
244            unsafe {
245                queue.remove(item);
246            }
247
248            if was_full && !queue.full() {
249                trace!("Queue - notify with space");
250                queue.waiting_for_space.notify();
251            }
252        })
253    }
254
255    fn messages_waiting(&self) -> usize {
256        self.inner.with(|queue| queue.len())
257    }
258}
259
260impl QueueImplementation for Queue {
261    fn create(capacity: usize, item_size: usize) -> QueuePtr {
262        let q = Box::new(Queue::new(capacity, item_size));
263        NonNull::from(Box::leak(q)).cast()
264    }
265
266    unsafe fn delete(queue: QueuePtr) {
267        let q = unsafe { Box::from_raw(queue.cast::<Queue>().as_ptr()) };
268        core::mem::drop(q);
269    }
270
271    unsafe fn send_to_front(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
272        let queue = unsafe { Queue::from_ptr(queue) };
273
274        unsafe { queue.send_to_front(item, timeout_us) }
275    }
276
277    unsafe fn send_to_back(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
278        let queue = unsafe { Queue::from_ptr(queue) };
279
280        unsafe { queue.send_to_back(item, timeout_us) }
281    }
282
283    unsafe fn try_send_to_back_from_isr(
284        queue: QueuePtr,
285        item: *const u8,
286        _higher_prio_task_waken: Option<&mut bool>,
287    ) -> bool {
288        let queue = unsafe { Queue::from_ptr(queue) };
289
290        unsafe { queue.try_send_to_back(item) }
291    }
292
293    unsafe fn receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool {
294        let queue = unsafe { Queue::from_ptr(queue) };
295
296        unsafe { queue.receive(item, timeout_us) }
297    }
298
299    unsafe fn try_receive_from_isr(
300        queue: QueuePtr,
301        item: *mut u8,
302        _higher_prio_task_waken: Option<&mut bool>,
303    ) -> bool {
304        let queue = unsafe { Queue::from_ptr(queue) };
305
306        unsafe { queue.try_receive(item) }
307    }
308
309    unsafe fn remove(queue: QueuePtr, item: *const u8) {
310        let queue = unsafe { Queue::from_ptr(queue) };
311
312        unsafe { queue.remove(item) }
313    }
314
315    fn messages_waiting(queue: QueuePtr) -> usize {
316        let queue = unsafe { Queue::from_ptr(queue) };
317
318        queue.messages_waiting()
319    }
320}
321
322register_queue_implementation!(Queue);