Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADR 30 : New handshake for the bolt protocol #1243

Open
wants to merge 18 commits into
base: 5.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 88 additions & 2 deletions packages/bolt-connection/src/bolt/handshake.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import { alloc } from '../channel'
import { newError } from 'neo4j-driver-core'

const BOLT_MAGIC_PREAMBLE = 0x6060b017
const AVAILABLE_BOLT_PROTOCOLS = [5.8, 5.7, 5.6, 5.4, 5.3, 5.2, 5.1, 5.0, 4.4, 4.3, 4.2, 3.0] // bolt protocols the client supports, ordered by preference
const DESIRED_CAPABILITES = 0

function version (major, minor) {
return {
Expand Down Expand Up @@ -70,15 +72,65 @@ function parseNegotiatedResponse (buffer, log) {
return Number(h[3] + '.' + h[2])
}

function handshakeNegotiationV2 (channel, buffer, log) {
const numVersions = buffer.readVarInt()
let versions = []
for (let i = 0; i < numVersions; i++) {
const versionRange = [
buffer.readUInt8(),
buffer.readUInt8(),
buffer.readUInt8(),
buffer.readUInt8()
]
versions = versions.concat(getVersions(versionRange))
}
const capabilityBitMask = buffer.readVarInt()
const capabilites = selectCapabilites(capabilityBitMask)

// parse supported capabilities
// select preferrable protocol and respond
let major
let minor
versions.sort((a, b) => Number(b.major + '.' + b.minor) - Number(a.major + '.' + a.minor))
for (let i = 0; i < versions.length; i++) {
const version = versions[i]
if (AVAILABLE_BOLT_PROTOCOLS.includes(Number(version.major + '.' + version.minor))) {
major = version.major
minor = version.minor
break
}
}

return new Promise((resolve, reject) => {
try {
const selectionBuffer = alloc(5)
selectionBuffer.writeInt32((minor << 8) | major)
selectionBuffer.writeVarInt(capabilites)
channel.write(selectionBuffer)
resolve({
protocolVersion: Number(major + '.' + minor),
capabilites,
consumeRemainingBuffer: consumer => {
if (buffer.hasRemaining()) {
consumer(buffer.readSlice(buffer.remaining()))
}
}
})
} catch (e) {
reject(e)
}
})
}

/**
* @return {BaseBuffer}
* @private
*/
function newHandshakeBuffer () {
return createHandshakeMessage([
version(255, 1),
[version(5, 8), version(5, 0)],
[version(4, 4), version(4, 2)],
version(4, 1),
version(3, 0)
])
}
Expand All @@ -91,8 +143,10 @@ function newHandshakeBuffer () {
/**
* @typedef HandshakeResult
* @property {number} protocolVersion The protocol version negotiated in the handshake
* @property {number} capabilites A bitmask representing the capabilities negotiated in the handshake
* @property {function(BufferConsumerCallback)} consumeRemainingBuffer A function to consume the remaining buffer if it exists
*/

/**
* Shake hands using the channel and return the protocol version
*
Expand All @@ -101,6 +155,23 @@ function newHandshakeBuffer () {
* @returns {Promise<HandshakeResult>} Promise of protocol version and consumeRemainingBuffer
*/
export default function handshake (channel, log) {
return initialHandshake(channel, log).then((result) => {
if (result.protocolVersion === 255.1) {
return handshakeNegotiationV2(channel, result.buffer, log)
} else {
return result
}
})
}

/**
* Shake hands using the channel and return the protocol version, or the improved handshake protocol if communicating with a newer server.
*
* @param {Channel} channel the channel use to shake hands
* @param {Logger} log the log object
* @returns {Promise<HandshakeResult>} Promise of protocol version and consumeRemainingBuffer
*/
function initialHandshake (channel, log) {
return new Promise((resolve, reject) => {
const handshakeErrorHandler = error => {
reject(error)
Expand All @@ -115,9 +186,10 @@ export default function handshake (channel, log) {
try {
// read the response buffer and initialize the protocol
const protocolVersion = parseNegotiatedResponse(buffer, log)

resolve({
protocolVersion,
capabilites: 0,
buffer,
consumeRemainingBuffer: consumer => {
if (buffer.hasRemaining()) {
consumer(buffer.readSlice(buffer.remaining()))
Expand All @@ -132,3 +204,17 @@ export default function handshake (channel, log) {
channel.write(newHandshakeBuffer())
})
}

function getVersions (versionArray) {
const resultArr = []
const major = versionArray[3]
const minor = versionArray[2]
for (let i = 0; i <= versionArray[1]; i++) {
resultArr.push({ major, minor: minor - i })
}
return resultArr
}

function selectCapabilites (capabilityBitMask) {
return DESIRED_CAPABILITES // capabilites are currently unused and will always be 0.
}
24 changes: 24 additions & 0 deletions packages/bolt-connection/src/buf/base-buf.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ export default class BaseBuffer {
throw new Error('Not implemented')
}

getVarInt (position) {
throw new Error('Not implemented')
}

putUInt8 (position, val) {
throw new Error('Not implemented')
}
Expand Down Expand Up @@ -244,6 +248,15 @@ export default class BaseBuffer {
return this.getFloat64(this._updatePos(8))
}

/**
* Read from state position
*/
readVarInt () {
const int = this.getVarInt(this.position)
this._updatePos(int.length)
return int.value
}

/**
* Write to state position.
* @param val
Expand Down Expand Up @@ -300,6 +313,17 @@ export default class BaseBuffer {
this.putFloat64(this._updatePos(8), val)
}

writeVarInt (val) {
while (val > 1) {
let int = val % 128
if (val >= 128) {
int += 128
}
val = val / 128
this.putInt8(this._updatePos(1), int)
}
}

/**
* Write to state position.
* @param val
Expand Down
12 changes: 12 additions & 0 deletions packages/bolt-connection/src/channel/channel-buf.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ export default class ChannelBuffer extends BaseBuffer {
return this._buffer.readDoubleBE(position)
}

getVarInt (position) {
let i = 0
let currentValue = this._buffer.readInt8(position + i)
let total = currentValue % 128
while (currentValue / 128 >= 1) {
i += 1
currentValue = this._buffer.readInt8(position + i)
total += currentValue % 128
}
return { length: i + 1, value: total }
}

putUInt8 (position, val) {
this._buffer.writeUInt8(val, position)
}
Expand Down
4 changes: 2 additions & 2 deletions packages/bolt-connection/test/bolt/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ describe('#unit Bolt', () => {
const writtenBuffer = channel.written[0]

const boltMagicPreamble = '60 60 b0 17'
const handshakev2 = '00 00 01 ff'
const protocolVersion5x7to5x0 = '00 08 08 05'
const protocolVersion4x4to4x2 = '00 02 04 04'
const protocolVersion4x1 = '00 00 01 04'
const protocolVersion3 = '00 00 00 03'

expect(writtenBuffer.toHex()).toEqual(
`${boltMagicPreamble} ${protocolVersion5x7to5x0} ${protocolVersion4x4to4x2} ${protocolVersion4x1} ${protocolVersion3}`
`${boltMagicPreamble} ${handshakev2} ${protocolVersion5x7to5x0} ${protocolVersion4x4to4x2} ${protocolVersion3}`
)
})

Expand Down
90 changes: 88 additions & 2 deletions packages/neo4j-driver-deno/lib/bolt-connection/bolt/handshake.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading