esp_hal/
work_queue.rs

1//! A generic work queue.
2//!
3//! Work queues are the backbone of cryptographic drivers. They enable asynchronous and
4//! blocking operations using shared peripherals, like crypto accelerators. Clients
5//! post work items into the work queue, and poll for completion. The act of polling
6//! processes the queue, which allows any number of clients to post work without deadlocking.
7//!
8//! Work queues are configured by backends. Backends register a `process` callback which is
9//! called when a client posts a work item or polls for completion.
10//!
11//! Posting a work item into the queue returns a handle. The handle can be used to poll whether
12//! the work item has been processed. Dropping the handle will cancel the work item.
13#![cfg_attr(esp32c2, allow(unused))]
14
15use core::{future::poll_fn, marker::PhantomData, ptr::NonNull, task::Context};
16
17use embassy_sync::waitqueue::WakerRegistration;
18use esp_sync::NonReentrantMutex;
19
20/// Queue driver operations.
21///
22/// Functions in this VTable are provided by drivers that consume work items.
23/// These functions may be called both in the context of the queue frontends and drivers.
24pub(crate) struct VTable<T: Sync + Send> {
25    /// Starts processing a new work item.
26    ///
27    /// The function returns whether the work item was accepted, and its poll status if it was
28    /// accepted. If there is no driver currently processing the queue, this function will
29    /// return None to prevent removing the work item from the queue.
30    ///
31    /// This function should be as short as possible.
32    pub(crate) post: fn(NonNull<()>, &mut T) -> Option<Poll>,
33
34    /// Polls the status of the current work item.
35    ///
36    /// The work queue ensures that the item passed here has been first passed to the driver by
37    /// `post`.
38    ///
39    /// This function should be as short as possible.
40    pub(crate) poll: fn(NonNull<()>, &mut T) -> Poll,
41
42    /// Attempts to abort processing a work item.
43    ///
44    /// This function should be as short as possible.
45    pub(crate) cancel: fn(NonNull<()>, &mut T),
46
47    /// Called when the driver may be stopped.
48    ///
49    /// This function should be as short as possible.
50    pub(crate) stop: fn(NonNull<()>),
51}
52
53impl<T: Sync + Send> VTable<T> {
54    pub(crate) const fn noop() -> Self {
55        Self {
56            post: |_, _| None,
57            poll: |_, _| unreachable!(),
58            cancel: |_, _| (),
59            stop: |_| (),
60        }
61    }
62}
63
64struct Inner<T: Sync + Send> {
65    head: Option<NonNull<WorkItem<T>>>,
66    tail: Option<NonNull<WorkItem<T>>>,
67    current: Option<NonNull<WorkItem<T>>>,
68
69    // The data pointer will be passed to VTable functions, which may be called in any context.
70    data: NonNull<()>,
71    vtable: VTable<T>,
72
73    // Counts suspend requests. When this reaches 0 again, the all wakers in the queue need to be
74    // waken to continue processing.
75    suspend_count: usize,
76    // The task waiting for the queue to be suspended. There can be multiple tasks, but that's
77    // practically rare (in this setup, it needs both HMAC and DSA to want to work at the same
78    // time).
79    suspend_waker: WakerRegistration,
80}
81
82unsafe impl<T: Sync + Send> Send for Inner<T> {}
83unsafe impl<T: Sync + Send> Sync for Inner<T> {}
84
85impl<T: Sync + Send> Inner<T> {
86    /// Places a work item at the end of the queue.
87    fn enqueue(&mut self, ptr: NonNull<WorkItem<T>>) {
88        if let Some(tail) = self.tail.as_mut() {
89            // Queue contains something, append to `tail`.
90            unsafe { tail.as_mut().next = Some(ptr) };
91        } else {
92            // Queue was empty, set `head` to the first element.
93            self.head = Some(ptr);
94        }
95
96        // Move `tail` to the newly inserted item.
97        self.tail = Some(ptr);
98    }
99
100    /// Places a work item at the front of the queue.
101    fn enqueue_front(&mut self, mut ptr: NonNull<WorkItem<T>>) {
102        // Chain the node into the list.
103        unsafe { ptr.as_mut().next = self.head };
104
105        // Adjust list `head` to point at the new-first element.
106        self.head = Some(ptr);
107        if self.tail.is_none() {
108            // The queue was empty, we need to set `tail` to the last element.
109            self.tail = Some(ptr);
110        }
111    }
112
113    /// Runs one processing iteration.
114    ///
115    /// This function enqueues a new work item or polls the status of the currently processed one.
116    /// Returns whether the function should be re-called by the caller.
117    fn process(&mut self) -> bool {
118        if let Some(mut current) = self.current {
119            let poll_result = (self.vtable.poll)(self.data, &mut unsafe { current.as_mut() }.data);
120
121            match poll_result {
122                Poll::Ready(status) => {
123                    unsafe { current.as_mut() }.complete(status);
124                    self.current = None;
125                    if self.suspend_count > 0 {
126                        // Queue suspended, stop the driver.
127                        (self.vtable.stop)(self.data);
128                        self.suspend_waker.wake();
129                        false
130                    } else {
131                        self.dequeue_and_post(true)
132                    }
133                }
134                Poll::Pending(recall) => recall,
135            }
136        } else {
137            // If the queue is empty, the driver should already have been notified when the queue
138            // became empty, so we don't notify it here.
139            self.dequeue_and_post(false)
140        }
141    }
142
143    /// Retrieves the next work queue item and sends it to the driver.
144    ///
145    /// Returns true if the queue needs to be polled again.
146    // Note: even if the queue itself may be implemented lock-free, dequeuing and posting to the
147    // driver must be done atomically to ensure that the queue can be processed fully by any of
148    // the frontends polling it.
149    fn dequeue_and_post(&mut self, notify_on_empty: bool) -> bool {
150        let Some(mut ptr) = self.dequeue() else {
151            if notify_on_empty {
152                // There are no more work items. Notify the driver that it can stop.
153                (self.vtable.stop)(self.data);
154            }
155            return false;
156        };
157
158        // Start processing a new work item.
159
160        if let Some(poll_status) = (self.vtable.post)(self.data, &mut unsafe { ptr.as_mut() }.data)
161        {
162            match poll_status {
163                Poll::Pending(recall) => {
164                    unsafe { ptr.as_mut().status = Poll::Pending(recall) };
165                    self.current = Some(ptr);
166                    recall
167                }
168                Poll::Ready(status) => {
169                    unsafe { ptr.as_mut() }.complete(status);
170                    // The driver immediately processed the work item.
171                    // Polling again needs to dequeue the next item.
172                    true
173                }
174            }
175        } else {
176            // If the driver didn't accept the work item, place it back to the front of the
177            // queue.
178            self.enqueue_front(ptr);
179            false
180        }
181    }
182
183    /// Pops and returns a work item from the start of the queue.
184    fn dequeue(&mut self) -> Option<NonNull<WorkItem<T>>> {
185        // If the `head` is None, the queue is empty. Return None and do nothing.
186        let ptr = self.head?;
187
188        self.head = unsafe { ptr.as_ref() }.next;
189
190        // If the new `head` is null, the queue is empty. Clear the `tail` pointer.
191        if self.head.is_none() {
192            self.tail = None;
193        }
194
195        Some(ptr)
196    }
197
198    /// Cancels a particular work item.
199    ///
200    /// If the work item is currently being processed, this function notifies the driver. Otherwise,
201    /// it tries to remove the pointer from the work queue.
202    ///
203    /// The function returns true when the item was immediately cancelled.
204    ///
205    /// This function is not `unsafe` because it only dereferences `work_item` if the function has
206    /// determined that the item belongs to this queue.
207    fn cancel(&mut self, mut work_item: NonNull<WorkItem<T>>) -> bool {
208        if self.current == Some(work_item) {
209            // Cancelling an in-progress item is more complicated than plucking it from the
210            // queue. Forward the request to the driver to (maybe) cancel the
211            // operation.
212            (self.vtable.cancel)(
213                self.data,
214                // This is safe to do, because the work item is currently owned by this queue.
215                &mut unsafe { work_item.as_mut() }.data,
216            );
217            // Queue will need to be polled to query item status.
218            return false;
219        }
220
221        if unsafe { work_item.as_ref() }.status.is_ready() {
222            // Nothing to do.
223            return true;
224        }
225
226        // The work item is not the current one, remove it from the queue. This immediately
227        // cancels the work item. `remove` only uses the address of the work item without
228        // dereferencing it.
229        if self.remove(work_item) {
230            unsafe { work_item.as_mut() }.complete(Status::Cancelled);
231            // Cancelled immediately, no further polling necessary for this item.
232            return true;
233        }
234
235        // In this case the item doesn't belong to this queue, it can be in any state. The item will
236        // need to be polled, but this also means something may have gone wrong.
237        false
238    }
239
240    /// Removes the item from the queue.
241    ///
242    /// Returns `true` if the work item was successfully removed, `false` if the work item was not
243    /// found in the queue.
244    ///
245    /// This function is not `unsafe` because it does not dereference `ptr`, so it does not matter
246    /// that `ptr` may belong to a different work queue.
247    fn remove(&mut self, ptr: NonNull<WorkItem<T>>) -> bool {
248        // Walk the queue to find `ptr`.
249        let mut prev = None;
250        let mut current = self.head;
251        while let Some(current_item) = current {
252            let next = unsafe { current_item.as_ref() }.next;
253
254            if current_item != ptr {
255                // Not what we're looking for. Move to the next element.
256                prev = current;
257                current = next;
258                continue;
259            }
260
261            // We've found `ptr`. Remove it from the list.
262            if Some(ptr) == self.head {
263                self.head = next;
264            } else {
265                // Unwrapping is fine, because if the current pointer is not the `head`, the
266                // previous pointer must be Some.
267                unsafe { unwrap!(prev).as_mut() }.next = next;
268            }
269
270            if Some(ptr) == self.tail {
271                self.tail = prev;
272            }
273
274            return true;
275        }
276
277        // Did not find `ptr`.
278        false
279    }
280
281    /// Increases the suspend counter, preventing new work items from starting to be processed.
282    ///
283    /// If the current work item finishes processing, the driver is shut down. Call `is_active` to
284    /// determine when the queue enters suspended state.
285    fn suspend(&mut self, ctx: Option<&Context<'_>>) {
286        self.suspend_count += 1;
287        if let Some(ctx) = ctx {
288            if self.current.is_some() {
289                self.suspend_waker.register(ctx.waker());
290            } else {
291                ctx.waker().wake_by_ref();
292            }
293        }
294    }
295
296    /// Decreases the suspend counter.
297    ///
298    /// When it reaches 0, this function wakes async tasks that poll the queue. They need to be
299    /// waken to ensure that their items don't end up stuck. Blocking pollers will eventually end up
300    /// looping when their turn comes.
301    fn resume(&mut self) {
302        self.suspend_count -= 1;
303        if self.suspend_count == 0 {
304            self.wake_polling_tasks();
305        }
306    }
307
308    fn wake_polling_tasks(&mut self) {
309        if self.data == NonNull::dangling() {
310            // No VTable means no driver, no need to continue processing.
311            return;
312        }
313        // Walk through the list and wake polling tasks.
314        let mut current = self.head;
315        while let Some(mut current_item) = current {
316            let item = unsafe { current_item.as_mut() };
317
318            item.waker.wake();
319
320            current = item.next;
321        }
322    }
323
324    fn is_active(&self) -> bool {
325        self.current.is_some()
326    }
327
328    unsafe fn configure(&mut self, data: NonNull<()>, vtable: VTable<T>) {
329        (self.vtable.stop)(self.data);
330
331        self.data = data;
332        self.vtable = vtable;
333
334        if self.suspend_count == 0 {
335            self.wake_polling_tasks();
336        }
337    }
338}
339
340/// A generic work queue.
341pub(crate) struct WorkQueue<T: Sync + Send> {
342    inner: NonReentrantMutex<Inner<T>>,
343}
344
345impl<T: Sync + Send> WorkQueue<T> {
346    /// Creates a new `WorkQueue`.
347    pub const fn new() -> Self {
348        Self {
349            inner: NonReentrantMutex::new(Inner {
350                head: None,
351                tail: None,
352                current: None,
353
354                data: NonNull::dangling(),
355                vtable: VTable::noop(),
356
357                suspend_count: 0,
358                suspend_waker: WakerRegistration::new(),
359            }),
360        }
361    }
362
363    /// Configures the queue.
364    ///
365    /// The provided data pointer will be passed to the VTable functions.
366    ///
367    /// # Safety
368    ///
369    /// The `data` pointer must be valid as long as the `WorkQueue` is configured with it. The
370    /// driver must access the data pointer appropriately (i.e. it must not move !Send data out of
371    /// it).
372    pub unsafe fn configure<D: Sync + Send>(&self, data: NonNull<D>, vtable: VTable<T>) {
373        self.inner
374            .with(|inner| unsafe { inner.configure(data.cast(), vtable) })
375    }
376
377    /// Enqueues a work item.
378    pub fn post_work<'t>(&'t self, work_item: &'t mut WorkItem<T>) -> Handle<'t, T> {
379        let ptr = unsafe {
380            // Safety: `Handle` and `work_item` have lifetime 't, which ensures this call
381            // can't be called on an in-flight work item. As `Handle` (and the underlying driver
382            // that processed the work queue) does not use the reference, and using the
383            // reference is not possible while `Handle` exists, this should be safe.
384            work_item.prepare()
385        };
386
387        self.inner.with(|inner| inner.enqueue(ptr));
388
389        Handle {
390            queue: self,
391            work_item: ptr,
392            _marker: PhantomData,
393        }
394    }
395
396    /// Polls the queue once.
397    ///
398    /// Returns true if the queue needs to be polled again.
399    #[allow(unused)]
400    pub fn process(&self) -> bool {
401        self.inner.with(|inner| inner.process())
402    }
403
404    /// Polls the queue once and returns the status of the given work item.
405    ///
406    /// ## Safety
407    ///
408    /// The caller must ensure that `item` belongs to the polled queue. An item belongs to the
409    /// **last queue it was enqueued in**, even if the item is no longer in the queue's linked
410    /// list. This relationship is broken when the Handle that owns the WorkItem is dropped.
411    pub unsafe fn poll(&self, item: NonNull<WorkItem<T>>) -> Poll {
412        self.inner.with(|inner| {
413            let status = unsafe { &*item.as_ptr() }.status;
414            if status.is_pending() {
415                inner.process();
416                unsafe { &*item.as_ptr() }.status
417            } else {
418                status
419            }
420        })
421    }
422
423    /// Schedules the work item to be cancelled.
424    ///
425    /// The function returns true when the item was immediately cancelled. If the function returns
426    /// `false`, the item will need to be polled until its status becomes [`Poll::Ready`].
427    ///
428    /// The work item should not be assumed to be immediately cancelled. Polling its handle
429    /// is necessary to ensure it is no longer being processed by the underlying driver.
430    pub fn cancel(&self, work_item: NonNull<WorkItem<T>>) -> bool {
431        self.inner.with(|inner| inner.cancel(work_item))
432    }
433}
434
435/// The status of a work item.
436#[derive(Clone, Copy, PartialEq, Debug)]
437#[cfg_attr(feature = "defmt", derive(defmt::Format))]
438pub enum Status {
439    /// The processing has completed.
440    Completed,
441
442    /// The work item has been cancelled.
443    Cancelled,
444}
445
446/// A unit of work in the work queue.
447pub(crate) struct WorkItem<T: Sync + Send> {
448    next: Option<NonNull<WorkItem<T>>>,
449    status: Poll,
450    data: T,
451    waker: WakerRegistration,
452}
453
454impl<T: Sync + Send> WorkItem<T> {
455    /// Completes the work item.
456    ///
457    /// This function is intended to be called from the underlying drivers.
458    pub fn complete(&mut self, status: Status) {
459        self.status = Poll::Ready(status);
460        self.waker.wake();
461    }
462
463    /// Prepares a work item to be enqueued.
464    ///
465    /// # Safety:
466    ///
467    /// The caller must ensure the reference is not used again while the pointer returned by this
468    /// function is in use.
469    unsafe fn prepare(&mut self) -> NonNull<Self> {
470        self.next = None;
471        self.status = Poll::Pending(false);
472
473        NonNull::from(self)
474    }
475}
476
477/// The status of a work item posted to a work queue.
478#[derive(Clone, Copy, PartialEq, Debug)]
479#[cfg_attr(feature = "defmt", derive(defmt::Format))]
480pub(crate) enum Poll {
481    /// The work item has not yet been fully processed. Contains whether the caller should poll the
482    /// queue again. This only has effect on async pollers, which will need to wake their tasks
483    /// immediately.
484    Pending(bool),
485
486    /// The work item has been processed.
487    Ready(Status),
488}
489
490impl Poll {
491    /// Returns whether the current result is still pending.
492    pub fn is_pending(self) -> bool {
493        matches!(self, Self::Pending(_))
494    }
495
496    /// Returns whether the current result is ready.
497    pub fn is_ready(self) -> bool {
498        !self.is_pending()
499    }
500}
501
502/// A reference to a posted [`WorkItem`].
503///
504/// This struct ensures that the work item is valid until the item is processed or is removed from
505/// the work queue.
506///
507/// Dropping the handle cancels the work item, but may block for some time if the work item is
508/// already being processed.
509pub(crate) struct Handle<'t, T: Sync + Send> {
510    queue: &'t WorkQueue<T>,
511    work_item: NonNull<WorkItem<T>>,
512    // Make sure lifetime is invariant to prevent UB.
513    _marker: PhantomData<&'t mut WorkItem<T>>,
514}
515
516impl<'t, T: Sync + Send> Handle<'t, T> {
517    pub(crate) fn from_completed_work_item(
518        queue: &'t WorkQueue<T>,
519        work_item: &'t mut WorkItem<T>,
520    ) -> Self {
521        // Don't use `complete` here, we don't need to wake anything, just ensure that the item will
522        // not be put into the queue.
523        work_item.status = Poll::Ready(Status::Completed);
524
525        Self {
526            queue,
527            work_item: NonNull::from(work_item),
528            _marker: PhantomData,
529        }
530    }
531
532    fn poll_inner(&mut self) -> Poll {
533        unsafe { self.queue.poll(self.work_item) }
534    }
535
536    /// Returns the status of the work item.
537    pub fn poll(&mut self) -> bool {
538        self.poll_inner().is_ready()
539    }
540
541    /// Polls the work item to completion, by busy-looping.
542    ///
543    /// This function returns immediately if `poll` returns `true`.
544    #[inline]
545    pub fn wait_blocking(mut self) -> Status {
546        loop {
547            if let Poll::Ready(status) = self.poll_inner() {
548                return status;
549            }
550        }
551    }
552
553    /// Waits until the work item is completed.
554    pub fn wait(&mut self) -> impl Future<Output = Status> {
555        poll_fn(|ctx| {
556            unsafe { self.work_item.as_mut() }
557                .waker
558                .register(ctx.waker());
559            match self.poll_inner() {
560                Poll::Pending(recall) => {
561                    if recall {
562                        ctx.waker().wake_by_ref();
563                    }
564                    core::task::Poll::Pending
565                }
566                Poll::Ready(status) => core::task::Poll::Ready(status),
567            }
568        })
569    }
570
571    /// Cancels the work item and asynchronously waits until it is removed from the work queue.
572    pub async fn cancel(&mut self) {
573        if !self.queue.cancel(self.work_item) {
574            self.wait().await;
575        }
576    }
577}
578
579impl<'t, T: Sync + Send> Drop for Handle<'t, T> {
580    fn drop(&mut self) {
581        if !self.queue.cancel(self.work_item) {
582            // We must wait for the driver to release our WorkItem.
583            while self.poll_inner().is_pending() {}
584        }
585    }
586}
587
588pub(crate) struct WorkQueueDriver<'t, D, T>
589where
590    D: Sync + Send,
591    T: Sync + Send,
592{
593    queue: &'t WorkQueue<T>,
594    _marker: PhantomData<&'t mut D>,
595}
596
597impl<'t, D, T> WorkQueueDriver<'t, D, T>
598where
599    D: Sync + Send,
600    T: Sync + Send,
601{
602    pub fn new(driver: &'t mut D, vtable: VTable<T>, queue: &'t WorkQueue<T>) -> Self {
603        unsafe {
604            // Safety: the lifetime 't ensures the pointer remains valid for the lifetime of the
605            // WorkQueueDriver. The Drop implementation (and the general "Don't forget" clause)
606            // ensure the pointer is not used after the WQD has been dropped.
607            queue.configure(NonNull::from(driver), vtable);
608        }
609        Self {
610            queue,
611            _marker: PhantomData,
612        }
613    }
614
615    /// Shuts down the driver.
616    pub fn stop(self) -> impl Future<Output = ()> {
617        let mut suspended = false;
618        poll_fn(move |ctx| {
619            self.queue.inner.with(|inner| {
620                if !inner.is_active() {
621                    unsafe {
622                        // Safety: the noop VTable functions don't use the pointer at all.
623                        self.queue
624                            .configure(NonNull::<D>::dangling(), VTable::noop())
625                    };
626                    // Make sure the queue doesn't remain suspended when the driver is re-started.
627                    if suspended {
628                        inner.resume();
629                    }
630                    return core::task::Poll::Ready(());
631                }
632                // This may kick out other suspend() callers, but that should be okay. They will
633                // only be able to do work if the queue is !active, for them it doesn't matter if
634                // the queue is suspended or stopped completely - just that it isn't running. As for
635                // the possible waker churn, we can use MultiWakerRegistration with a capacity
636                // suitable for the number of possible suspenders (2-3 unless the work queue ends up
637                // being used more widely), if this turns out to be a problem.
638                inner.suspend_waker.register(ctx.waker());
639                if !suspended {
640                    inner.suspend(Some(ctx));
641                    suspended = true;
642                }
643
644                core::task::Poll::Pending
645            })
646        })
647    }
648}
649
650impl<D, T> Drop for WorkQueueDriver<'_, D, T>
651where
652    D: Sync + Send,
653    T: Sync + Send,
654{
655    fn drop(&mut self) {
656        let wait_for_suspended = self.queue.inner.with(|inner| {
657            if inner.is_active() {
658                inner.suspend(None);
659                true
660            } else {
661                unsafe { inner.configure(NonNull::dangling(), VTable::noop()) };
662                false
663            }
664        });
665
666        if !wait_for_suspended {
667            return;
668        }
669
670        loop {
671            let done = self.queue.inner.with(|inner| {
672                if inner.is_active() {
673                    return false;
674                }
675
676                unsafe { inner.configure(NonNull::dangling(), VTable::noop()) };
677
678                inner.resume();
679
680                true
681            });
682            if done {
683                break;
684            }
685        }
686    }
687}
688
689/// Used by work queue clients, allows hiding WorkItem.
690pub(crate) struct WorkQueueFrontend<T: Sync + Send> {
691    work_item: WorkItem<T>,
692}
693
694impl<T: Sync + Send> WorkQueueFrontend<T> {
695    pub fn new(initial: T) -> Self {
696        Self {
697            work_item: WorkItem {
698                next: None,
699                status: Poll::Pending(false),
700                data: initial,
701                waker: WakerRegistration::new(),
702            },
703        }
704    }
705
706    pub fn data_mut(&mut self) -> &mut T {
707        &mut self.work_item.data
708    }
709
710    pub fn post<'t>(&'t mut self, queue: &'t WorkQueue<T>) -> Handle<'t, T> {
711        queue.post_work(&mut self.work_item)
712    }
713
714    /// Creates a Handle for a work item that does not need to be put into the queue.
715    pub fn post_completed<'t>(&'t mut self, queue: &'t WorkQueue<T>) -> Handle<'t, T> {
716        Handle::from_completed_work_item(queue, &mut self.work_item)
717    }
718}
719
720// TODO: implement individual algo context wrappers