Skip to content

Commit

Permalink
Updates including conflict resolution working
Browse files Browse the repository at this point in the history
  • Loading branch information
marceljuenemann committed Mar 17, 2023
1 parent bd70294 commit 7306094
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 31 deletions.
76 changes: 55 additions & 21 deletions src/__tests__/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import process from "process"
import { SupabaseClient, createClient } from "@supabase/supabase-js"
import { afterEach, beforeAll, beforeEach, describe, expect, it } from "vitest"

import { createRxDatabase, RxCollection, RxDatabase, WithDeleted } from "rxdb";
import { createRxDatabase, RxCollection, RxConflictHandler, RxConflictHandlerInput, RxDatabase, WithDeleted } from "rxdb";
import { getRxStorageMemory } from "rxdb/plugins/storage-memory";
import { Human, HUMAN_SCHEMA } from "./test-types.js";
import { replicateSupabase, SupabaseReplicationCheckpoint, SupabaseReplicationOptions } from "../index.js";
Expand Down Expand Up @@ -37,6 +37,7 @@ describe("replicateSupabase with actual SupabaseClient", () => {

// Start with Alice :)
await replication({}, async() => {
// TODO: remove explicit null, should be set by pull anyways
await collection.insert({id: '1', name: 'Alice', age: null})
})

Expand Down Expand Up @@ -78,19 +79,15 @@ describe("replicateSupabase with actual SupabaseClient", () => {

describe("with custom conflict handler", () => {
it("invokes conflict handler", async () => {
collection.conflictHandler = (input, context) => {
return Promise.resolve({
isEqual: false,
documentData: {...input.newDocumentState, name: 'Conflict resolved'}
})
}
collection.conflictHandler = resolveConflictWithName('Conflict resolved')

await supabase.from('humans').insert({id: '2', name: 'Bob'})
await collection.insert({id: '2', name: 'Bob 2', age: 2})
await replication()

expect(await supabaseContents()).toEqual([
{id: '1', name: 'Alice', age: null, '_deleted': false},
{id: '2', name: 'Bob', age: null, '_deleted': false}
{id: '2', name: 'Conflict resolved', age: 2, '_deleted': false}
])
expect(await rxdbContents()).toEqual([
{id: '1', name: 'Alice', age: null},
Expand All @@ -100,32 +97,58 @@ describe("replicateSupabase with actual SupabaseClient", () => {
})
})

describe.only("on client-side update", () => {
describe("on client-side update", () => {
describe("without conflict", () => {
it("updates supabase", async () => {
await replication({}, async() => {
await collection.insert({id: '2', name: 'Bob', age: 1})
})
await replication({}, async() => {
let doc = await collection.findOne('1').exec()
await doc!.patch({age: 2})
await doc!.patch({age: 42})
})
expect(await rxdbContents()).toEqual([
{id: '1', name: 'Alice', age: null},
{id: '2', name: 'Bob', age: 2}
])
expect(await supabaseContents()).toEqual([
{id: '1', name: 'Alice', age: null, '_deleted': false},
{id: '2', name: 'Bob', age: 2, '_deleted': false}
{id: '1', name: 'Alice', age: 42, '_deleted': false}
])
})
})

describe("with conflict", () => {
beforeEach(async () => {
// Set Alice's age to 42 locally, while changing her name on the server.
let doc = await collection.findOne('1').exec()
await doc!.patch({age: 42})
await supabase.from('humans').update({name: 'Alex'}).eq('id', '1')
})

describe("with default conflict handler", () => {
it("applies supabase changes", async () => {
await replication()
expect(await rxdbContents()).toEqual([
{id: '1', name: 'Alex', age: null}
])
expect(await supabaseContents()).toEqual([
{id: '1', name: 'Alex', age: null, '_deleted': false}
])
})
})

describe("with custom conflict handler", () => {
it("invokes conflict handler", async () => {
collection.conflictHandler = resolveConflictWithName('Conflict resolved')
await replication()
expect(await rxdbContents()).toEqual([
{id: '1', name: 'Conflict resolved', age: 42}
])
expect(await supabaseContents()).toEqual([
{id: '1', name: 'Conflict resolved', age: 42, '_deleted': false}
])
})
})
})
})
})

describe("when supabase changed while offline", () => {
it("pulls new rows", async () => {
collection.conflictHandler = async (input, contet) => {
collection.conflictHandler = async (input, context) => {
console.error("Conflict handler invoked", input)
return { isEqual: false, documentData: input.realMasterState }
}
Expand All @@ -142,9 +165,11 @@ describe("replicateSupabase with actual SupabaseClient", () => {

let replication = async (options: Partial<SupabaseReplicationOptions<Human>> = {}, transactions: () => Promise<void> = async() => {}): Promise<void> => {
let replication = startReplication(options)
await replication.awaitInSync()
await replication.awaitInitialReplication()
await transactions()
await replication.awaitInSync()
replication.reSync() // TODO: should not be necessary with live replication
await replication.awaitInSync()
await replication.cancel()
}

Expand All @@ -164,6 +189,15 @@ describe("replicateSupabase with actual SupabaseClient", () => {
return status
}

let resolveConflictWithName = <T>(name: string): RxConflictHandler<T> => {
return async (input: RxConflictHandlerInput<T>) => {
return {
isEqual: false,
documentData: {...input.newDocumentState, name}
}
}
}

let supabaseContents = async (stripModified: boolean = true): Promise<WithDeleted<Human>[]> => {
const { data, error } = await supabase.from('humans').select().order('id')
if (error) throw error
Expand Down
15 changes: 14 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { SupabaseClient } from '@supabase/supabase-js'

import type { ReplicationOptions, ReplicationPullOptions, ReplicationPushOptions } from './rxdb-internal-types.js'
import { SupabaseReplication } from './supabase-replication.js'
import { RxReplicationWriteToMasterRow } from 'rxdb'

export type SupabaseReplicationOptions<RxDocType> = {
/**
Expand Down Expand Up @@ -50,7 +51,19 @@ export type SupabaseReplicationOptions<RxDocType> = {
* options, as no data will be pushed if the field is absent.
*/
// TODO: enable custom batch size (currently always one row at a time)
push?: Omit<ReplicationPushOptions<RxDocType>, 'handler' | 'batchSize'>
push?: Omit<ReplicationPushOptions<RxDocType>, 'handler' | 'batchSize'>,

/**
* Handler for pushing row updates to supabase. Must return true iff the UPDATE was
* applied to the supabase table. Returning false signalises a write conflict, in
* which case the current state of the row will be fetched from supabase and passed to
* the RxDB collection's conflict handler.
* @default the default handler will update the row only iff all fields match the
* local state (before the update was applied), otherwise the conflict handler is
* invoked. The default handler does not support JSON fields at the moment.
*/
// TODO: Support JSON fields
updateHandler?: (row: RxReplicationWriteToMasterRow<RxDocType>) => Promise<boolean>
} & Omit<
// We don't support waitForLeadership. You should just run in a SharedWorker anyways, no?
ReplicationOptions<RxDocType, any>, 'pull' | 'push' | 'waitForLeadership'
Expand Down
32 changes: 23 additions & 9 deletions src/supabase-replication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,31 @@ export class SupabaseReplication<RxDocType> {
* state is fetched and passed to the conflict handler.
*/
private async handleUpdate(row: RxReplicationWriteToMasterRow<RxDocType>): Promise<WithDeleted<RxDocType>[]> {
const query = this.options.supabaseClient
.from(this.table)
.update(row.newDocumentState, { count: 'exact' })
//.eq(this.primaryKey, (row.newDocumentState as any)[this.primaryKey])
.match(row.assumedMasterState!) // TODO: Does not work for null and jsonb fields?
const updateHandler = this.options.updateHandler ? this.options.updateHandler : this.defaultUpdateHandler.bind(this)
if (await updateHandler(row)) return [] // Success :)
// Fetch current state and let conflict handler resolve it.
return [await this.fetchByPrimaryKey((row.newDocumentState as any)[this.primaryKey])]
}

/**
* Updates the row only if all database fields match the expected state.
*/
private async defaultUpdateHandler(row: RxReplicationWriteToMasterRow<RxDocType>): Promise<boolean> {
let query = this.options.supabaseClient.from(this.table).update(row.newDocumentState, { count: 'exact' })
Object.entries(row.assumedMasterState!).forEach(([field, value]) => {
let type = typeof value
if (type === 'string' || type === 'number') {
query = query.eq(field, value)
} else if (type === 'boolean' || value === null) {
query = query.is(field, value)
} else {
throw `replicateSupabase: Unsupported field of type ${type}`
}
})
const { error, count } = await query
console.debug("Update request:", (query as any)['url'].toString())
console.debug("Update request:", (query as any)['url'].toString(), "count", count)
if (error) throw error
if (count == 1) return [] // Success :)
// Fetch current state and let conflict handler resolve it.
return [await this.fetchByPrimaryKey((row as any)[this.primaryKey])]
return count == 1
}

private async fetchByPrimaryKey(primaryKeyValue: any): Promise<WithDeleted<RxDocType>> {
Expand Down

0 comments on commit 7306094

Please sign in to comment.