Skip to content

Commit

Permalink
Fixed non-standard cancellation behavior
Browse files Browse the repository at this point in the history
Cancellation shouldn't call completion. More here: https://twitter.com/millenomi/status/1137382877870510080
  • Loading branch information
tcldr committed Jun 26, 2019
1 parent 0602dd3 commit 3e4c03b
Show file tree
Hide file tree
Showing 16 changed files with 130 additions and 100 deletions.
1 change: 0 additions & 1 deletion Assets/EntwineTest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ func testMap() {
(300, .input("A")), // received uppercased input @ 100 + subscription time
(400, .input("B")), // received uppercased input @ 200 + subscription time
(500, .input("C")), // received uppercased input @ 300 + subscription time
(900, .completion(.finished)), // subscription cancelled
])
}
```
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func testMap() {
(300, .input("A")), // received uppercased input @ 100 + subscription time
(400, .input("B")), // received uppercased input @ 200 + subscription time
(500, .input("C")), // received uppercased input @ 300 + subscription time
(900, .completion(.finished)), // subscription cancelled
])
}
```
Expand Down
20 changes: 12 additions & 8 deletions Sources/Common/Utilities/SinkQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,28 @@ class SinkQueue<Sink: Subscriber> {
self.sink = sink
}

deinit {
expediteCompletion(.finished)
}

func requestDemand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
demandRequested += demand
return processDemand()
}

func enqueue(_ input: Sink.Input) -> Subscribers.Demand {
guard isActive else { return .none }
assertPreCompletion()
buffer.enqueue(input)
return processDemand()
}

func enqueue(completion: Subscribers.Completion<Sink.Failure>) -> Subscribers.Demand {
guard isActive else { return .none }
assertPreCompletion()
self.completion = completion
return processDemand()
}

