esp_rtos/task/
mod.rs

1#[cfg_attr(riscv, path = "riscv.rs")]
2#[cfg_attr(xtensa, path = "xtensa.rs")]
3pub(crate) mod arch_specific;
4
5#[cfg(feature = "esp-radio")]
6use core::ffi::c_void;
7use core::{marker::PhantomData, mem::MaybeUninit, ptr::NonNull};
8
9#[cfg(feature = "alloc")]
10use allocator_api2::alloc::{Allocator, Layout};
11pub(crate) use arch_specific::*;
12use esp_hal::{
13    system::Cpu,
14    time::{Duration, Instant},
15};
16#[cfg(feature = "rtos-trace")]
17use rtos_trace::TaskInfo;
18
19#[cfg(feature = "alloc")]
20use crate::InternalMemory;
21#[cfg(feature = "esp-radio")]
22use crate::semaphore::Semaphore;
23use crate::{
24    SCHEDULER,
25    run_queue::{Priority, RunQueue},
26    scheduler::SchedulerState,
27    wait_queue::WaitQueue,
28};
29
30pub type IdleFn = extern "C" fn() -> !;
31
32#[derive(Clone, Copy, PartialEq, Debug)]
33#[cfg_attr(feature = "defmt", derive(defmt::Format))]
34pub(crate) enum TaskState {
35    Ready,
36    Sleeping,
37    Deleted,
38}
39
40pub(crate) type TaskPtr = NonNull<Task>;
41pub(crate) type TaskListItem = Option<TaskPtr>;
42
43/// An abstraction that allows the task to contain multiple different queue pointers.
44pub(crate) trait TaskListElement: Default {
45    /// Returns the pointer to the next element in the list.
46    fn next(task: TaskPtr) -> Option<TaskPtr>;
47
48    /// Sets the pointer to the next element in the list.
49    fn set_next(task: TaskPtr, next: Option<TaskPtr>);
50
51    /// Returns whether the task is in the list. If this function returns `None`, we don't know.
52    fn is_in_queue(_task: TaskPtr) -> Option<bool> {
53        // By default we don't store this information, so we return "Don't know".
54        None
55    }
56
57    /// Marks whether the task is in the list.
58    fn mark_in_queue(_task: TaskPtr, _in_queue: bool) {}
59}
60
61macro_rules! task_list_item {
62    ($struct:ident, $field:ident $(, $in_queue_field:ident)?) => {
63        #[derive(Default)]
64        pub(crate) struct $struct;
65        impl TaskListElement for $struct {
66            fn next(task: TaskPtr) -> Option<TaskPtr> {
67                unsafe { task.as_ref().$field }
68            }
69
70            fn set_next(mut task: TaskPtr, next: Option<TaskPtr>) {
71                unsafe {
72                    task.as_mut().$field = next;
73                }
74            }
75
76            $(
77                fn is_in_queue(task: TaskPtr) -> Option<bool> {
78                    Some(unsafe { task.as_ref().$in_queue_field })
79                }
80
81                fn mark_in_queue(mut task: TaskPtr, in_queue: bool) {
82                    unsafe {
83                        task.as_mut().$in_queue_field = in_queue;
84                    }
85                }
86            )?
87        }
88    };
89}
90
91task_list_item!(TaskReadyQueueElement, ready_queue_item, run_queued);
92task_list_item!(TaskTimerQueueElement, timer_queue_item, timer_queued);
93// These aren't perf critical, no need to waste memory on caching list status:
94task_list_item!(TaskAllocListElement, alloc_list_item);
95task_list_item!(TaskDeleteListElement, delete_list_item);
96
97/// Extension trait for common task operations. These should be inherent methods but we can't
98/// implement stuff for NonNull.
99pub(crate) trait TaskExt {
100    #[cfg(feature = "rtos-trace")]
101    fn rtos_trace_id(self) -> u32;
102    #[cfg(feature = "rtos-trace")]
103    fn rtos_trace_info(self, run_queue: &mut RunQueue) -> TaskInfo;
104
105    #[cfg(any(feature = "esp-radio", feature = "embassy"))]
106    fn resume(self);
107    fn priority(self, _: &mut RunQueue) -> Priority;
108    fn set_priority(self, _: &mut RunQueue, new_pro: Priority);
109    fn state(self) -> TaskState;
110    fn set_state(self, state: TaskState);
111}
112
113impl TaskExt for TaskPtr {
114    #[cfg(feature = "rtos-trace")]
115    fn rtos_trace_id(self) -> u32 {
116        self.addr().get() as u32
117    }
118
119    #[cfg(feature = "rtos-trace")]
120    fn rtos_trace_info(self, run_queue: &mut RunQueue) -> TaskInfo {
121        TaskInfo {
122            name: "<todo>",
123            priority: self.priority(run_queue).get() as u32,
124            stack_base: unsafe { self.as_ref().stack.addr() },
125            stack_size: unsafe { self.as_ref().stack.len() },
126        }
127    }
128
129    #[cfg(any(feature = "esp-radio", feature = "embassy"))]
130    fn resume(self) {
131        SCHEDULER.with(|scheduler| scheduler.resume_task(self))
132    }
133
134    fn priority(self, _: &mut RunQueue) -> Priority {
135        unsafe { self.as_ref().priority }
136    }
137
138    fn set_priority(mut self, run_queue: &mut RunQueue, new_priority: Priority) {
139        run_queue.remove(self);
140        unsafe { self.as_mut().priority = new_priority };
141    }
142
143    fn state(self) -> TaskState {
144        unsafe { self.as_ref().state }
145    }
146
147    fn set_state(mut self, state: TaskState) {
148        trace!("Task {:?} state changed to {:?}", self, state);
149
150        #[cfg(feature = "rtos-trace")]
151        match state {
152            TaskState::Ready => rtos_trace::trace::task_ready_begin(self.rtos_trace_id()),
153            TaskState::Sleeping => rtos_trace::trace::task_ready_end(self.rtos_trace_id()),
154            TaskState::Deleted => rtos_trace::trace::task_terminate(self.rtos_trace_id()),
155        }
156
157        unsafe { self.as_mut().state = state };
158    }
159}
160
161/// A singly linked list of tasks.
162///
163/// Use this where you don't care about the order of list elements.
164///
165/// The `E` type parameter is used to access the data in the task object that belongs to this list.
166#[derive(Default)]
167pub(crate) struct TaskList<E> {
168    head: Option<TaskPtr>,
169    _item: PhantomData<E>,
170}
171
172impl<E: TaskListElement> TaskList<E> {
173    pub const fn new() -> Self {
174        Self {
175            head: None,
176            _item: PhantomData,
177        }
178    }
179
180    pub fn push(&mut self, task: TaskPtr) {
181        if E::is_in_queue(task) == Some(true) {
182            return;
183        }
184        E::mark_in_queue(task, true);
185
186        debug_assert!(E::next(task).is_none());
187        E::set_next(task, self.head);
188        self.head = Some(task);
189    }
190
191    pub fn pop(&mut self) -> Option<TaskPtr> {
192        let popped = self.head.take();
193
194        if let Some(task) = popped {
195            self.head = E::next(task);
196            E::set_next(task, None);
197            E::mark_in_queue(task, false);
198        }
199
200        popped
201    }
202
203    pub fn remove(&mut self, task: TaskPtr) {
204        if E::is_in_queue(task) == Some(false) {
205            return;
206        }
207        E::mark_in_queue(task, false);
208
209        // TODO: maybe this (and TaskQueue::remove) may prove too expensive.
210        let mut list = core::mem::take(self);
211        while let Some(popped) = list.pop() {
212            if popped != task {
213                self.push(popped);
214            }
215        }
216    }
217
218    #[cfg(feature = "rtos-trace")]
219    pub fn iter(&self) -> impl Iterator<Item = TaskPtr> {
220        let mut current = self.head;
221        core::iter::from_fn(move || {
222            let task = current?;
223            current = E::next(task);
224            Some(task)
225        })
226    }
227
228    pub(crate) fn is_empty(&self) -> bool {
229        self.head.is_none()
230    }
231}
232
233/// A singly linked queue of tasks.
234///
235/// Use this where you care about the order of list elements. Elements are popped from the front,
236/// and pushed to the back.
237///
238/// The `E` type parameter is used to access the data in the task object that belongs to this list.
239#[derive(Default)]
240pub(crate) struct TaskQueue<E> {
241    head: Option<TaskPtr>,
242    tail: Option<TaskPtr>,
243    _item: PhantomData<E>,
244}
245
246impl<E: TaskListElement> TaskQueue<E> {
247    pub const fn new() -> Self {
248        Self {
249            head: None,
250            tail: None,
251            _item: PhantomData,
252        }
253    }
254
255    pub fn push(&mut self, task: TaskPtr) {
256        if E::is_in_queue(task) == Some(true) {
257            return;
258        }
259        E::mark_in_queue(task, true);
260
261        debug_assert!(E::next(task).is_none());
262        if let Some(tail) = self.tail {
263            E::set_next(tail, Some(task));
264        } else {
265            self.head = Some(task);
266        }
267        self.tail = Some(task);
268    }
269
270    pub fn pop(&mut self) -> Option<TaskPtr> {
271        let popped = self.head.take();
272
273        if let Some(task) = popped {
274            self.head = E::next(task);
275            E::set_next(task, None);
276            if self.head.is_none() {
277                self.tail = None;
278            }
279            E::mark_in_queue(task, false);
280        }
281
282        popped
283    }
284
285    #[cfg(multi_core)]
286    pub fn pop_if(&mut self, cond: impl Fn(&Task) -> bool) -> Option<TaskPtr> {
287        let mut popped = None;
288
289        let mut list = core::mem::take(self);
290        while let Some(task) = list.pop() {
291            if popped.is_none() && cond(unsafe { task.as_ref() }) {
292                E::mark_in_queue(task, false);
293                popped = Some(task);
294            } else {
295                self.push(task);
296            }
297        }
298
299        popped
300    }
301
302    pub fn remove(&mut self, task: TaskPtr) {
303        if E::is_in_queue(task) == Some(false) {
304            return;
305        }
306
307        let mut list = core::mem::take(self);
308        while let Some(popped) = list.pop() {
309            if popped == task {
310                E::mark_in_queue(task, false);
311            } else {
312                self.push(popped);
313            }
314        }
315    }
316
317    pub(crate) fn is_empty(&self) -> bool {
318        self.head.is_none()
319    }
320}
321
322#[cfg(feature = "embassy")]
323pub(crate) mod flags {
324    use esp_sync::NonReentrantMutex;
325
326    use crate::{
327        CurrentThreadHandle,
328        SCHEDULER,
329        task::{TaskExt, TaskPtr},
330    };
331
332    pub(crate) struct FlagsInner {
333        owner: Option<TaskPtr>,
334        flags: u32,
335    }
336    impl FlagsInner {
337        fn wait(&mut self, wait_flags: u32) -> bool {
338            if self.flags & wait_flags == wait_flags {
339                self.flags &= !wait_flags;
340                true
341            } else {
342                if self.owner.is_none() {
343                    self.owner = Some(CurrentThreadHandle::get().task);
344                }
345
346                false
347            }
348        }
349    }
350
351    pub(crate) struct ThreadFlags {
352        inner: NonReentrantMutex<FlagsInner>,
353    }
354
355    impl ThreadFlags {
356        pub(crate) const fn new() -> Self {
357            Self {
358                inner: NonReentrantMutex::new(FlagsInner {
359                    owner: None,
360                    flags: 0,
361                }),
362            }
363        }
364
365        pub(crate) fn set(&self, flag: u32) {
366            self.inner.with(|inner| {
367                inner.flags |= flag;
368                if let Some(owner) = inner.owner {
369                    owner.resume();
370                }
371            });
372        }
373
374        pub(crate) fn get(&self) -> u32 {
375            self.inner.with(|inner| inner.flags)
376        }
377
378        pub(crate) fn wait(&self, wait_flags: u32, timeout_us: Option<u32>) -> bool {
379            if crate::with_deadline(timeout_us, |deadline| {
380                self.inner.with(|inner| {
381                    if inner.wait(wait_flags) {
382                        true
383                    } else {
384                        SCHEDULER.sleep_until(deadline);
385                        false
386                    }
387                })
388            }) {
389                debug!("Flags - wait - success");
390                true
391            } else {
392                trace!("Flags - wait - timed out");
393                false
394            }
395        }
396    }
397}
398
399#[repr(C)]
400pub(crate) struct Task {
401    pub cpu_context: CpuContext,
402    #[cfg(feature = "esp-radio")]
403    pub thread_semaphore: Option<Semaphore>,
404    pub state: TaskState,
405    pub stack: *mut [MaybeUninit<u32>],
406
407    #[cfg(any(hw_task_overflow_detection, sw_task_overflow_detection))]
408    pub stack_guard: *mut u32,
409    #[cfg(sw_task_overflow_detection)]
410    pub(crate) stack_guard_value: u32,
411
412    pub priority: Priority,
413    #[cfg(multi_core)]
414    pub pinned_to: Option<Cpu>,
415
416    pub wakeup_at: u64,
417
418    /// Whether the task is currently queued in the run queue.
419    pub run_queued: bool,
420    /// Whether the task is currently queued in the timer queue.
421    pub timer_queued: bool,
422
423    /// The current wait queue this task is in.
424    pub(crate) current_queue: Option<NonNull<WaitQueue>>,
425
426    // Lists a task can be in:
427    /// The list of all allocated tasks
428    pub alloc_list_item: TaskListItem,
429
430    /// Either the RunQueue or the WaitQueue
431    pub ready_queue_item: TaskListItem,
432
433    /// The timer queue
434    pub timer_queue_item: TaskListItem,
435
436    /// The list of tasks scheduled for deletion
437    pub delete_list_item: TaskListItem,
438
439    /// Whether the task was allocated on the heap.
440    #[cfg(feature = "alloc")]
441    pub(crate) heap_allocated: bool,
442}
443
444#[cfg(feature = "esp-radio")]
445extern "C" fn task_wrapper(task_fn: extern "C" fn(*mut c_void), param: *mut c_void) {
446    task_fn(param);
447    schedule_task_deletion(core::ptr::null_mut());
448}
449
450impl Task {
451    #[cfg(feature = "esp-radio")]
452    pub(crate) fn new(
453        name: &str,
454        task_fn: extern "C" fn(*mut c_void),
455        param: *mut c_void,
456        task_stack_size: usize,
457        priority: usize,
458        pinned_to: Option<Cpu>,
459    ) -> Self {
460        debug!(
461            "task_create {} {:?}({:?}) stack_size = {} priority = {} pinned_to = {:?}",
462            name, task_fn, param, task_stack_size, priority, pinned_to
463        );
464
465        // Make sure the stack guard doesn't eat into the stack size.
466        let extra_stack = if cfg!(any(hw_task_overflow_detection, sw_task_overflow_detection)) {
467            4 + esp_config::esp_config_int!(usize, "ESP_HAL_CONFIG_STACK_GUARD_OFFSET")
468        } else {
469            0
470        };
471
472        #[cfg(debug_build)]
473        // This is a lot, but debug builds fail in different ways without.
474        let extra_stack = extra_stack.max(6 * 1024);
475
476        let task_stack_size = task_stack_size + extra_stack;
477
478        // Make sure stack size is also aligned to 16 bytes.
479        let task_stack_size = (task_stack_size & !0xF) + 16;
480
481        let stack = unwrap!(
482            Layout::from_size_align(task_stack_size, 16)
483                .ok()
484                .and_then(|layout| InternalMemory.allocate(layout).ok()),
485            "Failed to allocate stack",
486        )
487        .as_ptr();
488
489        let stack_bottom = stack.cast::<MaybeUninit<u32>>();
490        let stack_len_bytes = stack.len();
491
492        let stack_guard_offset =
493            esp_config::esp_config_int!(usize, "ESP_HAL_CONFIG_STACK_GUARD_OFFSET");
494
495        let stack_words = core::ptr::slice_from_raw_parts_mut(stack_bottom, stack_len_bytes / 4);
496        let stack_top = unsafe { stack_bottom.add(stack_words.len()).cast() };
497
498        let mut task = Task {
499            cpu_context: new_task_context(task_fn, param, stack_top),
500            #[cfg(feature = "esp-radio")]
501            thread_semaphore: None,
502            state: TaskState::Ready,
503            stack: stack_words,
504            #[cfg(any(hw_task_overflow_detection, sw_task_overflow_detection))]
505            stack_guard: stack_words.cast(),
506            #[cfg(sw_task_overflow_detection)]
507            stack_guard_value: 0,
508            current_queue: None,
509            priority: Priority::new(priority),
510            #[cfg(multi_core)]
511            pinned_to,
512
513            wakeup_at: 0,
514            timer_queued: false,
515            run_queued: false,
516
517            alloc_list_item: TaskListItem::None,
518            ready_queue_item: TaskListItem::None,
519            timer_queue_item: TaskListItem::None,
520            delete_list_item: TaskListItem::None,
521
522            #[cfg(feature = "alloc")]
523            heap_allocated: false,
524        };
525
526        task.set_up_stack_guard(stack_guard_offset, 0xDEED_BAAD);
527
528        task
529    }
530
531    fn set_up_stack_guard(&mut self, offset: usize, _value: u32) {
532        let stack_bottom = self.stack.cast::<MaybeUninit<u32>>();
533        let stack_guard = unsafe { stack_bottom.byte_add(offset) };
534
535        #[cfg(sw_task_overflow_detection)]
536        unsafe {
537            // avoid touching the main stack's canary on the first core
538            if stack_guard.read().assume_init() != _value {
539                stack_guard.write(MaybeUninit::new(_value));
540            }
541            self.stack_guard_value = _value;
542        }
543
544        #[cfg(any(hw_task_overflow_detection, sw_task_overflow_detection))]
545        {
546            self.stack_guard = stack_guard.cast();
547        }
548    }
549
550    pub(crate) fn ensure_no_stack_overflow(&self) {
551        #[cfg(sw_task_overflow_detection)]
552        assert_eq!(
553            // This cast is safe to do from MaybeUninit<u32> because this is the word we've written
554            // during initialization.
555            unsafe { self.stack_guard.read() },
556            self.stack_guard_value,
557            "Stack overflow detected in {:?}",
558            self as *const Task
559        );
560    }
561
562    pub(crate) fn set_up_stack_watchpoint(&self) {
563        #[cfg(hw_task_overflow_detection)]
564        unsafe {
565            esp_hal::debugger::set_stack_watchpoint(self.stack_guard as usize);
566        }
567    }
568}
569
570impl Drop for Task {
571    fn drop(&mut self) {
572        debug!("Dropping task: {:?}", self as *mut Task);
573
574        #[cfg(feature = "esp-radio")]
575        let _ = self.thread_semaphore.take();
576
577        #[cfg(feature = "alloc")]
578        if self.heap_allocated {
579            let layout = unwrap!(
580                Layout::from_size_align(self.stack.len() * 4, 16).ok(),
581                "Cannot compute Layout for stack"
582            );
583            unsafe { InternalMemory.deallocate(unwrap!(NonNull::new(self.stack.cast())), layout) };
584        }
585    }
586}
587
588pub(super) fn allocate_main_task(
589    scheduler: &mut SchedulerState,
590    stack: *mut [MaybeUninit<u32>],
591    stack_guard_offset: usize,
592    stack_guard_value: u32,
593) {
594    let cpu = Cpu::current();
595    let current_cpu = cpu as usize;
596
597    debug_assert!(
598        !scheduler.per_cpu[current_cpu].initialized,
599        "Tried to allocate main task multiple times"
600    );
601
602    scheduler.per_cpu[current_cpu].initialized = true;
603
604    // Reset main task properties. The rest should be cleared when the task is deleted.
605    scheduler.per_cpu[current_cpu].main_task.priority = Priority::ZERO;
606    scheduler.per_cpu[current_cpu].main_task.state = TaskState::Ready;
607    scheduler.per_cpu[current_cpu].main_task.stack = stack;
608    scheduler.per_cpu[current_cpu].main_task.run_queued = false;
609    scheduler.per_cpu[current_cpu].main_task.timer_queued = false;
610    #[cfg(multi_core)]
611    {
612        scheduler.per_cpu[current_cpu].main_task.pinned_to = Some(cpu);
613    }
614
615    scheduler.per_cpu[current_cpu]
616        .main_task
617        .set_up_stack_guard(stack_guard_offset, stack_guard_value);
618
619    scheduler.per_cpu[current_cpu]
620        .main_task
621        .set_up_stack_watchpoint();
622
623    // This is slightly questionable as we don't ensure SchedulerState is pinned, but it's always
624    // part of a static object so taking the pointer is fine.
625    let main_task_ptr = NonNull::from(&scheduler.per_cpu[current_cpu].main_task);
626    debug!("Main task created: {:?}", main_task_ptr);
627
628    #[cfg(feature = "rtos-trace")]
629    rtos_trace::trace::task_new(main_task_ptr.rtos_trace_id());
630
631    // The main task is already running, no need to add it to the ready queue.
632    scheduler.all_tasks.push(main_task_ptr);
633    scheduler.per_cpu[current_cpu].current_task = Some(main_task_ptr);
634    scheduler
635        .run_queue
636        .mark_task_ready(&scheduler.per_cpu, main_task_ptr);
637}
638
639pub(super) fn with_current_task<R>(mut cb: impl FnMut(&mut Task) -> R) -> R {
640    SCHEDULER.with(|state| {
641        cb(unsafe {
642            let current_cpu = Cpu::current() as usize;
643            unwrap!(state.per_cpu[current_cpu].current_task).as_mut()
644        })
645    })
646}
647
648pub(super) fn current_task() -> TaskPtr {
649    with_current_task(|task| NonNull::from(task))
650}
651
652/// A handle to the current thread.
653#[derive(Clone, Copy, Debug)]
654#[cfg_attr(feature = "defmt", derive(defmt::Format))]
655pub struct CurrentThreadHandle {
656    task: TaskPtr,
657}
658
659impl CurrentThreadHandle {
660    /// Retrieves a handle to the current task.
661    pub fn get() -> Self {
662        Self {
663            task: current_task(),
664        }
665    }
666
667    /// Delays the current task for the specified duration.
668    pub fn delay(self, duration: Duration) {
669        self.delay_until(Instant::now() + duration);
670    }
671
672    /// Delays the current task until the specified deadline.
673    pub fn delay_until(self, deadline: Instant) {
674        SCHEDULER.sleep_until(deadline);
675    }
676
677    /// Sets the priority of the current task.
678    pub fn set_priority(self, priority: usize) {
679        let priority = Priority::new(priority);
680        SCHEDULER.with(|state| {
681            let old = self.task.priority(&mut state.run_queue);
682            self.task.set_priority(&mut state.run_queue, priority);
683
684            // If we're dropping in priority, trigger a context switch in case another task can be
685            // scheduled or time slicing needs to be started.
686            if old > priority {
687                crate::task::yield_task();
688            }
689        });
690    }
691}
692
693#[cfg(feature = "esp-radio")]
694pub(super) fn schedule_task_deletion(task: *mut Task) {
695    trace!("schedule_task_deletion {:?}", task);
696    if SCHEDULER.with(|scheduler| scheduler.schedule_task_deletion(task)) {
697        loop {
698            yield_task();
699        }
700    }
701}
702
703#[inline]
704#[cfg(multi_core)]
705pub(crate) fn schedule_other_core() {
706    use esp_hal::interrupt::software::SoftwareInterrupt;
707    match Cpu::current() {
708        Cpu::ProCpu => unsafe { SoftwareInterrupt::<'static, 1>::steal() }.raise(),
709        Cpu::AppCpu => unsafe { SoftwareInterrupt::<'static, 0>::steal() }.raise(),
710    }
711
712    // It takes a bit for the software interrupt to be serviced, but since it's happening on the
713    // other core, we don't need to wait.
714}