树莓派Pico上的协程革命:在264KB内存里跑出“多任务”真功夫
你有没有遇到过这种情况——
想让树莓派Pico一边读取温湿度传感器,一边闪烁LED提示灯,再同时监听串口指令……结果一用time.sleep(),整个程序就卡住了?
按键按了没反应,数据采集也断断续续。
别急,这不怪你代码写得差,而是传统阻塞式编程在资源受限的嵌入式系统中天生短板。
而真正的解决之道,并不是上RTOS、也不是硬啃C++线程调度,而是在MicroPython里玩转一个轻如鸿毛却威力惊人的机制——协程(Coroutine)。
今天我们就来拆解:如何在只有264KB SRAM、没有操作系统的树莓派Pico上,靠uasyncio实现看似“并发”的多任务系统,把双核M0+的潜力榨干。
为什么Pico不能用“线程”?那我们还能怎么做?
先泼一盆冷水:MicroPython不支持真正的多线程安全机制。虽然它提供了_thread模块,但共享变量极易引发竞态条件,且无法与GC(垃圾回收)协同工作。贸然使用,轻则数据错乱,重则直接崩溃。
但RP2040明明是双核处理器啊!难道只能浪费一个核心?
其实我们可以分两步走:
- 单核内做“伪并行”—— 用协程 + 事件循环模拟多任务;
- 双核间做“物理并行”—— 把高实时性任务扔到第二核心独立运行。
前者靠的是软件协作,后者靠的是硬件并行。两者结合,才能真正发挥Pico的全部实力。
协程的本质:让用户自己掌控“什么时候让出CPU”
协程不是线程,它是一种协作式多任务模型。它的核心思想很简单:
“我这个任务暂时干不了事了,比如要等1秒后再继续,那就先把CPU让出来给别人用。”
这个“让出”动作,就是await的意义所在。
MicroPython里的两种协程写法
从语法上看,MicroPython支持两种风格:
# 方法一:生成器形式(老派,少见) def task(): while True: print("Hello") yield # 让出控制权 time.sleep(0.1) # ⚠️ 这样还是阻塞! # 方法二:现代异步语法(推荐) import uasyncio as asyncio async def blink_led(): while True: print("LED ON") await asyncio.sleep(0.5) # ✅ 非阻塞延时 print("LED OFF") await asyncio.sleep(0.5)注意关键区别:
-time.sleep()是死等,期间其他任务完全无法执行;
-await asyncio.sleep()则只是“注册一个定时器”,然后立刻交出控制权,事件循环可以去跑别的任务。
这就是所谓的“非阻成”。
uasyncio 是怎么让多个任务轮流跑起来的?
想象一下你在厨房做饭:
你要煮面、炒菜、烧水泡茶。如果按顺序来,必须等面煮完才开始炒菜,效率极低。
但如果你会“穿插操作”:
- 下面 → 等待3分钟 → 去切菜 → 开火炒 → 等待翻炒 → 去烧水 → 回头看看面好了没……
这种“主动让出+轮询检查”的方式,就是uasyncio的本质。
它的工作流程如下:
- 创建多个
async函数作为任务; - 调用
asyncio.create_task()将它们加入调度队列; - 启动
asyncio.run(main()),进入事件循环; - 循环不断检查每个任务是否已准备好继续执行(例如
sleep时间到了); - 如果准备好了,就恢复该任务运行一小会儿,直到再次遇到
await; - 如此往复,形成“并发假象”。
听起来像不像操作系统的时间片轮转?只不过这里是用户说了算,而不是内核强制打断。
实战案例:三个任务同时跑,互不干扰
下面是一个典型的Pico应用场景:板载LED闪烁 + 外部传感器轮询 + 报警监控。
import uasyncio as asyncio from machine import Pin # 全局资源 led = Pin(25, Pin.OUT) # 板载LED sensor_pin = Pin(26, Pin.IN) # 外接传感器(如红外/按钮) sensor_value = 0 # 共享状态 # 任务1:快速闪烁LED async def blink_task(): while True: led.value(not led.value()) await asyncio.sleep(0.2) # 200ms切换一次 # 任务2:每秒读取一次传感器 async def sensor_task(): global sensor_value while True: sensor_value = sensor_pin.value() print(f"Sensor: {sensor_value}") await asyncio.sleep(1) # 任务3:实时监控传感器变化并报警 async def monitor_task(): while True: if sensor_value == 1: print("!!! ALARM TRIGGERED !!!") await asyncio.sleep(0.1) # 快速响应 else: await asyncio.sleep(0.5) # 放宽检测频率 # 主入口 async def main(): # 启动所有任务 asyncio.create_task(blink_task()) asyncio.create_task(sensor_task()) asyncio.create_task(monitor_task()) # 主循环永不退出 while True: await asyncio.sleep(1) # 运行事件循环 try: asyncio.run(main()) except KeyboardInterrupt: print("Program stopped.")关键点解析:
- 所有任务都通过
await asyncio.sleep()主动让出CPU; - 即使
monitor_task检测到报警后只睡0.1秒,也不会阻塞其他任务; - 整个系统仅运行在一个CPU核心上,却实现了“三线并行”的效果;
- 内存占用极小,适合长期运行。
双核加持:把最忙的任务丢到Core 1去!
刚才的例子都在Core 0跑,但如果某个任务特别“粘人”,比如需要高频PWM输出或编码器采样,怎么办?
RP2040的一大优势就是双核架构。我们可以手动把某些关键任务放到第二核心运行。
使用_thread启动独立线程
import _thread import uasyncio as asyncio import time # Core 1 上运行的心跳任务 def core1_heartbeat(): counter = 0 while True: print(f"[Core1] Tick {counter}") counter += 1 time.sleep(1) # 注意:这是阻塞的,但只影响Core 1 # 在Core 1启动任务 _thread.start_new_thread(core1_heartbeat, ()) # Core 0 继续跑协程 async def core0_tasks(): while True: print("[Core0] Handling async jobs...") await asyncio.sleep(0.5) # 启动主事件循环 asyncio.run(core0_tasks())输出示例:
[Core1] Tick 0 [Core0] Handling async jobs... [Core1] Tick 1 [Core0] Handling async jobs...可以看到两个任务确实是真正并行执行的。
重要提醒:
_thread不提供锁机制,访问全局变量需格外小心;- 不要在
_thread中调用MicroPython的GC敏感接口(如创建大量对象); - 推荐只在第二核心运行简单、独立、无复杂I/O的任务。
协程 vs 状态机:谁才是嵌入式开发的未来?
以前处理多任务,大家习惯写状态机:
switch(state) { case READ_SENSOR: value = adc.read(); state = WAIT_DEBOUNCE; break; case WAIT_DEBOUNCE: delay_ms(10); state = PROCESS_DATA; break; ... }逻辑分散、跳转复杂、维护困难。
而协程让你可以用近乎同步的方式写异步逻辑:
async def debounce_input(): while True: if button.value() == 1: await asyncio.sleep(0.01) # 去抖 if button.value() == 1: print("Button pressed!") await asyncio.sleep(0.05) # 防止空转占用CPU代码直观、结构清晰、易于扩展。
这才是现代嵌入式开发应有的样子:让开发者专注业务逻辑,而不是调度细节。
工程实践中必须避开的5个坑
1. ❌ 别写“永不停歇”的循环
async def bad_task(): while True: do_heavy_computation() # 没有 await → 占用CPU到底→ 结果:其他任务饿死!
✅ 正确做法:定期让出CPU
await asyncio.sleep(0) # 主动交出控制权2. ❌ 高频任务影响整体调度精度
比如一个任务每10ms执行一次,另一个每5秒执行一次。若前者太多,会导致后者延迟严重。
✅ 解决方案:合理设置sleep时间,避免过度抢占。
3. ❌ 共享资源竞争
尽管协程是单线程调度,但中断可能在任意时刻触发,仍可能导致数据不一致。
✅ 建议加标志位保护:
updating = False try: updating = True shared_data = new_value finally: updating = False4. ❌ 内存泄漏风险
每个协程对象都会占用一定内存(约几十字节),任务过多可能导致OOM。
✅ 推荐动态管理任务生命周期:
task = asyncio.create_task(my_task()) # ... 条件满足后取消 task.cancel()5. ❌ 忽视异常处理
一个未捕获的异常可能导致整个事件循环终止。
✅ 一定要包裹异常:
async def safe_task(): try: while True: await work() except Exception as e: print(f"Task failed: {e}") finally: cleanup()这套方案适合哪些场景?
| 应用类型 | 是否适用 | 说明 |
|---|---|---|
| 智能家居节点 | ✅ 强烈推荐 | 多传感器+本地反馈+无线通信 |
| 工业监测终端 | ✅ | 数据采集+报警联动+显示刷新 |
| 教学实验平台 | ✅✅✅ | 学生易理解,代码结构清晰 |
| 高精度电机控制 | ⚠️ 部分适用 | 实时性要求极高时建议用C/C++中断 |
| 音频流处理 | ❌ | 数据量大,需DMA和硬实时保障 |
总的来说,只要不是对微秒级响应有严苛要求的场景,协程都是首选方案。
更进一步:打造你的微型异步框架
一旦掌握了基础,就可以构建更高级的应用模式:
🧩 模式1:异步队列通信
queue = asyncio.Queue() async def producer(): while True: val = sensor.read() await queue.put(val) await asyncio.sleep(0.1) async def consumer(): while True: val = await queue.get() process(val)🧩 模式2:事件触发机制
event = asyncio.Event() # 某处触发 event.set() # 某处等待 await event.wait() event.clear()🧩 模式3:定时任务调度器
async def every(interval, func): while True: await asyncio.sleep(interval) func() # 用法 asyncio.create_task(every(2, lambda: print("Every 2s")))这些原语组合起来,已经有点像一个微型RTOS了。
写在最后:协程不只是技巧,更是一种思维方式
在树莓派Pico这样的设备上编程,我们总想着“省着点用”。但协程告诉我们:
真正的高效,不是压缩功能,而是提升抽象层次。
当你不再纠结于“哪个任务该轮到我了”,而是专注于“我要做什么”,开发体验就会完全不同。
也许未来的某一天,你会在Pico上跑起一个异步Web服务器,或是蓝牙BLE广播服务。而这一切的起点,不过是那一行简单的:
await asyncio.sleep(0.1)所以,下次当你又想敲下time.sleep()的时候,请停下来问一句:
“我真的要让整个系统停下来等我吗?”
如果不是,那就换上await吧。你会发现,264KB的内存里,也能跑出无限可能。
如果你正在做类似的项目,欢迎在评论区分享你的协程实践!