• 0

  • 454

探索 Flutter Stream 底层实现:Microtask

2星期前

首图.png
首图.png

前言

为什么我要写这篇文章?

最近公司来了一个有 Flutter 开发经验的 iOS 妹子,于是我们移动端就尝试用 Flutter 开发新的业务模块。

我们现在的 Flutter 项目用的状态管理框架是 BLoC,但是我在使用 BLoC 时感觉稀里糊涂的,不知道事情是怎么发生的,于是我就想了解一下 Stream 的原理。

这篇文章本来是要讲 Stream 原理的,后面发现这样内容太多了,于是决定先写一篇微任务原理的文章。

什么地方用到了微任务?

我们平时在日常开发中使用的 BLoC 等状态管理框架,大部分都基于事件流 Stream 来实现的。

我们调用 Stream 的 listen() 方法时,其实就是把一个微任务提交到微任务队列,而在 listen() 方法中传入的 onData 回调在微任务执行时,就会被调用。

如果我们的列表有太多的元素,我们想要异步地去遍历这些元素,我们可以使用 IteratorforEachAsync() 方法,这个方法也会把遍历的操作,作为微任务提交到微任务队列中。

而且除了通过 Stream 以外,我们也可以直接调用 scheduleMicrotask() 把任务加入队列。

微任务执行流程大致是怎么样的?

微任务执行流程.gif
微任务执行流程.gif

假如我们给 Stream 的 listen() 方法传入 onData 回调的函数名为 f ,那么在经过 Stream 相关的多个方法后, f 会被封装为延时事件 _DelayedEvent 保存在 _StreamImplEvents 中,然后调用 scheduleMicrotask() 把微任务接入队列,具体被提交到微任务队列的就是 _startMicrotaskLoop() 方法。

Engine 接收到 VSync 信号时,就会调用 BeginFrame() 方法,这个方法会调用微任务队列 DartMicrotaskQueue 的 RunMicroatsks() 方法从队列中取出任务来执行。

当 _startMicrotaskLoop() 方法被执行时,就会调用我们设定的回调 f 方法 ,到此微任务的执行流程就结束了。

内容概览

接下来我会分三个部分来讲,第一部分会讲微任务调度流程,也就是从调用 Stream.value() 方法到把微任务加入到 MicrotaskQueue 的流程。

第二部分会讲微任务执行流程,也就是从 Animator 接收到 VSync 信号开始,到 DartMicrotaskQueue 开始执行微任务的流程。

第三部分会讲从 _startMicrotaskLoop 开始,到调用 onData 回调的过程。

1. 微任务调度流程

Stream.value() 为例,value() 方法会创建一个 Stream 事件 _StreamImplEvents ,并把它作为挂起事件放到 StreamSubscription 中作为挂起事件。

然后 _StreamImplEvents 会把自己的 handleNext() 方法传给 scheduleMicrotask()方法 。

schedueMicrotask() 方法会把接收到的闭包封装为异步回调 _AsynCallback,并保存在异步回调链表中,然后在执行微任务的时候就会调用这些回调。

1.1 Stream.value()

Stream.listen__.png
Stream.listen__.png

在调用 Stream.value() 方法后,value() 方法的内部会创建一个异步 Stream 控制器 _AsyncStreamController ,并调用 _AsnycStreamController 的 _add() 方法。

Stream.value__.gif
Stream.value__.gif

_add() 方法会通过 _StreamController 的 _ensurePendingEvents() 方法来创建一个 _StreamImplEvents ,它是挂起事 _PendingEvents 的子类。

_add() 方法创建好挂起事件后,就会把 value ,在这里也就是 1 ,封装为延时数据事件 _DelayedData,然后调用 _StreamImplEvents 的 add() 方法把 _DelayedData 作为挂起事件 _StreamImplEvents 最后一个挂起事件 lastPendingEvent。

_DelayedEvent.png
_DelayedEvent.png

_DelayedData 是 _DelayedEvent 的子类,除了_DelayedData 以外,_DelayedEvent 还有 _DelayedError_DelayedDone 两个子类。

value() 方法调用完 _ AsnycStreamController 的 _add() 方法后,就会创建一个 _ControllerStream 并返回。

1.2 _StreamImpl.listen()

Before scheduleMicrotask__.png
Before scheduleMicrotask__.png

通过 Stream.value() 获取到 _ControllerStream 后调用的 listen() 方法,是 _ControllerStream 的父类 _StreamImpl 的方法。

_StreamImpl 的 listen() 方法中,会调用 _ControllerStream 的 _createSubscription() 方法创建订阅。

