Skip to content

Commit

Permalink
Added referenceCounted() for Multicast publishers
Browse files Browse the repository at this point in the history
- Adds `referenceCounted()` operator for Multicast publishers
- Moves `share(replay:)` to use `referenceCounted()` instead of `autoconnect()` – matching wider reactive community implementations.
  • Loading branch information
tcldr authored Oct 30, 2019
1 parent 8be24a5 commit ce8de84
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 2 deletions.
124 changes: 124 additions & 0 deletions Sources/Entwine/Operators/ReferenceCounted.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//
// Entwine
// https://github.com/tcldr/Entwine
//
// Copyright © 2019 Tristan Celder. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

import Combine

// MARK: - Publisher

extension Publishers {

/// Automates the process of connecting to a multicast publisher. Connects when the first
/// subscriber connects then cancels and discards when the subscriber count falls to zero.
public final class ReferenceCounted<Upstream: Publisher, SubjectType: Subject>: Publisher
where Upstream.Output == SubjectType.Output, Upstream.Failure == SubjectType.Failure
{
public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure

private let upstream: Upstream
private let createSubject: () -> SubjectType
private weak var sharedUpstreamReference: Publishers.Autoconnect<Publishers.Multicast<Upstream, SubjectType>>?

init(upstream: Upstream, createSubject: @escaping () -> SubjectType) {
self.upstream = upstream
self.createSubject = createSubject
}

public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
let sharedUpstream = sharedUpstreamPublisher()
sharedUpstream.subscribe(ReferenceCountedSink(upstream: sharedUpstream, downstream: subscriber))
}

func sharedUpstreamPublisher() -> Publishers.Autoconnect<Publishers.Multicast<Upstream, SubjectType>> {
guard let shared = sharedUpstreamReference else {
let shared = upstream.multicast(createSubject).autoconnect()
self.sharedUpstreamReference = shared
return shared
}
return shared
}
}

// MARK: - Sink

fileprivate final class ReferenceCountedSink<Upstream: Publisher, Downstream: Subscriber>: Subscriber
where Upstream.Output == Downstream.Input, Upstream.Failure == Downstream.Failure
{
typealias Input = Downstream.Input
typealias Failure = Downstream.Failure

private let upstream: Upstream
private let downstream: Downstream

init(upstream: Upstream, downstream: Downstream) {
self.upstream = upstream
self.downstream = downstream
}

func receive(subscription: Subscription) {
downstream.receive(subscription: ReferenceCountedSubscription(wrappedSubscription: subscription, sink: self))
}

func receive(_ input: Input) -> Subscribers.Demand {
downstream.receive(input)
}

func receive(completion: Subscribers.Completion<Failure>) {
downstream.receive(completion: completion)
}
}

fileprivate final class ReferenceCountedSubscription<Sink: Subscriber>: Subscription {

let wrappedSubscription: Subscription
var sink: Sink?

init(wrappedSubscription: Subscription, sink: Sink) {
self.wrappedSubscription = wrappedSubscription
self.sink = sink
}

func request(_ demand: Subscribers.Demand) {
wrappedSubscription.request(demand)
}

func cancel() {
wrappedSubscription.cancel()
sink = nil
}
}
}

// MARK: - Operator

