From 81eaf8ca879bcaca984310e13f556afaae1dbb5d Mon Sep 17 00:00:00 2001 From: kean Date: Sun, 17 Dec 2017 11:08:23 +0300 Subject: [PATCH] Simplify Loader --- Sources/CancellationToken.swift | 5 +- Sources/Loader.swift | 132 +++++++++++++++++--------------- Sources/Request.swift | 9 +-- Tests/LoaderTests.swift | 28 +++++++ 4 files changed, 105 insertions(+), 69 deletions(-) diff --git a/Sources/CancellationToken.swift b/Sources/CancellationToken.swift index d0109a8c0..134eeaf02 100644 --- a/Sources/CancellationToken.swift +++ b/Sources/CancellationToken.swift @@ -14,15 +14,14 @@ public final class CancellationTokenSource { return _observers == nil } - private var _observers: [() -> Void]? = [] - /// Creates a new token associated with the source. public var token: CancellationToken { return CancellationToken(source: self) } + private var _observers: [() -> Void]? = [] + /// Initializes the `CancellationTokenSource` instance. public init() {} - fileprivate func register(_ closure: @escaping () -> Void) { if !_register(closure) { closure() diff --git a/Sources/Loader.swift b/Sources/Loader.swift index 35b5de23e..1725499fa 100644 --- a/Sources/Loader.swift +++ b/Sources/Loader.swift @@ -15,6 +15,7 @@ public protocol Loading { } public typealias ProgressHandler = (_ completed: Int64, _ total: Int64) -> Void +private typealias Completion = (Result) -> Void public extension Loading { /// Loads an image with the given request. @@ -77,15 +78,19 @@ public final class Loader: Loading { self.taskQueue = TaskQueue(maxConcurrentTaskCount: maxConcurrentRequestCount) } + // MARK: Loading + /// Loads an image for the given request using image loading pipeline. public func loadImage(with request: Request, token: CancellationToken?, completion: @escaping (Result) -> Void) { queue.async { if token?.isCancelling == true { return } // Fast preflight check - self._loadImage(with: request, token: token, completion: completion) + self._loadImageDeduplicating(request, token: token, completion: completion) } } - private func _loadImage(with request: Request, token: CancellationToken?, completion: @escaping (Result) -> Void) { + // MARK: Deduplication + + private func _loadImageDeduplicating(_ request: Request, token: CancellationToken?, completion: @escaping Completion) { let task = _startTask(with: request) // Combine requests with the same `loadKey` into a single request. @@ -102,74 +107,31 @@ public final class Loader: Loading { private func _startTask(with request: Request) -> Task { // Check if the task for the same request already exists. let key = Request.loadKey(for: request) - if let task = tasks[key] { return task } - - let task = Task(request: request, key: key) - tasks[key] = task - // Use rate limiter to prevent trashing of the underlying systems - rateLimiter.execute(token: task.cts.token) { [weak self] in - self?._loadImage(with: task) - } - return task - } - - private func _loadImage(with task: Task) { // would be nice to rewrite to async/await - _loadData(with: task) { [weak self] in - switch $0 { - case let .success(val): self?._decode(response: val, task: task) - case let .failure(err): self?._complete(task, result: .failure(err)) + guard let task = tasks[key] else { + let task = Task(request: request, key: key) + tasks[key] = task + + // Start the pipeline + var request = request // make a copy to set a custom progress handler + request.progress = { [weak self, weak task] in + if let task = task { self?._progress(completed: $0, total: $1, task: task) } } + _loadImage(with: request, token: task.cts.token) { [weak self, weak task] in + if let task = task { self?._complete(task, result: $0) } + } + return task } - } - - private func _loadData(with task: Task, completion: @escaping (Result<(Data, URLResponse)>) -> Void) { - let token = task.cts.token - taskQueue.execute(token: token) { [weak self] finish in - self?.loader.loadData(with: task.request.urlRequest, token: token, progress: { - self?._progress(completed: $0, total: $1, task: task) - }, completion: { - finish() - completion($0) - }) - token.register { finish() } - } + return task } private func _progress(completed: Int64, total: Int64, task: Task) { queue.async { let handlers = task.handlers.flatMap { $0.progress } + guard !handlers.isEmpty else { return } DispatchQueue.main.async { handlers.forEach { $0(completed, total) } } } } - private func _decode(response: (Data, URLResponse), task: Task) { - queue.async { - self.decodingQueue.execute(token: task.cts.token) { [weak self] in - if let image = self?.decoder.decode(data: response.0, response: response.1) { - self?._process(image: image, task: task) - } else { - self?._complete(task, result: .failure(Error.decodingFailed)) - } - } - } - } - - private func _process(image: Image, task: Task) { - queue.async { - guard let processor = self.makeProcessor(image, task.request) else { - self._complete(task, result: .success(image)) // no need to process - return - } - self.processingQueue.execute(token: task.cts.token) { [weak self] in - if let image = processor.process(image) { - self?._complete(task, result: .success(image)) - } else { - self?._complete(task, result: .failure(Error.processingFailed)) - } - } - } - } - private func _complete(_ task: Task, result: Result) { queue.async { guard self.tasks[task.key] === task else { return } // check if still registered @@ -199,8 +161,7 @@ public final class Loader: Loading { var retainCount = 0 // number of non-cancelled handlers init(request: Request, key: AnyHashable) { - self.request = request - self.key = key + self.request = request; self.key = key } struct Handler { @@ -209,6 +170,55 @@ public final class Loader: Loading { } } + // MARK: Pipeline + + private func _loadImage(with request: Request, token: CancellationToken, completion: @escaping Completion) { + // Use rate limiter to prevent trashing of the underlying systems + rateLimiter.execute(token: token) { [weak self] in + self?._loadData(with: request, token: token, completion: completion) + } + } + + // would be nice to rewrite to async/await + private func _loadData(with request: Request, token: CancellationToken, completion: @escaping Completion) { + taskQueue.execute(token: token) { [weak self] finish in + self?.loader.loadData(with: request.urlRequest, token: token, progress: { + request.progress?($0, $1) + }, completion: { + finish() + + switch $0 { + case let .success(val): self?._decode(response: val, request: request, token: token, completion: completion) + case let .failure(err): completion(.failure(err)) + } + }) + token.register { finish() } + } + } + + private func _decode(response: (Data, URLResponse), request: Request, token: CancellationToken, completion: @escaping Completion) { + decodingQueue.execute(token: token) { [weak self] in + guard let image = self?.decoder.decode(data: response.0, response: response.1) else { + completion(.failure(Error.decodingFailed)); return + } + self?._process(image: image, request: request, token: token, completion: completion) + } + } + + private func _process(image: Image, request: Request, token: CancellationToken, completion: @escaping Completion) { + guard let processor = makeProcessor(image, request) else { + completion(.success(image)); return // no need to process + } + processingQueue.execute(token: token) { + guard let image = processor.process(image) else { + completion(.failure(Error.processingFailed)); return + } + completion(.success(image)) + } + } + + // MARK: Misc + /// Error returns by `Loader` class itself. `Loader` might also return /// errors from underlying `DataLoading` object. public enum Error: Swift.Error { diff --git a/Sources/Request.swift b/Sources/Request.swift index 6a085066e..d27e0d325 100644 --- a/Sources/Request.swift +++ b/Sources/Request.swift @@ -138,12 +138,11 @@ public extension Request { /// Appends a processor to the request. You can append arbitrary number of /// processors to the request. public mutating func process(with processor: P) { - if let existing = self.processor { - // Chain new processor and the existing one. - self.processor = AnyProcessor(ProcessorComposition([existing, AnyProcessor(processor)])) - } else { - self.processor = AnyProcessor(processor) + guard let existing = self.processor else { + self.processor = AnyProcessor(processor); return } + // Chain new processor and the existing one. + self.processor = AnyProcessor(ProcessorComposition([existing, AnyProcessor(processor)])) } /// Appends a processor to the request. You can append arbitrary number of diff --git a/Tests/LoaderTests.swift b/Tests/LoaderTests.swift index e2964ef1c..869eaca32 100644 --- a/Tests/LoaderTests.swift +++ b/Tests/LoaderTests.swift @@ -190,4 +190,32 @@ class LoaderDeduplicationTests: XCTestCase { XCTAssertEqual(self.dataLoader.createdTaskCount, 1) } } + + func testThatProgressIsReported() { + dataLoader.queue.isSuspended = true + + for _ in 0..<3 { + var request = Request(url: defaultURL) + expect { fulfill in + var expected: [(Int64, Int64)] = [(10, 20), (20, 20)] + request.progress = { + XCTAssertTrue(Thread.isMainThread) + XCTAssertTrue(expected.first?.0 == $0) + XCTAssertTrue(expected.first?.1 == $1) + expected.remove(at: 0) + if expected.isEmpty { + fulfill() + } + } + } + expect { fulfill in + loader.loadImage(with: request) { _ in + fulfill() + } + } + } + dataLoader.queue.isSuspended = false + + wait() + } }