redux-saga源码分析(二)

上一篇中的runSaga作用为启动中间件,最终会执行proc

proc函数

proc主要干了两件事情:

  • 生成事件订阅通道,用于存放回调函数
  • 执行一次Generator对象iterator的next,根据结果执行对应类型的effect
function proc(env, iterator, parentContext, parentEffectId, meta, cont) {
  
  const taskContext = Object.create(parentContext)
  const finalRunEffect = env.finalizeRunEffect(runEffect)
  const task = newTask(parentEffectId, meta, cont)
  const mainTask = { meta, cancel: cancelMain, _isRunning: true, _isCancelled: false }

  const taskQueue = forkQueue(
    mainTask,
    function onAbort() {
      cancelledDueToErrorTasks.push(...taskQueue.getTaskNames())
    },
    end,
  )
  next()

  return task
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

runEffec函数

runEffect根据类型执行逻辑:如果是take类型,封装回调加入当前作用域下的事件channel,初始化操作完成;如果是fork,进入proc逻辑。

function runEffect(effect, effectId, currCb) {
    if (is.promise(effect)) {
      resolvePromise(effect, currCb)
    } else if (is.iterator(effect)) {
      resolveIterator(effect, effectId, meta, currCb)
    } else if (effect && effect[IO]) {
      const { type, payload } = effect
      if (type === effectTypes.TAKE) runTakeEffect(payload, currCb)
      else if (type === effectTypes.PUT) runPutEffect(payload, currCb)
      else if (type === effectTypes.ALL) runAllEffect(payload, effectId, currCb)
      else if (type === effectTypes.RACE) runRaceEffect(payload, effectId, currCb)
      else if (type === effectTypes.CALL) runCallEffect(payload, effectId, currCb)
      else if (type === effectTypes.CPS) runCPSEffect(payload, currCb)
      else if (type === effectTypes.FORK) runForkEffect(payload, effectId, currCb)
      else if (type === effectTypes.JOIN) runJoinEffect(payload, currCb)
      else if (type === effectTypes.CANCEL) runCancelEffect(payload, currCb)
      else if (type === effectTypes.SELECT) runSelectEffect(payload, currCb)
      else if (type === effectTypes.ACTION_CHANNEL) runChannelEffect(payload, currCb)
      else if (type === effectTypes.FLUSH) runFlushEffect(payload, currCb)
      else if (type === effectTypes.CANCELLED) runCancelledEffect(payload, currCb)
      else if (type === effectTypes.GET_CONTEXT) runGetContextEffect(payload, currCb)
      else if (type === effectTypes.SET_CONTEXT) runSetContextEffect(payload, currCb)
      else currCb(effect)
    } else {
      // anything else returned as is
      currCb(effect)
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

next是一个自动执行函数

function next(arg, isErr) {
    if (!mainTask._isRunning) {
      throw new Error('Trying to resume an already finished generator')
    }

    try {
      let result
      if (isErr) {
        result = iterator.throw(arg)
      } else if (shouldCancel(arg)) {
        mainTask._isCancelled = true
        
        next.cancel()
        
        result = is.func(iterator.return) ? iterator.return(TASK_CANCEL) : { done: true, value: TASK_CANCEL }
      } else if (shouldTerminate(arg)) {
        result = is.func(iterator.return) ? iterator.return() : { done: true }
      } else {
        result = iterator.next(arg)
      }

      if (!result.done) {
        digestEffect(result.value, parentEffectId, '', next)
      } else {
        mainTask._isRunning = false
        mainTask.cont(result.value)
      }
    } catch (error) {
      if (mainTask._isCancelled) {
        env.logError(error)
      }
      mainTask._isRunning = false
      mainTask.cont(error, true)
    }
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

需要特别注意下sagaHelpers的实现,例如takeEveryHelper,它返回一个类iterator的对象,便于后续的遍历和执行。
当有redux事件进来时,会触发channel中的事件回调。回调函数是基于proc中next函数的封装,执行过程会触发一次fork和take,takeEvery借助的takeEveryHelper核心逻辑

function takeEveryHelper(patternOrChannel, worker, ...args) {
  const yTake = { done: false, value: take(patternOrChannel) }
  const yFork = ac => ({ done: false, value: fork(worker, ...args, ac) }) 
  let action, setAction = ac => (action = ac)
  // 创建iterator函数,实现自定义next逻辑:q1() -> q2() -> q1() 循环进行下去
  // 执行第一步,会进入runTake逻辑
  // 执行第二步,会进入runFork,在runFork中会再执行一次runTake
  // 因此函数会按照:  q1()  -->   q2()->q1()  -->  q2()->q1()  --> ...  这种循环执行下去
  return fsmIterator(
    {
      q1() {
        return ['q2', yTake, setAction]
      },
      q2() {  // 触发事件时,进行此步操作,返回一个fork对象
        return action === END ? [qEnd] : ['q1', yFork(action)]
      },
    },
    'q1',
    `takeEvery(${safeName(patternOrChannel)}, ${worker.name})`,
  )
}

function fsmIterator(fsm, q0, name = 'iterator') {
  let updateState,
    qNext = q0

  function next(arg, error) {
    ...
    let [q, output, _updateState] = fsm[qNext]()
    qNext = q
    return qNext === qEnd ? done : output
  }
  // 封装返回iterator类型对象
  return makeIterator(next, error => next(null, error), name, true)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

Effect

Effect就是一个个对象,而这些对象的解释则交给上面的runEffect来进行
以put—effect为例解释来看

function runPutEffect({ channel, action, resolve }, cb) {
    asap(() => {
      let result
      try {
      // dispatch相应的action
        result = (channel ? channel.put : env.dispatch)(action)
      } catch (error) {
        cb(error, true)
        return
      }
      if (resolve && is.promise(result)) {
        resolvePromise(result, cb)
      } else {
        cb(result)
      }
    })
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

总结

中间件做的事情

  • 根据你传入的generator函数,next执行
  • generator函数中,存在一些副作用操作,比如put、call、race等,然后根据这些操作再进行其他的操作,而这些操作只做了一件事情,就是生成effect
    内部逻辑:
  • proc:proc是运行的核心,effect方法适配其运行机制。
  • 事件channel:每次proc执行都会生成一个事件channel,如果当前上下文执行逻辑后有take类型,那么会往channel中塞入对应的回调函数,否则channel为空。redux的action会通过sagaEmitter.emit(action)触发事件channel中的回调,如果channel中事件匹配的回调,执行对应逻辑。
  • fork:可以对比linux中fork进行理解。每次fork都会执行proc函数生成一个新的task。
  • take:  塞入当前作用域下的事件channel一个回调函数,此回调函数的主逻辑是proc中的next函数。
  • put:遍历事件channel,执行匹配的回调
最后更新时间: 10/8/2019, 7:41:17 PM