From a98c88ed4bd04ea335e0b66cd7892f4721bf5f77 Mon Sep 17 00:00:00 2001 From: ericjordanmossman Date: Tue, 26 Sep 2023 17:19:31 -0400 Subject: [PATCH] `Results.subscribe()` preview API for flexible sync. (#8244) --- CHANGELOG.md | 27 +- .../RLMFlexibleSyncServerTests.mm | 165 ++++++- .../SwiftFlexibleSyncServerTests.swift | 411 ++++++++++++++++++ Realm/RLMAsyncTask.mm | 84 +++- Realm/RLMAsyncTask_Private.h | 12 + Realm/RLMRealm.h | 2 - Realm/RLMResults.h | 177 ++++++++ Realm/RLMResults.mm | 140 ++++++ Realm/RLMResults_Private.h | 8 + Realm/RLMSyncSubscription.h | 31 +- Realm/RLMSyncSubscription.mm | 101 ++++- Realm/RLMSyncSubscription_Private.h | 6 + Realm/RLMSyncSubscription_Private.hpp | 32 +- RealmSwift/Realm.swift | 3 +- RealmSwift/Results.swift | 82 +++- RealmSwift/SyncSubscription.swift | 11 +- 16 files changed, 1255 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ba36f1bce..1c592e2e34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,32 @@ x.y.z Release notes (yyyy-MM-dd) ============================================================= ### Enhancements -* None. +* Added `Results.subscribe` API for flexible sync. + Now you can subscribe and unsubscribe to a flexible sync subscription through an object `Result`. + ```swift + // Named subscription query + let results = try await realm.objects(Dog.self).where { $0.age > 18 }.subscribe(name: "adults") + results.unsubscribe() + + // Unnamed subscription query + let results = try await realm.objects(Dog.self).subscribe() + results.unsubscribe() + ```` + + After committing the subscription to the realm's local subscription set, the method + will wait for downloads according to the `WaitForSyncMode`. + ```swift + let results = try await realm.objects(Dog.self).where { $0.age > 1 }.subscribe(waitForSync: .always) + ``` + Where `.always` will always download the latest data for the subscription, `.onCreation` will do + it only the first time the subscription is created, and `.never` will never wait for the + data to be downloaded. + + This API is currently in `Preview` and may be subject to changes in the future. +* Added a new API which allows to remove all the unnamed subscriptions from the subscription set. + ```swift + realm.subscriptions.removeAll(unnamedOnly: true) + ``` ### Fixed * ([#????](https://github.com/realm/realm-swift/issues/????), since v?.?.?) diff --git a/Realm/ObjectServerTests/RLMFlexibleSyncServerTests.mm b/Realm/ObjectServerTests/RLMFlexibleSyncServerTests.mm index b4aa78119a..45c2baddb9 100644 --- a/Realm/ObjectServerTests/RLMFlexibleSyncServerTests.mm +++ b/Realm/ObjectServerTests/RLMFlexibleSyncServerTests.mm @@ -855,6 +855,40 @@ - (void)testFlexibleSyncRemoveAllQueriesForType { CHECK_COUNT(1, Dog, realm); } +- (void)testRemoveAllUnnamedSubscriptions { + bool didPopulate = [self populateData:^(RLMRealm *realm) { + [self createPeople:realm partition:_cmd]; + [self createDog:realm partition:_cmd]; + }]; + if (!didPopulate) { + return; + } + + RLMRealm *realm = [self getFlexibleSyncRealm:_cmd]; + XCTAssertNotNil(realm); + CHECK_COUNT(0, Person, realm); + + [self writeQueryAndCompleteForRealm:realm block:^(RLMSyncSubscriptionSet *subs) { + [subs addSubscriptionWithClassName:Person.className + where:@"age > 20 and partition == %@", NSStringFromSelector(_cmd)]; + [subs addSubscriptionWithClassName:Person.className + subscriptionName:@"person_age_2" + where:@"firstName == 'firstname_1' and partition == %@", NSStringFromSelector(_cmd)]; + [subs addSubscriptionWithClassName:Dog.className + where:@"breed == 'Labradoodle' and partition == %@", NSStringFromSelector(_cmd)]; + }]; + XCTAssertEqual(realm.subscriptions.count, 3U); + CHECK_COUNT(2, Person, realm); + CHECK_COUNT(1, Dog, realm); + + [self writeQueryAndCompleteForRealm:realm block:^(RLMSyncSubscriptionSet *subs) { + [subs removeAllUnnamedSubscriptions]; + }]; + XCTAssertEqual(realm.subscriptions.count, 1U); + CHECK_COUNT(1, Person, realm); + CHECK_COUNT(0, Dog, realm); +} + - (void)testFlexibleSyncUpdateQuery { bool didPopulate = [self populateData:^(RLMRealm *realm) { [self createPeople:realm partition:_cmd]; @@ -1072,7 +1106,6 @@ - (void)testFlexibleSyncInitialOnConnectionTimeout { syncConfig.cancelAsyncOpenOnNonFatalErrors = true; config.syncConfiguration = syncConfig; - // Set delay above the timeout so it should fail proxy.delay = 2.0; @@ -1090,6 +1123,136 @@ - (void)testFlexibleSyncInitialOnConnectionTimeout { [proxy stop]; } +- (void)testSubscribeWithName { + bool didPopulate = [self populateData:^(RLMRealm *realm) { + Person *person = [[Person alloc] initWithPrimaryKey:[RLMObjectId objectId] + age:30 + firstName:@"Brian" + lastName:@"Epstein"]; + person.partition = NSStringFromSelector(_cmd); + [realm addObject:person]; + }]; + if (!didPopulate) { + return; + } + + RLMRealm *realm = [self getFlexibleSyncRealm:_cmd]; + XCTAssertNotNil(realm); + CHECK_COUNT(0, Person, realm); + + XCTestExpectation *ex = [self expectationWithDescription:@"wait for download"]; + [[[Person allObjectsInRealm:realm] objectsWhere:@"lastName == 'Epstein'"] subscribeWithName:@"5thBeatle" onQueue:dispatch_get_main_queue() completion:^(RLMResults *results, NSError *error) { + XCTAssertNil(error); + XCTAssertEqual(results.count, 1U); + [ex fulfill]; + }]; + [self waitForExpectationsWithTimeout:5.0 handler:nil]; +} + +- (void)testUnsubscribeWithinBlock { + bool didPopulate = [self populateData:^(RLMRealm *realm) { + Person *person = [[Person alloc] initWithPrimaryKey:[RLMObjectId objectId] + age:30 + firstName:@"Joe" + lastName:@"Doe"]; + person.partition = NSStringFromSelector(_cmd); + [realm addObject:person]; + }]; + if (!didPopulate) { + return; + } + + RLMRealm *realm = [self getFlexibleSyncRealm:_cmd]; + XCTAssertNotNil(realm); + CHECK_COUNT(0, Person, realm); + + XCTestExpectation *ex = [self expectationWithDescription:@"wait for download"]; + [[[Person allObjectsInRealm:realm] objectsWhere:@"lastName == 'Doe'"] subscribeWithName:@"unknown" onQueue:dispatch_get_main_queue() completion:^(RLMResults *results, NSError *error) { + XCTAssertNil(error); + XCTAssertEqual(results.count, 1U); + [results unsubscribe]; + [ex fulfill]; + }]; + [self waitForExpectationsWithTimeout:5.0 handler:nil]; + XCTAssertEqual(realm.subscriptions.count, 0U); +} + +- (void)testSubscribeOnQueue { + bool didPopulate = [self populateData:^(RLMRealm *realm) { + Person *person = [[Person alloc] initWithPrimaryKey:[RLMObjectId objectId] + age:30 + firstName:@"Sophia" + lastName:@"Loren"]; + person.partition = NSStringFromSelector(_cmd); + [realm addObject:person]; + }]; + if (!didPopulate) { + return; + } + + RLMUser *user = [self flexibleSyncUser:_cmd]; + RLMRealmConfiguration *config = [user flexibleSyncConfiguration]; + config.objectClasses = @[Person.self]; + + XCTestExpectation *ex = [self expectationWithDescription:@"wait for download"]; + [self dispatchAsyncAndWait:^{ + RLMRealm *realm = [RLMRealm realmWithConfiguration:config error:nil]; + XCTAssertNotNil(realm); + CHECK_COUNT(0, Person, realm); + + [[[Person allObjectsInRealm:realm] objectsWhere:@"lastName == 'Loren'"] + subscribeWithCompletionOnQueue:dispatch_get_main_queue() + completion:^(RLMResults *results, NSError *error) { + XCTAssertNil(error); + XCTAssertEqual(results.realm.subscriptions.count, 1UL); + XCTAssertEqual(results.realm.subscriptions.state, RLMSyncSubscriptionStateComplete); + CHECK_COUNT(1, Person, results.realm); + [ex fulfill]; + }]; + }]; + [self waitForExpectationsWithTimeout:20.0 handler:nil]; + + RLMRealm *realm = [RLMRealm realmWithConfiguration:config error:nil]; + XCTAssertEqual(realm.subscriptions.count, 1UL); + CHECK_COUNT(1, Person, realm); +} + +- (void)testSubscribeWithNameAndTimeout { + bool didPopulate = [self populateData:^(RLMRealm *realm) { + [self createPeople:realm partition:_cmd]; + }]; + if (!didPopulate) { + return; + } + + RLMRealm *realm = [self getFlexibleSyncRealm:_cmd]; + XCTAssertNotNil(realm); + CHECK_COUNT(0, Person, realm); + + [[realm syncSession] suspend]; + XCTestExpectation *ex = [self expectationWithDescription:@"expect timeout"]; + NSTimeInterval timeout = 2.0; + RLMResults *res = [[Person allObjectsInRealm:realm] objectsWhere:@"age >= 20"]; + [res subscribeWithName:@"20up" waitForSync:RLMWaitForSyncModeAlways onQueue:dispatch_get_main_queue() timeout:timeout completion:^(RLMResults *results, NSError *error) { + XCTAssert(error); + NSString *expectedDesc = [NSString stringWithFormat:@"Waiting for update timed out after %.01f seconds.", timeout]; + XCTAssert([error.localizedDescription isEqualToString:expectedDesc]); + XCTAssertNil(results); + [ex fulfill]; + }]; + XCTAssertEqual(realm.subscriptions.count, 1UL); + [self waitForExpectationsWithTimeout:5.0 handler:nil]; + + // resume session and wait for complete + // otherwise test will not tear down successfully + [[realm syncSession] resume]; + NSDate * start = [[NSDate alloc] init]; + while (realm.subscriptions.state != RLMSyncSubscriptionStateComplete && start.timeIntervalSinceNow > -10.0) { + sleep(1); + } + XCTAssertEqual(realm.subscriptions.state, RLMSyncSubscriptionStateComplete); +} + #if 0 // FIXME: this is no longer an error and needs to be updated to something which is - (void)testFlexibleSyncInitialSubscriptionThrowsError { RLMUser *user = [self flexibleSyncUser:_cmd]; diff --git a/Realm/ObjectServerTests/SwiftFlexibleSyncServerTests.swift b/Realm/ObjectServerTests/SwiftFlexibleSyncServerTests.swift index cd1a6db104..9616cdb3f4 100644 --- a/Realm/ObjectServerTests/SwiftFlexibleSyncServerTests.swift +++ b/Realm/ObjectServerTests/SwiftFlexibleSyncServerTests.swift @@ -424,6 +424,33 @@ class SwiftFlexibleSyncTests: SwiftSyncTestCase { XCTAssertEqual(subscriptions.count, 0) } + func testRemoveAllUnnamedSubscriptions() throws { + let realm = try openFlexibleSyncRealm() + let subscriptions = realm.subscriptions + subscriptions.update { + subscriptions.append( + QuerySubscription(name: "alex") { + $0.firstName == "Alex" + }, + QuerySubscription { + $0.firstName == "Belle" + }, + QuerySubscription { + $0.firstName == "Charles" + }) + subscriptions.append(QuerySubscription(name: "zero") { + $0.intCol > 0 + }) + } + XCTAssertEqual(subscriptions.count, 4) + + subscriptions.update { + subscriptions.removeAll(unnamedOnly: true) + } + + XCTAssertEqual(subscriptions.count, 2) + } + func testSubscriptionSetIterate() throws { let realm = try openFlexibleSyncRealm() let subscriptions = realm.subscriptions @@ -1046,6 +1073,20 @@ extension SwiftFlexibleSyncServerTests { try realm.write { block(realm) } + waitForUploads(for: realm) + } + + @MainActor + func populateSwiftPerson() async throws { + try await populateFlexibleSyncData { realm in + realm.deleteAll() // Remove all objects for a clean state + for i in 1...10 { + let person = SwiftPerson(firstName: "\(#function)", + lastName: "lastname_\(i)", + age: i) + realm.add(person) + } + } } @MainActor @@ -1311,6 +1352,370 @@ extension SwiftFlexibleSyncServerTests { XCTAssertEqual(realm.subscriptions.count, 1) } + // MARK: Subscribe + +#if swift(>=5.8) + @MainActor + func testSubscribe() async throws { + try await populateSwiftPerson() + + let realm = try openFlexibleSyncRealm() + let results0 = try await realm.objects(SwiftPerson.self).where { $0.age >= 6 }.subscribe() + XCTAssertEqual(results0.count, 5) + XCTAssertEqual(realm.subscriptions.count, 1) + let results1 = try await realm.objects(SwiftPerson.self).where { $0.lastName == "lastname_3" }.subscribe() + XCTAssertEqual(results1.count, 1) + XCTAssertEqual(results0.count, 5) + XCTAssertEqual(realm.subscriptions.count, 2) + let results2 = realm.objects(SwiftPerson.self) + XCTAssertEqual(results2.count, 6) + } + + @MainActor + func testSubscribeReassign() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + var results0 = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe() + XCTAssertEqual(results0.count, 3) + XCTAssertEqual(realm.subscriptions.count, 1) + results0 = try await results0.where { $0.age < 8 }.subscribe() // results0 local query is { $0.age >= 8 AND $0.age < 8 } + XCTAssertEqual(results0.count, 0) // no matches because local query is impossible + XCTAssertEqual(realm.subscriptions.count, 2) // two subscriptions: "$0.age >= 8 AND $0.age < 8" and "$0.age >= 8" + let results1 = realm.objects(SwiftPerson.self) + XCTAssertEqual(results1.count, 3) // three objects from "$0.age >= 8". None "$0.age >= 8 AND $0.age < 8". + } + + @MainActor + func testSubscribeSameQueryNoName() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + let results0 = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe() + let ex = XCTestExpectation(description: "no attempt to re-create subscription, returns immediately") + realm.syncSession!.suspend() + _ = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe() + _ = try await results0.subscribe() + XCTAssertEqual(realm.subscriptions.count, 1) + ex.fulfill() + await fulfillment(of: [ex], timeout: 2.0) + XCTAssertEqual(realm.subscriptions.count, 1) + } + + @MainActor + func testSubscribeSameQuerySameName() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + let results0 = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe(name: "8 or older") + realm.syncSession!.suspend() + let ex = XCTestExpectation(description: "no attempt to re-create subscription, returns immediately") + Task { + _ = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe(name: "8 or older") + _ = try await results0.subscribe(name: "8 or older") + XCTAssertEqual(realm.subscriptions.count, 1) + ex.fulfill() + } + await fulfillment(of: [ex], timeout: 5.0) + XCTAssertEqual(realm.subscriptions.count, 1) + } + + @MainActor + func testSubscribeSameQueryDifferentName() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + let results0 = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe() + _ = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe(name: "8 or older") + _ = try await results0.subscribe(name: "older than 7") + XCTAssertEqual(realm.subscriptions.count, 3) + let subscriptions = realm.subscriptions + XCTAssertNil(subscriptions[0]!.name) + XCTAssertEqual(subscriptions[1]!.name, "8 or older") + XCTAssertEqual(subscriptions[2]!.name, "older than 7") + } + + @MainActor + func testSubscribeDifferentQuerySameName() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + _ = try await realm.objects(SwiftPerson.self).where { $0.age > 8 }.subscribe(name: "group1") + _ = try await realm.objects(SwiftPerson.self).where { $0.age > 5 }.subscribe(name: "group1") + XCTAssertEqual(realm.subscriptions.count, 1) +XCTAssertNotNil(realm.subscriptions.first(ofType: SwiftPerson.self) { $0.age > 5 }) + } + + @MainActor + func testSubscribeOnRealmConfinedActor() async throws { + try await populateSwiftPerson() + + try await populateSwiftPerson() + + let user = try await logInUser(for: basicCredentials(app: self.flexibleSyncApp), app: self.flexibleSyncApp) + var config = user.flexibleSyncConfiguration() + config.objectTypes = [SwiftPerson.self] + let realm = try await Realm(configuration: config, actor: MainActor.shared) + let results1 = try await realm.objects(SwiftPerson.self).where { $0.age > 8 }.subscribe(waitForSync: .onCreation) + XCTAssertEqual(results1.count, 2) + let results2 = try await realm.objects(SwiftPerson.self).where { $0.age > 6 }.subscribe(waitForSync: .always) + XCTAssertEqual(results2.count, 4) + let results3 = try await realm.objects(SwiftPerson.self).where { $0.age > 4 }.subscribe(waitForSync: .never) + XCTAssertEqual(results3.count, 4) + XCTAssertEqual(realm.subscriptions.count, 3) + } + + @CustomGlobalActor + func testSubscribeOnRealmConfinedCustomActor() async throws { + try await populateSwiftPerson() + + let user = try await logInUser(for: basicCredentials(app: self.flexibleSyncApp), app: self.flexibleSyncApp) + var config = user.flexibleSyncConfiguration() + config.objectTypes = [SwiftPerson.self] + let realm = try await Realm(configuration: config, actor: CustomGlobalActor.shared) + let results1 = try await realm.objects(SwiftPerson.self).where { $0.age > 8 }.subscribe(waitForSync: .onCreation) + XCTAssertEqual(results1.count, 2) + let results2 = try await realm.objects(SwiftPerson.self).where { $0.age > 6 }.subscribe(waitForSync: .always) + XCTAssertEqual(results2.count, 4) + let results3 = try await realm.objects(SwiftPerson.self).where { $0.age > 4 }.subscribe(waitForSync: .never) + XCTAssertEqual(results3.count, 4) + XCTAssertEqual(realm.subscriptions.count, 3) + } + + @MainActor + func testUnsubscribe() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + let results1 = try await realm.objects(SwiftPerson.self).where { $0.lastName == "lastname_3" }.subscribe() + XCTAssertEqual(realm.subscriptions.count, 1) + results1.unsubscribe() + XCTAssertEqual(realm.subscriptions.count, 0) + } + + @MainActor + func testUnsubscribeAfterReassign() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + var results0 = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe() + XCTAssertEqual(results0.count, 3) + XCTAssertEqual(realm.subscriptions.count, 1) + results0 = try await results0.where { $0.age < 8 }.subscribe() // subscribes to "age >= 8 && age < 8" because that's the local query + XCTAssertEqual(results0.count, 0) + XCTAssertEqual(realm.subscriptions.count, 2) // Two subs present:1) "age >= 8" 2) "age >= 8 && age < 8" + let results1 = realm.objects(SwiftPerson.self) + XCTAssertEqual(results1.count, 3) + results0.unsubscribe() // unsubscribes from "age >= 8 && age < 8" + XCTAssertEqual(realm.subscriptions.count, 1) + XCTAssertNotNil(realm.subscriptions.first(ofType: SwiftPerson.self) { $0.age >= 8 }) + XCTAssertEqual(results0.count, 0) // local query is still "age >= 8 && age < 8". + XCTAssertEqual(results1.count, 3) + } + + @MainActor + func testUnsubscribeWithoutSubscriptionExistingNamed() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + _ = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe(name: "sub1") + XCTAssertEqual(realm.subscriptions.count, 1) + let results = realm.objects(SwiftPerson.self).where { $0.age >= 8 } + results.unsubscribe() + XCTAssertEqual(realm.subscriptions.count, 1) + XCTAssertEqual(realm.subscriptions.first!.name, "sub1") + } + + func testUnsubscribeNoExistingMatch() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + XCTAssertEqual(realm.subscriptions.count, 0) + _ = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe(name: "age_older_8") + let results0 = realm.objects(SwiftPerson.self).where { $0.age >= 8 } + XCTAssertEqual(realm.subscriptions.count, 1) + XCTAssertEqual(results0.count, 3) + results0.unsubscribe() + XCTAssertEqual(realm.subscriptions.count, 1) + XCTAssertEqual(results0.count, 3) // Results are not modified because there is no subscription associated to the unsubscribed result + } + + @MainActor + func testUnsubscribeNamed() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + _ = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe() + _ = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe(name: "first_named") + let results = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe(name: "second_named") + XCTAssertEqual(realm.subscriptions.count, 3) + + results.unsubscribe() + XCTAssertEqual(realm.subscriptions.count, 2) + XCTAssertEqual(realm.subscriptions[0]!.name, nil) + XCTAssertEqual(realm.subscriptions[1]!.name, "first_named") + results.unsubscribe() // check again for case when subscription doesn't exist + XCTAssertEqual(realm.subscriptions.count, 2) + XCTAssertEqual(realm.subscriptions[0]!.name, nil) + XCTAssertEqual(realm.subscriptions[1]!.name, "first_named") + } + + @MainActor + func testUnsubscribeReassign() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + _ = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe(name: "first_named") + var results = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe(name: "second_named") + // expect `results` associated subscription to be reassigned to the id which matches the unnamed subscription + results = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe() + XCTAssertEqual(realm.subscriptions.count, 3) + + results.unsubscribe() + // so the two named subscriptions remain. + XCTAssertEqual(realm.subscriptions.count, 2) + XCTAssertEqual(realm.subscriptions[0]!.name, "first_named") + XCTAssertEqual(realm.subscriptions[1]!.name, "second_named") + } + + @MainActor + func testUnsubscribeSameQueryDifferentName() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + _ = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe() + let results2 = realm.objects(SwiftPerson.self).where { $0.age >= 8 } + XCTAssertEqual(realm.subscriptions.count, 1) + results2.unsubscribe() + XCTAssertEqual(realm.subscriptions.count, 0) + } + + @MainActor + func testSubscribeNameAcrossTypes() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + let results = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe(name: "sameName") + XCTAssertEqual(realm.subscriptions.count, 1) + XCTAssertEqual(results.count, 3) + _ = try await realm.objects(SwiftTypesSyncObject.self).subscribe(name: "sameName") + XCTAssertEqual(realm.subscriptions.count, 1) + XCTAssertEqual(results.count, 0) + } + + @MainActor + func testSubscribeOnCreation() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + var results = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe(waitForSync: .onCreation) + XCTAssertEqual(results.count, 3) + let expectation = XCTestExpectation(description: "method doesn't hang") + realm.syncSession!.suspend() + Task { + results = try! await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe(waitForSync: .onCreation) + XCTAssertEqual(results.count, 3) // expect method to return immediately, and not hang while no connection + XCTAssertEqual(realm.subscriptions.count, 1) + expectation.fulfill() + } + await fulfillment(of: [expectation], timeout: 2.0) + } + + @MainActor + func testSubscribeAlways() async throws { + let collection = try await setupCollection("SwiftPerson") + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + var results = try await realm.objects(SwiftPerson.self).where { $0.age >= 9 }.subscribe(waitForSync: .always) + XCTAssertEqual(results.count, 2) + + // suspend session on client. Add a document that isn't on the client. + realm.syncSession!.suspend() + let serverObject: Document = [ + "_id": .objectId(ObjectId.generate()), + "firstName": .string("Paul"), + "lastName": .string("M"), + "age": .int32(30) + ] + collection.insertOne(serverObject).await(self, timeout: 10.0) + + let start = Date() + while collection.count(filter: [:]).await(self) != 11 && start.timeIntervalSinceNow > -10.0 { + sleep(1) // wait until server sync + } + + // Resume the client session. + realm.syncSession!.resume() + XCTAssertEqual(results.count, 2) + results = try await realm.objects(SwiftPerson.self).where { $0.age >= 9 }.subscribe(waitForSync: .always) + // Expect this subscribe call to wait for sync downloads, even though the subscription already existed + XCTAssertEqual(results.count, 3) // Count is 3 because it includes the object/document that was created while offline. + XCTAssertEqual(realm.subscriptions.count, 1) + } + + @MainActor + func testSubscribeNever() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + let expectation = XCTestExpectation(description: "test doesn't hang") + Task { + let results = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe(waitForSync: .never) + XCTAssertEqual(results.count, 0) // expect no objects to be able to sync because of immediate return + XCTAssertEqual(realm.subscriptions.count, 1) + expectation.fulfill() + } + await fulfillment(of: [expectation], timeout: 1) + } + + @MainActor + func testSubscribeTimeout() async throws { + try await populateSwiftPerson() + let realm = try openFlexibleSyncRealm() + + realm.syncSession!.suspend() + let expectation = XCTestExpectation(description: "doesn't wait longer than expected") + Task { + let timeout = 2.0 + do { + _ = try await realm.objects(SwiftPerson.self).where { $0.age >= 8 }.subscribe(waitForSync: .always, timeout: timeout) + } catch let error as NSError { + expectation.fulfill() + XCTAssertEqual(error.code, Int(ETIMEDOUT)) + XCTAssertEqual(error.domain, NSPOSIXErrorDomain) + XCTAssertEqual(error.localizedDescription, "Waiting for update timed out after \(timeout) seconds.") + } + } + await fulfillment(of: [expectation], timeout: 3.0) + + // resume sync session and wait for subscription otherwise tear + // down can't complete successfully + realm.syncSession!.resume() + let start = Date() + while realm.subscriptions.state != .complete && start.timeIntervalSinceNow > -10.0 { + sleep(1) + } + XCTAssertEqual(realm.subscriptions.state, .complete) + } + + func testSubscribeTimeoutSucceeds() async throws { + try await populateSwiftPerson() + + let realm = try openFlexibleSyncRealm() + let results0 = try await realm.objects(SwiftPerson.self).where { $0.age >= 6 }.subscribe(timeout: 2.0) + XCTAssertEqual(results0.count, 5) + XCTAssertEqual(realm.subscriptions.count, 1) + let results1 = try await realm.objects(SwiftPerson.self).where { $0.lastName == "lastname_3" }.subscribe(timeout: 2.0) + XCTAssertEqual(results1.count, 1) + XCTAssertEqual(results0.count, 5) + XCTAssertEqual(realm.subscriptions.count, 2) + + let results2 = realm.objects(SwiftPerson.self) + XCTAssertEqual(results2.count, 6) + } +#endif + // MARK: - Custom Column @MainActor @@ -1567,5 +1972,11 @@ extension SwiftFlexibleSyncServerTests { } #endif } + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +@globalActor actor CustomGlobalActor: GlobalActor { + static var shared = CustomGlobalActor() +} + #endif // canImport(Combine) #endif // os(macOS) diff --git a/Realm/RLMAsyncTask.mm b/Realm/RLMAsyncTask.mm index 6c7d0f85f0..f32d341271 100644 --- a/Realm/RLMAsyncTask.mm +++ b/Realm/RLMAsyncTask.mm @@ -22,7 +22,7 @@ #import "RLMRealm_Private.hpp" #import "RLMRealmConfiguration_Private.hpp" #import "RLMScheduler.h" -#import "RLMSyncSubscription_Private.h" +#import "RLMSyncSubscription_Private.hpp" #import "RLMUtil.hpp" #import @@ -245,6 +245,7 @@ - (void)downloadCompleted { if (subscriptions.state == RLMSyncSubscriptionStatePending) { // FIXME: need cancellation for waiting for the subscription return [subscriptions waitForSynchronizationOnQueue:nil + timeout:0 completionBlock:^(NSError *error) { if (error) { std::lock_guard lock(_mutex); @@ -474,3 +475,84 @@ - (void)wait:(void (^)())completion { completion(); } @end + +@implementation RLMAsyncSubscriptionTask { + RLMUnfairMutex _mutex; + + RLMSyncSubscriptionSet *_subscriptionSet; + dispatch_queue_t _queue; + NSTimeInterval _timeout; + void (^_completion)(NSError *); + + dispatch_block_t _worker; +} + +- (instancetype)initWithSubscriptionSet:(RLMSyncSubscriptionSet *)subscriptionSet + queue:(nullable dispatch_queue_t)queue + timeout:(NSTimeInterval)timeout + completion:(void(^)(NSError *))completion { + if (!(self = [super init])) { + return self; + } + + _subscriptionSet = subscriptionSet; + _queue = queue; + _timeout = timeout; + _completion = completion; + + return self; +} + +- (void)waitForSubscription { + std::lock_guard lock(_mutex); + + if (_timeout != 0) { + // Setup timer + dispatch_time_t time = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(_timeout * NSEC_PER_SEC)); + // If the call below doesn't return after `time` seconds, the internal completion is called with an error. + _worker = dispatch_block_create(DISPATCH_BLOCK_ASSIGN_CURRENT, ^{ + NSString* errorMessage = [NSString stringWithFormat:@"Waiting for update timed out after %.01f seconds.", _timeout]; + NSError* error = [NSError errorWithDomain:NSPOSIXErrorDomain code:ETIMEDOUT userInfo:@{NSLocalizedDescriptionKey: errorMessage}]; + [self invokeCompletionWithError:error]; + }); + + dispatch_after(time, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), _worker); + } + + [self waitForSync]; +} + +-(void)waitForSync { + if (_completion) { + _subscriptionSet->_subscriptionSet->get_state_change_notification(realm::sync::SubscriptionSet::State::Complete) + .get_async([self](realm::StatusWith state) noexcept { + NSError *error = makeError(state); + [self invokeCompletionWithError:error]; + }); + } +} + +-(void)invokeCompletionWithError:(NSError * _Nullable)error { + void (^completion)(NSError *); + { + std::lock_guard lock(_mutex); + std::swap(completion, _completion); + } + + if (_worker) { + dispatch_block_cancel(_worker); + _worker = nil; + } + + if (completion) { + if (_queue) { + dispatch_async(_queue, ^{ + completion(error); + }); + return; + } + + completion(error); + } +} +@end diff --git a/Realm/RLMAsyncTask_Private.h b/Realm/RLMAsyncTask_Private.h index 3cce89441f..56d1e19163 100644 --- a/Realm/RLMAsyncTask_Private.h +++ b/Realm/RLMAsyncTask_Private.h @@ -68,4 +68,16 @@ RLM_SWIFT_SENDABLE + (RLMAsyncRefreshTask *)completedRefresh; @end +// A cancellable task for refreshing a Realm +RLM_SWIFT_SENDABLE +@interface RLMAsyncSubscriptionTask : NSObject + +- (instancetype)initWithSubscriptionSet:(RLMSyncSubscriptionSet *)subscriptionSet + queue:(nullable dispatch_queue_t)queue + timeout:(NSTimeInterval)timeout + completion:(void(^)(NSError *))completion; + +- (void)waitForSubscription; +@end + RLM_HEADER_AUDIT_END(nullability) diff --git a/Realm/RLMRealm.h b/Realm/RLMRealm.h index 0935e53015..6c56c39d8e 100644 --- a/Realm/RLMRealm.h +++ b/Realm/RLMRealm.h @@ -865,8 +865,6 @@ NS_REFINED_FOR_SWIFT; Represents the active subscriptions for this realm, which can be used to add/remove/update and search flexible sync subscriptions. Getting the subscriptions from a local or partition-based configured realm will thrown an exception. - - @warning This feature is currently in beta and its API is subject to change. */ @property (nonatomic, readonly, nonnull) RLMSyncSubscriptionSet *subscriptions; diff --git a/Realm/RLMResults.h b/Realm/RLMResults.h index 5911e666a2..3e3608c657 100644 --- a/Realm/RLMResults.h +++ b/Realm/RLMResults.h @@ -20,6 +20,25 @@ RLM_HEADER_AUDIT_BEGIN(nullability, sendability) +/// A block type used for APIs which asynchronously return a `Results`. +typedef void(^RLMResultsCompletionBlock)(RLMResults * _Nullable, NSError * _Nullable); + +/** + Determines wait for download behavior when subscribing on RLMResults. + @see ``[RLMResults subscribeWithName:waitForSync:onQueue:completion:]`` +*/ +typedef NS_ENUM(NSUInteger, RLMWaitForSyncMode) { + /// `subscribeWithName`'s callback will be invoked once matching objects are downloaded + /// from the server only when the subscription is created the first time. If the + /// subscription already exists, the callback is invoked without waiting for new downloads. + RLMWaitForSyncModeOnCreation, + /// `subscribeWithName`'s callback will wait for downloads before being invoked. + /// The callback can't be invoked in this mode unless an internet connection is established or a timeout is set. + RLMWaitForSyncModeAlways, + /// `subscribeWithName`'s callback is always invoked without waiting for downloads. + RLMWaitForSyncModeNever +} NS_SWIFT_NAME(WaitForSyncMode); + @class RLMObject; /** @@ -408,6 +427,164 @@ __attribute__((warn_unused_result)); NSError *_Nullable error))block keyPaths:(nullable NSArray *)keyPaths __attribute__((warn_unused_result)); + +#pragma mark - Flexible Sync + +/** + Creates a RLMSyncSubscription matching the RLMResults's local filter. + + After committing the subscription to the realm's local subscription set, the method + will wait for downloads according to ``RLMWaitForSyncMode`` behavior. + + ### Unnamed subscriptions ### + If `subscribeWithCompletion:` is called without a name whose query matches an unnamed subscription, another subscription is not created. + + If `subscribeWithCompletion:` is called without a name whose query matches a named subscription, an additional unnamed subscription is created. + ### Named Subscriptions ### + If `subscribeWithCompletion:` is called with a name whose query matches an unnamed subscription, an additional named subscription is created. + ### Existing name and query ### + If `subscribeWithCompletion:` is called with a name whose name is taken on a different query, the old subscription is updated with the new query. + + @note This method opens an update block transaction that creates or updates a subscription. + It's advised to *not* loop over this method in order to create multiple subscriptions at once. + This could create a performance bottleneck by opening multiple unnecessary write transactions. + @see: `[RLMSyncSubscription update:queue:onComplete:]` in order to create multiple subscriptions. + + @param queue The queue where the completion dispatches. + @param completion The completion block called after the subscription completes. The callback + will wait for downloads according to the value in `waitForSyncMode`. + @see ``RLMWaitForSyncMode`` + @warning This API is currently in `Preview` and may be subject to changes in the future. + */ +- (void)subscribeWithCompletionOnQueue:(dispatch_queue_t _Nullable)queue + completion:(RLMResultsCompletionBlock)completion; + +/** + Creates a RLMSyncSubscription matching the RLMResults's local filter. + + After committing the subscription to the realm's local subscription set, the method + will wait for downloads according to ``RLMWaitForSyncMode`` behavior. + + ### Unnamed subscriptions ### + If `subscribeWithCompletion:` is called without a name whose query matches an unnamed subscription, another subscription is not created. + + If `subscribeWithCompletion:` is called without a name whose query matches a named subscription, an additional unnamed subscription is created. + ### Named Subscriptions ### + If `subscribeWithCompletion:` is called with a name whose query matches an unnamed subscription, an additional named subscription is created. + ### Existing name and query ### + If `subscribeWithCompletion:` is called with a name whose name is taken on a different query, the old subscription is updated with the new query. + + @note This method opens an update block transaction that creates or updates a subscription. + It's advised to *not* loop over this method in order to create multiple subscriptions at once. + This could create a performance bottleneck by opening multiple unnecessary write transactions. + @see: `[RLMSyncSubscription update:queue:onComplete:]` in order to create multiple subscriptions. + + @param name The name used to identify the subscription. + @param queue The queue where the completion dispatches. + @param completion The completion block called after the subscription completes. The callback + will wait for downloads according to the value in `waitForSyncMode`. + @see ``RLMWaitForSyncMode`` + @warning This API is currently in `Preview` and may be subject to changes in the future. + */ +- (void)subscribeWithName:(NSString *_Nullable)name + onQueue:(dispatch_queue_t _Nullable)queue + completion:(RLMResultsCompletionBlock)completion; + +/** + Creates a RLMSyncSubscription matching the RLMResults's local filter. + + After committing the subscription to the realm's local subscription set, the method + will wait for downloads according to the ``RLMWaitForSyncMode``. + + ### Unnamed subscriptions ### + If `subscribeWithCompletion:` is called without a name whose query matches an unnamed subscription, another subscription is not created. + + If `subscribeWithCompletion:` is called without a name whose query matches a named subscription, an additional unnamed subscription is created. + ### Named Subscriptions ### + If `subscribeWithCompletion:` is called with a name whose query matches an unnamed subscription, an additional named subscription is created. + ### Existing name and query ### + If `subscribeWithCompletion:` is called with a name whose name is taken on a different query, the old subscription is updated with the new query. + + @note This method opens an update block transaction that creates or updates a subscription. + It's advised to *not* loop over this method in order to create multiple subscriptions at once. + This could create a performance bottleneck by opening multiple unnecessary write transactions. + @see: `[RLMSyncSubscription update:queue:onComplete:]` in order to create multiple subscriptions. + + @param name The name used to identify the subscription. + @param waitForSyncMode Dictates when the completion handler is called + @param queue The queue where the completion dispatches. + @param completion The completion block called after the subscription completes. The callback + will wait for downloads according to the value in `waitForSyncMode`. + @see ``RLMWaitForSyncMode`` + @warning This API is currently in `Preview` and may be subject to changes in the future. + */ +- (void)subscribeWithName:(NSString *_Nullable)name + waitForSync:(RLMWaitForSyncMode)waitForSyncMode + onQueue:(dispatch_queue_t _Nullable)queue + completion:(RLMResultsCompletionBlock)completion +__attribute__((swift_attr("@_unsafeInheritExecutor"))); + +/** + Creates a RLMSyncSubscription matching the RLMResults's local filter. + + After committing the subscription to the realm's local subscription set, the method + will wait for downloads according to the ``RLMWaitForSyncMode``. + + ### Unnamed subscriptions ### + If `subscribeWithCompletion:` is called without a name whose query matches an unnamed subscription, another subscription is not created. + + If `subscribeWithCompletion:` is called without a name whose query matches a named subscription, an additional unnamed subscription is created. + ### Named Subscriptions ### + If `subscribeWithCompletion:` is called with a name whose query matches an unnamed subscription, an additional named subscription is created. + ### Existing name and query ### + If `subscribeWithCompletion:` is called with a name whose name is taken on a different query, the old subscription is updated with the new query. + + @note This method opens an update block transaction that creates or updates a subscription. + It's advised to *not* loop over this method in order to create multiple subscriptions at once. + This could create a performance bottleneck by opening multiple unnecessary write transactions. + @see: `[RLMSyncSubscription update:queue:onComplete:]` in order to create multiple subscriptions. + + @param name The name used to identify the subscription. + @param waitForSyncMode Dictates when the completion handler is called + @param queue The queue where the completion dispatches. + @param timeout A timeout which ends waiting for downloads + via the completion handler. If the timeout is exceeded the completion + handler returns an error. + @param completion The completion block called after the subscription completes. The callback + will wait for downloads according to the value in `waitForSyncMode`. + @see ``RLMWaitForSyncMode`` + @warning This API is currently in `Preview` and may be subject to changes in the future. + */ +- (void)subscribeWithName:(NSString *_Nullable)name + waitForSync:(RLMWaitForSyncMode)waitForSyncMode + onQueue:(dispatch_queue_t _Nullable)queue + timeout:(NSTimeInterval)timeout + completion:(RLMResultsCompletionBlock)completion +__attribute__((swift_attr("@_unsafeInheritExecutor"))); + +/** + Removes a RLMSubscription matching the RLMResults'slocal filter. + + The method returns after committing the subscription removal to the + realm's local subscription set. Calling this method will not wait for objects to + be removed from the realm. + + Calling unsubscribe on a RLMResults does not remove the local filter from the RLMResults. + After calling unsubscribe, RLMResults may still contain objects because + other subscriptions may exist in the RLMRealm's subscription set. + + @note In order for a named subscription to be removed, the RLMResults + must have previously created the subscription. + The `RLMResults` returned in the completion block when calling `subscribe` can be used to unsubscribe from the same subscription. + + @note This method opens an update block transaction that creates or updates a subscription. + It's advised to *not* loop over this method in order to create multiple subscriptions at once. + This could create a performance bottleneck by opening multiple unnecessary write transactions. + @see: ``[RLMSyncSubscription update:queue:onComplete:]`` in order to create multiple subscriptions. + @warning This API is currently in `Preview` and may be subject to changes in the future. + */ +- (void)unsubscribe; + #pragma mark - Aggregating Property Values /** diff --git a/Realm/RLMResults.mm b/Realm/RLMResults.mm index 7df0958c7b..0be42e6d03 100644 --- a/Realm/RLMResults.mm +++ b/Realm/RLMResults.mm @@ -28,8 +28,10 @@ #import "RLMProperty_Private.h" #import "RLMQueryUtil.hpp" #import "RLMRealmConfiguration_Private.hpp" +#import "RLMScheduler.h" #import "RLMSchema_Private.h" #import "RLMSectionedResults_Private.hpp" +#import "RLMSyncSubscription_Private.hpp" #import "RLMThreadSafeReference_Private.hpp" #import "RLMUtil.hpp" @@ -53,6 +55,11 @@ - (bool)invalidate { @interface RLMResults () @end +// private properties +@interface RLMResults () +@property (nonatomic, nullable) RLMObjectId *associatedSubscriptionId; +@end + // // RLMResults implementation // @@ -563,6 +570,139 @@ - (RLMNotificationToken *)addNotificationBlock:(void (^)(RLMResults *, RLMCollec return _results.add_notification_callback(RLMWrapCollectionChangeCallback(block, self, true), std::move(keyPaths)); } +- (void)completionWithThreadSafeReference:(RLMThreadSafeReference * _Nullable)reference + confinedTo:(RLMScheduler *)confinement + completion:(RLMResultsCompletionBlock)completion + error:(NSError *_Nullable)error { + auto tsr = (error != nil) ? nil : reference; + RLMRealmConfiguration *configuration = _realm.configuration; + [confinement invoke:^{ + if (tsr) { + NSError *err; + RLMRealm *realm = [RLMRealm realmWithConfiguration:configuration error:&err]; + RLMResults *collection = [realm resolveThreadSafeReference:tsr]; + collection.associatedSubscriptionId = self.associatedSubscriptionId; + completion(collection, err); + } else { + completion(nil, error); + } + }]; +} + +// Returns true if the calling method should call immediately the completion block, this can happen if the subscription +// was already created in case of `onCreation` or we have selected `never` as sync mode (which doesn't require the subscription to complete to return) +- (bool)shouldNotWaitForSubscriptionToComplete:(RLMWaitForSyncMode)waitForSyncMode + name:(NSString *)name { + RLMSyncSubscriptionSet *subscriptions = self.realm.subscriptions; + switch(waitForSyncMode) { + case RLMWaitForSyncModeOnCreation: + // If an existing named subscription matches the provided name and local query, return. + if (name) { + RLMSyncSubscription *sub = [subscriptions subscriptionWithName:name query:_results.get_query()]; + if (sub != nil) { + return true; + } + } else { + // otherwise check if an unnamed subscription already exists. Return if it does exist. + RLMSyncSubscription *sub = [subscriptions subscriptionWithQuery:_results.get_query()]; + if (sub != nil && sub.name == nil) { + return true; + } + } + // If no name was provided and no existing unnamed subscription matches. + // break and create new subscription later. + break; + case RLMWaitForSyncModeAlways: + // never returns early + break; + case RLMWaitForSyncModeNever: + // commit subscription synchronously and return. + [subscriptions update:^{ + self.associatedSubscriptionId = [subscriptions addSubscriptionWithClassName:self.objectClassName + subscriptionName:name + query:_results.get_query() + updateExisting:true]; + }]; + return true; + } + return false; +} + +- (void)subscribeWithName:(NSString *_Nullable)name + waitForSync:(RLMWaitForSyncMode)waitForSyncMode + confinedTo:(RLMScheduler *)confinement + timeout:(NSTimeInterval)timeout + completion:(RLMResultsCompletionBlock)completion { + RLMThreadSafeReference *reference = [RLMThreadSafeReference referenceWithThreadConfined:self]; + if ([self shouldNotWaitForSubscriptionToComplete:waitForSyncMode name:name]) { + [self completionWithThreadSafeReference:reference confinedTo:confinement completion:completion error:nil]; + } else { + RLMThreadSafeReference *reference = [RLMThreadSafeReference referenceWithThreadConfined:self]; + RLMSyncSubscriptionSet *subscriptions = _realm.subscriptions; + [subscriptions update:^{ + // associated subscription id is nil when no name is provided. + self.associatedSubscriptionId = [subscriptions addSubscriptionWithClassName:self.objectClassName + subscriptionName:name + query:_results.get_query() + updateExisting:true]; + } queue:nil timeout:timeout onComplete:^(NSError *error) { + [self completionWithThreadSafeReference:reference confinedTo:confinement completion:completion error:error]; + }]; + } +} + +- (void)subscribeWithCompletionOnQueue:(dispatch_queue_t _Nullable)queue + completion:(RLMResultsCompletionBlock)completion { + return [self subscribeWithName:nil onQueue:queue completion:completion]; +}; + +- (void)subscribeWithName:(NSString *_Nullable)name + onQueue:(dispatch_queue_t _Nullable)queue + completion:(RLMResultsCompletionBlock)completion { + return [self subscribeWithName:name waitForSync:RLMWaitForSyncModeOnCreation onQueue:queue completion:completion]; +} + +- (void)subscribeWithName:(NSString *_Nullable)name + waitForSync:(RLMWaitForSyncMode)waitForSyncMode + onQueue:(dispatch_queue_t _Nullable)queue + completion:(RLMResultsCompletionBlock)completion { + [self subscribeWithName:name + waitForSync:waitForSyncMode + onQueue:queue + timeout:0 + completion:completion]; +} + +- (void)subscribeWithName:(NSString *_Nullable)name + waitForSync:(RLMWaitForSyncMode)waitForSyncMode + onQueue:(dispatch_queue_t _Nullable)queue + timeout:(NSTimeInterval)timeout + completion:(RLMResultsCompletionBlock)completion { + [self subscribeWithName:name + waitForSync:waitForSyncMode + confinedTo:[RLMScheduler dispatchQueue:queue] + timeout:timeout + completion:completion]; +} + +- (void)unsubscribe { + RLMSyncSubscriptionSet *subscriptions = self.realm.subscriptions; + + if (self.associatedSubscriptionId) { + [subscriptions update:^{ + [subscriptions removeSubscriptionWithId:self.associatedSubscriptionId]; + }]; + } else { + RLMSyncSubscription *sub = [subscriptions subscriptionWithQuery:_results.get_query()]; + if (sub.name == nil) { + [subscriptions update:^{ + [subscriptions removeSubscriptionWithClassName:self.objectClassName + query:_results.get_query()]; + }]; + } + } +} + - (BOOL)isAttached { return !!_realm; } diff --git a/Realm/RLMResults_Private.h b/Realm/RLMResults_Private.h index c7e29bebb2..224a39908d 100644 --- a/Realm/RLMResults_Private.h +++ b/Realm/RLMResults_Private.h @@ -18,6 +18,8 @@ #import +#import "RLMRealm_Private.h" + @class RLMObjectSchema; RLM_HEADER_AUDIT_BEGIN(nullability) @@ -28,6 +30,12 @@ RLM_HEADER_AUDIT_BEGIN(nullability) + (instancetype)emptyDetachedResults; - (RLMResults *)snapshot; +- (void)subscribeWithName:(NSString *_Nullable)name + waitForSync:(RLMWaitForSyncMode)waitForSyncMode + confinedTo:(RLMScheduler *)confinement + timeout:(NSTimeInterval)timeout + completion:(RLMResultsCompletionBlock)completion; + @end RLM_HEADER_AUDIT_END(nullability) diff --git a/Realm/RLMSyncSubscription.h b/Realm/RLMSyncSubscription.h index d35be19d0b..314e9392c7 100644 --- a/Realm/RLMSyncSubscription.h +++ b/Realm/RLMSyncSubscription.h @@ -117,13 +117,11 @@ RLM_HEADER_AUDIT_BEGIN(nullability, sendability) - (void)write:(__attribute__((noescape)) void(^)(void))block __attribute__((unavailable("Renamed to -update"))); /** - Synchronously performs any transactions (add/remove/update) to the subscription set within the block, - this will not wait for the server to acknowledge and see all the data associated with this collection of subscriptions, - and will return after committing the subscription transactions. + Synchronously performs any transactions (add/remove/update) to the subscription set within the block. The `onComplete` block is executed after waiting for associated data to be downloaded from the server. @param block The block containing actions to perform to the subscription set. @param onComplete A block which is called upon synchronization of - subscriptions to the server. The block will be passed `nil` + data from the server. The block will be passed `nil` if the update succeeded, and an error describing the problem otherwise. */ @@ -135,6 +133,22 @@ RLM_HEADER_AUDIT_BEGIN(nullability, sendability) - (void)write:(__attribute__((noescape)) void(^)(void))block onComplete:(void(^)(NSError * _Nullable))onComplete __attribute__((unavailable("Renamed to -update:onComplete."))); +/** + Synchronously performs any transactions (add/remove/update) to the subscription set within the block. The `onComplete` block is executed after waiting for associated data to be downloaded from the server. + + @param block The block containing actions to perform to the subscription set. + @param queue The serial queue to deliver notifications to. + @param onComplete A block which is called upon synchronization of + data from the server. The block will be passed `nil` + if the update succeeded, and an error describing the problem + otherwise. + */ + - (void)update:(__attribute__((noescape)) void(^)(void))block + queue:(nullable dispatch_queue_t)queue + onComplete:(void(^)(NSError *))onComplete +__attribute__((swift_attr("@_unsafeInheritExecutor"))); + + #pragma mark - Find subscription /** @@ -296,6 +310,15 @@ RLM_HEADER_AUDIT_BEGIN(nullability, sendability) */ - (void)removeAllSubscriptions; +/** + Removes all subscriptions without a name from the subscription set. + + @warning This method may only be called during a write subscription block. + @warning Removing all subscriptions will result in an error if no new subscription is added. Server should + acknowledge at least one subscription. + */ + - (void)removeAllUnnamedSubscriptions; + /** Removes all subscription with the specified class name. diff --git a/Realm/RLMSyncSubscription.mm b/Realm/RLMSyncSubscription.mm index a31c26bf01..c38891cadd 100644 --- a/Realm/RLMSyncSubscription.mm +++ b/Realm/RLMSyncSubscription.mm @@ -18,10 +18,12 @@ #import "RLMSyncSubscription_Private.hpp" +#import "RLMAsyncTask_Private.h" #import "RLMError_Private.hpp" #import "RLMObjectId_Private.hpp" #import "RLMQueryUtil.hpp" #import "RLMRealm_Private.hpp" +#import "RLMScheduler.h" #import "RLMUtil.hpp" #import @@ -112,7 +114,6 @@ - (void)updateSubscriptionWithPredicate:(NSPredicate *)predicate { #pragma mark - SubscriptionSet @interface RLMSyncSubscriptionSet () { - std::unique_ptr _subscriptionSet; std::unique_ptr _mutableSubscriptionSet; NSHashTable *_enumerators; } @@ -237,10 +238,23 @@ - (RLMSyncSubscriptionState)state { #pragma mark - Batch Update subscriptions - (void)update:(__attribute__((noescape)) void(^)(void))block { - return [self update:block onComplete:nil]; + [self update:block onComplete:nil]; } - (void)update:(__attribute__((noescape)) void(^)(void))block onComplete:(void(^)(NSError *))completionBlock { + [self update:block queue:nil onComplete:completionBlock]; +} + +- (void)update:(__attribute__((noescape)) void(^)(void))block + queue:(nullable dispatch_queue_t)queue + onComplete:(void(^)(NSError *))completionBlock { + [self update:block queue:queue timeout:0 onComplete:completionBlock]; +} + +- (void)update:(__attribute__((noescape)) void(^)(void))block + queue:(nullable dispatch_queue_t)queue + timeout:(NSTimeInterval)timeout + onComplete:(void(^)(NSError *))completionBlock { if (_mutableSubscriptionSet) { @throw RLMException(@"Cannot initiate a write transaction on subscription set that is already being updated."); } @@ -266,21 +280,20 @@ - (void)update:(__attribute__((noescape)) void(^)(void))block onComplete:(void(^ } if (completionBlock) { - [self waitForSynchronizationOnQueue:nil completionBlock:completionBlock]; + [self waitForSynchronizationOnQueue:queue + timeout:timeout + completionBlock:completionBlock]; } } - (void)waitForSynchronizationOnQueue:(nullable dispatch_queue_t)queue + timeout:(NSTimeInterval)timeout completionBlock:(void(^)(NSError *))completionBlock { - _subscriptionSet->get_state_change_notification(realm::sync::SubscriptionSet::State::Complete) - .get_async([completionBlock, queue](realm::StatusWith state) noexcept { - if (queue) { - return dispatch_async(queue, ^{ - completionBlock(makeError(state)); - }); - } - return completionBlock(makeError(state)); - }); + RLMAsyncSubscriptionTask *syncSubscriptionTask = [[RLMAsyncSubscriptionTask alloc] initWithSubscriptionSet:self + queue:queue + timeout:timeout + completion:completionBlock]; + [syncSubscriptionTask waitForSubscription]; } #pragma mark - Find subscription @@ -315,6 +328,10 @@ - (nullable RLMSyncSubscription *)subscriptionWithClassName:(NSString *)objectCl predicate:(NSPredicate *)predicate { RLMClassInfo& info = _realm->_info[objectClassName]; auto query = RLMPredicateToQuery(predicate, info.rlmObjectSchema, _realm.schema, _realm.group); + return [self subscriptionWithQuery:query]; +} + +- (nullable RLMSyncSubscription *)subscriptionWithQuery:(realm::Query)query { auto subscription = _subscriptionSet->find(query); if (subscription) { return [[RLMSyncSubscription alloc] initWithSubscription:*subscription @@ -323,6 +340,17 @@ - (nullable RLMSyncSubscription *)subscriptionWithClassName:(NSString *)objectCl return nil; } +- (nullable RLMSyncSubscription *)subscriptionWithName:(NSString *)name + query:(realm::Query)query { + auto subscription = _subscriptionSet->find([name UTF8String]); + if (subscription && subscription->query_string == query.get_description()) { + return [[RLMSyncSubscription alloc] initWithSubscription:*subscription + subscriptionSet:self]; + } else { + return nil; + } +} + #pragma mark - Add a Subscription @@ -387,20 +415,34 @@ - (void)addSubscriptionWithClassName:(NSString *)objectClassName predicate:(NSPredicate *)predicate updateExisting:(BOOL)updateExisting { [self verifyInWriteTransaction]; - + RLMClassInfo& info = _realm->_info[objectClassName]; auto query = RLMPredicateToQuery(predicate, info.rlmObjectSchema, _realm.schema, _realm.group); - + + [self addSubscriptionWithClassName:objectClassName + subscriptionName:name + query:query + updateExisting:updateExisting]; +} + +- (RLMObjectId *)addSubscriptionWithClassName:(NSString *)objectClassName + subscriptionName:(nullable NSString *)name + query:(realm::Query)query + updateExisting:(BOOL)updateExisting { + [self verifyInWriteTransaction]; + if (name) { if (updateExisting || !_mutableSubscriptionSet->find(name.UTF8String)) { - _mutableSubscriptionSet->insert_or_assign(name.UTF8String, query); + auto it = _mutableSubscriptionSet->insert_or_assign(name.UTF8String, query); + return [[RLMObjectId alloc] initWithValue:it.first->id]; } else { @throw RLMException(@"A subscription named '%@' already exists. If you meant to update the existing subscription please use the `update` method.", name); } } else { - _mutableSubscriptionSet->insert_or_assign(query); + auto it = _mutableSubscriptionSet->insert_or_assign(query); + return [[RLMObjectId alloc] initWithValue:it.first->id]; } } @@ -434,10 +476,15 @@ - (void)removeSubscriptionWithClassName:(NSString *)objectClassName - (void)removeSubscriptionWithClassName:(NSString *)objectClassName predicate:(NSPredicate *)predicate { - [self verifyInWriteTransaction]; - RLMClassInfo& info = _realm->_info[objectClassName]; auto query = RLMPredicateToQuery(predicate, info.rlmObjectSchema, _realm.schema, _realm.group); + [self removeSubscriptionWithClassName:objectClassName query:query]; +} + +- (void)removeSubscriptionWithClassName:(NSString *)objectClassName + query:(realm::Query)query { + [self verifyInWriteTransaction]; + auto subscription = _subscriptionSet->find(query); if (subscription) { _mutableSubscriptionSet->erase(query); @@ -445,10 +492,14 @@ - (void)removeSubscriptionWithClassName:(NSString *)objectClassName } - (void)removeSubscription:(RLMSyncSubscription *)subscription { + [self removeSubscriptionWithId:subscription.identifier]; +} + +- (void)removeSubscriptionWithId:(RLMObjectId *)objectId { [self verifyInWriteTransaction]; for (auto it = _mutableSubscriptionSet->begin(); it != _mutableSubscriptionSet->end();) { - if (it->id == subscription.identifier.value) { + if (it->id == objectId.value) { it = _mutableSubscriptionSet->erase(it); return; } @@ -463,6 +514,18 @@ - (void)removeAllSubscriptions { _mutableSubscriptionSet->clear(); } +- (void)removeAllUnnamedSubscriptions { + [self verifyInWriteTransaction]; + + for (auto it = _mutableSubscriptionSet->begin(); it != _mutableSubscriptionSet->end();) { + if (!it->name) { + it = _mutableSubscriptionSet->erase(it); + } else { + it++; + } + } +} + - (void)removeAllSubscriptionsWithClassName:(NSString *)className { [self verifyInWriteTransaction]; diff --git a/Realm/RLMSyncSubscription_Private.h b/Realm/RLMSyncSubscription_Private.h index dc438a20e4..1aa26a5976 100644 --- a/Realm/RLMSyncSubscription_Private.h +++ b/Realm/RLMSyncSubscription_Private.h @@ -56,7 +56,13 @@ RLM_HEADER_AUDIT_BEGIN(nullability, sendability) predicate:(NSPredicate *)predicate updateExisting:(BOOL)updateExisting; +- (void)update:(__attribute__((noescape)) void(^)(void))block + queue:(nullable dispatch_queue_t)queue + timeout:(NSTimeInterval)timeout + onComplete:(void(^)(NSError *))completionBlock; + - (void)waitForSynchronizationOnQueue:(nullable dispatch_queue_t)queue + timeout:(NSTimeInterval)timeout completionBlock:(void(^)(NSError *))completionBlock; - (RLMSyncSubscriptionEnumerator *)fastEnumerator; diff --git a/Realm/RLMSyncSubscription_Private.hpp b/Realm/RLMSyncSubscription_Private.hpp index 3d9a98f5d5..fee2eddc1f 100644 --- a/Realm/RLMSyncSubscription_Private.hpp +++ b/Realm/RLMSyncSubscription_Private.hpp @@ -18,23 +18,49 @@ #import "RLMSyncSubscription_Private.h" +#import + namespace realm::sync { class Subscription; class SubscriptionSet; } +namespace realm { +class Query; +} RLM_HEADER_AUDIT_BEGIN(nullability, sendability) @interface RLMSyncSubscription () - - (instancetype)initWithSubscription:(realm::sync::Subscription)subscription subscriptionSet:(RLMSyncSubscriptionSet *)subscriptionSet; - @end -@interface RLMSyncSubscriptionSet () +@interface RLMSyncSubscriptionSet () { +@public + std::unique_ptr _subscriptionSet; +} - (instancetype)initWithSubscriptionSet:(realm::sync::SubscriptionSet)subscriptionSet realm:(RLMRealm *)realm; +- (void)update:(__attribute__((noescape)) void(^)(void))block + queue:(nullable dispatch_queue_t)queue + timeout:(NSTimeInterval)timeout + onComplete:(void(^)(NSError *))completionBlock; + +- (RLMObjectId *)addSubscriptionWithClassName:(NSString *)objectClassName + subscriptionName:(nullable NSString *)name + query:(realm::Query)query + updateExisting:(BOOL)updateExisting; + +- (nullable RLMSyncSubscription *)subscriptionWithQuery:(realm::Query)query; + +// Return subscription that matches name *and* query +- (nullable RLMSyncSubscription *)subscriptionWithName:(NSString *)name + query:(realm::Query)query; + +- (void)removeSubscriptionWithClassName:(NSString *)objectClassName + query:(realm::Query)query; + +- (void)removeSubscriptionWithId:(RLMObjectId *)objectId; @end RLM_HEADER_AUDIT_END(nullability, sendability) diff --git a/RealmSwift/Realm.swift b/RealmSwift/Realm.swift index 1391faf279..86e3819472 100644 --- a/RealmSwift/Realm.swift +++ b/RealmSwift/Realm.swift @@ -388,7 +388,7 @@ public typealias AsyncTransactionId = RLMAsyncTransactionId /** Asynchronously performs actions contained within the given block inside a write transaction. The write transaction is begun asynchronously as if calling `beginAsyncWrite`, - and by default the transaction is commited asynchronously after the block completes. + and by default the transaction is committed asynchronously after the block completes. You can also explicitly call `commitWrite` or `cancelWrite` from within the block to synchronously commit or cancel the write transaction. Returning without one of these calls is equivalent to calling `commitWrite`. @@ -1101,7 +1101,6 @@ extension Realm { - returns: A `SyncSubscriptionSet`. - Warning: This feature is currently in beta and its API is subject to change. */ - @available(*, message: "This feature is currently in beta.") public var subscriptions: SyncSubscriptionSet { return SyncSubscriptionSet(rlmRealm.subscriptions) } diff --git a/RealmSwift/Results.swift b/RealmSwift/Results.swift index 5ad198fa99..e0f419d2ee 100644 --- a/RealmSwift/Results.swift +++ b/RealmSwift/Results.swift @@ -17,7 +17,7 @@ //////////////////////////////////////////////////////////////////////////// import Foundation -import Realm +import Realm.Private // MARK: MinMaxType @@ -147,6 +147,86 @@ extension Projection: KeypathSortable {} return RLMIterator(collection: collection) } + // MARK: Flexible Sync + +#if swift(>=5.8) + /** + Creates a SyncSubscription matching the Results' local query. + After committing the subscription to the realm's local subscription set, the method + will wait for downloads according to `WaitForSyncMode`. + + ### Unnamed subscriptions ### + If `.subscribe()` is called without a name whose query matches an unnamed subscription, another subscription is not created. + + If `.subscribe()` is called without a name whose query matches a named subscription, an additional unnamed subscription is created. + ### Named Subscriptions ### + If `.subscribe()` is called with a name whose query matches an unnamed subscription, an additional named subscription is created. + ### Existing name and query ### + If `.subscribe()` is called with a name whose name is taken on a different query, the old subscription is updated with the new query. + + If `.subscribe()` is called with a name that's in already in use by an identical query, no new subscription is created. + + + - Note: This method will wait for all data to be downloaded before returning when `WaitForSyncMode.always` and `.onCreation` (when the subscription is first created) is used. This requires an internet connection if no timeout is set. + + - Note: This method opens a update transaction that creates or updates a subscription. + It's advised to *not* loop over this method in order to create multiple subscriptions. + This could create a performance bottleneck by opening multiple unnecessary update transactions. + To create multiple subscriptions at once use `SyncSubscription.update`. + + - parameter name: The name applied to the subscription + - parameter waitForSync: ``WaitForSyncMode`` Determines the download behavior for the subscription. Defaults to `.onCreation`. + - parameter timeout: An optional client timeout. The client will cancel waiting for subscription downloads after this time has elapsed. Reaching this timeout doesn't imply a server error. + - returns: Returns `self`. + + - warning: This function is only supported for main thread and + actor-isolated Realms. + - warning: This API is currently in `Preview` and may be subject to changes in the future. + */ + @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) + @_unsafeInheritExecutor + public func subscribe(name: String? = nil, waitForSync: WaitForSyncMode = .onCreation, timeout: TimeInterval? = nil) async throws -> Results { + guard let actor = realm?.rlmRealm.actor as? Actor else { + fatalError("`subscribe` can only be called on main thread or actor-isolated Realms") + } + + var rlmResults = ObjectiveCSupport.convert(object: self) + let scheduler = await RLMScheduler.actor(actor, invoke: actor.invoke, verify: actor.verifier()) + rlmResults = try await rlmResults.subscribe(withName: name, waitForSync: waitForSync, confinedTo: scheduler, timeout: timeout ?? 0) + return self + } +#endif + /** + Removes a SyncSubscription matching the Results' local filter. + + The method returns after committing the subscription removal to the realm's + local subscription set. Calling this method will not wait for objects to + be removed from the realm. + + In order for a named subscription to be removed, the Results + must have previously created the subscription. For example: + ``` + let results1 = try await realm.objects(Dog.self).where { $0.age > 1 }.subscribe(name: "adults") + let results2 = try await realm.objects(Dog.self).where { $0.age > 1 }.subscribe(name: "overOne") + let results3 = try await realm.objects(Dog.self).where { $0.age > 1 }.subscribe() + // This will unsubscribe from the subscription named "overOne". The "adults" and unnamed + // subscription still remain. + results2.unsubscribe() + ``` + + - Note: This method opens an update transaction that removes a subscription. + It is advised to *not* use this method to batch multiple subscription changes + to the server. + To unsubscribe multiple subscriptions at once use `SyncSubscription.update`. + + - warning: Calling unsubscribe on a Results does not remove the local filter from the `Results`. After calling unsubscribe, + Results may still contain objects because other subscriptions may exist in the realm's subscription set. + - warning: This API is currently in `Preview` and may be subject to changes in the future. + */ + public func unsubscribe() { + let rlmResults = ObjectiveCSupport.convert(object: self) + rlmResults.unsubscribe() + } } extension Results: Encodable where Element: Encodable {} diff --git a/RealmSwift/SyncSubscription.swift b/RealmSwift/SyncSubscription.swift index 1edff2ba50..4904a6e223 100644 --- a/RealmSwift/SyncSubscription.swift +++ b/RealmSwift/SyncSubscription.swift @@ -218,7 +218,7 @@ import Realm.Private an `Error`describing what went wrong will be returned by the block */ public func update(_ block: (() -> Void), onComplete: (@Sendable (Error?) -> Void)? = nil) { - rlmSyncSubscriptionSet.update(block, onComplete: onComplete ?? { _ in }) + rlmSyncSubscriptionSet.update(block, onComplete: onComplete) } /// :nodoc: @@ -371,12 +371,17 @@ import Realm.Private /** Removes all subscriptions from the subscription set. + - parameter unnamedOnly: If true, only unnamed subscriptions are removed. - warning: This method may only be called during a write subscription block. - warning: Removing all subscriptions will result in an error if no new subscription is added. Server should acknowledge at least one subscription. */ - public func removeAll() { - rlmSyncSubscriptionSet.removeAllSubscriptions() + public func removeAll(unnamedOnly: Bool = false) { + if unnamedOnly { + rlmSyncSubscriptionSet.removeAllUnnamedSubscriptions() + } else { + rlmSyncSubscriptionSet.removeAllSubscriptions() + } } /**