Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Un-reference intervals and timeouts #5

Merged
merged 5 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,346 changes: 1,177 additions & 1,169 deletions package-lock.json

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "memx",
"version": "0.3.1",
"version": "0.3.2",
"description": "Simple and fast memcached client",
"main": "./dist/index.cjs",
"module": "./dist/index.mjs",
Expand Down Expand Up @@ -28,14 +28,14 @@
"author": "Juit Developers <[email protected]>",
"license": "Apache-2.0",
"devDependencies": {
"@plugjs/build": "^0.4.7",
"@types/chai": "^4.3.5",
"@types/chai-as-promised": "^7.1.5",
"@types/memjs": "^1.3.0",
"chai": "^4.3.7",
"@plugjs/build": "^0.5.22",
"@types/chai": "^4.3.11",
"@types/chai-as-promised": "^7.1.8",
"@types/memjs": "^1.3.3",
"chai": "<5.0.0",
"chai-as-promised": "^7.1.1",
"chai-exclude": "^2.1.0",
"memjs": "^1.3.1"
"memjs": "^1.3.2"
},
"directories": {
"test": "test"
Expand Down
2 changes: 1 addition & 1 deletion src/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import assert from 'node:assert'

import { ServerAdapter } from './server'

import type { Adapter, Counter, AdapterResult, Stats } from './types'
import type { ServerOptions } from './server'
import type { Adapter, AdapterResult, Counter, Stats } from './types'

function parseHosts(hosts?: string): ServerOptions[] {
const result: { host: string, port?: number }[] = []
Expand Down
8 changes: 4 additions & 4 deletions src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import assert from 'node:assert'
import net from 'node:net'

import { Encoder } from './encode'
import { Decoder } from './decode'
import { BUFFERS, OPCODE } from './constants'
import { Decoder } from './decode'
import { Encoder } from './encode'
import { socketFinalizationRegistry } from './internals'

import type { Socket } from 'node:net'
import type { RawIncomingPacket } from './decode'
import type { RawOutgoingPacket } from './encode'
import type { Socket } from 'node:net'

export type RawIncomingPackets = [ RawIncomingPacket, ...RawIncomingPacket[] ]

Expand Down Expand Up @@ -173,7 +173,7 @@ export class Connection {
if (error) return deferred.reject(error)
})

const timeout = setTimeout(() => deferred.reject(new Error('No response')), this.#timeout)
const timeout = setTimeout(() => deferred.reject(new Error('No response')), this.#timeout).unref()

return deferred.promise.finally(() => {
clearTimeout(timeout)
Expand Down
2 changes: 1 addition & 1 deletion src/decode.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import assert from 'node:assert'

import { DATA_TYPE, EMPTY_BUFFER, MAGIC, OFFSETS } from './constants'
import { allocateBuffer } from './buffers'
import { DATA_TYPE, EMPTY_BUFFER, MAGIC, OFFSETS } from './constants'

import type { RecyclableBuffer } from './buffers'

Expand Down
2 changes: 1 addition & 1 deletion src/encode.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { allocateBuffer } from './buffers'
import { DATA_TYPE, EMPTY_BUFFER, MAGIC, VBUCKET, OFFSETS } from './constants'
import { DATA_TYPE, EMPTY_BUFFER, MAGIC, OFFSETS, VBUCKET } from './constants'

import type { RecyclableBuffer } from './buffers'
import type { OPCODE } from './constants'
Expand Down
2 changes: 1 addition & 1 deletion src/fake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ export class FakeAdapter implements Adapter {
if (! ttl) return this.#cache.clear()

const wait = toExp(ttl) - Date.now()
setTimeout(() => this.#cache.clear(), wait)
setTimeout(() => this.#cache.clear(), wait).unref()
}

async noop(): Promise<void> {
Expand Down
17 changes: 6 additions & 11 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
export type { RecyclableBuffer } from './buffers'

import * as connection from './connection'
import * as constants from './constants'
import * as decode from './decode'
import * as encode from './encode'
import * as constants from './constants'
import * as connection from './connection'

export {
decode,
encode,
constants,
connection,
}
export { connection, constants, decode, encode }

export * from './fake'
export * from './types'
export * from './client'
export * from './cluster'
export * from './fake'
export * from './server'
export * from './client'
export * from './types'
export * from './utils'
2 changes: 1 addition & 1 deletion src/server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Connection } from './connection'
import { BUFFERS, OPCODE, STATUS } from './constants'

import type { Adapter, Counter, AdapterResult, Stats } from './types'
import type { ConnectionOptions } from './connection'
import type { RawIncomingPacket } from './decode'
import type { Adapter, AdapterResult, Counter, Stats } from './types'

const statsBigInt: readonly string[] = [
'auth_cmds', 'auth_errors', 'bytes', 'bytes_read', 'bytes_written', 'cas_badval', 'cas_hits', 'cas_misses',
Expand Down
4 changes: 2 additions & 2 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ export class PoorManLock {
do {
cas = await this.#client.add(this.#name, owner, { ttl: 2 })
if (cas !== undefined) break
await new Promise((resolve) => setTimeout(resolve, 100))
await new Promise((resolve) => void setTimeout(resolve, 100).unref())
} while (Date.now() < end)

if (cas === undefined) {
Expand All @@ -154,7 +154,7 @@ export class PoorManLock {
assert(replaced !== undefined, `Lock "${this.#client.prefix}${this.#name}" not replaced`)
cas = replaced
})(), `Error extending lock "${this.#client.prefix}${this.#name}"`)
}, 100)
}, 100).unref()

try {
return await executor()
Expand Down
2 changes: 1 addition & 1 deletion test/01-decode.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { randomFillSync } from 'node:crypto'

import { expect } from 'chai'

import { decode, constants } from '../src/index'
import { constants, decode } from '../src/index'


describe('Decoding Packets', () => {
Expand Down
2 changes: 1 addition & 1 deletion test/02-encode.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai'

import { encode, constants } from '../src/index'
import { constants, encode } from '../src/index'

describe('Encoding Packets', () => {
it('should encode a packet with all information required', () => {
Expand Down
2 changes: 1 addition & 1 deletion test/06-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { randomBytes } from 'node:crypto'

import { expect } from 'chai'

import { MemxClient, ClusterAdapter, ServerAdapter } from '../src/index'
import { ClusterAdapter, MemxClient, ServerAdapter } from '../src/index'

import type { Adapter } from '../src/index'

Expand Down
32 changes: 30 additions & 2 deletions test/10-utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { randomBytes } from 'node:crypto'

import { exec } from '@plugjs/build'
import { expect } from 'chai'


import { Bundle, MemxClient, Factory, PoorManLock } from '../src/index'
import { Bundle, Factory, MemxClient, PoorManLock } from '../src/index'

describe('Utilities', () => {
const host = process.env.MEMCACHED_HOST || '127.0.0.1'
Expand Down Expand Up @@ -273,5 +273,33 @@ describe('Utilities', () => {
expect((await client.getc(key))?.value).to.be.undefined
expect(record).to.eql([ 'create 1', 'create 2', 'start 1', 'end 1' ])
}, 3000)

it('should acquire when another process holding the lock crashes', async () => {
const lockname = `distributed-${process.pid}-${Math.floor(Math.random() * 100000)}`
const child = exec('tsrun', './test/locker.ts', lockname)

// this should give the child process enough time to start and lock
await new Promise((resolve) => void setTimeout(resolve, 500))

// the first attempt to locking should fail, the child should be locking!
await expect(new PoorManLock(client, lockname).execute(() => {
log.error('Initial lock attempt succesful')
}, { timeout: 100, owner: `test-parent-${process.pid}` }))
.to.be.rejectedWith(/timeout/)

// the second attempt should succeed, once the child dies...
const p1 = new PoorManLock(client, lockname).execute(() => {
log('Parent process executing 1')
}, { owner: `test-parent-${process.pid}@1` })
const p2 = new PoorManLock(client, lockname).execute(() => {
log('Parent process executing 2')
}, { owner: `test-parent-${process.pid}@2` })
const p3 = new PoorManLock(client, lockname).execute(() => {
log('Parent process executing 3')
}, { owner: `test-parent-${process.pid}@3` })

// reap up the child's leftovers...
await Promise.all([ p1, p2, p3, child ])
}, 10000)
})
})
27 changes: 27 additions & 0 deletions test/locker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* eslint-disable no-console */
import { log } from '@plugjs/build'

import { MemxClient, PoorManLock } from '../src/index'

const host = process.env.MEMCACHED_HOST || '127.0.0.1'
const port = parseInt(process.env.MEMCACHED_PORT || '11211')
const client = new MemxClient({ hosts: [ { host, port } ] })

async function test(): Promise<void> {
console.log(`Child process using lock "${process.argv[2]}"`)

const lock = new PoorManLock(client, process.argv[2])
try {
await lock.execute(async () => {
console.log('Child process locking')
await new Promise((resolve) => void setTimeout(resolve, 2000))
console.log('Child process exiting')
process.exit(123)
}, { owner: `test-child-${process.pid}` })
} finally {
console.log('Child process exit interrupted ???')
process.exit(123)
}
}

test().catch((error) => log.error('Error in child process test', error))
Loading