Skip to content

Handle connectivity issues while reading the body #1343

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 12 commits into from
Nov 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/nodejs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:

strategy:
matrix:
node-version: [10.x, 12.x, 14.x]
node-version: [10.x, 12.x, 14.x, 15.x]
os: [ubuntu-latest, windows-latest, macOS-latest]

steps:
Expand Down
86 changes: 38 additions & 48 deletions lib/Connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ const hpagent = require('hpagent')
const http = require('http')
const https = require('https')
const debug = require('debug')('elasticsearch')
const decompressResponse = require('decompress-response')
const pump = require('pump')
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/
const {
Expand Down Expand Up @@ -83,7 +82,6 @@ class Connection {

request (params, callback) {
this._openRequests++
var ended = false

const requestParams = this.buildRequestObject(params)
// https://github.com/nodejs/node/commit/b961d9fd83
Expand All @@ -96,53 +94,38 @@ class Connection {
debug('Starting a new request', params)
const request = this.makeRequest(requestParams)

// listen for the response event
// TODO: handle redirects?
request.on('response', response => {
/* istanbul ignore else */
if (ended === false) {
ended = true
this._openRequests--
const onResponse = response => {
cleanListeners()
this._openRequests--
callback(null, response)
}

if (params.asStream === true) {
callback(null, response)
} else {
callback(null, decompressResponse(response))
}
}
})

// handles request timeout
request.on('timeout', () => {
/* istanbul ignore else */
if (ended === false) {
ended = true
this._openRequests--
request.abort()
callback(new TimeoutError('Request timed out', params), null)
}
})

// handles request error
request.on('error', err => {
/* istanbul ignore else */
if (ended === false) {
ended = true
this._openRequests--
callback(new ConnectionError(err.message), null)
}
})
const onTimeout = () => {
cleanListeners()
this._openRequests--
request.once('error', () => {}) // we need to catch the request aborted error
request.abort()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use request.destroy() it might solve the v14 problem and it will also not emit an error event so you will no longer need the empty error handler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! I'm looking into this, as request.abort has been deprecated in newer versions of node.
Very likely I'll address this in a separate pr while working on #1297.

callback(new TimeoutError('Request timed out', params), null)
}

// updates the ended state
request.on('abort', () => {
const onError = err => {
cleanListeners()
this._openRequests--
callback(new ConnectionError(err.message), null)
}

const onAbort = () => {
cleanListeners()
request.once('error', () => {}) // we need to catch the request aborted error
debug('Request aborted', params)
/* istanbul ignore else */
if (ended === false) {
ended = true
this._openRequests--
callback(new RequestAbortedError(), null)
}
})
this._openRequests--
callback(new RequestAbortedError(), null)
}

request.on('response', onResponse)
request.on('timeout', onTimeout)
request.on('error', onError)
request.on('abort', onAbort)

// Disables the Nagle algorithm
request.setNoDelay(true)
Expand All @@ -151,8 +134,8 @@ class Connection {
if (isStream(params.body) === true) {
pump(params.body, request, err => {
/* istanbul ignore if */
if (err != null && /* istanbul ignore next */ ended === false) {
ended = true
if (err != null) {
cleanListeners()
this._openRequests--
callback(err, null)
}
Expand All @@ -162,6 +145,13 @@ class Connection {
}

return request

function cleanListeners () {
request.removeListener('response', onResponse)
request.removeListener('timeout', onTimeout)
request.removeListener('error', onError)
request.removeListener('abort', onAbort)
}
}

// TODO: write a better closing logic
Expand Down
216 changes: 127 additions & 89 deletions lib/Transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

const debug = require('debug')('elasticsearch')
const os = require('os')
const { gzip, createGzip } = require('zlib')
const { gzip, unzip, createGzip } = require('zlib')
const ms = require('ms')
const {
ConnectionError,
Expand Down Expand Up @@ -174,37 +174,40 @@ class Transport {
request = meta.connection.request(params, onResponse)
}

const onResponse = (err, response) => {
if (err !== null) {
if (err.name !== 'RequestAbortedError') {
// if there is an error in the connection
// let's mark the connection as dead
this.connectionPool.markDead(meta.connection)

if (this.sniffOnConnectionFault === true) {
this.sniff({
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
requestId: meta.request.id
})
}
const onConnectionError = (err) => {
if (err.name !== 'RequestAbortedError') {
// if there is an error in the connection
// let's mark the connection as dead
this.connectionPool.markDead(meta.connection)

if (this.sniffOnConnectionFault === true) {
this.sniff({
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
requestId: meta.request.id
})
}

// retry logic
if (meta.attempts < maxRetries) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
makeRequest()
return
}
// retry logic
if (meta.attempts < maxRetries) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
makeRequest()
return
}
}

err.meta = result
this.emit('response', err, result)
return callback(err, result)
err.meta = result
this.emit('response', err, result)
return callback(err, result)
}

const onResponse = (err, response) => {
if (err !== null) {
return onConnectionError(err)
}

const { statusCode, headers } = response
result.statusCode = statusCode
result.headers = headers
result.statusCode = response.statusCode
result.headers = response.headers

if (options.asStream === true) {
result.body = response
Expand All @@ -213,74 +216,109 @@ class Transport {
return
}

var payload = ''
// collect the payload
response.setEncoding('utf8')
response.on('data', chunk => { payload += chunk })
/* istanbul ignore next */
response.on('error', err => {
const error = new ConnectionError(err.message, result)
this.emit('response', error, result)
callback(error, result)
})
response.on('end', () => {
const isHead = params.method === 'HEAD'
// we should attempt the payload deserialization only if:
// - a `content-type` is defined and is equal to `application/json`
// - the request is not a HEAD request
// - the payload is not an empty string
if (headers['content-type'] !== undefined &&
headers['content-type'].indexOf('application/json') > -1 &&
isHead === false &&
payload !== ''
) {
try {
result.body = this.serializer.deserialize(payload)
} catch (err) {
this.emit('response', err, result)
return callback(err, result)
}
} else {
// cast to boolean if the request method was HEAD
result.body = isHead === true ? true : payload
const contentEncoding = (result.headers['content-encoding'] || '').toLowerCase()
const isCompressed = contentEncoding.indexOf('gzip') > -1 || contentEncoding.indexOf('deflate') > -1
// if the response is compressed, we must handle it
// as buffer for allowing decompression later
let payload = isCompressed ? [] : ''
const onData = isCompressed
? chunk => { payload.push(chunk) }
: chunk => { payload += chunk }
const onEnd = err => {
response.removeListener('data', onData)
response.removeListener('end', onEnd)
response.removeListener('error', onEnd)
response.removeListener('aborted', onAbort)

if (err) {
return onConnectionError(new ConnectionError(err.message))
}

// we should ignore the statusCode if the user has configured the `ignore` field with
// the statusCode we just got or if the request method is HEAD and the statusCode is 404
const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.indexOf(statusCode) > -1) ||
(isHead === true && statusCode === 404)

if (ignoreStatusCode === false &&
(statusCode === 502 || statusCode === 503 || statusCode === 504)) {
// if the statusCode is 502/3/4 we should run our retry strategy
// and mark the connection as dead
this.connectionPool.markDead(meta.connection)
// retry logic (we shoukd not retry on "429 - Too Many Requests")
if (meta.attempts < maxRetries && statusCode !== 429) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
makeRequest()
return
}
if (isCompressed) {
unzip(Buffer.concat(payload), onBody)
} else {
// everything has worked as expected, let's mark
// the connection as alive (or confirm it)
this.connectionPool.markAlive(meta.connection)
onBody(null, payload)
}
}

if (ignoreStatusCode === false && statusCode >= 400) {
const error = new ResponseError(result)
this.emit('response', error, result)
callback(error, result)
} else {
// cast to boolean if the request method was HEAD
if (isHead === true && statusCode === 404) {
result.body = false
}
this.emit('response', null, result)
callback(null, result)
const onAbort = () => {
response.destroy()
onEnd(new Error('Response aborted while reading the body'))
}

if (!isCompressed) {
response.setEncoding('utf8')
}
response.on('data', onData)
response.on('error', onEnd)
response.on('end', onEnd)
response.on('aborted', onAbort)
}

const onBody = (err, payload) => {
if (err) {
this.emit('response', err, result)
return callback(err, result)
}
if (Buffer.isBuffer(payload)) {
payload = payload.toString()
}
const isHead = params.method === 'HEAD'
// we should attempt the payload deserialization only if:
// - a `content-type` is defined and is equal to `application/json`
// - the request is not a HEAD request
// - the payload is not an empty string
if (result.headers['content-type'] !== undefined &&
result.headers['content-type'].indexOf('application/json') > -1 &&
isHead === false &&
payload !== ''
) {
try {
result.body = this.serializer.deserialize(payload)
} catch (err) {
this.emit('response', err, result)
return callback(err, result)
}
})
} else {
// cast to boolean if the request method was HEAD
result.body = isHead === true ? true : payload
}

// we should ignore the statusCode if the user has configured the `ignore` field with
// the statusCode we just got or if the request method is HEAD and the statusCode is 404
const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.indexOf(result.statusCode) > -1) ||
(isHead === true && result.statusCode === 404)

if (ignoreStatusCode === false &&
(result.statusCode === 502 || result.statusCode === 503 || result.statusCode === 504)) {
// if the statusCode is 502/3/4 we should run our retry strategy
// and mark the connection as dead
this.connectionPool.markDead(meta.connection)
// retry logic (we shoukd not retry on "429 - Too Many Requests")
if (meta.attempts < maxRetries && result.statusCode !== 429) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
makeRequest()
return
}
} else {
// everything has worked as expected, let's mark
// the connection as alive (or confirm it)
this.connectionPool.markAlive(meta.connection)
}

if (ignoreStatusCode === false && result.statusCode >= 400) {
const error = new ResponseError(result)
this.emit('response', error, result)
callback(error, result)
} else {
// cast to boolean if the request method was HEAD
if (isHead === true && result.statusCode === 404) {
result.body = false
}
this.emit('response', null, result)
callback(null, result)
}
}

const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers))
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
},
"dependencies": {
"debug": "^4.1.1",
"decompress-response": "^4.2.0",
"hpagent": "^0.1.1",
"ms": "^2.1.1",
"pump": "^3.0.0",
Expand Down
Loading