func expediteCompletion(_ completion: Subscribers.Completion<Sink.Failure>) {
guard let sink = sink else { return }
guard let sink = sink else {
assertionFailure("Out of sequence. A completion signal has already been sent.")
return
}
self.sink = nil
self.buffer = .empty
sink.receive(completion: completion)
Expand All @@ -74,9 +73,10 @@ class SinkQueue<Sink: Subscriber> {
// Processes as much demand as requested, returns spare capacity that
// can be forwarded to upstream subscriber/s
func processDemand() -> Subscribers.Demand {
guard let sink = sink else { return .none }
while demandProcessed < demandRequested, let next = buffer.next() {
demandProcessed += 1
demandRequested += sink?.receive(next) ?? .none
demandRequested += sink.receive(next)
}
if let completion = completion, demandQueued < 1 {
expediteCompletion(completion)
Expand All @@ -86,4 +86,8 @@ class SinkQueue<Sink: Subscriber> {
demandForwarded += spareDemand
return spareDemand
}

func assertPreCompletion() {
assert(completion == nil && sink != nil, "Out of sequence. A completion signal is queued or has already been sent.")
}
}
35 changes: 13 additions & 22 deletions Sources/Entwine/Operators/Dematerialize.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ extension Publishers {
// for a few more elements to arrive after this is
// called.
func cancel() {
self.sink?.cancelUpstreamSubscription()
self.sink = nil
}
}
Expand All @@ -107,24 +108,20 @@ extension Publishers {
Downstream.Input == AnyPublisher<Upstream.Output.Input, DematerializationError<Upstream.Output.Failure>>,
Downstream.Failure == DematerializationError<Upstream.Output.Failure>
{

typealias Input = Upstream.Output
typealias Failure = Upstream.Failure

enum Status { case pending, active(PassthroughSubject<Input.Input, Downstream.Failure>), complete }

var queue: SinkQueue<Downstream>
var upstreamSubscription: Subscription?
var currentMaterializationSubject: PassthroughSubject<Input.Input, Downstream.Failure>?
var status = Status.pending

init(upstream: Upstream, downstream: Downstream) {
self.queue = SinkQueue(sink: downstream)
upstream.subscribe(self)
}

deinit {
queue.expediteCompletion(.finished)
cancelUpstreamSubscription()
}

// Called by the upstream publisher (or its agent) to signal
// that the subscription has begun
func receive(subscription: Subscription) {
Expand All @@ -141,18 +138,17 @@ extension Publishers {

switch input.signal {
case .subscription:
guard currentMaterializationSubject == nil else {
guard case .pending = status else {
queue.expediteCompletion(.failure(.outOfSequence))
cancelUpstreamSubscription()
return .none
}
currentMaterializationSubject = .init()
return queue.enqueue(currentMaterializationSubject!.eraseToAnyPublisher())
let subject = PassthroughSubject<Input.Input, Downstream.Failure>()
status = .active(subject)
return queue.enqueue(subject.eraseToAnyPublisher())

case .input(let dematerializedInput):
guard let subject = currentMaterializationSubject else {
guard case .active(let subject) = status else {
queue.expediteCompletion(.failure(.outOfSequence))
cancelUpstreamSubscription()
return .none
}
subject.send(dematerializedInput)
Expand All @@ -162,16 +158,12 @@ extension Publishers {
return .max(1)

case .completion(let dematerializedCompletion):
guard let subject = currentMaterializationSubject else {
guard case .active(let subject) = status else {
queue.expediteCompletion(.failure(.outOfSequence))
cancelUpstreamSubscription()
return .none
}
currentMaterializationSubject = nil
status = .complete
subject.send(completion: wrapSourceCompletion(dematerializedCompletion))
// re-imburse the sender as we're not queueing an
// additional element on the outer stream, only
// sending an element on the inner-stream
return .none
}
}
Expand All @@ -185,7 +177,6 @@ extension Publishers {
// that the sequence has terminated
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
_ = queue.enqueue(completion: .finished)
cancelUpstreamSubscription()
}

// Indirectly called by the downstream subscriber via its subscription
Expand Down Expand Up @@ -227,8 +218,8 @@ extension Publisher where Output: SignalConvertible, Failure == Never {
///
/// - Returns: A publisher that materializes an upstream publisher of `Signal`s into the represented
/// sequence.
public func dematerialize() -> Publishers.FlatMap<AnyPublisher<Self.Output.Input, DematerializationError<Self.Output.Failure>>, Publishers.First<Publishers.Dematerialize<Self>>> {
return dematerializedValuesPublisherSequence().first().flatMap { $0 }
public func dematerialize() -> Publishers.FlatMap<AnyPublisher<Self.Output.Input, DematerializationError<Self.Output.Failure>>, Publishers.Dematerialize<Self>> {
Publishers.Dematerialize(upstream: self).flatMap { $0 }
}
}

Expand Down
4 changes: 1 addition & 3 deletions Sources/Entwine/Operators/Materialize.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ extension Publishers {
var sink: MaterializeSink<Upstream, Downstream>?

init(upstream: Upstream, downstream: Downstream) {
let sink = MaterializeSink(upstream: upstream, downstream: downstream)
self.sink = sink
self.sink = MaterializeSink(upstream: upstream, downstream: downstream)
}

// Subscription Methods
Expand Down Expand Up @@ -89,7 +88,6 @@ extension Publishers {
}

deinit {
queue.expediteCompletion(.finished)
cancelUpstreamSubscription()
}

Expand Down
6 changes: 0 additions & 6 deletions Sources/Entwine/Operators/ReplaySubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,13 @@ fileprivate final class ReplaySubjectSubscription<Sink: Subscriber>: Subscriptio

func forwardCompletionToSink(_ completion: Subscribers.Completion<Sink.Failure>) {
queue.expediteCompletion(completion)
cleanup()
}

func request(_ demand: Subscribers.Demand) {
_ = queue.requestDemand(demand)
}

func cancel() {
queue.expediteCompletion(.finished)
cleanup()
}

func cleanup() {
cleanupHandler?()
cleanupHandler = nil
}
Expand Down
70 changes: 52 additions & 18 deletions Sources/Entwine/Operators/WithLatestFrom.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,59 @@ extension Publishers {
}

public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
upstream.subscribe(
WithLatestFromSink<Upstream, Other, S>(
downstream: subscriber,
otherSink: WithLatestFromOtherSink(publisher: other),
transform: transform
)
)
let otherSink = WithLatestFromOtherSink(publisher: other)
let upstreamSink = WithLatestFromSink(upstream: upstream, downstream: subscriber, otherSink: otherSink, transform: transform)
subscriber.receive(subscription: WithLatestFromSubscription(sink: upstreamSink))
}
}

// MARK: - Subscription

fileprivate class WithLatestFromSubscription<Upstream: Publisher, Other: Publisher, Downstream: Subscriber>: Subscription
where Upstream.Failure == Other.Failure, Upstream.Failure == Downstream.Failure
{

var sink: WithLatestFromSink<Upstream, Other, Downstream>?

init(sink: WithLatestFromSink<Upstream, Other, Downstream>) {
self.sink = sink
}

func request(_ demand: Subscribers.Demand) {
sink?.signalDemand(demand)
}

func cancel() {
self.sink?.terminateSubscription()
self.sink = nil
}
}

// MARK: - Main Sink

fileprivate class WithLatestFromSink<Upstream: Publisher, Other: Publisher, Downstream: Subscriber>: Subscriber
where Upstream.Failure == Other.Failure, Upstream.Failure == Downstream.Failure {
where Upstream.Failure == Other.Failure, Upstream.Failure == Downstream.Failure
{

typealias Input = Upstream.Output
typealias Failure = Upstream.Failure

let downstream: Downstream
var queue: SinkQueue<Downstream>
var upstreamSubscription: Subscription?

let otherSink: WithLatestFromOtherSink<Other>
let transform: (Upstream.Output, Other.Output) -> Downstream.Input

init(downstream: Downstream, otherSink: WithLatestFromOtherSink<Other>, transform: @escaping (Input, Other.Output) -> Downstream.Input) {
self.downstream = downstream
init(upstream: Upstream, downstream: Downstream, otherSink: WithLatestFromOtherSink<Other>, transform: @escaping (Input, Other.Output) -> Downstream.Input) {
self.queue = SinkQueue(sink: downstream)
self.otherSink = otherSink
self.transform = transform

upstream.subscribe(self)
}

func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
self.upstreamSubscription = subscription
otherSink.subscribe()
}

Expand All @@ -83,15 +108,27 @@ extension Publishers {
// the dropped item by returning a Subscribers.Demand of 1.
return .max(1)
}
return downstream.receive(transform(input, otherInput))
return queue.enqueue(transform(input, otherInput))
}

func receive(completion: Subscribers.Completion<Downstream.Failure>) {
_ = queue.enqueue(completion: completion)
}

func signalDemand(_ demand: Subscribers.Demand) {
let spareDemand = queue.requestDemand(demand)
guard spareDemand > .none else { return }
upstreamSubscription?.request(spareDemand)
}

func terminateSubscription() {
otherSink.terminateSubscription()
downstream.receive(completion: completion)
upstreamSubscription?.cancel()
}
}

// MARK: - Other Sink

fileprivate class WithLatestFromOtherSink<P: Publisher>: Subscriber {

typealias Input = P.Output
Expand All @@ -101,7 +138,6 @@ extension Publishers {
private (set) var lastInput: Input?
private var subscription: Subscription?


init(publisher: P) {
self.publisher = publisher.eraseToAnyPublisher()
}
Expand All @@ -120,9 +156,7 @@ extension Publishers {
return .none
}

func receive(completion: Subscribers.Completion<Failure>) {
subscription = nil
}
func receive(completion: Subscribers.Completion<Failure>) { }

func terminateSubscription() {
subscription?.cancel()
Expand Down
18 changes: 9 additions & 9 deletions Sources/Entwine/Publishers/Factory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -112,29 +112,29 @@ public class Dispatcher<Input, Failure: Error> {

/// Queues an element to be delivered to the subscriber
///
/// If the subscriber has cancelled the subscription, or either the `forward(completion:)`
/// or the `forwardImmediately(completion:)`method of the dispatcher has already
/// been called, this will be a no-op.
/// - Warning: If either the `forward(completion:)` or the
/// `forwardImmediately(completion:)`method of the dispatcher has already
/// been called this will raise an assertion failure.
///
/// - Parameter input: a value to be delivered to a downstream subscriber
public func forward(_ input: Input) {
fatalError("Abstract class. Override in subclass.")
}

/// Completes the sequence once any queued elements are delivered to the subscriber
/// - Parameter completion: a completion value to be delivered to the subscriber once
///
/// If the subscriber has cancelled the subscription, or either the `forward(completion:)`
/// or the `forwardImmediately(completion:)`method of the dispatcher has already
/// been called, this will be a no-op.
/// - Warning: If either the `forward(completion:)` or the
/// `forwardImmediately(completion:)`method of the dispatcher has already
/// been called this will raise an assertion failure.
///
/// - Parameter completion: a completion value to be delivered to the subscriber once
/// the remaining items in the queue have been delivered
public func forward(completion: Subscribers.Completion<Failure>) {
fatalError("Abstract class. Override in subclass.")
}

/// Completes the sequence immediately regardless of any elements that are waiting to be delivered,
/// subsequent calls to the dispatcher will be a no-op
/// Completes the sequence immediately regardless of any elements that are waiting to be delivered
/// - Warning: subsequent calls to the dispatcher will raise an assertion failure
/// - Parameter completion: a completion value to be delivered immediately to the subscriber
public func forwardImmediately(completion: Subscribers.Completion<Failure>) {
fatalError("Abstract class. Override in subclass.")
Expand Down
2 changes: 2 additions & 0 deletions Sources/Entwine/Utilities/CancellationBag.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import Combine
/// A container for cancellables that will be cancelled when the bag is deallocated or cancelled itself
public final class CancellationBag: Cancellable {

public init() {}

private var cancellables = [AnyCancellable]()

/// Adds a cancellable to the bag which will have its `.cancel()` method invoked
Expand Down
Loading

0 comments on commit 3e4c03b

Please sign in to comment.