Skip to content

Connection Hint: supporting connection.recv_timeout_seconds #761

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bolt-connection/src/channel/browser/browser-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ export default class WebSocketChannel {
})
}

/**
* Setup the receive timeout for the channel.
*
* Not supported for the browser channel.
*
* @param {number} receiveTimeout The amount of time the channel will keep without receive any data before timeout (ms)
* @returns {void}
*/
setupReceiveTimeout (receiveTimeout) {}

/**
* Set connection timeout on the given WebSocket, if configured.
* @return {number} the timeout id or null.
Expand Down
43 changes: 37 additions & 6 deletions bolt-connection/src/channel/node/node-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ const TrustStrategy = {
* @param {function} onFailure - callback to execute on connection failure.
* @return {*} socket connection.
*/
function connect (config, onSuccess, onFailure = () => null) {
function _connect (config, onSuccess, onFailure = () => null) {
const trustStrategy = trustStrategyName(config)
if (!isEncrypted(config)) {
const socket = net.connect(
Expand Down Expand Up @@ -230,7 +230,7 @@ export default class NodeChannel {
* Create new instance
* @param {ChannelConfig} config - configuration for this channel.
*/
constructor (config) {
constructor (config, connect = _connect) {
const self = this

this.id = _CONNECTION_IDGEN++
Expand Down Expand Up @@ -305,12 +305,12 @@ export default class NodeChannel {
_setupConnectionTimeout (config, socket) {
const timeout = config.connectionTimeout
if (timeout) {
socket.on('connect', () => {
const connectListener = () => {
// connected - clear connection timeout
socket.setTimeout(0)
})
}

socket.on('timeout', () => {
const timeoutListener = () => {
// timeout fired - not connected within configured time. cancel timeout and destroy socket
socket.setTimeout(0)
socket.destroy(
Expand All @@ -319,12 +319,43 @@ export default class NodeChannel {
config.connectionErrorCode
)
)
})
}

socket.on('connect', connectListener)
socket.on('timeout', timeoutListener)

this._removeConnectionTimeoutListeners = () => {
this._conn.off('connect', connectListener)
this._conn.off('timeout', timeoutListener)
}

socket.setTimeout(timeout)
}
}

/**
* Setup the receive timeout for the channel.
*
* @param {number} receiveTimeout How long the channel will wait for receiving data before timing out (ms)
* @returns {void}
*/
setupReceiveTimeout (receiveTimeout) {
if (this._removeConnectionTimeoutListeners) {
this._removeConnectionTimeoutListeners()
}

this._conn.on('timeout', () => {
this._conn.destroy(
newError(
`Connection lost. Server didn't respond in ${receiveTimeout}ms`,
this._connectionErrorCode
)
)
})

this._conn.setTimeout(receiveTimeout)
}

/**
* Write the passed in buffer to connection
* @param {NodeBuffer} buffer - Buffer to write
Expand Down
24 changes: 23 additions & 1 deletion bolt-connection/src/connection/connection-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

import { Chunker, Dechunker, ChannelConfig, Channel } from '../channel'
import { newError, error, json, internal } from 'neo4j-driver-core'
import { newError, error, json, internal, toNumber } from 'neo4j-driver-core'
import Connection from './connection'
import Bolt from '../bolt'

Expand Down Expand Up @@ -198,6 +198,28 @@ export default class ChannelConnection extends Connection {
if (!this.databaseId) {
this.databaseId = dbConnectionId
}

if (metadata.hints) {
const receiveTimeoutRaw =
metadata.hints['connection.recv_timeout_seconds']
if (
receiveTimeoutRaw !== null &&
receiveTimeoutRaw !== undefined
) {
const receiveTimeoutInSeconds = toNumber(receiveTimeoutRaw)
if (
Number.isInteger(receiveTimeoutInSeconds) &&
receiveTimeoutInSeconds > 0
) {
this._ch.setupReceiveTimeout(receiveTimeoutInSeconds * 1000)
} else {
this._log.info(
`Server located at ${this._address} supplied an invalid connection receive timeout value (${receiveTimeoutInSeconds}). ` +
'Please, verify the server configuration and status because this can be the symptom of a bigger issue.'
)
}
}
}
}
resolve(self)
}
Expand Down
14 changes: 12 additions & 2 deletions bolt-connection/src/rediscovery/routing-table.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,21 @@ const MIN_ROUTERS = 1
* The routing table object used to determine the role of the servers in the driver.
*/
export default class RoutingTable {
constructor ({ database, routers, readers, writers, expirationTime } = {}) {
constructor ({
database,
routers,
readers,
writers,
expirationTime,
ttl
} = {}) {
this.database = database
this.databaseName = database || 'default database'
this.routers = routers || []
this.readers = readers || []
this.writers = writers || []
this.expirationTime = expirationTime || int(0)
this.ttl = ttl
}

/**
Expand Down Expand Up @@ -139,6 +147,7 @@ export function createValidRoutingTable (
routerAddress,
rawRoutingTable
) {
const ttl = rawRoutingTable.ttl
const expirationTime = calculateExpirationTime(rawRoutingTable, routerAddress)
const { routers, readers, writers } = parseServers(
rawRoutingTable,
Expand All @@ -153,7 +162,8 @@ export function createValidRoutingTable (
routers,
readers,
writers,
expirationTime
expirationTime,
ttl
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
* limitations under the License.
*/

import WebSocketChannel from '../../../bolt-connection/lib/channel/browser/browser-channel'
import ChannelConfig from '../../../bolt-connection/lib/channel/channel-config'
import WebSocketChannel from '../../../src/channel/browser/browser-channel'
import ChannelConfig from '../../../src/channel/channel-config'
import { error, internal } from 'neo4j-driver-core'
import { setTimeoutMock } from '../timers-util'
import { setTimeoutMock } from '../../timers-util'

const {
serverAddress: { ServerAddress },
Expand All @@ -35,7 +35,7 @@ const WS_CLOSING = 2
const WS_CLOSED = 3

/* eslint-disable no-global-assign */
describe('#unit WebSocketChannel', () => {
describe('WebSocketChannel', () => {
let webSocketChannel

afterEach(async () => {
Expand Down Expand Up @@ -173,7 +173,7 @@ describe('#unit WebSocketChannel', () => {
createWebSocketFactory(WS_CLOSED)
)

await expectAsync(channel.close()).toBeResolved()
await expect(channel.close()).resolves.not.toThrow()
})

it('should resolve close when websocket is closed', async () => {
Expand All @@ -186,7 +186,7 @@ describe('#unit WebSocketChannel', () => {
createWebSocketFactory(WS_OPEN)
)

await expectAsync(channel.close()).toBeResolved()
await expect(channel.close()).resolves.not.toThrow()
})

function testFallbackToLiteralIPv6 (boltAddress, expectedWsAddress) {
Expand Down Expand Up @@ -294,6 +294,39 @@ describe('#unit WebSocketChannel', () => {
}
})

describe('.setupReceiveTimeout()', () => {
beforeEach(() => {
const address = ServerAddress.fromUrl('http://localhost:8989')
const channelConfig = new ChannelConfig(
address,
{ connectionTimeout: 0 },
SERVICE_UNAVAILABLE
)
webSocketChannel = new WebSocketChannel(
channelConfig,
undefined,
createWebSocketFactory(WS_OPEN)
)
})

it('should exists', () => {
expect(webSocketChannel).toHaveProperty('setupReceiveTimeout')
expect(typeof webSocketChannel.setupReceiveTimeout).toBe('function')
})

it('should not setTimeout', () => {
const fakeSetTimeout = setTimeoutMock.install()
try {
webSocketChannel.setupReceiveTimeout()

expect(fakeSetTimeout._timeoutIdCounter).toEqual(0)
expect(webSocketChannel._connectionTimeoutId).toBe(null)
} finally {
fakeSetTimeout.uninstall()
}
})
})

function createWebSocketFactory (readyState) {
const ws = {}
ws.readyState = readyState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* limitations under the License.
*/

import BrowserHostNameResolver from '../../../bolt-connection/lib/channel/browser/browser-host-name-resolver'
import BrowserHostNameResolver from '../../../src/channel/browser/browser-host-name-resolver'

describe('#unit BrowserHostNameResolver', () => {
it('should resolve given address to itself', done => {
Expand Down
Loading