Skip to main content

esp_rtos/embassy/
mod.rs

1//! OS-aware embassy executors.
2
3use core::{cell::UnsafeCell, mem::MaybeUninit, ptr::NonNull, sync::atomic::Ordering};
4
5use embassy_executor::{SendSpawner, Spawner, raw};
6use esp_hal::{
7    interrupt::{InterruptHandler, Priority, software::SoftwareInterrupt},
8    system::Cpu,
9    time::{Duration, Instant},
10};
11use macros::ram;
12use portable_atomic::AtomicPtr;
13
14use crate::{
15    SCHEDULER,
16    scheduler::SchedulerState,
17    task::{TaskPtr, read_thread_pointer},
18};
19
20/// A zero-overhead lock that allows mutable access to the contained value through the scheduler.
21struct SchedulerLocked<T> {
22    inner: UnsafeCell<T>,
23}
24
25unsafe impl<T: Send> Sync for SchedulerLocked<T> {}
26unsafe impl<T: Send> Send for SchedulerLocked<T> {}
27
28impl<T> SchedulerLocked<T> {
29    fn new(inner: T) -> Self {
30        Self {
31            inner: UnsafeCell::new(inner),
32        }
33    }
34
35    fn with<'s>(&'s self, _scheduler: &'s mut SchedulerState) -> &'s mut T {
36        // Safety: The `_scheduler` parameter proves the caller holds the scheduler lock,
37        // so exclusive access to the contained value is safe.
38        unsafe { &mut *self.inner.get() }
39    }
40}
41
42pub(crate) struct FlagsInner {
43    owner: TaskPtr,
44    waiting: Option<TaskPtr>,
45    set: bool,
46}
47impl FlagsInner {
48    fn take(&mut self) -> bool {
49        if self.set {
50            // The flag was set while we weren't looking.
51            self.set = false;
52            true
53        } else {
54            // `waiting` signals that the owner should be resumed when the flag is set. Copying
55            // the task pointer is an optimization that allows clearing the
56            // waiting state without computing the address of a separate field.
57            self.waiting = Some(self.owner);
58
59            false
60        }
61    }
62}
63
64/// A single event bit, optimized for the thread-mode embassy executor.
65///
66/// This takes shortcuts, which make it unsuitable for general purpose use (such as no wait
67/// queue, no timeout, assumes a single thread waits for the flag, there is only a single bit of
68/// flag information).
69struct ThreadFlag {
70    inner: SchedulerLocked<FlagsInner>,
71}
72
73impl ThreadFlag {
74    fn new() -> Self {
75        let owner = SCHEDULER.with(|scheduler| {
76            if let Some(current_task) = NonNull::new(read_thread_pointer()) {
77                current_task
78            } else {
79                // We're cheating, the task hasn't been initialized yet.
80                let current_cpu = Cpu::current();
81                NonNull::from(&scheduler.per_cpu[current_cpu as usize].main_task)
82            }
83        });
84        Self {
85            inner: SchedulerLocked::new(FlagsInner {
86                owner,
87                waiting: None,
88                set: false,
89            }),
90        }
91    }
92
93    fn with<R>(&self, scheduler: &mut SchedulerState, f: impl FnOnce(&mut FlagsInner) -> R) -> R {
94        let inner = self.inner.with(scheduler);
95        f(inner)
96    }
97
98    fn set(&self) {
99        SCHEDULER.with(|scheduler| {
100            let to_resume = self.with(scheduler, |inner| {
101                let to_resume = inner.waiting.take();
102
103                if to_resume.is_none() {
104                    // The task isn't waiting, set the flag.
105                    inner.set = true;
106                }
107
108                to_resume
109            });
110
111            if let Some(waiting) = to_resume {
112                // The task is waiting, there is no need to set the flag - resuming the thread
113                // is all the signal we need.
114                scheduler.resume_task(waiting);
115            }
116        });
117    }
118
119    fn get(&self) -> bool {
120        SCHEDULER.with(|scheduler| self.with(scheduler, |inner| inner.set))
121    }
122
123    fn wait(&self) {
124        // SCHEDULER.sleep_until, but we know the current task's ID, and we know there
125        // is no timeout.
126        SCHEDULER.with(|scheduler| {
127            let owner_to_suspend = self.with(scheduler, |inner| {
128                if !inner.take() {
129                    Some(inner.owner)
130                } else {
131                    None
132                }
133            });
134
135            if let Some(owner) = owner_to_suspend {
136                scheduler.sleep_task_until(owner, Instant::EPOCH + Duration::MAX);
137                crate::task::yield_task();
138            }
139        });
140    }
141}
142
143#[unsafe(export_name = "__pender")]
144#[ram]
145fn __pender(context: *mut ()) {
146    match context as usize {
147        0 => unsafe { SoftwareInterrupt::<0>::steal().raise() },
148        1 => unsafe { SoftwareInterrupt::<1>::steal().raise() },
149        2 => unsafe { SoftwareInterrupt::<2>::steal().raise() },
150        3 => unsafe { SoftwareInterrupt::<3>::steal().raise() },
151        _ => {
152            // This forces us to keep the embassy timer queue separate, otherwise we'd need to
153            // reentrantly lock SCHEDULER.
154            let flags = unwrap!(unsafe { context.cast::<ThreadFlag>().as_ref() });
155            flags.set();
156        }
157    }
158}
159
160/// Callbacks to run code before/after polling the task queue.
161pub trait Callbacks {
162    /// Called just before polling the executor.
163    fn before_poll(&mut self);
164
165    /// Called after the executor is polled, if there is no work scheduled.
166    ///
167    /// Note that tasks can become ready at any point during the execution
168    /// of this function.
169    fn on_idle(&mut self);
170}
171
172/// Thread-mode executor.
173///
174/// This executor runs in an OS thread, meaning the scheduler needs to be started before using any
175/// async operations. If you wish to write async code without the scheduler running, consider
176/// using the [`InterruptExecutor`].
177#[cfg_attr(
178    multi_core,
179    doc = r"
180
181If you want to start the executor on the second core, you will need to start the second core using [`crate::start_second_core`].
182If you are looking for a way to run code on the second core without the scheduler, use the [`InterruptExecutor`].
183"
184)]
185pub struct Executor {
186    executor: UnsafeCell<MaybeUninit<raw::Executor>>,
187}
188
189impl Executor {
190    /// Create a new thread-mode executor.
191    pub const fn new() -> Self {
192        Self {
193            executor: UnsafeCell::new(MaybeUninit::uninit()),
194        }
195    }
196
197    /// Run the executor.
198    ///
199    /// The `init` closure is called with a [`Spawner`] that spawns tasks on
200    /// this executor. Use it to spawn the initial task(s). After `init`
201    /// returns, the executor starts running the tasks.
202    ///
203    /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is
204    /// `Copy`), for example by passing it as an argument to the initial
205    /// tasks.
206    ///
207    /// This function requires `&'static mut self`. This means you have to store
208    /// the Executor instance in a place where it'll live forever and grants
209    /// you mutable access. There's a few ways to do this:
210    ///
211    /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
212    /// - a `static mut` (unsafe, not recommended)
213    /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading
214    ///   its lifetime with `transmute`. (unsafe)
215    ///
216    /// This function never returns.
217    pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
218        let flags = ThreadFlag::new();
219        struct NoHooks;
220
221        impl Callbacks for NoHooks {
222            fn before_poll(&mut self) {}
223
224            fn on_idle(&mut self) {}
225        }
226
227        self.run_inner(init, &flags, NoHooks)
228    }
229
230    /// Run the executor with callbacks.
231    ///
232    /// See [Callbacks] on when the callbacks are called.
233    ///
234    /// See [Self::run] for more information about running the executor.
235    ///
236    /// This function never returns.
237    pub fn run_with_callbacks(
238        &'static mut self,
239        init: impl FnOnce(Spawner),
240        callbacks: impl Callbacks,
241    ) -> ! {
242        let flags = ThreadFlag::new();
243        struct Hooks<'a, CB: Callbacks>(CB, &'a ThreadFlag);
244
245        impl<CB: Callbacks> Callbacks for Hooks<'_, CB> {
246            fn before_poll(&mut self) {
247                self.0.before_poll()
248            }
249
250            fn on_idle(&mut self) {
251                // Make sure we only call on_idle if the executor would otherwise go to sleep.
252                if !self.1.get() {
253                    self.0.on_idle();
254                }
255            }
256        }
257
258        self.run_inner(init, &flags, Hooks(callbacks, &flags))
259    }
260
261    fn run_inner(
262        &'static self,
263        init: impl FnOnce(Spawner),
264        flags: &ThreadFlag,
265        mut hooks: impl Callbacks,
266    ) -> ! {
267        let executor = unsafe {
268            (&mut *self.executor.get()).write(raw::Executor::new(
269                (flags as *const ThreadFlag).cast::<()>().cast_mut(),
270            ))
271        };
272
273        #[cfg(multi_core)]
274        if Cpu::current() != Cpu::ProCpu
275            && crate::SCHEDULER
276                .with(|scheduler| !scheduler.per_cpu[Cpu::current() as usize].initialized)
277        {
278            panic!("Executor cannot be started: the scheduler is not running on the current CPU.");
279        }
280
281        init(executor.spawner());
282
283        loop {
284            hooks.before_poll();
285
286            unsafe { executor.poll() };
287
288            hooks.on_idle();
289
290            // Wait for work to become available.
291            flags.wait();
292        }
293    }
294}
295
296impl Default for Executor {
297    fn default() -> Self {
298        Self::new()
299    }
300}
301
302/// Interrupt mode executor.
303///
304/// This executor runs tasks in interrupt mode. The interrupt handler is set up
305/// to poll tasks, and when a task is woken the interrupt is pended from
306/// software.
307///
308/// Interrupt executors have potentially lower latency than thread-mode executors, but only a
309/// limited number can be created.
310pub struct InterruptExecutor<const SWI: u8> {
311    executor: UnsafeCell<MaybeUninit<raw::Executor>>,
312    interrupt: SoftwareInterrupt<'static, SWI>,
313}
314
315const COUNT: usize = 4;
316static INTERRUPT_EXECUTORS: [InterruptExecutorStorage; COUNT] =
317    [const { InterruptExecutorStorage::new() }; COUNT];
318
319unsafe impl<const SWI: u8> Send for InterruptExecutor<SWI> {}
320unsafe impl<const SWI: u8> Sync for InterruptExecutor<SWI> {}
321
322struct InterruptExecutorStorage {
323    raw_executor: AtomicPtr<raw::Executor>,
324}
325
326impl InterruptExecutorStorage {
327    const fn new() -> Self {
328        Self {
329            raw_executor: AtomicPtr::new(core::ptr::null_mut()),
330        }
331    }
332
333    /// # Safety:
334    ///
335    /// The caller must ensure `set` has been called before.
336    #[inline(always)]
337    unsafe fn get(&self) -> &raw::Executor {
338        unsafe { &*self.raw_executor.load(Ordering::Relaxed) }
339    }
340
341    fn set(&self, executor: *mut raw::Executor) {
342        self.raw_executor.store(executor, Ordering::Relaxed);
343    }
344}
345
346extern "C" fn handle_interrupt<const NUM: u8>() {
347    let swi = unsafe { SoftwareInterrupt::<NUM>::steal() };
348    swi.reset();
349
350    unsafe {
351        // SAFETY: The executor is always initialized before the interrupt is enabled.
352        let executor = INTERRUPT_EXECUTORS[NUM as usize].get();
353        executor.poll();
354    }
355}
356
357impl<const SWI: u8> InterruptExecutor<SWI> {
358    /// Create a new `InterruptExecutor`.
359    /// This takes the software interrupt to be used internally.
360    #[inline]
361    pub const fn new(interrupt: SoftwareInterrupt<'static, SWI>) -> Self {
362        Self {
363            executor: UnsafeCell::new(MaybeUninit::uninit()),
364            interrupt,
365        }
366    }
367
368    /// Start the executor at the given priority level.
369    ///
370    /// This initializes the executor, enables the interrupt, and returns.
371    /// The executor keeps running in the background through the interrupt.
372    ///
373    /// This returns a [`SendSpawner`] you can use to spawn tasks on it. A
374    /// [`SendSpawner`] is returned instead of a [`Spawner`] because the
375    /// executor effectively runs in a different "thread" (the interrupt),
376    /// so spawning tasks on it is effectively sending them.
377    ///
378    /// To obtain a [`Spawner`] for this executor, use [`Spawner::for_current_executor`]
379    /// from a task running in it.
380    pub fn start(&'static mut self, priority: Priority) -> SendSpawner {
381        unsafe {
382            (*self.executor.get()).write(raw::Executor::new((SWI as usize) as *mut ()));
383
384            INTERRUPT_EXECUTORS[SWI as usize].set((*self.executor.get()).as_mut_ptr());
385        }
386
387        let swi_handler = match SWI {
388            0 => handle_interrupt::<0>,
389            1 => handle_interrupt::<1>,
390            2 => handle_interrupt::<2>,
391            3 => handle_interrupt::<3>,
392            _ => unreachable!(),
393        };
394
395        self.interrupt
396            .set_interrupt_handler(InterruptHandler::new(swi_handler, priority));
397
398        let executor = unsafe { (*self.executor.get()).assume_init_ref() };
399        executor.spawner().make_send()
400    }
401
402    /// Get a SendSpawner for this executor
403    ///
404    /// This returns a [`SendSpawner`] you can use to spawn tasks on this
405    /// executor.
406    ///
407    /// This MUST only be called on an executor that has already been started.
408    /// The function will panic otherwise.
409    pub fn spawner(&'static self) -> SendSpawner {
410        if INTERRUPT_EXECUTORS[SWI as usize]
411            .raw_executor
412            .load(Ordering::Acquire)
413            .is_null()
414        {
415            panic!("InterruptExecutor::spawner() called on uninitialized executor.");
416        }
417        let executor = unsafe { (*self.executor.get()).assume_init_ref() };
418        executor.spawner().make_send()
419    }
420}