diff --git a/.github/workflows/nodejs.yml b/.github/workflows/nodejs.yml index efac2bdff..07231e456 100644 --- a/.github/workflows/nodejs.yml +++ b/.github/workflows/nodejs.yml @@ -9,7 +9,7 @@ jobs: strategy: matrix: - node-version: [10.x, 12.x, 14.x] + node-version: [10.x, 12.x, 14.x, 15.x] os: [ubuntu-latest, windows-latest, macOS-latest] steps: diff --git a/lib/Connection.js b/lib/Connection.js index 6bf38c495..a9084ed77 100644 --- a/lib/Connection.js +++ b/lib/Connection.js @@ -25,7 +25,6 @@ const hpagent = require('hpagent') const http = require('http') const https = require('https') const debug = require('debug')('elasticsearch') -const decompressResponse = require('decompress-response') const pump = require('pump') const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/ const { @@ -83,7 +82,6 @@ class Connection { request (params, callback) { this._openRequests++ - var ended = false const requestParams = this.buildRequestObject(params) // https://github.com/nodejs/node/commit/b961d9fd83 @@ -96,53 +94,38 @@ class Connection { debug('Starting a new request', params) const request = this.makeRequest(requestParams) - // listen for the response event - // TODO: handle redirects? - request.on('response', response => { - /* istanbul ignore else */ - if (ended === false) { - ended = true - this._openRequests-- + const onResponse = response => { + cleanListeners() + this._openRequests-- + callback(null, response) + } - if (params.asStream === true) { - callback(null, response) - } else { - callback(null, decompressResponse(response)) - } - } - }) - - // handles request timeout - request.on('timeout', () => { - /* istanbul ignore else */ - if (ended === false) { - ended = true - this._openRequests-- - request.abort() - callback(new TimeoutError('Request timed out', params), null) - } - }) - - // handles request error - request.on('error', err => { - /* istanbul ignore else */ - if (ended === false) { - ended = true - this._openRequests-- - callback(new ConnectionError(err.message), null) - } - }) + const onTimeout = () => { + cleanListeners() + this._openRequests-- + request.once('error', () => {}) // we need to catch the request aborted error + request.abort() + callback(new TimeoutError('Request timed out', params), null) + } - // updates the ended state - request.on('abort', () => { + const onError = err => { + cleanListeners() + this._openRequests-- + callback(new ConnectionError(err.message), null) + } + + const onAbort = () => { + cleanListeners() + request.once('error', () => {}) // we need to catch the request aborted error debug('Request aborted', params) - /* istanbul ignore else */ - if (ended === false) { - ended = true - this._openRequests-- - callback(new RequestAbortedError(), null) - } - }) + this._openRequests-- + callback(new RequestAbortedError(), null) + } + + request.on('response', onResponse) + request.on('timeout', onTimeout) + request.on('error', onError) + request.on('abort', onAbort) // Disables the Nagle algorithm request.setNoDelay(true) @@ -151,8 +134,8 @@ class Connection { if (isStream(params.body) === true) { pump(params.body, request, err => { /* istanbul ignore if */ - if (err != null && /* istanbul ignore next */ ended === false) { - ended = true + if (err != null) { + cleanListeners() this._openRequests-- callback(err, null) } @@ -162,6 +145,13 @@ class Connection { } return request + + function cleanListeners () { + request.removeListener('response', onResponse) + request.removeListener('timeout', onTimeout) + request.removeListener('error', onError) + request.removeListener('abort', onAbort) + } } // TODO: write a better closing logic diff --git a/lib/Transport.js b/lib/Transport.js index 791f1018c..b5bdcba53 100644 --- a/lib/Transport.js +++ b/lib/Transport.js @@ -21,7 +21,7 @@ const debug = require('debug')('elasticsearch') const os = require('os') -const { gzip, createGzip } = require('zlib') +const { gzip, unzip, createGzip } = require('zlib') const ms = require('ms') const { ConnectionError, @@ -174,37 +174,40 @@ class Transport { request = meta.connection.request(params, onResponse) } - const onResponse = (err, response) => { - if (err !== null) { - if (err.name !== 'RequestAbortedError') { - // if there is an error in the connection - // let's mark the connection as dead - this.connectionPool.markDead(meta.connection) - - if (this.sniffOnConnectionFault === true) { - this.sniff({ - reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT, - requestId: meta.request.id - }) - } + const onConnectionError = (err) => { + if (err.name !== 'RequestAbortedError') { + // if there is an error in the connection + // let's mark the connection as dead + this.connectionPool.markDead(meta.connection) + + if (this.sniffOnConnectionFault === true) { + this.sniff({ + reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT, + requestId: meta.request.id + }) + } - // retry logic - if (meta.attempts < maxRetries) { - meta.attempts++ - debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) - makeRequest() - return - } + // retry logic + if (meta.attempts < maxRetries) { + meta.attempts++ + debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) + makeRequest() + return } + } - err.meta = result - this.emit('response', err, result) - return callback(err, result) + err.meta = result + this.emit('response', err, result) + return callback(err, result) + } + + const onResponse = (err, response) => { + if (err !== null) { + return onConnectionError(err) } - const { statusCode, headers } = response - result.statusCode = statusCode - result.headers = headers + result.statusCode = response.statusCode + result.headers = response.headers if (options.asStream === true) { result.body = response @@ -213,74 +216,109 @@ class Transport { return } - var payload = '' - // collect the payload - response.setEncoding('utf8') - response.on('data', chunk => { payload += chunk }) - /* istanbul ignore next */ - response.on('error', err => { - const error = new ConnectionError(err.message, result) - this.emit('response', error, result) - callback(error, result) - }) - response.on('end', () => { - const isHead = params.method === 'HEAD' - // we should attempt the payload deserialization only if: - // - a `content-type` is defined and is equal to `application/json` - // - the request is not a HEAD request - // - the payload is not an empty string - if (headers['content-type'] !== undefined && - headers['content-type'].indexOf('application/json') > -1 && - isHead === false && - payload !== '' - ) { - try { - result.body = this.serializer.deserialize(payload) - } catch (err) { - this.emit('response', err, result) - return callback(err, result) - } - } else { - // cast to boolean if the request method was HEAD - result.body = isHead === true ? true : payload + const contentEncoding = (result.headers['content-encoding'] || '').toLowerCase() + const isCompressed = contentEncoding.indexOf('gzip') > -1 || contentEncoding.indexOf('deflate') > -1 + // if the response is compressed, we must handle it + // as buffer for allowing decompression later + let payload = isCompressed ? [] : '' + const onData = isCompressed + ? chunk => { payload.push(chunk) } + : chunk => { payload += chunk } + const onEnd = err => { + response.removeListener('data', onData) + response.removeListener('end', onEnd) + response.removeListener('error', onEnd) + response.removeListener('aborted', onAbort) + + if (err) { + return onConnectionError(new ConnectionError(err.message)) } - // we should ignore the statusCode if the user has configured the `ignore` field with - // the statusCode we just got or if the request method is HEAD and the statusCode is 404 - const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.indexOf(statusCode) > -1) || - (isHead === true && statusCode === 404) - - if (ignoreStatusCode === false && - (statusCode === 502 || statusCode === 503 || statusCode === 504)) { - // if the statusCode is 502/3/4 we should run our retry strategy - // and mark the connection as dead - this.connectionPool.markDead(meta.connection) - // retry logic (we shoukd not retry on "429 - Too Many Requests") - if (meta.attempts < maxRetries && statusCode !== 429) { - meta.attempts++ - debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) - makeRequest() - return - } + if (isCompressed) { + unzip(Buffer.concat(payload), onBody) } else { - // everything has worked as expected, let's mark - // the connection as alive (or confirm it) - this.connectionPool.markAlive(meta.connection) + onBody(null, payload) } + } - if (ignoreStatusCode === false && statusCode >= 400) { - const error = new ResponseError(result) - this.emit('response', error, result) - callback(error, result) - } else { - // cast to boolean if the request method was HEAD - if (isHead === true && statusCode === 404) { - result.body = false - } - this.emit('response', null, result) - callback(null, result) + const onAbort = () => { + response.destroy() + onEnd(new Error('Response aborted while reading the body')) + } + + if (!isCompressed) { + response.setEncoding('utf8') + } + response.on('data', onData) + response.on('error', onEnd) + response.on('end', onEnd) + response.on('aborted', onAbort) + } + + const onBody = (err, payload) => { + if (err) { + this.emit('response', err, result) + return callback(err, result) + } + if (Buffer.isBuffer(payload)) { + payload = payload.toString() + } + const isHead = params.method === 'HEAD' + // we should attempt the payload deserialization only if: + // - a `content-type` is defined and is equal to `application/json` + // - the request is not a HEAD request + // - the payload is not an empty string + if (result.headers['content-type'] !== undefined && + result.headers['content-type'].indexOf('application/json') > -1 && + isHead === false && + payload !== '' + ) { + try { + result.body = this.serializer.deserialize(payload) + } catch (err) { + this.emit('response', err, result) + return callback(err, result) } - }) + } else { + // cast to boolean if the request method was HEAD + result.body = isHead === true ? true : payload + } + + // we should ignore the statusCode if the user has configured the `ignore` field with + // the statusCode we just got or if the request method is HEAD and the statusCode is 404 + const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.indexOf(result.statusCode) > -1) || + (isHead === true && result.statusCode === 404) + + if (ignoreStatusCode === false && + (result.statusCode === 502 || result.statusCode === 503 || result.statusCode === 504)) { + // if the statusCode is 502/3/4 we should run our retry strategy + // and mark the connection as dead + this.connectionPool.markDead(meta.connection) + // retry logic (we shoukd not retry on "429 - Too Many Requests") + if (meta.attempts < maxRetries && result.statusCode !== 429) { + meta.attempts++ + debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) + makeRequest() + return + } + } else { + // everything has worked as expected, let's mark + // the connection as alive (or confirm it) + this.connectionPool.markAlive(meta.connection) + } + + if (ignoreStatusCode === false && result.statusCode >= 400) { + const error = new ResponseError(result) + this.emit('response', error, result) + callback(error, result) + } else { + // cast to boolean if the request method was HEAD + if (isHead === true && result.statusCode === 404) { + result.body = false + } + this.emit('response', null, result) + callback(null, result) + } } const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers)) diff --git a/package.json b/package.json index 2bd4da5a0..98e775348 100644 --- a/package.json +++ b/package.json @@ -75,7 +75,6 @@ }, "dependencies": { "debug": "^4.1.1", - "decompress-response": "^4.2.0", "hpagent": "^0.1.1", "ms": "^2.1.1", "pump": "^3.0.0", diff --git a/test/unit/client.test.js b/test/unit/client.test.js index 4991b1ef9..43c411c9d 100644 --- a/test/unit/client.test.js +++ b/test/unit/client.test.js @@ -21,7 +21,7 @@ const { test } = require('tap') const { URL } = require('url') -const { Client, ConnectionPool, Transport } = require('../../index') +const { Client, ConnectionPool, Transport, errors } = require('../../index') const { CloudConnectionPool } = require('../../lib/pool') const { buildServer } = require('../utils') @@ -1191,3 +1191,55 @@ test('name property as symbol', t => { t.strictEqual(client.name, symbol) }) + +// The nodejs http agent will try to wait for the whole +// body to arrive before closing the request, so this +// test might take some time. +test('Bad content length', t => { + t.plan(3) + + let count = 0 + function handler (req, res) { + count += 1 + const body = JSON.stringify({ hello: 'world' }) + res.setHeader('Content-Type', 'application/json;utf=8') + res.setHeader('Content-Length', body.length + '') + res.end(body.slice(0, -5)) + } + + buildServer(handler, ({ port }, server) => { + const client = new Client({ node: `http://localhost:${port}`, maxRetries: 1 }) + client.info((err, { body }) => { + t.ok(err instanceof errors.ConnectionError) + t.is(err.message, 'Response aborted while reading the body') + t.strictEqual(count, 2) + server.stop() + }) + }) +}) + +test('Socket destryed while reading the body', t => { + t.plan(3) + + let count = 0 + function handler (req, res) { + count += 1 + const body = JSON.stringify({ hello: 'world' }) + res.setHeader('Content-Type', 'application/json;utf=8') + res.setHeader('Content-Length', body.length + '') + res.write(body.slice(0, -5)) + setTimeout(() => { + res.socket.destroy() + }, 500) + } + + buildServer(handler, ({ port }, server) => { + const client = new Client({ node: `http://localhost:${port}`, maxRetries: 1 }) + client.info((err, { body }) => { + t.ok(err instanceof errors.ConnectionError) + t.is(err.message, 'Response aborted while reading the body') + t.strictEqual(count, 2) + server.stop() + }) + }) +}) diff --git a/test/unit/connection.test.js b/test/unit/connection.test.js index c83147c8d..95c5e5faf 100644 --- a/test/unit/connection.test.js +++ b/test/unit/connection.test.js @@ -21,7 +21,6 @@ const { test } = require('tap') const { inspect } = require('util') -const { createGzip, createDeflate } = require('zlib') const { URL } = require('url') const { Agent } = require('http') const hpagent = require('hpagent') @@ -400,90 +399,6 @@ test('Send body as stream', t => { }) }) -test('Should handle compression', t => { - t.test('gzip', t => { - t.plan(3) - - function handler (req, res) { - res.writeHead(200, { - 'Content-Type': 'application/json;utf=8', - 'Content-Encoding': 'gzip' - }) - intoStream(JSON.stringify({ hello: 'world' })) - .pipe(createGzip()) - .pipe(res) - } - - buildServer(handler, ({ port }, server) => { - const connection = new Connection({ - url: new URL(`http://localhost:${port}`) - }) - connection.request({ - path: '/hello', - method: 'GET' - }, (err, res) => { - t.error(err) - - t.match(res.headers, { - 'content-type': 'application/json;utf=8', - 'content-encoding': 'gzip' - }) - - var payload = '' - res.setEncoding('utf8') - res.on('data', chunk => { payload += chunk }) - res.on('error', err => t.fail(err)) - res.on('end', () => { - t.deepEqual(JSON.parse(payload), { hello: 'world' }) - server.stop() - }) - }) - }) - }) - - t.test('deflate', t => { - t.plan(3) - - function handler (req, res) { - res.writeHead(200, { - 'Content-Type': 'application/json;utf=8', - 'Content-Encoding': 'deflate' - }) - intoStream(JSON.stringify({ hello: 'world' })) - .pipe(createDeflate()) - .pipe(res) - } - - buildServer(handler, ({ port }, server) => { - const connection = new Connection({ - url: new URL(`http://localhost:${port}`) - }) - connection.request({ - path: '/hello', - method: 'GET' - }, (err, res) => { - t.error(err) - - t.match(res.headers, { - 'content-type': 'application/json;utf=8', - 'content-encoding': 'deflate' - }) - - var payload = '' - res.setEncoding('utf8') - res.on('data', chunk => { payload += chunk }) - res.on('error', err => t.fail(err)) - res.on('end', () => { - t.deepEqual(JSON.parse(payload), { hello: 'world' }) - server.stop() - }) - }) - }) - }) - - t.end() -}) - test('Should not close a connection if there are open requests', t => { t.plan(4) diff --git a/test/unit/transport.test.js b/test/unit/transport.test.js index a46faf572..1026ba0a5 100644 --- a/test/unit/transport.test.js +++ b/test/unit/transport.test.js @@ -22,7 +22,7 @@ const { test } = require('tap') const { URL } = require('url') const FakeTimers = require('@sinonjs/fake-timers') -const { createGunzip } = require('zlib') +const { createGunzip, gzipSync } = require('zlib') const os = require('os') const intoStream = require('into-stream') const { @@ -1665,13 +1665,17 @@ test('Should cast to boolean HEAD request', t => { }) test('Suggest compression', t => { - t.plan(2) + t.plan(3) function handler (req, res) { t.match(req.headers, { 'accept-encoding': 'gzip,deflate' }) + + const body = gzipSync(JSON.stringify({ hello: 'world' })) res.setHeader('Content-Type', 'application/json;utf=8') - res.end(JSON.stringify({ hello: 'world' })) + res.setHeader('Content-Encoding', 'gzip') + res.setHeader('Content-Length', Buffer.byteLength(body)) + res.end(body) } buildServer(handler, ({ port }, server) => { @@ -1694,6 +1698,46 @@ test('Suggest compression', t => { path: '/hello' }, (err, { body }) => { t.error(err) + t.deepEqual(body, { hello: 'world' }) + server.stop() + }) + }) +}) + +test('Broken compression', t => { + t.plan(2) + function handler (req, res) { + t.match(req.headers, { + 'accept-encoding': 'gzip,deflate' + }) + + const body = gzipSync(JSON.stringify({ hello: 'world' })) + res.setHeader('Content-Type', 'application/json;utf=8') + res.setHeader('Content-Encoding', 'gzip') + // we are not setting the content length on purpose + res.end(body.slice(0, -5)) + } + + buildServer(handler, ({ port }, server) => { + const pool = new ConnectionPool({ Connection }) + pool.addConnection(`http://localhost:${port}`) + + const transport = new Transport({ + emit: () => {}, + connectionPool: pool, + serializer: new Serializer(), + maxRetries: 3, + requestTimeout: 30000, + sniffInterval: false, + sniffOnStart: false, + suggestCompression: true + }) + + transport.request({ + method: 'GET', + path: '/hello' + }, (err, { body }) => { + t.ok(err) server.stop() }) }) diff --git a/test/utils/MockConnection.js b/test/utils/MockConnection.js index 5b3aac272..6a031f6e4 100644 --- a/test/utils/MockConnection.js +++ b/test/utils/MockConnection.js @@ -109,7 +109,7 @@ class MockConnectionSniff extends Connection { 'content-type': 'application/json;utf=8', date: new Date().toISOString(), connection: 'keep-alive', - 'content-length': '205' + 'content-length': '191' } process.nextTick(() => { if (!aborted) {