esp_hal/work_queue.rs
1//! A generic work queue.
2//!
3//! Work queues are the backbone of cryptographic drivers. They enable asynchronous and
4//! blocking operations using shared peripherals, like crypto accelerators. Clients
5//! post work items into the work queue, and poll for completion. The act of polling
6//! processes the queue, which allows any number of clients to post work without deadlocking.
7//!
8//! Work queues are configured by backends. Backends register a `process` callback which is
9//! called when a client posts a work item or polls for completion.
10//!
11//! Posting a work item into the queue returns a handle. The handle can be used to poll whether
12//! the work item has been processed. Dropping the handle will cancel the work item.
13#![cfg_attr(esp32c2, allow(unused))]
14
15use core::{future::poll_fn, marker::PhantomData, ptr::NonNull, task::Context};
16
17use embassy_sync::waitqueue::WakerRegistration;
18use esp_sync::NonReentrantMutex;
19
20/// Queue driver operations.
21///
22/// Functions in this VTable are provided by drivers that consume work items.
23/// These functions may be called both in the context of the queue frontends and drivers.
24pub(crate) struct VTable<T: Sync + Send> {
25 /// Starts processing a new work item.
26 ///
27 /// The function returns whether the work item was accepted, and its poll status if it was
28 /// accepted. If there is no driver currently processing the queue, this function will
29 /// return None to prevent removing the work item from the queue.
30 ///
31 /// This function should be as short as possible.
32 pub(crate) post: fn(NonNull<()>, &mut T) -> Option<Poll>,
33
34 /// Polls the status of the current work item.
35 ///
36 /// The work queue ensures that the item passed here has been first passed to the driver by
37 /// `post`.
38 ///
39 /// This function should be as short as possible.
40 pub(crate) poll: fn(NonNull<()>, &mut T) -> Poll,
41
42 /// Attempts to abort processing a work item.
43 ///
44 /// This function should be as short as possible.
45 pub(crate) cancel: fn(NonNull<()>, &mut T),
46
47 /// Called when the driver may be stopped.
48 ///
49 /// This function should be as short as possible.
50 pub(crate) stop: fn(NonNull<()>),
51}
52
53impl<T: Sync + Send> VTable<T> {
54 pub(crate) const fn noop() -> Self {
55 Self {
56 post: |_, _| None,
57 poll: |_, _| unreachable!(),
58 cancel: |_, _| (),
59 stop: |_| (),
60 }
61 }
62}
63
64struct Inner<T: Sync + Send> {
65 head: Option<NonNull<WorkItem<T>>>,
66 tail: Option<NonNull<WorkItem<T>>>,
67 current: Option<NonNull<WorkItem<T>>>,
68
69 // The data pointer will be passed to VTable functions, which may be called in any context.
70 data: NonNull<()>,
71 vtable: VTable<T>,
72
73 // Counts suspend requests. When this reaches 0 again, the all wakers in the queue need to be
74 // waken to continue processing.
75 suspend_count: usize,
76 // The task waiting for the queue to be suspended. There can be multiple tasks, but that's
77 // practically rare (in this setup, it needs both HMAC and DSA to want to work at the same
78 // time).
79 suspend_waker: WakerRegistration,
80}
81
82unsafe impl<T: Sync + Send> Send for Inner<T> {}
83unsafe impl<T: Sync + Send> Sync for Inner<T> {}
84
85impl<T: Sync + Send> Inner<T> {
86 /// Places a work item at the end of the queue.
87 fn enqueue(&mut self, ptr: NonNull<WorkItem<T>>) {
88 if let Some(tail) = self.tail.as_mut() {
89 // Queue contains something, append to `tail`.
90 unsafe { tail.as_mut().next = Some(ptr) };
91 } else {
92 // Queue was empty, set `head` to the first element.
93 self.head = Some(ptr);
94 }
95
96 // Move `tail` to the newly inserted item.
97 self.tail = Some(ptr);
98 }
99
100 /// Places a work item at the front of the queue.
101 fn enqueue_front(&mut self, mut ptr: NonNull<WorkItem<T>>) {
102 // Chain the node into the list.
103 unsafe { ptr.as_mut().next = self.head };
104
105 // Adjust list `head` to point at the new-first element.
106 self.head = Some(ptr);
107 if self.tail.is_none() {
108 // The queue was empty, we need to set `tail` to the last element.
109 self.tail = Some(ptr);
110 }
111 }
112
113 /// Runs one processing iteration.
114 ///
115 /// This function enqueues a new work item or polls the status of the currently processed one.
116 /// Returns whether the function should be re-called by the caller.
117 fn process(&mut self) -> bool {
118 if let Some(mut current) = self.current {
119 let poll_result = (self.vtable.poll)(self.data, &mut unsafe { current.as_mut() }.data);
120
121 match poll_result {
122 Poll::Ready(status) => {
123 unsafe { current.as_mut() }.complete(status);
124 self.current = None;
125 if self.suspend_count > 0 {
126 // Queue suspended, stop the driver.
127 (self.vtable.stop)(self.data);
128 self.suspend_waker.wake();
129 false
130 } else {
131 self.dequeue_and_post(true)
132 }
133 }
134 Poll::Pending(recall) => recall,
135 }
136 } else {
137 // If the queue is empty, the driver should already have been notified when the queue
138 // became empty, so we don't notify it here.
139 self.dequeue_and_post(false)
140 }
141 }
142
143 /// Retrieves the next work queue item and sends it to the driver.
144 ///
145 /// Returns true if the queue needs to be polled again.
146 // Note: even if the queue itself may be implemented lock-free, dequeuing and posting to the
147 // driver must be done atomically to ensure that the queue can be processed fully by any of
148 // the frontends polling it.
149 fn dequeue_and_post(&mut self, notify_on_empty: bool) -> bool {
150 let Some(mut ptr) = self.dequeue() else {
151 if notify_on_empty {
152 // There are no more work items. Notify the driver that it can stop.
153 (self.vtable.stop)(self.data);
154 }
155 return false;
156 };
157
158 // Start processing a new work item.
159
160 if let Some(poll_status) = (self.vtable.post)(self.data, &mut unsafe { ptr.as_mut() }.data)
161 {
162 match poll_status {
163 Poll::Pending(recall) => {
164 unsafe { ptr.as_mut().status = Poll::Pending(recall) };
165 self.current = Some(ptr);
166 recall
167 }
168 Poll::Ready(status) => {
169 unsafe { ptr.as_mut() }.complete(status);
170 // The driver immediately processed the work item.
171 // Polling again needs to dequeue the next item.
172 true
173 }
174 }
175 } else {
176 // If the driver didn't accept the work item, place it back to the front of the
177 // queue.
178 self.enqueue_front(ptr);
179 false
180 }
181 }
182
183 /// Pops and returns a work item from the start of the queue.
184 fn dequeue(&mut self) -> Option<NonNull<WorkItem<T>>> {
185 // If the `head` is None, the queue is empty. Return None and do nothing.
186 let ptr = self.head?;
187
188 self.head = unsafe { ptr.as_ref() }.next;
189
190 // If the new `head` is null, the queue is empty. Clear the `tail` pointer.
191 if self.head.is_none() {
192 self.tail = None;
193 }
194
195 Some(ptr)
196 }
197
198 /// Cancels a particular work item.
199 ///
200 /// If the work item is currently being processed, this function notifies the driver. Otherwise,
201 /// it tries to remove the pointer from the work queue.
202 ///
203 /// The function returns true when the item was immediately cancelled.
204 ///
205 /// This function is not `unsafe` because it only dereferences `work_item` if the function has
206 /// determined that the item belongs to this queue.
207 fn cancel(&mut self, mut work_item: NonNull<WorkItem<T>>) -> bool {
208 if self.current == Some(work_item) {
209 // Cancelling an in-progress item is more complicated than plucking it from the
210 // queue. Forward the request to the driver to (maybe) cancel the
211 // operation.
212 (self.vtable.cancel)(
213 self.data,
214 // This is safe to do, because the work item is currently owned by this queue.
215 &mut unsafe { work_item.as_mut() }.data,
216 );
217 // Queue will need to be polled to query item status.
218 return false;
219 }
220
221 if unsafe { work_item.as_ref() }.status.is_ready() {
222 // Nothing to do.
223 return true;
224 }
225
226 // The work item is not the current one, remove it from the queue. This immediately
227 // cancels the work item. `remove` only uses the address of the work item without
228 // dereferencing it.
229 if self.remove(work_item) {
230 unsafe { work_item.as_mut() }.complete(Status::Cancelled);
231 // Cancelled immediately, no further polling necessary for this item.
232 return true;
233 }
234
235 // In this case the item doesn't belong to this queue, it can be in any state. The item will
236 // need to be polled, but this also means something may have gone wrong.
237 false
238 }
239
240 /// Removes the item from the queue.
241 ///
242 /// Returns `true` if the work item was successfully removed, `false` if the work item was not
243 /// found in the queue.
244 ///
245 /// This function is not `unsafe` because it does not dereference `ptr`, so it does not matter
246 /// that `ptr` may belong to a different work queue.
247 fn remove(&mut self, ptr: NonNull<WorkItem<T>>) -> bool {
248 // Walk the queue to find `ptr`.
249 let mut prev = None;
250 let mut current = self.head;
251 while let Some(current_item) = current {
252 let next = unsafe { current_item.as_ref() }.next;
253
254 if current_item != ptr {
255 // Not what we're looking for. Move to the next element.
256 prev = current;
257 current = next;
258 continue;
259 }
260
261 // We've found `ptr`. Remove it from the list.
262 if Some(ptr) == self.head {
263 self.head = next;
264 } else {
265 // Unwrapping is fine, because if the current pointer is not the `head`, the
266 // previous pointer must be Some.
267 unsafe { unwrap!(prev).as_mut() }.next = next;
268 }
269
270 if Some(ptr) == self.tail {
271 self.tail = prev;
272 }
273
274 return true;
275 }
276
277 // Did not find `ptr`.
278 false
279 }
280
281 /// Increases the suspend counter, preventing new work items from starting to be processed.
282 ///
283 /// If the current work item finishes processing, the driver is shut down. Call `is_active` to
284 /// determine when the queue enters suspended state.
285 fn suspend(&mut self, ctx: Option<&Context<'_>>) {
286 self.suspend_count += 1;
287 if let Some(ctx) = ctx {
288 if self.current.is_some() {
289 self.suspend_waker.register(ctx.waker());
290 } else {
291 ctx.waker().wake_by_ref();
292 }
293 }
294 }
295
296 /// Decreases the suspend counter.
297 ///
298 /// When it reaches 0, this function wakes async tasks that poll the queue. They need to be
299 /// waken to ensure that their items don't end up stuck. Blocking pollers will eventually end up
300 /// looping when their turn comes.
301 fn resume(&mut self) {
302 self.suspend_count -= 1;
303 if self.suspend_count == 0 {
304 self.wake_polling_tasks();
305 }
306 }
307
308 fn wake_polling_tasks(&mut self) {
309 if self.data == NonNull::dangling() {
310 // No VTable means no driver, no need to continue processing.
311 return;
312 }
313 // Walk through the list and wake polling tasks.
314 let mut current = self.head;
315 while let Some(mut current_item) = current {
316 let item = unsafe { current_item.as_mut() };
317
318 item.waker.wake();
319
320 current = item.next;
321 }
322 }
323
324 fn is_active(&self) -> bool {
325 self.current.is_some()
326 }
327
328 unsafe fn configure(&mut self, data: NonNull<()>, vtable: VTable<T>) {
329 (self.vtable.stop)(self.data);
330
331 self.data = data;
332 self.vtable = vtable;
333
334 if self.suspend_count == 0 {
335 self.wake_polling_tasks();
336 }
337 }
338}
339
340/// A generic work queue.
341pub(crate) struct WorkQueue<T: Sync + Send> {
342 inner: NonReentrantMutex<Inner<T>>,
343}
344
345impl<T: Sync + Send> WorkQueue<T> {
346 /// Creates a new `WorkQueue`.
347 pub const fn new() -> Self {
348 Self {
349 inner: NonReentrantMutex::new(Inner {
350 head: None,
351 tail: None,
352 current: None,
353
354 data: NonNull::dangling(),
355 vtable: VTable::noop(),
356
357 suspend_count: 0,
358 suspend_waker: WakerRegistration::new(),
359 }),
360 }
361 }
362
363 /// Configures the queue.
364 ///
365 /// The provided data pointer will be passed to the VTable functions.
366 ///
367 /// # Safety
368 ///
369 /// The `data` pointer must be valid as long as the `WorkQueue` is configured with it. The
370 /// driver must access the data pointer appropriately (i.e. it must not move !Send data out of
371 /// it).
372 pub unsafe fn configure<D: Sync + Send>(&self, data: NonNull<D>, vtable: VTable<T>) {
373 self.inner
374 .with(|inner| unsafe { inner.configure(data.cast(), vtable) })
375 }
376
377 /// Enqueues a work item.
378 pub fn post_work<'t>(&'t self, work_item: &'t mut WorkItem<T>) -> Handle<'t, T> {
379 let ptr = unsafe {
380 // Safety: `Handle` and `work_item` have lifetime 't, which ensures this call
381 // can't be called on an in-flight work item. As `Handle` (and the underlying driver
382 // that processed the work queue) does not use the reference, and using the
383 // reference is not possible while `Handle` exists, this should be safe.
384 work_item.prepare()
385 };
386
387 self.inner.with(|inner| inner.enqueue(ptr));
388
389 Handle {
390 queue: self,
391 work_item: ptr,
392 _marker: PhantomData,
393 }
394 }
395
396 /// Polls the queue once.
397 ///
398 /// Returns true if the queue needs to be polled again.
399 #[allow(unused)]
400 pub fn process(&self) -> bool {
401 self.inner.with(|inner| inner.process())
402 }
403
404 /// Polls the queue once and returns the status of the given work item.
405 ///
406 /// ## Safety
407 ///
408 /// The caller must ensure that `item` belongs to the polled queue. An item belongs to the
409 /// **last queue it was enqueued in**, even if the item is no longer in the queue's linked
410 /// list. This relationship is broken when the Handle that owns the WorkItem is dropped.
411 pub unsafe fn poll(&self, item: NonNull<WorkItem<T>>) -> Poll {
412 self.inner.with(|inner| {
413 let status = unsafe { &*item.as_ptr() }.status;
414 if status.is_pending() {
415 inner.process();
416 unsafe { &*item.as_ptr() }.status
417 } else {
418 status
419 }
420 })
421 }
422
423 /// Schedules the work item to be cancelled.
424 ///
425 /// The function returns true when the item was immediately cancelled. If the function returns
426 /// `false`, the item will need to be polled until its status becomes [`Poll::Ready`].
427 ///
428 /// The work item should not be assumed to be immediately cancelled. Polling its handle
429 /// is necessary to ensure it is no longer being processed by the underlying driver.
430 pub fn cancel(&self, work_item: NonNull<WorkItem<T>>) -> bool {
431 self.inner.with(|inner| inner.cancel(work_item))
432 }
433}
434
435/// The status of a work item.
436#[derive(Clone, Copy, PartialEq, Debug)]
437#[cfg_attr(feature = "defmt", derive(defmt::Format))]
438pub enum Status {
439 /// The processing has completed.
440 Completed,
441
442 /// The work item has been cancelled.
443 Cancelled,
444}
445
446/// A unit of work in the work queue.
447pub(crate) struct WorkItem<T: Sync + Send> {
448 next: Option<NonNull<WorkItem<T>>>,
449 status: Poll,
450 data: T,
451 waker: WakerRegistration,
452}
453
454impl<T: Sync + Send> WorkItem<T> {
455 /// Completes the work item.
456 ///
457 /// This function is intended to be called from the underlying drivers.
458 pub fn complete(&mut self, status: Status) {
459 self.status = Poll::Ready(status);
460 self.waker.wake();
461 }
462
463 /// Prepares a work item to be enqueued.
464 ///
465 /// # Safety:
466 ///
467 /// The caller must ensure the reference is not used again while the pointer returned by this
468 /// function is in use.
469 unsafe fn prepare(&mut self) -> NonNull<Self> {
470 self.next = None;
471 self.status = Poll::Pending(false);
472
473 NonNull::from(self)
474 }
475}
476
477/// The status of a work item posted to a work queue.
478#[derive(Clone, Copy, PartialEq, Debug)]
479#[cfg_attr(feature = "defmt", derive(defmt::Format))]
480pub(crate) enum Poll {
481 /// The work item has not yet been fully processed. Contains whether the caller should poll the
482 /// queue again. This only has effect on async pollers, which will need to wake their tasks
483 /// immediately.
484 Pending(bool),
485
486 /// The work item has been processed.
487 Ready(Status),
488}
489
490impl Poll {
491 /// Returns whether the current result is still pending.
492 pub fn is_pending(self) -> bool {
493 matches!(self, Self::Pending(_))
494 }
495
496 /// Returns whether the current result is ready.
497 pub fn is_ready(self) -> bool {
498 !self.is_pending()
499 }
500}
501
502/// A reference to a posted [`WorkItem`].
503///
504/// This struct ensures that the work item is valid until the item is processed or is removed from
505/// the work queue.
506///
507/// Dropping the handle cancels the work item, but may block for some time if the work item is
508/// already being processed.
509pub(crate) struct Handle<'t, T: Sync + Send> {
510 queue: &'t WorkQueue<T>,
511 work_item: NonNull<WorkItem<T>>,
512 // Make sure lifetime is invariant to prevent UB.
513 _marker: PhantomData<&'t mut WorkItem<T>>,
514}
515
516impl<'t, T: Sync + Send> Handle<'t, T> {
517 pub(crate) fn from_completed_work_item(
518 queue: &'t WorkQueue<T>,
519 work_item: &'t mut WorkItem<T>,
520 ) -> Self {
521 // Don't use `complete` here, we don't need to wake anything, just ensure that the item will
522 // not be put into the queue.
523 work_item.status = Poll::Ready(Status::Completed);
524
525 Self {
526 queue,
527 work_item: NonNull::from(work_item),
528 _marker: PhantomData,
529 }
530 }
531
532 fn poll_inner(&mut self) -> Poll {
533 unsafe { self.queue.poll(self.work_item) }
534 }
535
536 /// Returns the status of the work item.
537 pub fn poll(&mut self) -> bool {
538 self.poll_inner().is_ready()
539 }
540
541 /// Polls the work item to completion, by busy-looping.
542 ///
543 /// This function returns immediately if `poll` returns `true`.
544 #[inline]
545 pub fn wait_blocking(mut self) -> Status {
546 loop {
547 if let Poll::Ready(status) = self.poll_inner() {
548 return status;
549 }
550 }
551 }
552
553 /// Waits until the work item is completed.
554 pub fn wait(&mut self) -> impl Future<Output = Status> {
555 poll_fn(|ctx| {
556 unsafe { self.work_item.as_mut() }
557 .waker
558 .register(ctx.waker());
559 match self.poll_inner() {
560 Poll::Pending(recall) => {
561 if recall {
562 ctx.waker().wake_by_ref();
563 }
564 core::task::Poll::Pending
565 }
566 Poll::Ready(status) => core::task::Poll::Ready(status),
567 }
568 })
569 }
570
571 /// Cancels the work item and asynchronously waits until it is removed from the work queue.
572 pub async fn cancel(&mut self) {
573 if !self.queue.cancel(self.work_item) {
574 self.wait().await;
575 }
576 }
577}
578
579impl<'t, T: Sync + Send> Drop for Handle<'t, T> {
580 fn drop(&mut self) {
581 if !self.queue.cancel(self.work_item) {
582 // We must wait for the driver to release our WorkItem.
583 while self.poll_inner().is_pending() {}
584 }
585 }
586}
587
588pub(crate) struct WorkQueueDriver<'t, D, T>
589where
590 D: Sync + Send,
591 T: Sync + Send,
592{
593 queue: &'t WorkQueue<T>,
594 _marker: PhantomData<&'t mut D>,
595}
596
597impl<'t, D, T> WorkQueueDriver<'t, D, T>
598where
599 D: Sync + Send,
600 T: Sync + Send,
601{
602 pub fn new(driver: &'t mut D, vtable: VTable<T>, queue: &'t WorkQueue<T>) -> Self {
603 unsafe {
604 // Safety: the lifetime 't ensures the pointer remains valid for the lifetime of the
605 // WorkQueueDriver. The Drop implementation (and the general "Don't forget" clause)
606 // ensure the pointer is not used after the WQD has been dropped.
607 queue.configure(NonNull::from(driver), vtable);
608 }
609 Self {
610 queue,
611 _marker: PhantomData,
612 }
613 }
614
615 /// Shuts down the driver.
616 pub fn stop(self) -> impl Future<Output = ()> {
617 let mut suspended = false;
618 poll_fn(move |ctx| {
619 self.queue.inner.with(|inner| {
620 if !inner.is_active() {
621 unsafe {
622 // Safety: the noop VTable functions don't use the pointer at all.
623 self.queue
624 .configure(NonNull::<D>::dangling(), VTable::noop())
625 };
626 // Make sure the queue doesn't remain suspended when the driver is re-started.
627 if suspended {
628 inner.resume();
629 }
630 return core::task::Poll::Ready(());
631 }
632 // This may kick out other suspend() callers, but that should be okay. They will
633 // only be able to do work if the queue is !active, for them it doesn't matter if
634 // the queue is suspended or stopped completely - just that it isn't running. As for
635 // the possible waker churn, we can use MultiWakerRegistration with a capacity
636 // suitable for the number of possible suspenders (2-3 unless the work queue ends up
637 // being used more widely), if this turns out to be a problem.
638 inner.suspend_waker.register(ctx.waker());
639 if !suspended {
640 inner.suspend(Some(ctx));
641 suspended = true;
642 }
643
644 core::task::Poll::Pending
645 })
646 })
647 }
648}
649
650impl<D, T> Drop for WorkQueueDriver<'_, D, T>
651where
652 D: Sync + Send,
653 T: Sync + Send,
654{
655 fn drop(&mut self) {
656 let wait_for_suspended = self.queue.inner.with(|inner| {
657 if inner.is_active() {
658 inner.suspend(None);
659 true
660 } else {
661 unsafe { inner.configure(NonNull::dangling(), VTable::noop()) };
662 false
663 }
664 });
665
666 if !wait_for_suspended {
667 return;
668 }
669
670 loop {
671 let done = self.queue.inner.with(|inner| {
672 if inner.is_active() {
673 return false;
674 }
675
676 unsafe { inner.configure(NonNull::dangling(), VTable::noop()) };
677
678 inner.resume();
679
680 true
681 });
682 if done {
683 break;
684 }
685 }
686 }
687}
688
689/// Used by work queue clients, allows hiding WorkItem.
690pub(crate) struct WorkQueueFrontend<T: Sync + Send> {
691 work_item: WorkItem<T>,
692}
693
694impl<T: Sync + Send> WorkQueueFrontend<T> {
695 pub fn new(initial: T) -> Self {
696 Self {
697 work_item: WorkItem {
698 next: None,
699 status: Poll::Pending(false),
700 data: initial,
701 waker: WakerRegistration::new(),
702 },
703 }
704 }
705
706 pub fn data_mut(&mut self) -> &mut T {
707 &mut self.work_item.data
708 }
709
710 pub fn post<'t>(&'t mut self, queue: &'t WorkQueue<T>) -> Handle<'t, T> {
711 queue.post_work(&mut self.work_item)
712 }
713
714 /// Creates a Handle for a work item that does not need to be put into the queue.
715 pub fn post_completed<'t>(&'t mut self, queue: &'t WorkQueue<T>) -> Handle<'t, T> {
716 Handle::from_completed_work_item(queue, &mut self.work_item)
717 }
718}
719
720// TODO: implement individual algo context wrappers