Skip to content

Commit

Permalink
Worker pool: use the transfer feature and shared buffers for message …
Browse files Browse the repository at this point in the history
…posting

This changes the way the worker pool and the workers communicate in the
following way:

- Buffers sent to/from workers are now "transferred". This is a feature
  of `MessagePort.postMessage()` that allows to transfer ownership of
  certain objects to the receiving side, avoiding cloning, thus
  improving the performance of every worker.

- It's now possible to send to the workers an optional
  `SharedArrayBuffer` as an input (in addition to the `Buffer` that is
  already supported).

The `BackgroundNoteDecryptor` used by the wallet scanner has been
updated to make use of `SharedArrayBuffer` to store the account keys.
  • Loading branch information
andiflabs committed Jul 11, 2024
1 parent 6143c20 commit 9b5d616
Show file tree
Hide file tree
Showing 13 changed files with 344 additions and 110 deletions.
6 changes: 3 additions & 3 deletions ironfish-rust-nodejs/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ export class NoteEncrypted {
*/
static combineHash(depth: number, jsLeft: Buffer, jsRight: Buffer): Buffer
/** Returns undefined if the note was unable to be decrypted with the given key. */
decryptNoteForOwner(incomingHexKey: string): Buffer | null
decryptNoteForOwners(incomingHexKeys: Array<string>): Array<Buffer | undefined | null>
decryptNoteForOwner(incomingViewKey: Buffer): Buffer | null
decryptNoteForOwners(incomingViewKeys: Array<Buffer>): Array<Buffer | undefined | null>
/** Returns undefined if the note was unable to be decrypted with the given key. */
decryptNoteForSpender(outgoingHexKey: string): Buffer | null
decryptNoteForSpender(outgoingViewKey: Buffer): Buffer | null
}
export type NativeNote = Note
export class Note {
Expand Down
17 changes: 8 additions & 9 deletions ironfish-rust-nodejs/src/structs/note_encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,31 +134,30 @@ impl NativeNoteEncrypted {

/// Returns undefined if the note was unable to be decrypted with the given key.
#[napi]
pub fn decrypt_note_for_owner(&self, incoming_hex_key: String) -> Result<Option<Buffer>> {
pub fn decrypt_note_for_owner(&self, incoming_view_key: JsBuffer) -> Result<Option<Buffer>> {
let incoming_view_key =
IncomingViewKey::from_hex(&incoming_hex_key).map_err(to_napi_err)?;
IncomingViewKey::read(&*incoming_view_key.into_value()?).map_err(to_napi_err)?;
let decrypted_note = self.note.decrypt_note_for_owner(&incoming_view_key);
decrypted_note_to_buffer(decrypted_note).map_err(to_napi_err)
}

#[napi]
pub fn decrypt_note_for_owners(
&self,
incoming_hex_keys: Vec<String>,
incoming_view_keys: Vec<JsBuffer>,
) -> Result<Vec<Option<Buffer>>> {
let incoming_view_keys = try_map(&incoming_hex_keys[..], |hex_key| {
IncomingViewKey::from_hex(hex_key)
})
.map_err(to_napi_err)?;
let incoming_view_keys = try_map(incoming_view_keys, |incoming_view_key| {
IncomingViewKey::read(&*incoming_view_key.into_value()?).map_err(to_napi_err)
})?;
let decrypted_notes = self.note.decrypt_note_for_owners(&incoming_view_keys);
try_map(decrypted_notes, decrypted_note_to_buffer).map_err(to_napi_err)
}

/// Returns undefined if the note was unable to be decrypted with the given key.
#[napi]
pub fn decrypt_note_for_spender(&self, outgoing_hex_key: String) -> Result<Option<Buffer>> {
pub fn decrypt_note_for_spender(&self, outgoing_view_key: JsBuffer) -> Result<Option<Buffer>> {
let outgoing_view_key =
OutgoingViewKey::from_hex(&outgoing_hex_key).map_err(to_napi_err)?;
OutgoingViewKey::read(&*outgoing_view_key.into_value()?).map_err(to_napi_err)?;
let decrypted_note = self.note.decrypt_note_for_spender(&outgoing_view_key);
decrypted_note_to_buffer(decrypted_note).map_err(to_napi_err)
}
Expand Down
6 changes: 3 additions & 3 deletions ironfish-rust-nodejs/tests/demo.test.slow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ describe('Demonstrate the Sapling API', () => {
expect(encryptedNote.hash().byteLength).toBe(32)
expect(encryptedNote.equals(encryptedNote)).toBe(true)

const decryptedNoteBuffer = encryptedNote.decryptNoteForOwner(key.incomingViewKey)
const decryptedNoteBuffer = encryptedNote.decryptNoteForOwner(Buffer.from(key.incomingViewKey, 'hex'))
expect(decryptedNoteBuffer).toBeInstanceOf(Buffer)
expect(decryptedNoteBuffer!.byteLength).toBe(DECRYPTED_NOTE_LENGTH)

const decryptedSpenderNote = encryptedNote.decryptNoteForSpender(key.outgoingViewKey)
const decryptedSpenderNote = encryptedNote.decryptNoteForSpender(Buffer.from(key.outgoingViewKey, 'hex'))
expect(decryptedSpenderNote).toBe(null)

const decryptedNote = Note.deserialize(decryptedNoteBuffer!)
Expand All @@ -104,7 +104,7 @@ describe('Demonstrate the Sapling API', () => {
const transaction = new Transaction(LATEST_TRANSACTION_VERSION)
transaction.setExpiration(10)
const encryptedNote = new NoteEncrypted(postedMinersFeeTransaction.getNote(0))
const decryptedNote = Note.deserialize(encryptedNote.decryptNoteForOwner(key.incomingViewKey)!)
const decryptedNote = Note.deserialize(encryptedNote.decryptNoteForOwner(Buffer.from(key.incomingViewKey, 'hex'))!)
const newNote = new Note(recipientKey.publicAddress, 15n, Buffer.from('receive'), Asset.nativeId(), minersFeeNote.owner())

let currentHash = encryptedNote.hash()
Expand Down
9 changes: 8 additions & 1 deletion ironfish-rust/src/keys/view_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct IncomingViewKey {

impl IncomingViewKey {
/// load view key from a Read implementation
pub fn read<R: io::Read>(reader: &mut R) -> Result<Self, IronfishError> {
pub fn read<R: io::Read>(reader: R) -> Result<Self, IronfishError> {
let view_key = read_scalar(reader)?;
Ok(IncomingViewKey { view_key })
}
Expand Down Expand Up @@ -174,6 +174,13 @@ pub struct OutgoingViewKey {
}

impl OutgoingViewKey {
/// load view key from a Read implementation
pub fn read<R: io::Read>(mut reader: R) -> Result<Self, IronfishError> {
let mut view_key = [0u8; 32];
reader.read_exact(&mut view_key)?;
Ok(OutgoingViewKey { view_key })
}

/// Load a key from a string of hexadecimal digits
pub fn from_hex(value: &str) -> Result<Self, IronfishError> {
match hex_to_bytes(value) {
Expand Down
26 changes: 17 additions & 9 deletions ironfish/src/primitives/noteEncrypted.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ export type NoteEncryptedHash = Buffer
export type SerializedNoteEncryptedHash = Buffer
export type SerializedNoteEncrypted = Buffer

const ensureBuffer = (value: Buffer | string): Buffer => {
if (typeof value === 'string') {
return Buffer.from(value, 'hex')
} else {
return value
}
}

export class NoteEncrypted {
private readonly noteEncryptedSerialized: Buffer

Expand Down Expand Up @@ -81,28 +89,28 @@ export class NoteEncrypted {
}
}

decryptNoteForOwner(ownerHexKey: string): Note | undefined {
const note = this.takeReference().decryptNoteForOwner(ownerHexKey)
decryptNoteForOwner(incomingViewKey: Buffer | string): Note | undefined {
const note = this.takeReference().decryptNoteForOwner(ensureBuffer(incomingViewKey))
this.returnReference()
if (note) {
return new Note(note)
}
}

decryptNoteForOwners(ownerHexKeys: Array<string>): Array<Note | undefined> {
if (ownerHexKeys.length === 0) {
decryptNoteForOwners(incomingViewKeys: Array<Buffer>): Array<Note | undefined> {
if (incomingViewKeys.length === 0) {
return []
} else if (ownerHexKeys.length === 1) {
return [this.decryptNoteForOwner(ownerHexKeys[0])]
} else if (incomingViewKeys.length === 1) {
return [this.decryptNoteForOwner(incomingViewKeys[0])]
}

const notes = this.takeReference().decryptNoteForOwners(ownerHexKeys)
const notes = this.takeReference().decryptNoteForOwners(incomingViewKeys)
this.returnReference()
return notes.map((note) => (note ? new Note(note) : undefined))
}

decryptNoteForSpender(spenderHexKey: string): Note | undefined {
const note = this.takeReference().decryptNoteForSpender(spenderHexKey)
decryptNoteForSpender(outgoingViewKey: Buffer | string): Note | undefined {
const note = this.takeReference().decryptNoteForSpender(ensureBuffer(outgoingViewKey))
this.returnReference()
if (note) {
return new Note(note)
Expand Down
6 changes: 3 additions & 3 deletions ironfish/src/rpc/routes/wallet/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ export async function getTransactionNotes(
const accountKeys = [
{
accountId: account.id,
incomingViewKey: account.incomingViewKey,
outgoingViewKey: account.outgoingViewKey,
viewKey: account.viewKey,
incomingViewKey: Buffer.from(account.incomingViewKey, 'hex'),
outgoingViewKey: Buffer.from(account.outgoingViewKey, 'hex'),
viewKey: Buffer.from(account.viewKey, 'hex'),
},
]

Expand Down
53 changes: 40 additions & 13 deletions ironfish/src/wallet/scanner/noteDecryptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
DecryptNotesOptions,
DecryptNotesRequest,
DecryptNotesResponse,
DecryptNotesSharedAccountKeys,
} from '../../workerPool/tasks/decryptNotes'
import { JobAbortedError } from '../../workerPool/tasks/jobAbort'
import { Account } from '../account/account'
Expand All @@ -24,11 +25,11 @@ export type DecryptNotesFromTransactionsCallback = (
transactions: Array<{ transaction: Transaction; decryptedNotes: Array<DecryptedNote> }>,
) => Promise<void>

type DecryptQueueValue = {
type DecryptQueueItem = {
job: Job
accounts: ReadonlyArray<Account>
blockHeader: BlockHeader
transactions: ReadonlyArray<Transaction>
accounts: ReadonlyArray<Account>
callback: DecryptNotesFromTransactionsCallback
}

Expand All @@ -43,7 +44,10 @@ export class BackgroundNoteDecryptor {

private readonly workerPool: WorkerPool
private readonly options: DecryptNotesOptions
private readonly decryptQueue: AsyncQueue<DecryptQueueValue>
private readonly decryptQueue: AsyncQueue<DecryptQueueItem>

private accounts: ReadonlyArray<Account>
private sharedAccountKeys: DecryptNotesSharedAccountKeys

constructor(workerPool: WorkerPool, config: Config, options: DecryptNotesOptions) {
this.workerPool = workerPool
Expand All @@ -56,6 +60,9 @@ export class BackgroundNoteDecryptor {
}
queueSize = Math.max(queueSize, 1)
this.decryptQueue = new AsyncQueue(queueSize)

this.accounts = []
this.sharedAccountKeys = new DecryptNotesSharedAccountKeys([])
}

start(abort?: AbortController) {
Expand Down Expand Up @@ -84,7 +91,7 @@ export class BackgroundNoteDecryptor {
}

private async decryptLoop(): Promise<void> {
let resolve: (value: DecryptQueueValue | void) => unknown
let resolve: (value: DecryptQueueItem | void) => unknown
let reject: (reason?: unknown) => void

this.onStopped.then(
Expand All @@ -98,7 +105,7 @@ export class BackgroundNoteDecryptor {
this.triggerFlushed = null
}

const [promise, resolveNew, rejectNew] = PromiseUtils.split<DecryptQueueValue | void>()
const [promise, resolveNew, rejectNew] = PromiseUtils.split<DecryptQueueItem | void>()
resolve = resolveNew
reject = rejectNew

Expand All @@ -112,7 +119,7 @@ export class BackgroundNoteDecryptor {
break
}

const { job, accounts, blockHeader, transactions, callback } = item
const { job, blockHeader, transactions, accounts, callback } = item

let decryptNotesResponse
try {
Expand Down Expand Up @@ -166,15 +173,12 @@ export class BackgroundNoteDecryptor {
throw new Error('decryptor was not started')
}

this.updateAccounts(accounts)

if (!this.triggerFlushed) {
this.onFlushed = new Promise((resolve) => (this.triggerFlushed = resolve))
}

const accountKeys = accounts.map((account) => ({
incomingViewKey: account.incomingViewKey,
outgoingViewKey: account.outgoingViewKey,
viewKey: account.viewKey,
}))
Assert.isNotNull(blockHeader.noteSize)

const encryptedNotes = []
Expand All @@ -190,20 +194,43 @@ export class BackgroundNoteDecryptor {
}

const decryptNotesRequest = new DecryptNotesRequest(
accountKeys,
this.sharedAccountKeys,
encryptedNotes,
this.options,
)
const job = this.workerPool.execute(decryptNotesRequest)

return this.decryptQueue.push({
job,
accounts,
blockHeader,
transactions,
accounts: this.accounts,
callback,
})
}

private updateAccounts(newAccounts: ReadonlyArray<Account>) {
if (
newAccounts.length === this.accounts.length &&
newAccounts.every((account, index) => account === this.accounts[index])
) {
// No change
return
}

// Because `decryptLoop` does not use `this.accounts` or
// `this.sharedAccountKeys` directly, we can swap their value without the
// need to flush the queue. This is safe as long as the value is not
// mutated.
this.accounts = newAccounts
this.sharedAccountKeys = new DecryptNotesSharedAccountKeys(
newAccounts.map((account) => ({
incomingViewKey: Buffer.from(account.incomingViewKey, 'hex'),
outgoingViewKey: Buffer.from(account.outgoingViewKey, 'hex'),
viewKey: Buffer.from(account.viewKey, 'hex'),
})),
)
}
}

/**
Expand Down
6 changes: 3 additions & 3 deletions ironfish/src/wallet/wallet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,9 @@ export class Wallet {
): Promise<Map<string, Array<DecryptedNote | null>>> {
const accountKeys = accounts.map((account) => ({
accountId: account.id,
incomingViewKey: account.incomingViewKey,
outgoingViewKey: account.outgoingViewKey,
viewKey: account.viewKey,
incomingViewKey: Buffer.from(account.incomingViewKey, 'hex'),
outgoingViewKey: Buffer.from(account.outgoingViewKey, 'hex'),
viewKey: Buffer.from(account.viewKey, 'hex'),
}))

return this.workerPool.decryptNotes(accountKeys, encryptedNotes, {
Expand Down
Loading

0 comments on commit 9b5d616

Please sign in to comment.