extension Publishers.Multicast {

/// Automates the process of connecting to a multicast publisher. Connects when the first
/// subscriber connects then cancels and discards when the subscriber count falls to zero.
///
/// - Returns: A publisher which automatically connects to its upstream multicast publisher.
public func referenceCounted() -> Publishers.ReferenceCounted<Upstream, SubjectType> {
.init(upstream: upstream, createSubject: createSubject)
}
}
4 changes: 2 additions & 2 deletions Sources/Entwine/Operators/ShareReplay.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ extension Publisher {
/// replay to new subscribers
/// - Returns: A class instance that republishes its upstream publisher and maintains a
/// buffer of its latest values for replay to new subscribers
public func share(replay maxBufferSize: Int) -> Publishers.Autoconnect<Publishers.Multicast<Self, ReplaySubject<Output, Failure>>> {
multicast(subject: ReplaySubject<Output, Failure>(maxBufferSize: maxBufferSize)).autoconnect()
public func share(replay maxBufferSize: Int) -> Publishers.ReferenceCounted<Self, ReplaySubject<Self.Output, Self.Failure>> {
multicast { ReplaySubject<Output, Failure>(maxBufferSize: maxBufferSize) }.referenceCounted()
}
}
132 changes: 132 additions & 0 deletions Tests/EntwineTests/ReferenceCountedTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
//
// Entwine
// https://github.com/tcldr/Entwine
//
// Copyright © 2019 Tristan Celder. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

import XCTest
import Combine

@testable import Entwine
@testable import EntwineTest

final class ReferenceCountedTests: XCTestCase {

// MARK: - Properties

private var scheduler: TestScheduler!

// MARK: - Per test set-up and tear-down

override func setUp() {
scheduler = TestScheduler(initialClock: 0)
}

// MARK: - Tests

func testAutoConnectsAndPassesThroughInitialValue() {

let passthrough = PassthroughSubject<Int, Never>()
let subject = passthrough.prepend(-1).share()

let results1 = scheduler.createTestableSubscriber(Int.self, Never.self)

scheduler.schedule(after: 100) { subject.subscribe(results1) }
scheduler.schedule(after: 200) { passthrough.send(0) }
scheduler.schedule(after: 210) { passthrough.send(1) }

scheduler.resume()

let expected: TestSequence<Int, Never> = [
(100, .subscription),
(100, .input(-1)),
(200, .input( 0)),
(210, .input( 1)),
]

XCTAssertEqual(expected, results1.recordedOutput)
}

func testPassesThroughInitialValueToFirstSubscriberOnly() {

let passthrough = PassthroughSubject<Int, Never>()
let subject = passthrough.prepend(-1).share()

let results1 = scheduler.createTestableSubscriber(Int.self, Never.self)
let results2 = scheduler.createTestableSubscriber(Int.self, Never.self)

scheduler.schedule(after: 100) { subject.subscribe(results1) }
scheduler.schedule(after: 110) { subject.subscribe(results2) }
scheduler.schedule(after: 200) { passthrough.send(0) }
scheduler.schedule(after: 210) { passthrough.send(1) }

scheduler.resume()

let expected2: TestSequence<Int, Never> = [
(110, .subscription),
(200, .input( 0)),
(210, .input( 1)),
]

XCTAssertEqual(expected2, results2.recordedOutput)
}

func testResetsWhenReferenceCountReachesZero() {

let passthrough = PassthroughSubject<Int, Never>()
let subject = passthrough.prepend(-1).share()

let results1 = scheduler.createTestableSubscriber(Int.self, Never.self)
let results2 = scheduler.createTestableSubscriber(Int.self, Never.self)
let results3 = scheduler.createTestableSubscriber(Int.self, Never.self)

scheduler.schedule(after: 100) { subject.subscribe(results1) }
scheduler.schedule(after: 110) { subject.subscribe(results2) }
scheduler.schedule(after: 200) { passthrough.send(0) }
scheduler.schedule(after: 210) { passthrough.send(1) }
scheduler.schedule(after: 300) { results1.cancel() }
scheduler.schedule(after: 310) { results2.cancel() }
scheduler.schedule(after: 400) { subject.subscribe(results3) }
scheduler.schedule(after: 500) { passthrough.send(0) }
scheduler.schedule(after: 510) { passthrough.send(1) }

scheduler.resume()

let expected3: TestSequence<Int, Never> = [
(400, .subscription),
(400, .input(-1)),
(500, .input( 0)),
(510, .input( 1)),
]

XCTAssertEqual(expected3, results3.recordedOutput)
}

func testMulticastCreateSubjectCalledWhenSubscriberCountGoesFromZeroToOne() {

var cancellables = Set<AnyCancellable>()
let factory: () -> PassthroughSubject<Int, Never> = { Swift.print("createSubject()"); return PassthroughSubject() }
let sut = Just(1)
sut.multicast(factory).autoconnect().sink { print("A:\($0)") }.store(in: &cancellables)
cancellables = Set<AnyCancellable>()
sut.multicast(factory).autoconnect().sink { print("B:\($0)") }.store(in: &cancellables)
}
}
58 changes: 58 additions & 0 deletions Tests/EntwineTests/ShareReplayTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,62 @@ final class ShareReplayTests: XCTestCase {

XCTAssertEqual(expected2, results2.recordedOutput)
}

func testPassesThroughInitialValueToFirstSubscriberOnly() {

let passthrough = PassthroughSubject<Int, Never>()
let subject = passthrough.prepend(-1).share()

let results1 = scheduler.createTestableSubscriber(Int.self, Never.self)
let results2 = scheduler.createTestableSubscriber(Int.self, Never.self)

scheduler.schedule(after: 100) { subject.subscribe(results1) }
scheduler.schedule(after: 110) { subject.subscribe(results2) }
scheduler.schedule(after: 200) { passthrough.send(0) }
scheduler.schedule(after: 210) { passthrough.send(1) }

scheduler.resume()

let expected2: TestSequence<Int, Never> = [
(110, .subscription),
(200, .input( 0)),
(210, .input( 1)),
]

XCTAssertEqual(expected2, results2.recordedOutput)
}

func testResetsWhenReferenceCountReachesZero() {

let passthrough = PassthroughSubject<Int, Never>()
let subject = passthrough.prepend(-1).share(replay: 2)

let results1 = scheduler.createTestableSubscriber(Int.self, Never.self)
let results2 = scheduler.createTestableSubscriber(Int.self, Never.self)

scheduler.schedule(after: 100) { subject.subscribe(results1) }
scheduler.schedule(after: 110) { passthrough.send(0) }
scheduler.schedule(after: 200) { results1.cancel() }
scheduler.schedule(after: 200) { subject.subscribe(results2) }
scheduler.schedule(after: 300) { passthrough.send(5) }
scheduler.schedule(after: 310) { passthrough.send(6) }

scheduler.resume()

let expected1: TestSequence<Int, Never> = [
(100, .subscription),
(100, .input(-1)),
(110, .input( 0)),
]

let expected2: TestSequence<Int, Never> = [
(200, .subscription),
(200, .input(-1)),
(300, .input( 5)),
(310, .input( 6)),
]

XCTAssertEqual(expected1, results1.recordedOutput)
XCTAssertEqual(expected2, results2.recordedOutput)
}
}

0 comments on commit ce8de84

Please sign in to comment.