9.11.1 生产无限序列
管道(Pipeline)是一种模式, 我们可以用一个协程生产无限序列:
fun produceNumbers() = produce<Long>(CommonPool) {
var x = 1L
while (true) send(x++) // infinite stream of integers starting from 1
}
我们的消费序列的函数如下:
fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
for (x in numbers) send(x * x)
}
主代码启动并连接整个管线:
fun testPipeline() = runBlocking {
val numbers = produceNumbers() // produces integers from 1 and on
val squares = consumeNumbers(numbers) // squares integers
//for (i in 1..6) println(squares.receive())
while (true) {
println(squares.receive())
}
println("Done!")
squares.cancel()
numbers.cancel()
}
运行上面的代码,我们将会发现控制台在打印一个无限序列,完全没有停止的意思。
9.11.2 管道与无穷质数序列
我们使用协程管道来生成一个无穷质数序列。
我们从无穷大的自然数序列开始:
fun numbersProducer(context: CoroutineContext, start: Int) = produce<Int>(context) {
var n = start
while (true) send(n++) // infinite stream of integers from start
}
这次我们引入一个显式上下文参数context, 以便调用方可以控制我们的协程运行的位置。下面的管道将筛选传入的数字流, 过滤掉可以被当前质数整除的所有数字:
fun filterPrimes(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
for (x in numbers) if (x % prime != 0) send(x)
}
现在我们通过从2开始, 从当前通道中取一个质数, 并为找到的每个质数启动新的管道阶段, 从而构建出我们的管道:
numbersFrom(2) -> filterPrimes(2) -> filterPrimes(3) -> filterPrimes(5) -> filterPrimes(7) ...
测试无穷质数序列:
fun producePrimesSequences() = runBlocking {
var producerJob = numbersProducer(context, 2)
while (true) {
val prime = producerJob.receive()
print("${prime} \t")
producerJob = filterPrimes(context, producerJob, prime)
}
}
运行上面的代码,我们将会看到控制台一直在无限打印出质数序列:
9.11.3 通道缓冲区
我们可以给通道设置一个缓冲区:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>(4) // 创建一个缓冲区容量为4的通道
launch(context) {
repeat(10) {
println("Sending $it")
channel.send(it) // 当缓冲区已满的时候, send将会挂起
}
}
delay(1000)
}
输出:
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4