diff --git a/Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/DatastoreIndex.swift b/Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/DatastoreIndex.swift index a46ef07..cf81e20 100644 --- a/Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/DatastoreIndex.swift +++ b/Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/DatastoreIndex.swift @@ -289,12 +289,14 @@ extension DiskPersistence.Datastore.Index { /// - Parameters: /// - proposedEntry: The entry to use in comparison with other persisted entries. /// - pages: A collection of pages to check against. + /// - requiresCompleteEntries: Set to `true` if the comparator requires a complete entry to operate with. /// - pageBuilder: A closure that provides a cached Page object for the loaded page. /// - comparator: A comparator to determine order and equality between the proposed entry and a persisted one. /// - Returns: The index within the pages collection where the entry would reside. func pageIndex( for proposedEntry: T, in pages: [DatastoreIndexManifest.PageInfo], + requiresCompleteEntries: Bool, pageBuilder: @Sendable (_ pageID: DatastorePageIdentifier) async -> DiskPersistence.Datastore.Page, comparator: @Sendable (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder ) async throws -> Int? { @@ -348,11 +350,13 @@ extension DiskPersistence.Datastore.Index { /// If we have some bytes, attempt to decode them into an entry. if let bytesForFirstEntry { - firstEntryOfPage = try? DatastorePageEntry(bytes: bytesForFirstEntry, isPartial: false) + firstEntryOfPage = try? DatastorePageEntry(bytes: bytesForFirstEntry, isPartial: true) } - /// If we have an entry, stop scanning as we can go ahead and operate on it. - if firstEntryOfPage != nil { break pageIterator } + /// If we have an entry, stop scanning as we can go ahead and operate on it. Also make sure that we have a complete entry if one is required by rejecting partial entries when the flag is set. + if let firstEntryOfPage, !(requiresCompleteEntries && firstEntryOfPage.isPartial) { + break pageIterator + } } } @@ -388,6 +392,7 @@ extension DiskPersistence.Datastore.Index { func entry( for proposedEntry: T, + requiresCompleteEntries: Bool, comparator: @Sendable (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder ) async throws -> ( cursor: DiskPersistence.InstanceCursor, @@ -396,6 +401,7 @@ extension DiskPersistence.Datastore.Index { try await entry( for: proposedEntry, in: try await manifest.orderedPages, + requiresCompleteEntries: requiresCompleteEntries, pageBuilder: { await datastore.page(for: .init(index: self.id, page: $0)) }, comparator: comparator ) @@ -404,6 +410,7 @@ extension DiskPersistence.Datastore.Index { func entry( for proposedEntry: T, in pages: [DatastoreIndexManifest.PageInfo], + requiresCompleteEntries: Bool, pageBuilder: @Sendable (_ pageID: DatastorePageIdentifier) async -> DiskPersistence.Datastore.Page, comparator: @Sendable (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder ) async throws -> ( @@ -415,6 +422,7 @@ extension DiskPersistence.Datastore.Index { let startingPageIndex = try await pageIndex( for: proposedEntry, in: pages, + requiresCompleteEntries: requiresCompleteEntries, pageBuilder: pageBuilder, comparator: comparator ) @@ -549,11 +557,13 @@ extension DiskPersistence.Datastore.Index { func insertionCursor( for proposedEntry: T, + requiresCompleteEntries: Bool, comparator: @Sendable (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder ) async throws -> DiskPersistence.InsertionCursor { try await insertionCursor( for: proposedEntry, in: try await manifest.orderedPages, + requiresCompleteEntries: requiresCompleteEntries, pageBuilder: { await datastore.page(for: .init(index: self.id, page: $0)) }, comparator: comparator ) @@ -562,6 +572,7 @@ extension DiskPersistence.Datastore.Index { func insertionCursor( for proposedEntry: T, in pages: [DatastoreIndexManifest.PageInfo], + requiresCompleteEntries: Bool, pageBuilder: @Sendable (_ pageID: DatastorePageIdentifier) async -> DiskPersistence.Datastore.Page, comparator: @Sendable (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder ) async throws -> DiskPersistence.InsertionCursor { @@ -570,6 +581,7 @@ extension DiskPersistence.Datastore.Index { let startingPageIndex = try await pageIndex( for: proposedEntry, in: pages, + requiresCompleteEntries: requiresCompleteEntries, pageBuilder: pageBuilder, comparator: comparator ) diff --git a/Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/DatastorePageEntry.swift b/Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/DatastorePageEntry.swift index b8514e1..15084de 100644 --- a/Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/DatastorePageEntry.swift +++ b/Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/DatastorePageEntry.swift @@ -13,6 +13,8 @@ import Bytes struct DatastorePageEntry: Hashable { var headers: [Bytes] var content: Bytes + + /// Whether the entry contains a complete header, but a partial content. var isPartial: Bool = false } diff --git a/Sources/CodableDatastore/Persistence/Disk Persistence/Transaction/Transaction.swift b/Sources/CodableDatastore/Persistence/Disk Persistence/Transaction/Transaction.swift index 0339028..13e6ed0 100644 --- a/Sources/CodableDatastore/Persistence/Disk Persistence/Transaction/Transaction.swift +++ b/Sources/CodableDatastore/Persistence/Disk Persistence/Transaction/Transaction.swift @@ -529,7 +529,11 @@ extension DiskPersistence.Transaction { let index = try await rootObject.primaryIndex - let (cursor, entry) = try await index.entry(for: identifier, comparator: primaryIndexComparator) + let (cursor, entry) = try await index.entry( + for: identifier, + requiresCompleteEntries: false, + comparator: primaryIndexComparator + ) guard entry.headers.count == 2 else { throw DiskPersistenceError.invalidEntryFormat } @@ -551,7 +555,11 @@ extension DiskPersistence.Transaction { let index = try await rootObject.primaryIndex - return try await index.insertionCursor(for: identifier, comparator: primaryIndexComparator) + return try await index.insertionCursor( + for: identifier, + requiresCompleteEntries: false, + comparator: primaryIndexComparator + ) } func directIndexCursor( @@ -572,7 +580,11 @@ extension DiskPersistence.Transaction { guard let index = try await rootObject.directIndexes[indexName] else { throw DatastoreInterfaceError.indexNotFound } - let (cursor, entry) = try await index.entry(for: (indexValue, identifier), comparator: directIndexComparator) + let (cursor, entry) = try await index.entry( + for: (indexValue, identifier), + requiresCompleteEntries: false, + comparator: directIndexComparator + ) guard entry.headers.count == 3 else { throw DiskPersistenceError.invalidEntryFormat } @@ -597,7 +609,11 @@ extension DiskPersistence.Transaction { guard let index = try await rootObject.directIndexes[indexName] else { throw DatastoreInterfaceError.indexNotFound } - return try await index.insertionCursor(for: (indexValue, identifier), comparator: directIndexComparator) + return try await index.insertionCursor( + for: (indexValue, identifier), + requiresCompleteEntries: false, + comparator: directIndexComparator + ) } func secondaryIndexCursor( @@ -614,7 +630,11 @@ extension DiskPersistence.Transaction { guard let index = try await rootObject.secondaryIndexes[indexName] else { throw DatastoreInterfaceError.indexNotFound } - let (cursor, _) = try await index.entry(for: (indexValue, identifier), comparator: secondaryIndexComparator) + let (cursor, _) = try await index.entry( + for: (indexValue, identifier), + requiresCompleteEntries: true, + comparator: secondaryIndexComparator + ) return cursor } @@ -633,7 +653,11 @@ extension DiskPersistence.Transaction { guard let index = try await rootObject.secondaryIndexes[indexName] else { throw DatastoreInterfaceError.indexNotFound } - return try await index.insertionCursor(for: (indexValue, identifier), comparator: secondaryIndexComparator) + return try await index.insertionCursor( + for: (indexValue, identifier), + requiresCompleteEntries: true, + comparator: secondaryIndexComparator + ) } } @@ -698,6 +722,7 @@ extension DiskPersistence.Transaction { } else { try await index.insertionCursor( for: (range.lowerBoundExpression, .ascending), + requiresCompleteEntries: false, comparator: primaryIndexBoundComparator ) } @@ -723,6 +748,7 @@ extension DiskPersistence.Transaction { } else { try await index.insertionCursor( for: (range.upperBoundExpression, .descending), + requiresCompleteEntries: false, comparator: primaryIndexBoundComparator ) } @@ -766,6 +792,7 @@ extension DiskPersistence.Transaction { } else { try await index.insertionCursor( for: (range.lowerBoundExpression, .ascending), + requiresCompleteEntries: false, comparator: directIndexBoundComparator ) } @@ -791,6 +818,7 @@ extension DiskPersistence.Transaction { } else { try await index.insertionCursor( for: (range.upperBoundExpression, .descending), + requiresCompleteEntries: false, comparator: directIndexBoundComparator ) } @@ -834,6 +862,7 @@ extension DiskPersistence.Transaction { } else { try await index.insertionCursor( for: (range.lowerBoundExpression, .ascending), + requiresCompleteEntries: false, comparator: secondaryIndexBoundComparator ) } @@ -857,6 +886,7 @@ extension DiskPersistence.Transaction { } else { try await index.insertionCursor( for: (range.upperBoundExpression, .descending), + requiresCompleteEntries: false, comparator: secondaryIndexBoundComparator ) } diff --git a/Tests/CodableDatastoreTests/DiskPersistenceDatastoreIndexTests.swift b/Tests/CodableDatastoreTests/DiskPersistenceDatastoreIndexTests.swift index 264e1cc..b685287 100644 --- a/Tests/CodableDatastoreTests/DiskPersistenceDatastoreIndexTests.swift +++ b/Tests/CodableDatastoreTests/DiskPersistenceDatastoreIndexTests.swift @@ -12,6 +12,8 @@ import XCTest @testable import CodableDatastore +fileprivate struct SortError: Error, Equatable {} + final class DiskPersistenceDatastoreIndexTests: XCTestCase { var temporaryStoreURL: URL = FileManager.default.temporaryDirectory @@ -26,7 +28,10 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase { func assertPageSearch( proposedEntry: UInt8, pages: [[DatastorePageEntryBlock]], + requiredContentLength: Int? = nil, + requiresCompleteEntries: Bool = false, expectedIndex: Int?, + expectedSearchFailure: Bool = false, file: StaticString = #filePath, line: UInt = #line ) async throws { @@ -67,13 +72,25 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase { pageInfos.append(.existing(pageID)) } - let result = try await index.pageIndex(for: proposedEntry, in: pageInfos) { [pageLookup] pageID in - pageLookup[pageID]! - } comparator: { lhs, rhs in - lhs.sortOrder(comparedTo: rhs.headers[0][0]) + do { + let result = try await index.pageIndex( + for: proposedEntry, + in: pageInfos, + requiresCompleteEntries: requiresCompleteEntries + ) { [pageLookup] pageID in + pageLookup[pageID]! + } comparator: { lhs, rhs in + if let requiredContentLength, rhs.content.count != requiredContentLength { + throw SortError() + } + return lhs.sortOrder(comparedTo: rhs.headers[0][0]) + } + + XCTAssertEqual(result, expectedIndex, file: file, line: line) + XCTAssertFalse(expectedSearchFailure, file: file, line: line) + } catch is SortError { + XCTAssertTrue(expectedSearchFailure, "Encountered unexpected error", file: file, line: line) } - - XCTAssertEqual(result, expectedIndex, file: file, line: line) } func assertInsertionCursor( @@ -121,7 +138,7 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase { pageInfos.append(.existing(pageID)) } - let result = try await index.insertionCursor(for: RangeBoundExpression.including(proposedEntry), in: pageInfos) { [pageLookup] pageID in + let result = try await index.insertionCursor(for: RangeBoundExpression.including(proposedEntry), in: pageInfos, requiresCompleteEntries: false) { [pageLookup] pageID in pageLookup[pageID]! } comparator: { lhs, rhs in lhs.sortOrder(comparedTo: rhs.headers[0][0], order: .ascending) @@ -208,6 +225,29 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase { try await assertPageSearch(proposedEntry: 4, pages: pages, expectedIndex: 1) } + func testSplitContentBlockSearch() async throws { + let entry1 = DatastorePageEntry(headers: [[1]], content: Array(repeating: 1, count: 100)).blocks(remainingPageSpace: 20, maxPageSpace: 1024) + let entry3 = DatastorePageEntry(headers: [[3]], content: Array(repeating: 3, count: 100)).blocks(remainingPageSpace: 20, maxPageSpace: 1024) + + let pages = [ + [entry1[0]], + [entry1[1], entry3[0]], + [entry3[1]] + ] + + try await assertPageSearch(proposedEntry: 0, pages: pages, requiredContentLength: 100, requiresCompleteEntries: false, expectedIndex: 0, expectedSearchFailure: true) + try await assertPageSearch(proposedEntry: 1, pages: pages, requiredContentLength: 100, requiresCompleteEntries: false, expectedIndex: 0, expectedSearchFailure: true) + try await assertPageSearch(proposedEntry: 2, pages: pages, requiredContentLength: 100, requiresCompleteEntries: false, expectedIndex: 0, expectedSearchFailure: true) + try await assertPageSearch(proposedEntry: 3, pages: pages, requiredContentLength: 100, requiresCompleteEntries: false, expectedIndex: 1, expectedSearchFailure: true) + try await assertPageSearch(proposedEntry: 4, pages: pages, requiredContentLength: 100, requiresCompleteEntries: false, expectedIndex: 1, expectedSearchFailure: true) + + try await assertPageSearch(proposedEntry: 0, pages: pages, requiredContentLength: 100, requiresCompleteEntries: true, expectedIndex: 0, expectedSearchFailure: false) + try await assertPageSearch(proposedEntry: 1, pages: pages, requiredContentLength: 100, requiresCompleteEntries: true, expectedIndex: 0, expectedSearchFailure: false) + try await assertPageSearch(proposedEntry: 2, pages: pages, requiredContentLength: 100, requiresCompleteEntries: true, expectedIndex: 0, expectedSearchFailure: false) + try await assertPageSearch(proposedEntry: 3, pages: pages, requiredContentLength: 100, requiresCompleteEntries: true, expectedIndex: 1, expectedSearchFailure: false) + try await assertPageSearch(proposedEntry: 4, pages: pages, requiredContentLength: 100, requiresCompleteEntries: true, expectedIndex: 1, expectedSearchFailure: false) + } + func testTwoPageBackwardsBleedingBlockSearch() async throws { let entry1 = DatastorePageEntry(headers: [[1]], content: [1]).blocks(remainingPageSpace: 1024, maxPageSpace: 1024) let entry3 = DatastorePageEntry(headers: [[3]], content: [3]).blocks(remainingPageSpace: 7, maxPageSpace: 1024) @@ -381,7 +421,7 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase { let exp = expectation(description: "Finished") Task { [pageInfos, pageLookup] in for _ in 0..<1000 { - _ = try await index.pageIndex(for: UInt64.random(in: 0..<1000000), in: pageInfos) { pageID in + _ = try await index.pageIndex(for: UInt64.random(in: 0..<1000000), in: pageInfos, requiresCompleteEntries: false) { pageID in pageLookup[pageID]! } comparator: { lhs, rhs in lhs.sortOrder(comparedTo: try UInt64(bigEndianBytes: rhs.headers[0]))