esp_rtos/embassy/
mod.rs

1//! OS-aware embassy executors.
2
3use core::{cell::UnsafeCell, mem::MaybeUninit, ptr::NonNull, sync::atomic::Ordering};
4
5use embassy_executor::{SendSpawner, Spawner, raw};
6use esp_hal::{
7    interrupt::{InterruptHandler, Priority, software::SoftwareInterrupt},
8    system::Cpu,
9    time::{Duration, Instant},
10};
11use esp_sync::NonReentrantMutex;
12use macros::ram;
13use portable_atomic::AtomicPtr;
14
15use crate::{
16    SCHEDULER,
17    task::{TaskExt, TaskPtr},
18};
19
20pub(crate) struct FlagsInner {
21    owner: TaskPtr,
22    waiting: Option<TaskPtr>,
23    set: bool,
24}
25impl FlagsInner {
26    fn take(&mut self) -> bool {
27        if self.set {
28            // The flag was set while we weren't looking.
29            self.set = false;
30            true
31        } else {
32            // `waiting` signals that the owner should be resumed when the flag is set. Copying
33            // the task pointer is an optimization that allows clearing the
34            // waiting state without computing the address of a separate field.
35            self.waiting = Some(self.owner);
36
37            false
38        }
39    }
40}
41
42/// A single event bit, optimized for the thread-mode embassy executor.
43///
44/// This takes shortcuts, which make it unsuitable for general purpose use (such as no wait
45/// queue, no timeout, assumes a single thread waits for the flag, there is only a single bit of
46/// flag information).
47struct ThreadFlag {
48    inner: NonReentrantMutex<FlagsInner>,
49}
50
51impl ThreadFlag {
52    fn new() -> Self {
53        let owner = SCHEDULER.with(|scheduler| {
54            let current_cpu = Cpu::current() as usize;
55            if let Some(current_task) = scheduler.per_cpu[current_cpu].current_task {
56                current_task
57            } else {
58                // We're cheating, the task hasn't been initialized yet.
59                NonNull::from(&scheduler.per_cpu[current_cpu].main_task)
60            }
61        });
62        Self {
63            inner: NonReentrantMutex::new(FlagsInner {
64                owner,
65                waiting: None,
66                set: false,
67            }),
68        }
69    }
70
71    fn with<R>(&self, f: impl FnOnce(&mut FlagsInner) -> R) -> R {
72        self.inner.with(|inner| f(inner))
73    }
74
75    fn set(&self) {
76        self.with(|inner| {
77            if let Some(waiting) = inner.waiting.take() {
78                // The task is waiting, there is no need to set the flag - resuming the thread
79                // is all the signal we need.
80                waiting.resume();
81            } else {
82                // The task isn't waiting, set the flag.
83                inner.set = true;
84            }
85        });
86    }
87
88    fn get(&self) -> bool {
89        self.with(|inner| inner.set)
90    }
91
92    fn wait(&self) {
93        self.with(|inner| {
94            if !inner.take() {
95                // SCHEDULER.sleep_until, but we know the current task's ID, and we know there
96                // is no timeout.
97                SCHEDULER.with(|scheduler| {
98                    scheduler.sleep_task_until(inner.owner, Instant::EPOCH + Duration::MAX);
99                    crate::task::yield_task();
100                });
101            }
102        });
103    }
104}
105
106#[unsafe(export_name = "__pender")]
107#[ram]
108fn __pender(context: *mut ()) {
109    match context as usize {
110        0 => unsafe { SoftwareInterrupt::<0>::steal().raise() },
111        1 => unsafe { SoftwareInterrupt::<1>::steal().raise() },
112        2 => unsafe { SoftwareInterrupt::<2>::steal().raise() },
113        3 => unsafe { SoftwareInterrupt::<3>::steal().raise() },
114        _ => {
115            // This forces us to keep the embassy timer queue separate, otherwise we'd need to
116            // reentrantly lock SCHEDULER.
117            let flags = unwrap!(unsafe { context.cast::<ThreadFlag>().as_ref() });
118            flags.set();
119        }
120    }
121}
122
123/// Callbacks to run code before/after polling the task queue.
124pub trait Callbacks {
125    /// Called just before polling the executor.
126    fn before_poll(&mut self);
127
128    /// Called after the executor is polled, if there is no work scheduled.
129    ///
130    /// Note that tasks can become ready at any point during the execution
131    /// of this function.
132    fn on_idle(&mut self);
133}
134
135/// Thread-mode executor.
136///
137/// This executor runs in an OS thread, meaning the scheduler needs to be started before using any
138/// async operations. If you wish to write async code without the scheduler running, consider
139/// using the [`InterruptExecutor`].
140#[cfg_attr(
141    multi_core,
142    doc = r"
143
144If you want to start the executor on the second core, you will need to start the second core using [`crate::start_second_core`].
145If you are looking for a way to run code on the second core without the scheduler, use the [`InterruptExecutor`].
146"
147)]
148pub struct Executor {
149    executor: UnsafeCell<MaybeUninit<raw::Executor>>,
150}
151
152impl Executor {
153    /// Create a new thread-mode executor.
154    pub const fn new() -> Self {
155        Self {
156            executor: UnsafeCell::new(MaybeUninit::uninit()),
157        }
158    }
159
160    /// Run the executor.
161    ///
162    /// The `init` closure is called with a [`Spawner`] that spawns tasks on
163    /// this executor. Use it to spawn the initial task(s). After `init`
164    /// returns, the executor starts running the tasks.
165    ///
166    /// To spawn more tasks later, you may keep copies of the [`Spawner`] (it is
167    /// `Copy`), for example by passing it as an argument to the initial
168    /// tasks.
169    ///
170    /// This function requires `&'static mut self`. This means you have to store
171    /// the Executor instance in a place where it'll live forever and grants
172    /// you mutable access. There's a few ways to do this:
173    ///
174    /// - a [StaticCell](https://docs.rs/static_cell/latest/static_cell/) (safe)
175    /// - a `static mut` (unsafe, not recommended)
176    /// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading
177    ///   its lifetime with `transmute`. (unsafe)
178    ///
179    /// This function never returns.
180    pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
181        let flags = ThreadFlag::new();
182        struct NoHooks;
183
184        impl Callbacks for NoHooks {
185            fn before_poll(&mut self) {}
186
187            fn on_idle(&mut self) {}
188        }
189
190        self.run_inner(init, &flags, NoHooks)
191    }
192
193    /// Run the executor with callbacks.
194    ///
195    /// See [Callbacks] on when the callbacks are called.
196    ///
197    /// See [Self::run] for more information about running the executor.
198    ///
199    /// This function never returns.
200    pub fn run_with_callbacks(
201        &'static mut self,
202        init: impl FnOnce(Spawner),
203        callbacks: impl Callbacks,
204    ) -> ! {
205        let flags = ThreadFlag::new();
206        struct Hooks<'a, CB: Callbacks>(CB, &'a ThreadFlag);
207
208        impl<CB: Callbacks> Callbacks for Hooks<'_, CB> {
209            fn before_poll(&mut self) {
210                self.0.before_poll()
211            }
212
213            fn on_idle(&mut self) {
214                // Make sure we only call on_idle if the executor would otherwise go to sleep.
215                if !self.1.get() {
216                    self.0.on_idle();
217                }
218            }
219        }
220
221        self.run_inner(init, &flags, Hooks(callbacks, &flags))
222    }
223
224    fn run_inner(
225        &'static self,
226        init: impl FnOnce(Spawner),
227        flags: &ThreadFlag,
228        mut hooks: impl Callbacks,
229    ) -> ! {
230        let executor = unsafe {
231            (&mut *self.executor.get()).write(raw::Executor::new(
232                (flags as *const ThreadFlag).cast::<()>().cast_mut(),
233            ))
234        };
235
236        #[cfg(multi_core)]
237        if Cpu::current() != Cpu::ProCpu
238            && crate::SCHEDULER
239                .with(|scheduler| !scheduler.per_cpu[Cpu::current() as usize].initialized)
240        {
241            panic!("Executor cannot be started: the scheduler is not running on the current CPU.");
242        }
243
244        init(executor.spawner());
245
246        loop {
247            hooks.before_poll();
248
249            unsafe { executor.poll() };
250
251            hooks.on_idle();
252
253            // Wait for work to become available.
254            flags.wait();
255        }
256    }
257}
258
259impl Default for Executor {
260    fn default() -> Self {
261        Self::new()
262    }
263}
264
265/// Interrupt mode executor.
266///
267/// This executor runs tasks in interrupt mode. The interrupt handler is set up
268/// to poll tasks, and when a task is woken the interrupt is pended from
269/// software.
270///
271/// Interrupt executors have potentially lower latency than thread-mode executors, but only a
272/// limited number can be created.
273pub struct InterruptExecutor<const SWI: u8> {
274    executor: UnsafeCell<MaybeUninit<raw::Executor>>,
275    interrupt: SoftwareInterrupt<'static, SWI>,
276}
277
278const COUNT: usize = 4;
279static INTERRUPT_EXECUTORS: [InterruptExecutorStorage; COUNT] =
280    [const { InterruptExecutorStorage::new() }; COUNT];
281
282unsafe impl<const SWI: u8> Send for InterruptExecutor<SWI> {}
283unsafe impl<const SWI: u8> Sync for InterruptExecutor<SWI> {}
284
285struct InterruptExecutorStorage {
286    raw_executor: AtomicPtr<raw::Executor>,
287}
288
289impl InterruptExecutorStorage {
290    const fn new() -> Self {
291        Self {
292            raw_executor: AtomicPtr::new(core::ptr::null_mut()),
293        }
294    }
295
296    /// # Safety:
297    ///
298    /// The caller must ensure `set` has been called before.
299    unsafe fn get(&self) -> &raw::Executor {
300        unsafe { &*self.raw_executor.load(Ordering::Relaxed) }
301    }
302
303    fn set(&self, executor: *mut raw::Executor) {
304        self.raw_executor.store(executor, Ordering::Relaxed);
305    }
306}
307
308extern "C" fn handle_interrupt<const NUM: u8>() {
309    let swi = unsafe { SoftwareInterrupt::<NUM>::steal() };
310    swi.reset();
311
312    unsafe {
313        // SAFETY: The executor is always initialized before the interrupt is enabled.
314        let executor = INTERRUPT_EXECUTORS[NUM as usize].get();
315        executor.poll();
316    }
317}
318
319impl<const SWI: u8> InterruptExecutor<SWI> {
320    /// Create a new `InterruptExecutor`.
321    /// This takes the software interrupt to be used internally.
322    #[inline]
323    pub const fn new(interrupt: SoftwareInterrupt<'static, SWI>) -> Self {
324        Self {
325            executor: UnsafeCell::new(MaybeUninit::uninit()),
326            interrupt,
327        }
328    }
329
330    /// Start the executor at the given priority level.
331    ///
332    /// This initializes the executor, enables the interrupt, and returns.
333    /// The executor keeps running in the background through the interrupt.
334    ///
335    /// This returns a [`SendSpawner`] you can use to spawn tasks on it. A
336    /// [`SendSpawner`] is returned instead of a [`Spawner`] because the
337    /// executor effectively runs in a different "thread" (the interrupt),
338    /// so spawning tasks on it is effectively sending them.
339    ///
340    /// To obtain a [`Spawner`] for this executor, use [`Spawner::for_current_executor`]
341    /// from a task running in it.
342    pub fn start(&'static mut self, priority: Priority) -> SendSpawner {
343        unsafe {
344            (*self.executor.get()).write(raw::Executor::new((SWI as usize) as *mut ()));
345
346            INTERRUPT_EXECUTORS[SWI as usize].set((*self.executor.get()).as_mut_ptr());
347        }
348
349        let swi_handler = match SWI {
350            0 => handle_interrupt::<0>,
351            1 => handle_interrupt::<1>,
352            2 => handle_interrupt::<2>,
353            3 => handle_interrupt::<3>,
354            _ => unreachable!(),
355        };
356
357        self.interrupt
358            .set_interrupt_handler(InterruptHandler::new(swi_handler, priority));
359
360        let executor = unsafe { (*self.executor.get()).assume_init_ref() };
361        executor.spawner().make_send()
362    }
363
364    /// Get a SendSpawner for this executor
365    ///
366    /// This returns a [`SendSpawner`] you can use to spawn tasks on this
367    /// executor.
368    ///
369    /// This MUST only be called on an executor that has already been started.
370    /// The function will panic otherwise.
371    pub fn spawner(&'static self) -> SendSpawner {
372        if INTERRUPT_EXECUTORS[SWI as usize]
373            .raw_executor
374            .load(Ordering::Acquire)
375            .is_null()
376        {
377            panic!("InterruptExecutor::spawner() called on uninitialized executor.");
378        }
379        let executor = unsafe { (*self.executor.get()).assume_init_ref() };
380        executor.spawner().make_send()
381    }
382}