Skip to content

Commit

Permalink
Simplify Loader
Browse files Browse the repository at this point in the history
  • Loading branch information
kean committed Dec 17, 2017
1 parent 2e0a0f9 commit 81eaf8c
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 69 deletions.
5 changes: 2 additions & 3 deletions Sources/CancellationToken.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ public final class CancellationTokenSource {
return _observers == nil
}

private var _observers: [() -> Void]? = []

/// Creates a new token associated with the source.
public var token: CancellationToken { return CancellationToken(source: self) }

private var _observers: [() -> Void]? = []

/// Initializes the `CancellationTokenSource` instance.
public init() {}


fileprivate func register(_ closure: @escaping () -> Void) {
if !_register(closure) {
closure()
Expand Down
132 changes: 71 additions & 61 deletions Sources/Loader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public protocol Loading {
}

public typealias ProgressHandler = (_ completed: Int64, _ total: Int64) -> Void
private typealias Completion = (Result<Image>) -> Void

public extension Loading {
/// Loads an image with the given request.
Expand Down Expand Up @@ -77,15 +78,19 @@ public final class Loader: Loading {
self.taskQueue = TaskQueue(maxConcurrentTaskCount: maxConcurrentRequestCount)
}

// MARK: Loading

/// Loads an image for the given request using image loading pipeline.
public func loadImage(with request: Request, token: CancellationToken?, completion: @escaping (Result<Image>) -> Void) {
queue.async {
if token?.isCancelling == true { return } // Fast preflight check
self._loadImage(with: request, token: token, completion: completion)
self._loadImageDeduplicating(request, token: token, completion: completion)
}
}

private func _loadImage(with request: Request, token: CancellationToken?, completion: @escaping (Result<Image>) -> Void) {
// MARK: Deduplication

private func _loadImageDeduplicating(_ request: Request, token: CancellationToken?, completion: @escaping Completion) {
let task = _startTask(with: request)

// Combine requests with the same `loadKey` into a single request.
Expand All @@ -102,74 +107,31 @@ public final class Loader: Loading {
private func _startTask(with request: Request) -> Task {
// Check if the task for the same request already exists.
let key = Request.loadKey(for: request)
if let task = tasks[key] { return task }

let task = Task(request: request, key: key)
tasks[key] = task
// Use rate limiter to prevent trashing of the underlying systems
rateLimiter.execute(token: task.cts.token) { [weak self] in
self?._loadImage(with: task)
}
return task
}

private func _loadImage(with task: Task) { // would be nice to rewrite to async/await
_loadData(with: task) { [weak self] in
switch $0 {
case let .success(val): self?._decode(response: val, task: task)
case let .failure(err): self?._complete(task, result: .failure(err))
guard let task = tasks[key] else {
let task = Task(request: request, key: key)
tasks[key] = task

// Start the pipeline
var request = request // make a copy to set a custom progress handler
request.progress = { [weak self, weak task] in
if let task = task { self?._progress(completed: $0, total: $1, task: task) }
}
_loadImage(with: request, token: task.cts.token) { [weak self, weak task] in
if let task = task { self?._complete(task, result: $0) }
}
return task
}
}

private func _loadData(with task: Task, completion: @escaping (Result<(Data, URLResponse)>) -> Void) {
let token = task.cts.token
taskQueue.execute(token: token) { [weak self] finish in
self?.loader.loadData(with: task.request.urlRequest, token: token, progress: {
self?._progress(completed: $0, total: $1, task: task)
}, completion: {
finish()
completion($0)
})
token.register { finish() }
}
return task
}

private func _progress(completed: Int64, total: Int64, task: Task) {
queue.async {
let handlers = task.handlers.flatMap { $0.progress }
guard !handlers.isEmpty else { return }
DispatchQueue.main.async { handlers.forEach { $0(completed, total) } }
}
}

private func _decode(response: (Data, URLResponse), task: Task) {
queue.async {
self.decodingQueue.execute(token: task.cts.token) { [weak self] in
if let image = self?.decoder.decode(data: response.0, response: response.1) {
self?._process(image: image, task: task)
} else {
self?._complete(task, result: .failure(Error.decodingFailed))
}
}
}
}

private func _process(image: Image, task: Task) {
queue.async {
guard let processor = self.makeProcessor(image, task.request) else {
self._complete(task, result: .success(image)) // no need to process
return
}
self.processingQueue.execute(token: task.cts.token) { [weak self] in
if let image = processor.process(image) {
self?._complete(task, result: .success(image))
} else {
self?._complete(task, result: .failure(Error.processingFailed))
}
}
}
}

private func _complete(_ task: Task, result: Result<Image>) {
queue.async {
guard self.tasks[task.key] === task else { return } // check if still registered
Expand Down Expand Up @@ -199,8 +161,7 @@ public final class Loader: Loading {
var retainCount = 0 // number of non-cancelled handlers

init(request: Request, key: AnyHashable) {
self.request = request
self.key = key
self.request = request; self.key = key
}

struct Handler {
Expand All @@ -209,6 +170,55 @@ public final class Loader: Loading {
}
}

// MARK: Pipeline

private func _loadImage(with request: Request, token: CancellationToken, completion: @escaping Completion) {
// Use rate limiter to prevent trashing of the underlying systems
rateLimiter.execute(token: token) { [weak self] in
self?._loadData(with: request, token: token, completion: completion)
}
}

// would be nice to rewrite to async/await
private func _loadData(with request: Request, token: CancellationToken, completion: @escaping Completion) {
taskQueue.execute(token: token) { [weak self] finish in
self?.loader.loadData(with: request.urlRequest, token: token, progress: {
request.progress?($0, $1)
}, completion: {
finish()

switch $0 {
case let .success(val): self?._decode(response: val, request: request, token: token, completion: completion)
case let .failure(err): completion(.failure(err))
}
})
token.register { finish() }
}
}

private func _decode(response: (Data, URLResponse), request: Request, token: CancellationToken, completion: @escaping Completion) {
decodingQueue.execute(token: token) { [weak self] in
guard let image = self?.decoder.decode(data: response.0, response: response.1) else {
completion(.failure(Error.decodingFailed)); return
}
self?._process(image: image, request: request, token: token, completion: completion)
}
}

private func _process(image: Image, request: Request, token: CancellationToken, completion: @escaping Completion) {
guard let processor = makeProcessor(image, request) else {
completion(.success(image)); return // no need to process
}
processingQueue.execute(token: token) {
guard let image = processor.process(image) else {
completion(.failure(Error.processingFailed)); return
}
completion(.success(image))
}
}

// MARK: Misc

/// Error returns by `Loader` class itself. `Loader` might also return
/// errors from underlying `DataLoading` object.
public enum Error: Swift.Error {
Expand Down
9 changes: 4 additions & 5 deletions Sources/Request.swift
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,11 @@ public extension Request {
/// Appends a processor to the request. You can append arbitrary number of
/// processors to the request.
public mutating func process<P: Processing>(with processor: P) {
if let existing = self.processor {
// Chain new processor and the existing one.
self.processor = AnyProcessor(ProcessorComposition([existing, AnyProcessor(processor)]))
} else {
self.processor = AnyProcessor(processor)
guard let existing = self.processor else {
self.processor = AnyProcessor(processor); return
}
// Chain new processor and the existing one.
self.processor = AnyProcessor(ProcessorComposition([existing, AnyProcessor(processor)]))
}

/// Appends a processor to the request. You can append arbitrary number of
Expand Down
28 changes: 28 additions & 0 deletions Tests/LoaderTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,32 @@ class LoaderDeduplicationTests: XCTestCase {
XCTAssertEqual(self.dataLoader.createdTaskCount, 1)
}
}

func testThatProgressIsReported() {
dataLoader.queue.isSuspended = true

for _ in 0..<3 {
var request = Request(url: defaultURL)
expect { fulfill in
var expected: [(Int64, Int64)] = [(10, 20), (20, 20)]
request.progress = {
XCTAssertTrue(Thread.isMainThread)
XCTAssertTrue(expected.first?.0 == $0)
XCTAssertTrue(expected.first?.1 == $1)
expected.remove(at: 0)
if expected.isEmpty {
fulfill()
}
}
}
expect { fulfill in
loader.loadImage(with: request) { _ in
fulfill()
}
}
}
dataLoader.queue.isSuspended = false

wait()
}
}

0 comments on commit 81eaf8c

Please sign in to comment.