esp_rtos/esp_radio/
queue.rs1use alloc::{boxed::Box, vec};
2use core::ptr::NonNull;
3
4use esp_radio_rtos_driver::{
5 queue::{QueueImplementation, QueuePtr},
6 register_queue_implementation,
7};
8use esp_sync::NonReentrantMutex;
9
10use crate::wait_queue::WaitQueue;
11
12struct QueueInner {
13 storage: Box<[u8]>,
14 item_size: usize,
15 capacity: usize,
16 count: usize,
17 current_read: usize,
18 current_write: usize,
19 waiting_for_space: WaitQueue,
20 waiting_for_item: WaitQueue,
21}
22
23impl QueueInner {
24 fn new(capacity: usize, item_size: usize) -> Self {
25 Self {
26 item_size,
27 capacity,
28 count: 0,
29 current_read: 0,
30 current_write: 0,
31 storage: vec![0; capacity * item_size].into_boxed_slice(),
32 waiting_for_space: WaitQueue::new(),
33 waiting_for_item: WaitQueue::new(),
34 }
35 }
36
37 fn get(&self, index: usize) -> &[u8] {
38 let item_start = self.item_size * index;
39 &self.storage[item_start..][..self.item_size]
40 }
41
42 fn get_mut(&mut self, index: usize) -> &mut [u8] {
43 let item_start = self.item_size * index;
44 &mut self.storage[item_start..][..self.item_size]
45 }
46
47 unsafe fn try_enqueue(&mut self, item: *const u8) -> bool {
48 if self.full() {
49 return false;
50 }
51
52 let item = unsafe { core::slice::from_raw_parts(item, self.item_size) };
53
54 let dst = self.get_mut(self.current_write);
55 dst.copy_from_slice(item);
56
57 self.current_write = (self.current_write + 1) % self.capacity;
58 self.count += 1;
59
60 true
61 }
62
63 unsafe fn try_enqueue_front(&mut self, item: *const u8) -> bool {
64 if self.full() {
65 return false;
66 }
67
68 let item = unsafe { core::slice::from_raw_parts(item, self.item_size) };
69
70 self.current_read = (self.current_read + self.capacity - 1) % self.capacity;
71 let dst = self.get_mut(self.current_read);
72 dst.copy_from_slice(item);
73
74 self.count += 1;
75
76 true
77 }
78
79 unsafe fn try_dequeue(&mut self, dst: *mut u8) -> bool {
80 if self.empty() {
81 return false;
82 }
83
84 let dst = unsafe { core::slice::from_raw_parts_mut(dst, self.item_size) };
85
86 let src = self.get(self.current_read);
87 dst.copy_from_slice(src);
88
89 self.current_read = (self.current_read + 1) % self.capacity;
90 self.count -= 1;
91
92 true
93 }
94
95 unsafe fn remove(&mut self, item: *const u8) {
96 if self.empty() {
97 return;
98 }
99
100 let count = self.len();
104
105 let mut tmp_item = vec![0; self.item_size];
106
107 let item_slice = unsafe { core::slice::from_raw_parts(item, self.item_size) };
108 for _ in 0..count {
109 if !unsafe { self.try_dequeue(tmp_item.as_mut_ptr().cast()) } {
110 break;
111 }
112 if &tmp_item[..] != item_slice {
113 _ = unsafe { self.try_enqueue(tmp_item.as_mut_ptr().cast()) };
114 }
115 }
118 }
119
120 fn len(&self) -> usize {
121 self.count
122 }
123
124 fn empty(&self) -> bool {
125 self.len() == 0
126 }
127
128 fn full(&self) -> bool {
129 self.len() == self.capacity
130 }
131}
132
133pub struct Queue {
134 inner: NonReentrantMutex<QueueInner>,
135}
136
137impl Queue {
138 pub fn new(capacity: usize, item_size: usize) -> Self {
139 Queue {
140 inner: NonReentrantMutex::new(QueueInner::new(capacity, item_size)),
141 }
142 }
143
144 unsafe fn from_ptr<'a>(ptr: QueuePtr) -> &'a Self {
145 unsafe { ptr.cast::<Self>().as_ref() }
146 }
147
148 unsafe fn send_to_back(&self, item: *const u8, timeout_us: Option<u32>) -> bool {
149 if crate::with_deadline(timeout_us, |deadline| {
150 self.inner.with(|queue| {
151 if unsafe { queue.try_enqueue(item) } {
152 trace!("Queue - notify with item");
153 queue.waiting_for_item.notify();
154 true
155 } else {
156 trace!("Queue - wait for space - {:?}", deadline);
158 queue.waiting_for_space.wait_with_deadline(deadline);
159 false
160 }
161 })
162 }) {
163 debug!("Queue - send to back - success");
164 true
165 } else {
166 debug!("Queue - send to back - timed out");
167 false
168 }
169 }
170
171 unsafe fn send_to_front(&self, item: *const u8, timeout_us: Option<u32>) -> bool {
172 if crate::with_deadline(timeout_us, |deadline| {
173 self.inner.with(|queue| {
174 if unsafe { queue.try_enqueue_front(item) } {
175 trace!("Queue - notify with item");
176 queue.waiting_for_item.notify();
177 true
178 } else {
179 trace!("Queue - wait for space - {:?}", deadline);
181 queue.waiting_for_space.wait_with_deadline(deadline);
182 false
183 }
184 })
185 }) {
186 debug!("Queue - send to front - success");
187 true
188 } else {
189 debug!("Queue - send to front - timed out");
190 false
191 }
192 }
193
194 unsafe fn try_send_to_back(&self, item: *const u8) -> bool {
195 self.inner.with(|queue| {
196 if unsafe { queue.try_enqueue(item) } {
197 queue.waiting_for_item.notify();
198 true
199 } else {
200 false
201 }
202 })
203 }
204
205 unsafe fn receive(&self, item: *mut u8, timeout_us: Option<u32>) -> bool {
206 if crate::with_deadline(timeout_us, |deadline| {
207 self.inner.with(|queue| {
208 if unsafe { queue.try_dequeue(item) } {
209 trace!("Queue - notify with space");
210 queue.waiting_for_space.notify();
211 true
212 } else {
213 trace!("Queue - wait for item - {:?}", deadline);
215 queue.waiting_for_item.wait_with_deadline(deadline);
216 false
217 }
218 })
219 }) {
220 debug!("Queue - dequeued item");
221 true
222 } else {
223 debug!("Queue - timed out waiting for item");
224 false
225 }
226 }
227
228 unsafe fn try_receive(&self, item: *mut u8) -> bool {
229 self.inner.with(|queue| {
230 if unsafe { queue.try_dequeue(item) } {
231 trace!("Queue - notify with space");
232 queue.waiting_for_space.notify();
233 true
234 } else {
235 false
236 }
237 })
238 }
239
240 unsafe fn remove(&self, item: *const u8) {
241 self.inner.with(|queue| {
242 let was_full = queue.full();
243
244 unsafe {
245 queue.remove(item);
246 }
247
248 if was_full && !queue.full() {
249 trace!("Queue - notify with space");
250 queue.waiting_for_space.notify();
251 }
252 })
253 }
254
255 fn messages_waiting(&self) -> usize {
256 self.inner.with(|queue| queue.len())
257 }
258}
259
260impl QueueImplementation for Queue {
261 fn create(capacity: usize, item_size: usize) -> QueuePtr {
262 let q = Box::new(Queue::new(capacity, item_size));
263 NonNull::from(Box::leak(q)).cast()
264 }
265
266 unsafe fn delete(queue: QueuePtr) {
267 let q = unsafe { Box::from_raw(queue.cast::<Queue>().as_ptr()) };
268 core::mem::drop(q);
269 }
270
271 unsafe fn send_to_front(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
272 let queue = unsafe { Queue::from_ptr(queue) };
273
274 unsafe { queue.send_to_front(item, timeout_us) }
275 }
276
277 unsafe fn send_to_back(queue: QueuePtr, item: *const u8, timeout_us: Option<u32>) -> bool {
278 let queue = unsafe { Queue::from_ptr(queue) };
279
280 unsafe { queue.send_to_back(item, timeout_us) }
281 }
282
283 unsafe fn try_send_to_back_from_isr(
284 queue: QueuePtr,
285 item: *const u8,
286 _higher_prio_task_waken: Option<&mut bool>,
287 ) -> bool {
288 let queue = unsafe { Queue::from_ptr(queue) };
289
290 unsafe { queue.try_send_to_back(item) }
291 }
292
293 unsafe fn receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool {
294 let queue = unsafe { Queue::from_ptr(queue) };
295
296 unsafe { queue.receive(item, timeout_us) }
297 }
298
299 unsafe fn try_receive_from_isr(
300 queue: QueuePtr,
301 item: *mut u8,
302 _higher_prio_task_waken: Option<&mut bool>,
303 ) -> bool {
304 let queue = unsafe { Queue::from_ptr(queue) };
305
306 unsafe { queue.try_receive(item) }
307 }
308
309 unsafe fn remove(queue: QueuePtr, item: *const u8) {
310 let queue = unsafe { Queue::from_ptr(queue) };
311
312 unsafe { queue.remove(item) }
313 }
314
315 fn messages_waiting(queue: QueuePtr) -> usize {
316 let queue = unsafe { Queue::from_ptr(queue) };
317
318 queue.messages_waiting()
319 }
320}
321
322register_queue_implementation!(Queue);