_createSubscription() 方法又调用了 Stream 控制器 _StreamController_subscrbe() 方法进行订阅。

_StreamController 的 _subscribe() 方法会创建一个 _ControllerSubscription ,并把在前面调用 _ensurePendingEvents() 方法时创建的 _StreamImplEvents 作为挂起事件 _PendingEvents 传到 _BufferingStreamSubscription_setPendingEvents() 方法中。

而 _setPendingEvents() 方法中,又调用了自己的 schedule() 方法,并把自己的 handleNext() 方法传入 schedule_microtask.dart 文件中的 scheduleMicrotask() 方法中。

下面再来看下 scheduleMicrotask() 的实现。

1.3 scheduleMicrotask()

after scheduleMicrotask__.gif
after scheduleMicrotask__.gif

scheduleMicrotask() 方法会调用 _rootScheduleMicrotask() 方法,在 _rootScheduleMicrotask() 方法中,会把回调方法封装为异步回调 _AsyncCallback ,并传给 _scheduleAsynCallabck() 方法。

在 _scheduleAsyncCallback() 方法中,会先把 _AsyncCallback 封装为 _AsyncCallbackEntry ,然后把 _startMicrotakskLoop() 方法传到 _AsyncRun 的 _scheduleImmedaite() 方法中,_scheduleImmediate() 的实现在 schedule_microtask_patch.dart 文件中。

这些异步回调是以链表的形式存储的,新创建的 _AsyncCallbackEntry 会被放在链表的尾端,_startMcirotaskLoop() 方法执行的时候,就会调用到这些回调。

_AsyncCallbackEntry.png
_AsyncCallbackEntry.png

_AsyncRun 的 _scheduleImmedaite() 方法会调用 _ScheduleImmediate 中的闭包 _closure ,这个闭包其实就是 Engine 层中的 DartMicrotaskQueue 的 ScheduleMicrotask() 方法。

然后在 ScheduleMicrotask() 方法中,会把 _startMicrotaskLoop 放到微任务队列 MicrotaskQueue 中,

MicrotaskQueue.png
MicrotaskQueue.png

DartMicrotaskQueue 有遍历微任务的实现,而 MicrotaskQueue 则是具体保存微任务的地方。

1.4 _ScheduleImmediate._closure

_ScheduleImmediate.png
_ScheduleImmediate.png

_ScheduleImmediate 中的闭包 _closure 是在 DartRuntimeHooks 初始化的时候赋值的,下面说下这个过程。

在 Engine 的启动过程中,会创建 RootIsolate,并调用 DartIsolate 的 LoadLibraries() 方法。

LoadLibraries() 方法会调用 dart_runtime_hooks.cc 中的 InitDartAsync() ,InitDartAsync() 方法首先会从 natives.dart 中获取 DartMicrotaskQueue 的 ScheduleMicrotask() 方法,

这个方法就是用来初始化 _ScheduleImmedate 的 _closure ,有了这个方法,后面 scheduleMicrotask() 要用的时候才能调用得到。

在 natives.dart 中,有一个 _getScheduleMicrotaskClosure() 方法,这个方法就是 DartRuntimeHooks 中的 ScheduleMicrotask() 方法。

而 DartRuntimeHooks 中的 ScheduleMicrotask() 方法会调用 UIDartState 的 ScheduleMicrotask() 方法,最终会调用微任务队列 DartMicrotaskQueue 的 ScheduleMicrotask() 方法。

initDartAsync() 方法获取到了微任务队列的 ScheduleMicrotask 方法后,就会调用 _setScheduleImmediateClosure() 方法把这个函数赋值给 _ScheduleImmediate 的 _closure 。

2. 微任务执行流程

看完了微任务的调度流程,下面我们来看下微任务的执行流程。

RunMicrotasks(Engine 层).png
RunMicrotasks(Engine 层).png

微任务的执行流程从 Engine 层接收到 VSync 垂直同步信号开始,当 DartMicrotaskQueue 的 RunMicrotasks() 方法被调用后,就会通过 Dart 层执行 Dart 代码,最后调用到 Flutter 层的 _startMicrotaskLoop 方法。

关于完整的绘制流程在这里就不展开了,我们从 Animator 的 RequestFrame() 方法开始讲起。

在 RequestFrame() 方法中,会调用自己的 AwaitVSync() 方法,而 AwaitVSync() 方法会调用 VSyncWaiter 的 AsyncWaitForVsync() 方法等待 VSync 信号

当 VSyncWaiter 接收到 VSync 信号后,就会调用 Animator 的 BeginFrame() 方法。

