并发任务控制
刀刀
4/7/2025
0 字
0 分钟
前置代码要求
首先看一段代码:
function timeout(time) {
return new Promise((res) => {
setTimeout(() => {
res()
}, time)
})
}
class SuperTask {
}
const superTask = new SuperTask()
function addTask(time, name) {
superTask
.add(() => timeout(time))
.then(() => {
console.log(`任务${name}完成`)
})
}
addTask(10000, 1) // 10000ms后输出:任务1完成
addTask(5000, 2) // 5000ms后输出:任务2完成
addTask(3000, 3) // 8000ms后输出:任务3完成
addTask(4000, 4) // 12000ms后输出:任务4完成
代码有一个 timeout
函数,用于延迟一段时间后执行。函数 addTask
接收时间和名称,执行类的 add
方法后触发延迟函数,执行完毕后再输出文本。
依次调用 addTask
函数,发现任务3并不是3秒后触发,而是8秒后触发。梳理一下时间顺序不难看出,他同一时间只有两个任务可以执行,如下:
首先执行任务1,需要等待10秒,然后执行任务2,需要等待5秒。
此时任务3需要等待前两个任务其中一个任务执行完毕才可继续执行,最早的是任务2,5秒后执行完毕,此时任务3才开始执行,等待3秒后执行完毕,因此是5+3=8秒。
如果还是不理解,可以想象为类似于现实中的银行柜台。一个银行中只有两个柜台可以办理业务,第三个人想要办理业务,则需要等待前两个人中其中一人办理好才能办理。
配置并发类
基础属性
需要配置以下属性:
- 并发的数量,默认为2
- 线程池数组,初始为空
- 当前正在执行任务的数量,默认为0
class SuperTask {
constructor(parallelCount = 2) {
this.parallelCount = parallelCount
this.tasks = []
this.runningCount = 0
}
}
添加函数
根据代码,执行完添加方法后调用了 .then()
,因此添加任务函数返回一个 Promise
。添加任务函数执行两个操作:
- 把任务添加到任务线程数组中
- 尝试执行任务,有可能执行失败
还是以银行例子举例,银行大厅可能空无一人(对应数组为空),一个人拿了号之后叫号为自己的号,可以执行;也有可能前面有很多人了,就要去排队,因此经过判断后执行失败。
class SuperTask {
constructor(parallelCount = 2) {
// ....
}
add(task) {
return new Promise((res, rej) => {
this.task.push(task)
this._run()
})
}
_run() {}
}
执行函数
在 _run()
函数中需要跨函数执行 add
函数的 res
,把 Promise
状态改变。因此需要修改 add
函数的代码,不能单纯保存任务 task
,也需要保存其 res
和 rej
。
循环判断当前的执行任务数量是否小于并发数量,与当前的任务数组是否不为空,都符合条件则拿出第一项任务,执行其任务,并让执行任务的数量自增1.
任务执行完毕后让执行任务的数量自减1,且再次调用 _run()
方法重新判断是否执行下一个任务。
代码如下:
class SuperTask {
constructor(parallelCount = 2) {
// ....
}
add(task) {
return new Promise((res, rej) => {
this.task.push({
task,
res,
rej
})
this._run()
})
}
_run() {
while(this.runningCount < this.parallelCount && this.tasks.length > 0) {
const { task, res, rej } = this.tasks.shift()
this.runningCount++
task().then(resolve, reject).finally(() => {
this.runningCount--
this._run()
})
}
}
}
完整代码
function timeout(time) {
return new Promise((res) => {
setTimeout(() => {
res()
}, time)
})
}
class SuperTask {
constructor(parallelCount = 2) {
this.parallelCount = parallelCount
this.tasks = []
this.runningCount = 0
}
add(task) {
return new Promise((res, rej) => {
this.task.push({
task,
res,
rej
})
this._run()
})
}
_run() {
while(this.runningCount < this.parallelCount && this.tasks.length > 0) {
const { task, res, rej } = this.tasks.shift()
this.runningCount++
task().then(resolve, reject).finally(() => {
this.runningCount--
this._run()
})
}
}
}
const superTask = new SuperTask()
function addTask(time, name) {
superTask
.add(() => timeout(time))
.then(() => {
console.log(`任务${name}完成`)
})
}
addTask(10000, 1) // 10000ms后输出:任务1完成
addTask(5000, 2) // 5000ms后输出:任务2完成
addTask(3000, 3) // 8000ms后输出:任务3完成
addTask(4000, 4) // 12000ms后输出:任务4完成
拓展:并发队列函数封装
思路
接下来封装一个并发队列的函数,主要思想如下:
- 返回一个
Promise
函数 - 判断当前任务数组长度,为0则表示任务处理完毕,返回成功状态
- 循环,循环条件为可并发数量以及当前索引小于总任务数组索引,符合条件则调用
run()
函数 - 定义一个变量
nextIndex
,用于记录下一个任务的索引,默认为初始0;定义一个变量finishCount
,用于记录任务完成的数量,默认为0 run()
方法获取nextIndex
索引对应的任务,nextIndex
自增1,执行任务。- 任务执行成功后
finishCount
自增1,并开始判断情况:- 判断当前索引是否小于数组长度,小于说明还未执行完毕,继续调用
run()
方法 - 判断完成任务数量变量是否等于任务数组,等于说明任务都完成了,完成则返回成功状态
- 判断当前索引是否小于数组长度,小于说明还未执行完毕,继续调用
总体代码
function paralleTask(tasks, parallelCount = 2) {
return new Promise((resolve) => {
// 任务数组是否为空
if(tasks.length === 0) {
resolve()
return
}
let nextIndex = 0
let finishCount = 0
function _run() {
// 运行下一个任务
const task = tasks[nextIndex]
nextIndex++
task().then(() => {
finishCount++
if(nextIndex < tasks.length) {
_run()
} else if(finishCount === tasks.length) {
resolve()
}
})
}
// 循环可执行任务数,并发数量
for(let i = 0; i < parallelCount && nextIndex < tasks.length; i++) {
_run()
}
})
}
export default paralleTask
使用
import paralleTask from './paralleTask.js'
const tasks = [
// ...
]
paralleTask(tasks, 4).then(() => {
console.log('all finish!')
})