Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft of an electricStreamToD2Input method #11

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions packages/d2ts/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
"./sqlite": {
"types": "./dist/sqlite/index.d.ts",
"default": "./dist/sqlite/index.js"
},
"./electric": {
"types": "./dist/electric/index.d.ts",
"default": "./dist/electric/index.js"
}
},
"scripts": {
Expand All @@ -24,6 +28,7 @@
"format:check": "prettier --check \"src/**/*.{ts,tsx,js,jsx,json}\" \"tests/**/*.{ts,tsx,js,jsx,json}\""
},
"devDependencies": {
"@electric-sql/client": "1.0.0-beta.2",
"@types/better-sqlite3": "^7.6.12",
"@types/node": "^22.10.2",
"@typescript-eslint/eslint-plugin": "^8.18.0",
Expand All @@ -38,9 +43,13 @@
"vitest": "^2.1.8"
},
"peerDependencies": {
"@electric-sql/client": "1.0.0-beta.2",
"better-sqlite3": "^11.7.0"
},
"peerDependenciesMeta": {
"@electric-sql/client": {
"optional": true
},
"better-sqlite3": {
"optional": true
}
Expand Down
160 changes: 160 additions & 0 deletions packages/d2ts/src/electric/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import { D2, RootStreamBuilder } from '../d2.js'
import { MultiSetArray } from '../multiset.js'
import { type Version, type Antichain } from '../order.js'
import {
type Row,
type ShapeStreamInterface,
isChangeMessage,
isControlMessage,
} from '@electric-sql/client'

/*
Electric uses Postgres LSNs to track progress, each message is annotated with an LSN.
Currently the LSN is a string in the format "LSN_sequence", we need to extract the
number from this to use as the version for each message. In the future we intend to add
the LSN as a header to each message, so we can remove this logic.
We need to keep track of these and use them to send as the version for each message to
the D2 input stream.
D2 also requires a a frontier message for be sent, this is the lower bound for all
future messages.
To do this we:
- Keep track of the last LSN we've seen
- Send the LSN as the version for each message
- When we receive an `up-to-date` message, we send the last LSN+1 as the frontier. The
addition of 1 is to account for the fact that the last LSN is the version of the last
message, and we need to send the version of the next message as the frontier.
*/

function extractLSN(offset: string): number {
// Extract LSN from format "LSN_sequence"
const lsn = parseInt(offset.split('_')[0])
if (isNaN(lsn)) {
throw new Error(`Invalid LSN format: ${offset}`)
}
return lsn
}

export interface ElectricStreamToD2InputOptions<T extends Row<unknown> = Row> {
/** D2 Graph to send messages to */
graph: D2
/** The Electric ShapeStream to consume */
stream: ShapeStreamInterface<T>
/** The D2 input stream to send messages to */
input: RootStreamBuilder<[key: string, T]>
/** Optional function to convert LSN to version number/Version */
lsnToVersion?: (lsn: number) => number | Version
/** Optional function to convert LSN to frontier number/Antichain */
lsnToFrontier?: (lsn: number) => number | Antichain
/** Initial LSN */
initialLsn?: number
/** When to run the graph */
runOn?: 'up-to-date' | 'lsn-advance' | false
}

/**
* Connects an Electric ShapeStream to a D2 input stream
* IMPORTANT: Requires the ShapeStream to be configured with `replica: 'full'`
* @param options Configuration options
* @param options.stream The Electric ShapeStream to consume
* @param options.input The D2 input stream to send messages to
* @param options.lsnToVersion Optional function to convert LSN to version number/Version
* @param options.lsnToFrontier Optional function to convert LSN to frontier number/Antichain
* @returns The input stream for chaining
*/
export function electricStreamToD2Input<T extends Row<unknown> = Row>({
graph,
stream,
input,
lsnToVersion = (lsn: number) => lsn,
lsnToFrontier = (lsn: number) => lsn,
initialLsn = 0,
runOn = 'up-to-date',
}: ElectricStreamToD2InputOptions<T>): RootStreamBuilder<[key: string, T]> {
let lastLsn: number = initialLsn
let changes: MultiSetArray<[key: string, T]> = []

const sendChanges = (lsn: number) => {
const version = lsnToVersion(lsn)
if (changes.length > 0) {
input.sendData(version, [...changes])
}
changes = []
}

stream.subscribe((messages) => {

for (const message of messages) {
if (isControlMessage(message)) {
// Handle control message
if (message.headers.control === 'up-to-date') {
sendChanges(lastLsn)
const frontier = lsnToFrontier(lastLsn + 1) // +1 to account for the fact that the last LSN is the version of the last message
input.sendFrontier(frontier)
if (runOn === 'up-to-date' || runOn === 'lsn-advance') {
graph.run()
}
}
} else if (isChangeMessage(message)) {
// Handle change message
const lsn = extractLSN(message.offset)
switch (message.headers.operation) {
case 'insert':
changes.push([[message.key, message.value], 1])
break
case 'update':
// An update is a delete followed by an insert
changes.push([[message.key, message.value], -1]) // We don't have the old value, TODO: check if this causes issues
changes.push([[message.key, message.value], 1])
break
case 'delete':
changes.push([[message.key, message.value], -1])
break
}
if (lsn !== lastLsn) {
sendChanges(lsn)
if (runOn === 'lsn-advance') {
graph.run()
}
}
lastLsn = lsn
}
}
})

return input
}

/*
// Used something like this:

// Create D2 graph
const graph = new D2({ initialFrontier: 0 })

// Create D2 input
const input = graph.newInput<any>()

// Configure the pipeline
input
.pipe(
map(([key, data]) => data.value),
filter(value => value > 10)
)

// Finalize graph
graph.finalize()

// Create Electric stream (example)
const electricStream = new ShapeStream({
url: 'http://localhost:3000/v1/shape',
params: {
table: 'items',
replica: 'full',
}
})

// Connect Electric stream to D2 input
electricStreamToD2Input({
stream: electricStream,
input,
})
*/
194 changes: 194 additions & 0 deletions packages/d2ts/tests/electric.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import { describe, it, expect, vi, beforeEach } from 'vitest'
import { electricStreamToD2Input } from '../src/electric'
import { D2 } from '../src/d2'
import type { ShapeStreamInterface, Message, Offset } from '@electric-sql/client'

describe('electricStreamToD2Input', () => {
let mockStream: ShapeStreamInterface
let mockSubscribeCallback: (messages: Message[]) => void
let d2: D2
let input: any

beforeEach(() => {
mockSubscribeCallback = vi.fn()
mockStream = {
subscribe: (callback) => {
mockSubscribeCallback = callback
return () => {} // Return unsubscribe function
},
unsubscribeAll: vi.fn(),
isLoading: () => false,
lastSyncedAt: () => Date.now(),
lastSynced: () => 0,
isConnected: () => true,
isUpToDate: true,
lastOffset: '0_0',
shapeHandle: 'test-handle',
error: undefined
}

d2 = new D2({ initialFrontier: 0 })
input = d2.newInput()
vi.spyOn(input, 'sendData')
vi.spyOn(input, 'sendFrontier')
})

it('should handle insert operations correctly', () => {
electricStreamToD2Input({
graph: d2,
stream: mockStream,
input
})

const messages: Message[] = [{
headers: {
operation: 'insert'
},
offset: '100_001',
key: 'test-1',
value: { id: 1, name: 'test' }
}]

mockSubscribeCallback(messages)

expect(input.sendData).toHaveBeenCalledWith(
100,
expect.arrayContaining([
[['test-1', { id: 1, name: 'test' }], 1]
])
)
})

it('should handle update operations as delete + insert', () => {
electricStreamToD2Input({
graph: d2,
stream: mockStream,
input
})

const messages: Message[] = [{
headers: {
operation: 'update'
},
offset: '100_001',
key: 'test-1',
value: { id: 1, name: 'updated' }
}]

mockSubscribeCallback(messages)

expect(input.sendData).toHaveBeenCalledWith(
100,
expect.arrayContaining([
[['test-1', { id: 1, name: 'updated' }], -1],
[['test-1', { id: 1, name: 'updated' }], 1]
])
)
})

it('should handle delete operations correctly', () => {
electricStreamToD2Input({
graph: d2,
stream: mockStream,
input
})

const messages: Message[] = [{
headers: {
operation: 'delete'
},
offset: '100_001',
key: 'test-1',
value: { id: 1, name: 'deleted' }
}]

mockSubscribeCallback(messages)

expect(input.sendData).toHaveBeenCalledWith(
100,
expect.arrayContaining([
[['test-1', { id: 1, name: 'deleted' }], -1]
])
)
})

it('should handle control messages and send frontier', () => {
electricStreamToD2Input({
graph: d2,
stream: mockStream,
input
})

const messages: Message[] = [{
headers: {
operation: 'insert'
},
offset: '100_001',
key: 'test-1',
value: { id: 1 }
}, {
headers: {
control: 'up-to-date'
}
}]

mockSubscribeCallback(messages)

expect(input.sendFrontier).toHaveBeenCalledWith(101)
})

it('should use custom lsnToVersion and lsnToFrontier functions', () => {
const customLsnToVersion = (lsn: number) => lsn * 2
const customLsnToFrontier = (lsn: number) => lsn * 3

electricStreamToD2Input({
graph: d2,
stream: mockStream,
input,
lsnToVersion: customLsnToVersion,
lsnToFrontier: customLsnToFrontier
})

const messages: Message[] = [{
headers: {
operation: 'insert'
},
offset: '100_001',
key: 'test-1',
value: { id: 1 }
}, {
headers: {
control: 'up-to-date'
}
}]

mockSubscribeCallback(messages)

expect(input.sendData).toHaveBeenCalledWith(
200, // 100 * 2
expect.arrayContaining([
[['test-1', { id: 1 }], 1]
])
)
expect(input.sendFrontier).toHaveBeenCalledWith(303) // (100 + 1) * 3
})

it('should throw error for invalid LSN format', () => {
electricStreamToD2Input({
graph: d2,
stream: mockStream,
input
})

const messages: Message[] = [{
headers: {
operation: 'insert'
},
offset: 'invalid_lsn' as unknown as Offset,
key: 'test-1',
value: { id: 1 }
}]

expect(() => mockSubscribeCallback(messages)).toThrow('Invalid LSN format: invalid_lsn')
})
})
Loading
Loading