Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务率先运行(一)
假设现在有一种场景,在一个任务接收器中,源源不断且不知道任务发送者何时会将新任务发送过来,每个任务都具备不同的任务优先级,任务无时无刻的进入任务缓冲池,目的是把任务缓冲池中优先级最高的那个任务挑选出来最先运行。
import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.newFixedThreadPoolContext import kotlinx.coroutines.runBlocking import java.util.UUID fun main() { val myThreadPool = newFixedThreadPoolContext(4, "my-thread") val bufferCapacity = 5 val totalTaskSize = 15 val channel = Channel<TaskInfo>() val taskList = mutableListOf<TaskInfo>() runBlocking { //接收任务 async { channel.receiveAsFlow() .buffer(bufferCapacity) .onEach { it -> //生产者 println("onEach $it at time=${System.currentTimeMillis()} ${Thread.currentThread().name}") taskList.add(it) }.flowOn(myThreadPool) .collect { it -> //消费者 println("collect $it at time=${System.currentTimeMillis()} ${Thread.currentThread().name}") val newOrderList = taskList.sortedBy { it.priority } newOrderList.forEach { print("${it.priority} ") } val lastTaskInfo = newOrderList.lastOrNull() println("\n最大优先级任务:$lastTaskInfo") taskList.remove(lastTaskInfo) loader(lastTaskInfo!!) } } //源源不断的密集发送加载任务。 async { repeat(totalTaskSize) { it -> enqueue(channel, it) } } } } private suspend fun enqueue(channel: Channel<TaskInfo>, id: Int) { val taskInfo = TaskInfo(id, (Math.random() * 9999).toInt()) println("enqueue $taskInfo") channel.send(taskInfo) } //假设这里是真正的耗时任务执行体 private suspend fun loader(info: TaskInfo) { println("load start $info @time=${System.currentTimeMillis()} ${Thread.currentThread().name}") delay(500) println("load end $info @time=${System.currentTimeMillis()} ${Thread.currentThread().name}") } private class TaskInfo { var id = 0 var priority = 0 private val taskId = UUID.randomUUID() constructor(id: Int, priority: Int) { this.id = id this.priority = priority } override fun equals(other: Any?): Boolean { return taskId == (other as TaskInfo).taskId } override fun toString(): String { return "TaskInfo(id=$id, priority=$priority)" } }输出:
enqueue TaskInfo(id=0, priority=7947)
enqueue TaskInfo(id=1, priority=1045)
enqueue TaskInfo(id=2, priority=4478)
onEach TaskInfo(id=0, priority=7947) at time=1765979341859 my-thread-2
onEach TaskInfo(id=1, priority=1045) at time=1765979341859 my-thread-2
onEach TaskInfo(id=2, priority=4478) at time=1765979341859 my-thread-2
enqueue TaskInfo(id=3, priority=5964)
enqueue TaskInfo(id=4, priority=2658)
onEach TaskInfo(id=3, priority=5964) at time=1765979341859 my-thread-4
onEach TaskInfo(id=4, priority=2658) at time=1765979341859 my-thread-4
enqueue TaskInfo(id=5, priority=3495)
onEach TaskInfo(id=5, priority=3495) at time=1765979341860 my-thread-3
enqueue TaskInfo(id=6, priority=1461)
onEach TaskInfo(id=6, priority=1461) at time=1765979341860 my-thread-4
enqueue TaskInfo(id=7, priority=4860)
onEach TaskInfo(id=7, priority=4860) at time=1765979341860 my-thread-3
enqueue TaskInfo(id=8, priority=7226)
onEach TaskInfo(id=8, priority=7226) at time=1765979341860 my-thread-4
enqueue TaskInfo(id=9, priority=1939)
enqueue TaskInfo(id=10, priority=133)
onEach TaskInfo(id=9, priority=1939) at time=1765979341861 my-thread-3
onEach TaskInfo(id=10, priority=133) at time=1765979341861 my-thread-3
enqueue TaskInfo(id=11, priority=1818)
enqueue TaskInfo(id=12, priority=7695)
onEach TaskInfo(id=11, priority=1818) at time=1765979341861 my-thread-2
onEach TaskInfo(id=12, priority=7695) at time=1765979341861 my-thread-2
enqueue TaskInfo(id=13, priority=4365)
onEach TaskInfo(id=13, priority=4365) at time=1765979341862 my-thread-4
enqueue TaskInfo(id=14, priority=4889)
onEach TaskInfo(id=14, priority=4889) at time=1765979341862 my-thread-2
collect TaskInfo(id=0, priority=7947) at time=1765979341862 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964 7226 7695 7947
最大优先级任务:TaskInfo(id=0, priority=7947)
load start TaskInfo(id=0, priority=7947) @time=1765979341887 main
load end TaskInfo(id=0, priority=7947) @time=1765979342391 main
collect TaskInfo(id=1, priority=1045) at time=1765979342392 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964 7226 7695
最大优先级任务:TaskInfo(id=12, priority=7695)
load start TaskInfo(id=12, priority=7695) @time=1765979342392 main
load end TaskInfo(id=12, priority=7695) @time=1765979342901 main
collect TaskInfo(id=2, priority=4478) at time=1765979342901 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964 7226
最大优先级任务:TaskInfo(id=8, priority=7226)
load start TaskInfo(id=8, priority=7226) @time=1765979342902 main
load end TaskInfo(id=8, priority=7226) @time=1765979343412 main
collect TaskInfo(id=3, priority=5964) at time=1765979343412 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964
最大优先级任务:TaskInfo(id=3, priority=5964)
load start TaskInfo(id=3, priority=5964) @time=1765979343412 main
load end TaskInfo(id=3, priority=5964) @time=1765979343922 main
collect TaskInfo(id=4, priority=2658) at time=1765979343922 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889
最大优先级任务:TaskInfo(id=14, priority=4889)
load start TaskInfo(id=14, priority=4889) @time=1765979343923 main
load end TaskInfo(id=14, priority=4889) @time=1765979344433 main
collect TaskInfo(id=5, priority=3495) at time=1765979344433 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860
最大优先级任务:TaskInfo(id=7, priority=4860)
load start TaskInfo(id=7, priority=4860) @time=1765979344434 main
load end TaskInfo(id=7, priority=4860) @time=1765979344943 main
collect TaskInfo(id=6, priority=1461) at time=1765979344943 main
133 1045 1461 1818 1939 2658 3495 4365 4478
最大优先级任务:TaskInfo(id=2, priority=4478)
load start TaskInfo(id=2, priority=4478) @time=1765979344943 main
load end TaskInfo(id=2, priority=4478) @time=1765979345452 main
collect TaskInfo(id=7, priority=4860) at time=1765979345452 main
133 1045 1461 1818 1939 2658 3495 4365
最大优先级任务:TaskInfo(id=13, priority=4365)
load start TaskInfo(id=13, priority=4365) @time=1765979345452 main
load end TaskInfo(id=13, priority=4365) @time=1765979345960 main
collect TaskInfo(id=8, priority=7226) at time=1765979345960 main
133 1045 1461 1818 1939 2658 3495
最大优先级任务:TaskInfo(id=5, priority=3495)
load start TaskInfo(id=5, priority=3495) @time=1765979345960 main
load end TaskInfo(id=5, priority=3495) @time=1765979346467 main
collect TaskInfo(id=9, priority=1939) at time=1765979346467 main
133 1045 1461 1818 1939 2658
最大优先级任务:TaskInfo(id=4, priority=2658)
load start TaskInfo(id=4, priority=2658) @time=1765979346467 main
load end TaskInfo(id=4, priority=2658) @time=1765979346973 main
collect TaskInfo(id=10, priority=133) at time=1765979346973 main
133 1045 1461 1818 1939
最大优先级任务:TaskInfo(id=9, priority=1939)
load start TaskInfo(id=9, priority=1939) @time=1765979346974 main
load end TaskInfo(id=9, priority=1939) @time=1765979347482 main
collect TaskInfo(id=11, priority=1818) at time=1765979347482 main
133 1045 1461 1818
最大优先级任务:TaskInfo(id=11, priority=1818)
load start TaskInfo(id=11, priority=1818) @time=1765979347483 main
load end TaskInfo(id=11, priority=1818) @time=1765979347986 main
collect TaskInfo(id=12, priority=7695) at time=1765979347986 main
133 1045 1461
最大优先级任务:TaskInfo(id=6, priority=1461)
load start TaskInfo(id=6, priority=1461) @time=1765979347987 main
load end TaskInfo(id=6, priority=1461) @time=1765979348498 main
collect TaskInfo(id=13, priority=4365) at time=1765979348498 main
133 1045
最大优先级任务:TaskInfo(id=1, priority=1045)
load start TaskInfo(id=1, priority=1045) @time=1765979348498 main
load end TaskInfo(id=1, priority=1045) @time=1765979349006 main
collect TaskInfo(id=14, priority=4889) at time=1765979349006 main
133
最大优先级任务:TaskInfo(id=10, priority=133)
load start TaskInfo(id=10, priority=133) @time=1765979349007 main
load end TaskInfo(id=10, priority=133) @time=1765979349513 main
相关:
https://blog.csdn.net/zhangphil/article/details/154843029