在 Animator 的 BeginFrame() 方法中,会调用 Shell 的 OnAnimatorBeginFrame() 方法,Shell 又会调用 Engine 的 BeginFrame() 方法,然后 Engine 又会调用 RuntimeController 的 BeginFrame() 方法。

在 RuntimeController 的 BeginFrame() 方法中,会调用 PlatformConfiguration 的 BeginFrame() 方法,在 PlatformConfiguration 的 BeginFrame() 方法中,调用 _beginFrame()、FlushMicrotasksNow() 以及 _drawFrame() 三个方法。

PlatformConfiguration.BeginFrame__.png
PlatformConfiguration.BeginFrame__.png

_beginFrame() 方法和 _drawFrame() 方法会调用到 Flutter 层 window 的 onBeginFrame 和 onDrawFrame 回调触发绘制流程。

FlushMicrotasksNow() 是 UIDartState 的方法,这个方法直接调用了 DartMicrotaskQueue 的 RunMicrotasks() 方法。

在 RunMicrotasks() 方法中,会遍历 MicrotaskQueue 中的回调,这些回调是 Dart 代码,所以这里会调用 Dart 层的方法来执行回调。

回调被传入 Dart 层后,会经过下面这些方法。

RunMicrotasks__(Dart 层).png
RunMicrotasks__(Dart 层).png

3. 异步回调执行流程

下面我们来看下 DartMicrotaskQueue 通过 Dart 层调用到 Dart 代码,也就是 _startMicrotaskLoop() 方法后,都经过了哪些方法才调用到了 onData 回调。

_startMicrotaskLoop.gif
_startMicrotaskLoop.gif

_startMicrotaskLoop() 方法被调用后,会通过 Zone 调用到 _StreamImplEvents 的 handleNext() 方法处理下一个事件,然后 _StreamImplEvents 会调用 StreamSubscription 的 \_sendData() 方法发送数据事件,最后就会调用到我们设定的 onData() 回调。

3.1 _startMicrotaskLoop()

_startMicrotaskLoop(Zone).png
_startMicrotaskLoop(Zone).png
1. _microtaskLoop()

在 _startMicrotaskLoop() 方法把 _microtaskLoop() 放在了 try/finally 块中执行,而 _microtaskLoop() 中则会不断读取并执行链表中的 _AsyncCallbackEntry。

之所以要放在 try/finally 块中执行,是为了避免执行某个回调的过程中发生了异常导致遍历的过程中断。

2. Zone.bindCallbackGuarded()

_startMicrotaskLoop() 方法在调用到 handleNext() 方法前,会三番四次经过 Zone,之所以会这样,是因为在 scheduleMicrotask() 方法中,把接收到的回调放到了 Zone 的 bindCallbackGuarded() 方法中,而 _rootScheduleMicrotask() 方法,又把接收到的回调再次放到了 bindCallbackGuarded() 方法中。

而 bindCallbackGuarded() 会调用 runGuarded() 方法,runGuaded() 会把接收到的回调放在 try/catch 代码块中执行,并在遇到异常的时候调用 handleUncaughtError() 方法。

当 handleUncaughtError() 被调用时,如果我们把 runApp() 放在了 runZoneGuarded() 中执行,那么在 onError() 回调中就能够获取到这个异常的信息。

如果我们没有调用 runZoneGuarded() ,也没有在 listen() 方法中传入 onError 回调的话,那么异常就会作为 Unhandled Exception 打印成日志。

3. _PendingEvents.schedule()

这里调用的并不是 schedule() 方法,而是在 schedule() 方法中传给 scheduleMicrotask() 方法的闭包,具体实现如下。

scheduleMicrotask_code_.png
scheduleMicrotask_code_.png

3.2 handleNext()

handleNext__.png
handleNext__.png

之所以这个方法叫 handleNext() ,是因为延时事件 _DelayedEvent 和 _AsyncCallbackEntry 一样,是以链表的形式保存的,handleNext() 会获取并执行第一个挂起事件(firstPendingEvent)的下一个事件。

在 _DelayedData 的 perform() 方法中,只是简单地调用了_EventDispatch 的 _sendData() 方法发送数据。

_EventDispatch 是一个抽象类,_BufferingStreamSubscription 是它的其中一个子类,_BufferingStreamSubscription 会把 _onData 放到 runUnaryGuaded() 中执行,目的也是为了捕获回调中的异常,如果没有异常的话,那就会正常走到 onData 中,然后这个流程就结束了。

参考资料

本文使用 mdnice 排版

免责声明:文章版权归原作者所有,其内容与观点不代表Unitimes立场,亦不构成任何投资意见或建议。

ios

454

相关文章推荐

未登录头像

暂无评论