Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Secondary Index Partial Content #224

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,14 @@ extension DiskPersistence.Datastore.Index {
/// - Parameters:
/// - proposedEntry: The entry to use in comparison with other persisted entries.
/// - pages: A collection of pages to check against.
/// - requiresCompleteEntries: Set to `true` if the comparator requires a complete entry to operate with.
/// - pageBuilder: A closure that provides a cached Page object for the loaded page.
/// - comparator: A comparator to determine order and equality between the proposed entry and a persisted one.
/// - Returns: The index within the pages collection where the entry would reside.
func pageIndex<T>(
for proposedEntry: T,
in pages: [DatastoreIndexManifest.PageInfo],
requiresCompleteEntries: Bool,
pageBuilder: @Sendable (_ pageID: DatastorePageIdentifier) async -> DiskPersistence.Datastore.Page,
comparator: @Sendable (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder
) async throws -> Int? {
Expand Down Expand Up @@ -348,11 +350,13 @@ extension DiskPersistence.Datastore.Index {

/// If we have some bytes, attempt to decode them into an entry.
if let bytesForFirstEntry {
firstEntryOfPage = try? DatastorePageEntry(bytes: bytesForFirstEntry, isPartial: false)
firstEntryOfPage = try? DatastorePageEntry(bytes: bytesForFirstEntry, isPartial: true)
}

/// If we have an entry, stop scanning as we can go ahead and operate on it.
if firstEntryOfPage != nil { break pageIterator }
/// If we have an entry, stop scanning as we can go ahead and operate on it. Also make sure that we have a complete entry if one is required by rejecting partial entries when the flag is set.
if let firstEntryOfPage, !(requiresCompleteEntries && firstEntryOfPage.isPartial) {
break pageIterator
}
}
}

Expand Down Expand Up @@ -388,6 +392,7 @@ extension DiskPersistence.Datastore.Index {

func entry<T>(
for proposedEntry: T,
requiresCompleteEntries: Bool,
comparator: @Sendable (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder
) async throws -> (
cursor: DiskPersistence.InstanceCursor,
Expand All @@ -396,6 +401,7 @@ extension DiskPersistence.Datastore.Index {
try await entry(
for: proposedEntry,
in: try await manifest.orderedPages,
requiresCompleteEntries: requiresCompleteEntries,
pageBuilder: { await datastore.page(for: .init(index: self.id, page: $0)) },
comparator: comparator
)
Expand All @@ -404,6 +410,7 @@ extension DiskPersistence.Datastore.Index {
func entry<T>(
for proposedEntry: T,
in pages: [DatastoreIndexManifest.PageInfo],
requiresCompleteEntries: Bool,
pageBuilder: @Sendable (_ pageID: DatastorePageIdentifier) async -> DiskPersistence.Datastore.Page,
comparator: @Sendable (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder
) async throws -> (
Expand All @@ -415,6 +422,7 @@ extension DiskPersistence.Datastore.Index {
let startingPageIndex = try await pageIndex(
for: proposedEntry,
in: pages,
requiresCompleteEntries: requiresCompleteEntries,
pageBuilder: pageBuilder,
comparator: comparator
)
Expand Down Expand Up @@ -549,11 +557,13 @@ extension DiskPersistence.Datastore.Index {

func insertionCursor<T>(
for proposedEntry: T,
requiresCompleteEntries: Bool,
comparator: @Sendable (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder
) async throws -> DiskPersistence.InsertionCursor {
try await insertionCursor(
for: proposedEntry,
in: try await manifest.orderedPages,
requiresCompleteEntries: requiresCompleteEntries,
pageBuilder: { await datastore.page(for: .init(index: self.id, page: $0)) },
comparator: comparator
)
Expand All @@ -562,6 +572,7 @@ extension DiskPersistence.Datastore.Index {
func insertionCursor<T>(
for proposedEntry: T,
in pages: [DatastoreIndexManifest.PageInfo],
requiresCompleteEntries: Bool,
pageBuilder: @Sendable (_ pageID: DatastorePageIdentifier) async -> DiskPersistence.Datastore.Page,
comparator: @Sendable (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder
) async throws -> DiskPersistence.InsertionCursor {
Expand All @@ -570,6 +581,7 @@ extension DiskPersistence.Datastore.Index {
let startingPageIndex = try await pageIndex(
for: proposedEntry,
in: pages,
requiresCompleteEntries: requiresCompleteEntries,
pageBuilder: pageBuilder,
comparator: comparator
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import Bytes
struct DatastorePageEntry: Hashable {
var headers: [Bytes]
var content: Bytes

/// Whether the entry contains a complete header, but a partial content.
var isPartial: Bool = false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,11 @@ extension DiskPersistence.Transaction {

let index = try await rootObject.primaryIndex

let (cursor, entry) = try await index.entry(for: identifier, comparator: primaryIndexComparator)
let (cursor, entry) = try await index.entry(
for: identifier,
requiresCompleteEntries: false,
comparator: primaryIndexComparator
)
guard entry.headers.count == 2
else { throw DiskPersistenceError.invalidEntryFormat }

Expand All @@ -551,7 +555,11 @@ extension DiskPersistence.Transaction {

let index = try await rootObject.primaryIndex

return try await index.insertionCursor(for: identifier, comparator: primaryIndexComparator)
return try await index.insertionCursor(
for: identifier,
requiresCompleteEntries: false,
comparator: primaryIndexComparator
)
}

func directIndexCursor<IndexType: Indexable, IdentifierType: Indexable>(
Expand All @@ -572,7 +580,11 @@ extension DiskPersistence.Transaction {
guard let index = try await rootObject.directIndexes[indexName]
else { throw DatastoreInterfaceError.indexNotFound }

let (cursor, entry) = try await index.entry(for: (indexValue, identifier), comparator: directIndexComparator)
let (cursor, entry) = try await index.entry(
for: (indexValue, identifier),
requiresCompleteEntries: false,
comparator: directIndexComparator
)
guard entry.headers.count == 3
else { throw DiskPersistenceError.invalidEntryFormat }

Expand All @@ -597,7 +609,11 @@ extension DiskPersistence.Transaction {
guard let index = try await rootObject.directIndexes[indexName]
else { throw DatastoreInterfaceError.indexNotFound }

return try await index.insertionCursor(for: (indexValue, identifier), comparator: directIndexComparator)
return try await index.insertionCursor(
for: (indexValue, identifier),
requiresCompleteEntries: false,
comparator: directIndexComparator
)
}

func secondaryIndexCursor<IndexType: Indexable, IdentifierType: Indexable>(
Expand All @@ -614,7 +630,11 @@ extension DiskPersistence.Transaction {
guard let index = try await rootObject.secondaryIndexes[indexName]
else { throw DatastoreInterfaceError.indexNotFound }

let (cursor, _) = try await index.entry(for: (indexValue, identifier), comparator: secondaryIndexComparator)
let (cursor, _) = try await index.entry(
for: (indexValue, identifier),
requiresCompleteEntries: true,
comparator: secondaryIndexComparator
)

return cursor
}
Expand All @@ -633,7 +653,11 @@ extension DiskPersistence.Transaction {
guard let index = try await rootObject.secondaryIndexes[indexName]
else { throw DatastoreInterfaceError.indexNotFound }

return try await index.insertionCursor(for: (indexValue, identifier), comparator: secondaryIndexComparator)
return try await index.insertionCursor(
for: (indexValue, identifier),
requiresCompleteEntries: true,
comparator: secondaryIndexComparator
)
}
}

Expand Down Expand Up @@ -698,6 +722,7 @@ extension DiskPersistence.Transaction {
} else {
try await index.insertionCursor(
for: (range.lowerBoundExpression, .ascending),
requiresCompleteEntries: false,
comparator: primaryIndexBoundComparator
)
}
Expand All @@ -723,6 +748,7 @@ extension DiskPersistence.Transaction {
} else {
try await index.insertionCursor(
for: (range.upperBoundExpression, .descending),
requiresCompleteEntries: false,
comparator: primaryIndexBoundComparator
)
}
Expand Down Expand Up @@ -766,6 +792,7 @@ extension DiskPersistence.Transaction {
} else {
try await index.insertionCursor(
for: (range.lowerBoundExpression, .ascending),
requiresCompleteEntries: false,
comparator: directIndexBoundComparator
)
}
Expand All @@ -791,6 +818,7 @@ extension DiskPersistence.Transaction {
} else {
try await index.insertionCursor(
for: (range.upperBoundExpression, .descending),
requiresCompleteEntries: false,
comparator: directIndexBoundComparator
)
}
Expand Down Expand Up @@ -834,6 +862,7 @@ extension DiskPersistence.Transaction {
} else {
try await index.insertionCursor(
for: (range.lowerBoundExpression, .ascending),
requiresCompleteEntries: false,
comparator: secondaryIndexBoundComparator
)
}
Expand All @@ -857,6 +886,7 @@ extension DiskPersistence.Transaction {
} else {
try await index.insertionCursor(
for: (range.upperBoundExpression, .descending),
requiresCompleteEntries: false,
comparator: secondaryIndexBoundComparator
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import XCTest
@testable import CodableDatastore

fileprivate struct SortError: Error, Equatable {}

final class DiskPersistenceDatastoreIndexTests: XCTestCase {
var temporaryStoreURL: URL = FileManager.default.temporaryDirectory

Expand All @@ -26,7 +28,10 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase {
func assertPageSearch(
proposedEntry: UInt8,
pages: [[DatastorePageEntryBlock]],
requiredContentLength: Int? = nil,
requiresCompleteEntries: Bool = false,
expectedIndex: Int?,
expectedSearchFailure: Bool = false,
file: StaticString = #filePath,
line: UInt = #line
) async throws {
Expand Down Expand Up @@ -67,13 +72,25 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase {
pageInfos.append(.existing(pageID))
}

let result = try await index.pageIndex(for: proposedEntry, in: pageInfos) { [pageLookup] pageID in
pageLookup[pageID]!
} comparator: { lhs, rhs in
lhs.sortOrder(comparedTo: rhs.headers[0][0])
do {
let result = try await index.pageIndex(
for: proposedEntry,
in: pageInfos,
requiresCompleteEntries: requiresCompleteEntries
) { [pageLookup] pageID in
pageLookup[pageID]!
} comparator: { lhs, rhs in
if let requiredContentLength, rhs.content.count != requiredContentLength {
throw SortError()
}
return lhs.sortOrder(comparedTo: rhs.headers[0][0])
}

XCTAssertEqual(result, expectedIndex, file: file, line: line)
XCTAssertFalse(expectedSearchFailure, file: file, line: line)
} catch is SortError {
XCTAssertTrue(expectedSearchFailure, "Encountered unexpected error", file: file, line: line)
}

XCTAssertEqual(result, expectedIndex, file: file, line: line)
}

func assertInsertionCursor(
Expand Down Expand Up @@ -121,7 +138,7 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase {
pageInfos.append(.existing(pageID))
}

let result = try await index.insertionCursor(for: RangeBoundExpression.including(proposedEntry), in: pageInfos) { [pageLookup] pageID in
let result = try await index.insertionCursor(for: RangeBoundExpression.including(proposedEntry), in: pageInfos, requiresCompleteEntries: false) { [pageLookup] pageID in
pageLookup[pageID]!
} comparator: { lhs, rhs in
lhs.sortOrder(comparedTo: rhs.headers[0][0], order: .ascending)
Expand Down Expand Up @@ -208,6 +225,29 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase {
try await assertPageSearch(proposedEntry: 4, pages: pages, expectedIndex: 1)
}

func testSplitContentBlockSearch() async throws {
let entry1 = DatastorePageEntry(headers: [[1]], content: Array(repeating: 1, count: 100)).blocks(remainingPageSpace: 20, maxPageSpace: 1024)
let entry3 = DatastorePageEntry(headers: [[3]], content: Array(repeating: 3, count: 100)).blocks(remainingPageSpace: 20, maxPageSpace: 1024)

let pages = [
[entry1[0]],
[entry1[1], entry3[0]],
[entry3[1]]
]

try await assertPageSearch(proposedEntry: 0, pages: pages, requiredContentLength: 100, requiresCompleteEntries: false, expectedIndex: 0, expectedSearchFailure: true)
try await assertPageSearch(proposedEntry: 1, pages: pages, requiredContentLength: 100, requiresCompleteEntries: false, expectedIndex: 0, expectedSearchFailure: true)
try await assertPageSearch(proposedEntry: 2, pages: pages, requiredContentLength: 100, requiresCompleteEntries: false, expectedIndex: 0, expectedSearchFailure: true)
try await assertPageSearch(proposedEntry: 3, pages: pages, requiredContentLength: 100, requiresCompleteEntries: false, expectedIndex: 1, expectedSearchFailure: true)
try await assertPageSearch(proposedEntry: 4, pages: pages, requiredContentLength: 100, requiresCompleteEntries: false, expectedIndex: 1, expectedSearchFailure: true)

try await assertPageSearch(proposedEntry: 0, pages: pages, requiredContentLength: 100, requiresCompleteEntries: true, expectedIndex: 0, expectedSearchFailure: false)
try await assertPageSearch(proposedEntry: 1, pages: pages, requiredContentLength: 100, requiresCompleteEntries: true, expectedIndex: 0, expectedSearchFailure: false)
try await assertPageSearch(proposedEntry: 2, pages: pages, requiredContentLength: 100, requiresCompleteEntries: true, expectedIndex: 0, expectedSearchFailure: false)
try await assertPageSearch(proposedEntry: 3, pages: pages, requiredContentLength: 100, requiresCompleteEntries: true, expectedIndex: 1, expectedSearchFailure: false)
try await assertPageSearch(proposedEntry: 4, pages: pages, requiredContentLength: 100, requiresCompleteEntries: true, expectedIndex: 1, expectedSearchFailure: false)
}

func testTwoPageBackwardsBleedingBlockSearch() async throws {
let entry1 = DatastorePageEntry(headers: [[1]], content: [1]).blocks(remainingPageSpace: 1024, maxPageSpace: 1024)
let entry3 = DatastorePageEntry(headers: [[3]], content: [3]).blocks(remainingPageSpace: 7, maxPageSpace: 1024)
Expand Down Expand Up @@ -381,7 +421,7 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase {
let exp = expectation(description: "Finished")
Task { [pageInfos, pageLookup] in
for _ in 0..<1000 {
_ = try await index.pageIndex(for: UInt64.random(in: 0..<1000000), in: pageInfos) { pageID in
_ = try await index.pageIndex(for: UInt64.random(in: 0..<1000000), in: pageInfos, requiresCompleteEntries: false) { pageID in
pageLookup[pageID]!
} comparator: { lhs, rhs in
lhs.sortOrder(comparedTo: try UInt64(bigEndianBytes: rhs.headers[0]))
Expand Down
Loading