在 Combine 中实现自定义 Publisher
2020 - 07 - 09
对于整个事件流的另一端,Publisher
,有时候我们也有自定义的需求。在《SwiftUI 和 Combine 编程》中,在“打包”多个请求时,我们用了一种很 naive 的方法:
struct LoadPokemonRequest {
static var all: AnyPublisher<[PokemonViewModel], AppError> {
(1...30)
.map { LoadPokemonRequest(id: $0).publisher }
.zipAll
}
}
其中,zipAll
是 Array
上的 extension:
extension Array where Element: Publisher {
var zipAll: AnyPublisher<[Element.Output], Element.Failure> {
let initial = Just([Element.Output]())
.setFailureType(to: Element.Failure.self)
.eraseToAnyPublisher()
return reduce(initial) { result, publisher in
result.zip(publisher) { $0 + [$1] }.eraseToAnyPublisher()
}
}
}
这个做法创建了多个“临时” Publisher
,并通过 reduce
把它们组合在一起。对于 zip
来说,这么做侥幸可以工作,但是这并不是一个一般性的解决方案。和自定义 Subscriber
一样,Combine 中的 Publisher
也是 protocol,我们可以按照需求去创建那些 Combine 库中还不存在、但是很有用的 Publisher
。在本文里,我们就以创建一个真正的 ZipAll
作为例子,来说明自定义 Publisher
的一般方法。
你可以打开一个 Playground,跟随本文键入代码,也可以在这里直接查看并尝试完整的代码。
在我们正式开始之前,我还是想强调下面这张图,它总结了 Combine 框架的完整工作流程。其实归根溯源,不管我们只是想很初级地使用 Combine 的内建内容,还是想更高级一些去自定义响应式的操作和事件流,归根结底,我们都是在如图定义的工作流中进行操作。只有真正理解和熟悉这张 Combine 的工作流程图,才能说是真正掌握了 Combine 的思维方式。
主要角色和工作
按照上图,我们逐行来梳理在自定义一个 Publisher
时需要做些什么。这可以为自定义 Publisher
的设计提供一个概览性的指导。对于图中的每个步骤,说明如下:
Subscriber
可以通过调用Publisher.subscribe
来告诉Publisher
订阅开始。自然地,我们需要在Publisher
上增加一个方法:subscribe
。Publisher
需要调用Subscriber
上的receive(subscription:)
方法。这个方法接收一个Subscription
。那么显然,Publisher
需要知道如何创建一个合适的Subscription
。Subscriber
通过调用 2 中创建的Subscription
上的request
方法,来首次表明自己需要多少个事件。也就是说,Subscription
上必须要有一个request
方法,它接受并记录Subscribers.Demand
(这也是Subscription
协议中所定义的方法)。如果你对这个过程还不太熟悉,建议你可以参考我之前关于自定义Subscriber
和实现 back pressure 的文章,那边对Demand
的用法和原理进行了详细的说明。- 当新的事件发生,并且当前的 demand 满足要求 (也即
Subscriber
还需要更多事件) 时,调用Subscriber.receive(_:)
来向下游发送一个事件。这件事情可以由Publisher
完成,但是更多的时候,我们会倾向于保持Publisher
的值语义,然后选择在Subscription
中实现这些逻辑。因为Subscription
在大部分情况下会保持某个 buffer,并随着时间进行响应并改变值 (毕竟这正是 Combine 或者说响应式编程所要解决的问题领域),所以一般我们会选择将Subscription
声明为class
并使用引用语义。另外,Subscriber.receive(_:)
返回的Demand
值应该被追加到剩余需要的事件个数中。 - 同 4。
- 如果事件结束 (比如异步操作完全完成,或者出现了错误),需要调用
Subscriber.receive(completion:)
。这一步也经常是由Subscription
来实现的。
上面的 4,5,6 中涉及的都是在 Subscription
中调用 Subscriber
的方法,所以在我们的实现中,在 Subscription
里持有 Subscriber
是一个自然而然的选择。对于自定义的具体的 Publisher
类型来说,它只负责提供一个简单的接口封装,来满足 Publisher
协议的规定,并保持这个角色的值语义 (在 Combine 的实现中,绝大部分的 Publisher
都拥有值语义,这让订阅的声明周期和行为相对简单)。事件发送,值的保持等涉及到具体、时序上的操作,则由一个相对复杂的 Subscription
实现。
Publisher
对于我们要实现的接受数组版本的 zip
来说,最直接的就是实现一个 ZipAll
,让它实现 Publisher
协议。遵循 Combine 的一般方式,我们把 ZipAll
定义在 Publishers
中,并添加上 Publisher
协议所需要的方法:
extension Publishers {
struct ZipAll<Collection: Swift.Collection>: Publisher
where Collection.Element: Publisher
{
// 1
typealias Output = [Collection.Element.Output]
typealias Failure = Collection.Element.Failure
private let publishers: Collection
init(_ publishers: Collection) {
self.publishers = publishers
}
// 2
func receive<S>(subscriber: S)
where S : Subscriber, Failure == S.Failure, Output == S.Input
{
// 3
}
}
}
- 作为新的
Publisher
,ZipAll
也需要自己的Output
。通过限定ZipAll
所接收的子Publisher
具有相同的类型,新的Publisher
的Output
也便可以被确定。 - 这是
Publisher
协议所规定需要实现的方法,不论你自定义的Publisher
具体是什么,这一部分是不会改变的。 - 在
receive(subscriber:)
里,按照 Combine 工作流程,我们创建Subscription
并调用Subscriber.receive(subscription:)
来把这个新创建的Subscription
发送给Subscriber
。(流程图中的 1 和 2),然后等待Subscriber
首次请求数据。现在我们还没有创建合适的Subscription
类型,所以先把它留空。在后面我们会回到这个方法,并填上需要的内容。
在 init
里,我们接收了一个类型满足 Swift.Collection
,且其中元素均为同类型 Publisher
的集合类型作为参数。在实际使用这个 ZipAll
时,我们大概会想要做的步骤如下:
- 订阅每个输入的 publisher,并观察它们的事件。建立符合输入的 publisher 个数的缓冲区。
- 某个 publisher 发出新的值后,先将它保存到对应的缓冲区里。然后检查所有这些缓冲区中是不是都有待处理的元素。如果都有,则将它们的首个元素移出来,形成一个数组并作为新的
ZipAll
值发送出去。 - 某个输入 publisher 发出成功完成的事件后,将它记录下来,并检查是不是所有的输入 publisher 都完成了。如果是,则将
.finish
事件发送出去。 - 如果某个输入 publisher 发出了错误,那么将错误直接作为新
Publisher
的结果发出。
暂时我们现在还不知道要怎么往 receive(subscriber:)
中填写内容,这要求我们需要知道如何创建 Subscription
。好消息是,Subscription
本身也是一个被严格定义的协议,这为我们实现自定义订阅类型提供了一些基本的依据。
Subscription
紧接着 ZipAll
的定义,在 Publishers
中创建一个私有的 ZipAppSubscription
类:
extension Publishers {
struct ZipAll ... {
// ...
}
private class ZipAppSubscription<Output, Failure: Error>: Subscription
{
// 1
}
}
Combine 中,Subscription
协议定义了两个必须实现的方法:
func request(_ demand: Subscribers.Demand)
func cancel()
前者用来接收 Subscriber
的请求,后者用来取消当前订阅。
在 ZipAppSubscription
的 // 1
里添加下面这些内容:
// 1
private var leftDemand: Subscribers.Demand = .none
private var subscriber: AnySubscriber<[Output], Failure>? = nil
private var buffer: [[Output]]
private let publishers: [AnyPublisher<Output, Failure>]
private var childSubscriptions: [AnyCancellable] = []
private var finishedCount = 0
// 2
private var lock = NSRecursiveLock()
init<S: Subscriber>(
subscriber: S,
publishers: [AnyPublisher<Output, Failure>]
) where Failure == S.Failure, [Output] == S.Input
{
self.subscriber = AnySubscriber(subscriber)
self.buffer = Array(repeating: [], count: publishers.count)
self.publishers = publishers
}
func request(_ demand: Subscribers.Demand) {
lock.lock()
defer { lock.unlock() }
self.leftDemand += demand
// 3
send()
}
func cancel() {
lock.lock()
defer { lock.unlock() }
childSubscriptions = []
subscriber = nil
}
- 既然生活在 Combine 的世界中,我们就得遵守 Combine 的游戏规则。
leftDemand
将记录下游订阅者还需要的值的数量,这样我们可以遵守基于 pull 的行为规则。 - 我们不能确定 zip 操作中涉及的各个 publisher 最终会在哪个线程向我们发送数据,这些数据在接收后会被放到
buffer
中待用,因此这里出现了多个线程共享资源的情况。让整个操作线程安全的最简单的方法就是上锁。 - 当收到
request(_:)
调用时,除了将下游告知的需求demand
累加到leftDemand
以外,我们还需要检查buffer
并尝试触发事件send
就是做这件事情的。
另外,我们还需要一个开始订阅的方法 (startSubscribing
),它会负责开始订阅 publishers
发出的值和事件。
这个 startSubscribing
和 3 中的 send
是 Subscription
的关键内容。前者负责把对应的事件进行转发处理:对于接收到的值,将它缓存在 buffer
中,并判断是否应当触发 zip 合并后的事件;对于接收到的结束事件,如果是错误事件,则结束自身事件流,如果是子 publisher 的结束事件,则将它记录下来,直到所有的 publisher 都结束后,再向外发送自身的结束事件。
这些逻辑看起来有些麻烦,但是如果给翻译翻译的话,代码看起来还是比较简单的:
func startSubscribing() {
for (i, publisher) in publishers.enumerated() {
publisher.sink(
receiveCompletion: { [weak self] completion in
self?.receiveCompletion(completion, at: i)
},
receiveValue: { [weak self] value in
self?.receiveValue(value, at: i)
}
).store(in: &childSubscriptions)
}
}
private func receiveValue(
_ value: Output, at index: Int
) {
lock.lock()
defer { lock.unlock() }
buffer[index].append(value)
send()
}
private func receiveCompletion(
_ event: Subscribers.Completion<Failure>, at index: Int
)
{
lock.lock()
defer { lock.unlock() }
guard let subscriber = subscriber else { return }
switch event {
case .finished:
finishedCount += 1
if finishedCount == buffer.count {
subscriber.receive(completion: .finished)
self.subscriber = nil
}
case .failure:
subscriber.receive(completion: event)
self.subscriber = nil
}
}
然后是 Subscription
里的另一个重要方法 send
,它负责检查 buffer
,并在满足 zip 逻辑的时候向外发布一个新值:
private func send() {
guard let subscriber = subscriber else { return }
while leftDemand > .none, let outputs = firstRowOutputItems {
self.leftDemand -= .max(1)
let nextDemand = subscriber.receive(outputs)
self.leftDemand += nextDemand
}
}
private var firstRowOutputItems: [Output]? {
guard buffer.allSatisfy({ !$0.isEmpty }) else { return nil }
var outputs = [Output]()
for i in 0 ..< buffer.count {
var column = buffer[i]
outputs.append(column.remove(at: 0))
buffer[i] = column
}
return outputs
}
这样,我们就有一个完整的 Subscription
角色了。最后,让我们回到 Publishers.ZipAll
中,把刚才剩下的 receive(subscriber:)
方法补完。创建一个 ZipAppSubscription
实例,调用 Subscriber
协议所定义的 receive(subscription:)
方法,并开始订阅所有的 publisher:
struct ZipAll ... {
// ...
func receive<S>(subscriber: S)
where S : Subscriber, Failure == S.Failure, Output == S.Input
{
let subscription = ZipAppSubscription<Collection.Element.Output, Failure>(
subscriber: subscriber, publishers: publishers.map { $0.eraseToAnyPublisher() }
)
subscriber.receive(subscription: subscription)
subscription.startSubscribing()
}
}
现在,我们就可以通过 Publishers.ZipAll
来创建一个真正的 ZipAll
的 Publisher
了。比如:
let p1 = [1,2,3].publisher
let p2 = [4,5,6].publisher
let p3 = [7,8,9,10].publisher
let zipped = Publishers.ZipAll([p1, p2, p3])
let subscription = zipped.sink(
receiveCompletion: {completion in
print("receiveCompletion \(completion)")
},
receiveValue: { values in
print("receiveValues: \(values)")
}
)
// 输出:
// receiveValues: [1, 4, 7]
// receiveValues: [2, 5, 8]
// receiveValues: [3, 6, 9]
// receiveCompletion finished
当然,最后,我们可以学习 Publishers
中的其他类型那样,为 ZipAll
提供一个辅助方法,让创建 Publishers.ZipAll
变得简单一些:
extension Collection where Element: Publisher {
var zipAll: Publishers.ZipAll<Self> {
Publishers.ZipAll(self)
}
}
let zipped = [p1, p2, p3].zipAll
不足之处和改进空间
虽然 ZipAll
应该已经可以正常工作了,但是还有一些值得优化的地方。
性能改进
首先是 firstRowOutputItems
中的数组操作的效率。buffer
的类型是 [[Output]]
,它其中的元素也只是普通的 Array
。因此 firstRowOutputItems
里的移除首个元素 column.remove(at: 0)
的操作,其实时间复杂度是 O(n)
,而它又处于一个 buffer.count
的循环中,所以这里会带来一个 n^2 的复杂度,是难以接收的。我们可以自己创建一个队列的数据结构,把 remove(at: 0)
的操作简化为 O(1)
来避免这个问题。
其次,还是在 firstRowOutputItems
里,我们每次都对“是否 buffer
中所有的列都至少有一个元素”进行了判断:buffer.allSatisfy({ !$0.isEmpty })
,这也是一个 O(n)
。一种更简单的方式,是维护一个变量来记录当前已经收到的可合并值的个数:在每次收到值时,判断 buffer
对应的位置上是否已经有值,来确定需不需要更改这个变量。如果发现已经收到的可合并值的个数与 publishers 的数量相等的话,就说明所有数据都已经准备就绪,可以将它们 zip
并发送。通过这样一个变量,我们可以把这里的 O(n)
也简化为 O(1)
。甚至更进一步,可以自然而然地做到去掉上面提到的 buffer.count
循环,把整个发送流程优化到 O(1)
。
有限 Demand
除了速度优化外,ZipAll
现在的行为逻辑也有值得商榷的地方。在 startSubscribing
里,我们简单地使用了 sink
来对输入的 publishers
进行订阅。Sink
subscriber 在通过 receive(subscription:)
接收到订阅后,会立即 request(_:)
.unlimited
的 Subscribers.Demand
。这其实没有尊重 Combine 事件的拉取模型原则:在我们的 ZipAll
实现中,下游订阅者可以通过控制 Demand
来控制收到的值的数量,但是内部的 publishers
的订阅却可以接受无限多的值。这么一来,一旦在 ZipAll
内部产生 back pressure,比如外部所需要的值的频率小于内部 publishers 产生值的频率的话,buffer
将可以大量积压,导致内存问题。实际上,我们可以根据下游订阅者需要的值的数量,来决定我们所需要的 publishers 给我们的值的数量。这样,我们就能将 back pressure 的处理也应用到被 zip 的 publishers 中去,从而避免溢出问题。
相对于使用 Sink
,我们可以用 AnySubscriber
来在更细的力度上进行一些控制。比如在收到订阅后只请求有限个事件,在收到新值时尊重下游订阅的 Demand
等:
AnySubscriber(
receiveSubscription: { subscription in
},
receiveValue: { value -> Subscribers.Demand in
},
receiveCompletion: { completion in
}
)
总结
可以看出,文中给出的实现有不少缺点,这个参考实现更多地是为了以最简单的方式说明自定义 Publisher
的一般方法,还远远没有达到可以用在产品代码中的质量。不过,通过这种直接的例子,我们可以总结出一些实现自定义 Publisher
时的一般经验:
Publisher
的接口和它需要完成的任务是相对固定的,遵循 Combine 的工作流程图,来实现其中各个职责类型的必要方法即可。- 如果没有特殊的需求,一般我们会将
Publisher
定义为struct
而非class
,这可以让内存管理和多次订阅的行为更加容易预测。但是,如果一个Publisher
有需要共享的话,应该将它定义为引用语义,比如Publishers.Share
。 - 相对于
Publisher
,大部分有关时序的操作都被封装到了Subscription
里。作为Publisher
和Subscriber
之间通讯的桥梁,Subscription
负责大部分逻辑,并维护 Combine 流程的正确性。一般来说,这也是在自定义Publisher
时我们花费最多时间的地方。 - 想要确保你的自定义
Publisher
能在 Combine 的世界中运行良好,需要遵守基本的规则。比如尊重下游的 demand,考虑性能因素等。