跳转到内容

并发任务控制

刀刀

4/7/2025

0 字

0 分钟

前置代码要求

首先看一段代码:

js
function timeout(time) {
  return new Promise((res) => {
    setTimeout(() => {
      res()
    }, time)
  })
}

class SuperTask {
    
}

const superTask = new SuperTask()
function addTask(time, name) {
  superTask
    .add(() => timeout(time))
  	.then(() => {
      console.log(`任务${name}完成`)
    })
}

addTask(10000, 1) // 10000ms后输出:任务1完成
addTask(5000, 2) // 5000ms后输出:任务2完成
addTask(3000, 3) // 8000ms后输出:任务3完成
addTask(4000, 4) // 12000ms后输出:任务4完成

代码有一个 timeout 函数,用于延迟一段时间后执行。函数 addTask 接收时间和名称,执行类的 add 方法后触发延迟函数,执行完毕后再输出文本。

依次调用 addTask 函数,发现任务3并不是3秒后触发,而是8秒后触发。梳理一下时间顺序不难看出,他同一时间只有两个任务可以执行,如下:

首先执行任务1,需要等待10秒,然后执行任务2,需要等待5秒。

此时任务3需要等待前两个任务其中一个任务执行完毕才可继续执行,最早的是任务2,5秒后执行完毕,此时任务3才开始执行,等待3秒后执行完毕,因此是5+3=8秒。

如果还是不理解,可以想象为类似于现实中的银行柜台。一个银行中只有两个柜台可以办理业务,第三个人想要办理业务,则需要等待前两个人中其中一人办理好才能办理。

配置并发类

基础属性

需要配置以下属性:

  • 并发的数量,默认为2
  • 线程池数组,初始为空
  • 当前正在执行任务的数量,默认为0
js
class SuperTask {
  constructor(parallelCount = 2) {
    this.parallelCount = parallelCount
    this.tasks = []
    this.runningCount = 0
  }
}

添加函数

根据代码,执行完添加方法后调用了 .then() ,因此添加任务函数返回一个 Promise 。添加任务函数执行两个操作:

  1. 把任务添加到任务线程数组中
  2. 尝试执行任务,有可能执行失败

还是以银行例子举例,银行大厅可能空无一人(对应数组为空),一个人拿了号之后叫号为自己的号,可以执行;也有可能前面有很多人了,就要去排队,因此经过判断后执行失败。

js
class SuperTask {
  constructor(parallelCount = 2) {
    // ....
  }
  
  add(task) {
    return new Promise((res, rej) => {
      this.task.push(task)
      this._run()
    })
  }
    
  _run() {}
}

执行函数

_run() 函数中需要跨函数执行 add 函数的 res ,把 Promise 状态改变。因此需要修改 add 函数的代码,不能单纯保存任务 task ,也需要保存其 resrej

循环判断当前的执行任务数量是否小于并发数量,与当前的任务数组是否不为空,都符合条件则拿出第一项任务,执行其任务,并让执行任务的数量自增1.

任务执行完毕后让执行任务的数量自减1,且再次调用 _run() 方法重新判断是否执行下一个任务。

代码如下:

js
class SuperTask {
  constructor(parallelCount = 2) {
    // ....
  }
    
  add(task) {
    return new Promise((res, rej) => {
      this.task.push({ 
        task, 
        res, 
        rej 
      }) 
      this._run()
    })
  }
    
  _run() {
    while(this.runningCount < this.parallelCount && this.tasks.length > 0) { 
      const { task, res, rej } = this.tasks.shift() 
      this.runningCount++
      
      task().then(resolve, reject).finally(() => { 
        this.runningCount--
        this._run() 
      }) 
    } 
  }
}

完整代码

js
function timeout(time) {
  return new Promise((res) => {
    setTimeout(() => {
      res()
    }, time)
  })
}

class SuperTask {
  constructor(parallelCount = 2) {
    this.parallelCount = parallelCount
    this.tasks = []
    this.runningCount = 0
  }
    
  add(task) {
    return new Promise((res, rej) => {
      this.task.push({
        task,
        res,
        rej
      })
      this._run()
    })
  }
    
  _run() {
    while(this.runningCount < this.parallelCount && this.tasks.length > 0) {
      const { task, res, rej } = this.tasks.shift()
      this.runningCount++
      
      task().then(resolve, reject).finally(() => {
        this.runningCount--
        this._run()
      })
    }
  }
}

const superTask = new SuperTask()
function addTask(time, name) {
  superTask
    .add(() => timeout(time))
    .then(() => {
      console.log(`任务${name}完成`)
    })
}

addTask(10000, 1) // 10000ms后输出:任务1完成
addTask(5000, 2) // 5000ms后输出:任务2完成
addTask(3000, 3) // 8000ms后输出:任务3完成
addTask(4000, 4) // 12000ms后输出:任务4完成

拓展:并发队列函数封装

思路

接下来封装一个并发队列的函数,主要思想如下:

  1. 返回一个 Promise 函数
  2. 判断当前任务数组长度,为0则表示任务处理完毕,返回成功状态
  3. 循环,循环条件为可并发数量以及当前索引小于总任务数组索引,符合条件则调用 run() 函数
  4. 定义一个变量 nextIndex ,用于记录下一个任务的索引,默认为初始0;定义一个变量 finishCount ,用于记录任务完成的数量,默认为0
  5. run() 方法获取 nextIndex 索引对应的任务, nextIndex 自增1,执行任务。
  6. 任务执行成功后 finishCount 自增1,并开始判断情况:
    • 判断当前索引是否小于数组长度,小于说明还未执行完毕,继续调用 run() 方法
    • 判断完成任务数量变量是否等于任务数组,等于说明任务都完成了,完成则返回成功状态

总体代码

js
function paralleTask(tasks, parallelCount = 2) {
  return new Promise((resolve) => {
    // 任务数组是否为空
    if(tasks.length === 0) {
      resolve()
      return
    }
    
    let nextIndex = 0
    let finishCount = 0
    
    function _run() {
      // 运行下一个任务
      const task = tasks[nextIndex]
      nextIndex++
      task().then(() => {
        finishCount++
        if(nextIndex < tasks.length) {
          _run()
        } else if(finishCount === tasks.length) {
          resolve()
        }
      })
    }
    
    // 循环可执行任务数,并发数量
    for(let i = 0; i < parallelCount && nextIndex < tasks.length; i++) {
      _run()
    }
  })
}

export default paralleTask

使用

js
import paralleTask from './paralleTask.js'

const tasks = [
  // ...
]

paralleTask(tasks, 4).then(() => {
  console.log('all finish!')
})