Skip to content

Commit

Permalink
Update dependencies (SOHU-Co#1204)
Browse files Browse the repository at this point in the history
* Remove unused code

* Update dependencies and formatting
  • Loading branch information
hyperlink authored and KeeReal committed Apr 11, 2019
1 parent 3c25d54 commit 7174b52
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 104 deletions.
45 changes: 0 additions & 45 deletions lib/baseClient.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict';

var assert = require('assert');
var util = require('util');
var _ = require('lodash');
var async = require('async');
Expand All @@ -11,7 +10,6 @@ var getCodec = require('./codec');
var protocol = require('./protocol');
var encodeMessageSet = protocol.encodeMessageSet;
var Message = protocol.Message;
var url = require('url');
var logger = require('./logging')('kafka-node:BaseClient');
var validateKafkaTopics = require('./utils').validateTopicNames;

Expand All @@ -27,49 +25,6 @@ function Client () {

util.inherits(Client, EventEmitter);

Client.prototype.setupBrokerProfiles = function (brokers) {
this.brokerProfiles = Object.create(null);
var self = this;
var protocol = self.ssl ? 'SSL' : 'PLAINTEXT';

Object.keys(brokers).forEach(function (key) {
var brokerProfile = brokers[key];
var addr;

if (brokerProfile.endpoints && brokerProfile.endpoints.length) {
var endpoint = _.find(brokerProfile.endpoints, function (endpoint) {
var securityProtocolMap = brokerProfile.listener_security_protocol_map;
var listenerName = url
.parse(endpoint)
.protocol.replace(':', '')
.toUpperCase();
if (securityProtocolMap !== undefined) {
return securityProtocolMap[listenerName] === protocol;
} else {
return listenerName === protocol;
}
});

if (endpoint == null) {
throw new Error(['No kafka endpoint found for broker: ', key, ' with protocol ', protocol].join(''));
}

var endpointUrl = url.parse(endpoint);

addr = endpointUrl.hostname + ':' + endpointUrl.port;

brokerProfile.host = endpointUrl.hostname;
brokerProfile.port = endpointUrl.port;
} else {
addr = brokerProfile.host + ':' + brokerProfile.port;
}
assert(brokerProfile.host && brokerProfile.port, 'kafka host or port is empty');

self.brokerProfiles[addr] = brokerProfile;
self.brokerProfiles[addr].id = key;
});
};

Client.prototype.closeBrokers = function (brokers) {
_.each(brokers, function (broker) {
broker.socket.closing = true;
Expand Down
17 changes: 11 additions & 6 deletions lib/commitStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ var DEFAULTS = {
class CommitStream extends Transform {
constructor (client, topics, groupId, options) {
options = options || {};
let parentOptions = _.defaults({highWaterMark: options.highWaterMark}, {objectMode: true});
let parentOptions = _.defaults({ highWaterMark: options.highWaterMark }, { objectMode: true });
super(parentOptions);

this.options = _.defaults((options || {}), DEFAULTS);
this.options = _.defaults(options || {}, DEFAULTS);
this.client = client;
this.topicPartionOffsets = this.buildTopicData(_.cloneDeep(topics));

Expand All @@ -36,9 +36,12 @@ class CommitStream extends Transform {
this.autoCommitIntervalTimer = null;

if (this.autoCommit && this.autoCommitIntervalMs) {
this.autoCommitIntervalTimer = setInterval(function () {
this.commit();
}.bind(this), this.autoCommitIntervalMs);
this.autoCommitIntervalTimer = setInterval(
function () {
this.commit();
}.bind(this),
this.autoCommitIntervalMs
);
}

this.messageCount = 0;
Expand Down Expand Up @@ -132,7 +135,9 @@ class CommitStream extends Transform {

let topicPartionOffsets = self.topicPartionOffsets;

let commits = topicPartionOffsets.filter(function (partition) { return partition.offset !== 0; });
let commits = topicPartionOffsets.filter(function (partition) {
return partition.offset !== 0;
});

if (commits.length) {
self.committing = true;
Expand Down
22 changes: 11 additions & 11 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
"types": "types/index.d.ts",
"license": "MIT",
"dependencies": {
"async": "^2.5.0",
"async": "^2.6.2",
"binary": "~0.3.0",
"bl": "^1.2.0",
"bl": "^2.2.0",
"buffer-crc32": "~0.2.5",
"buffermaker": "~1.2.0",
"debug": "^2.1.3",
Expand All @@ -40,17 +40,17 @@
"snappy": "^6.0.1"
},
"devDependencies": {
"@types/node": "^10.5.2",
"@types/node": "^10.12.27",
"coveralls": "^2.11.12",
"doctoc": "^1.2.0",
"eslint": "^3.17.1",
"eslint-config-semistandard": "^11.0.0",
"eslint-config-standard": "^10.2.1",
"eslint": "^5.14.1",
"eslint-config-semistandard": "^13.0.0",
"eslint-config-standard": "^12.0.0",
"eslint-plugin-dependencies": "^2.2.0",
"eslint-plugin-import": "^2.2.0",
"eslint-plugin-node": "^4.2.2",
"eslint-plugin-promise": "^3.4.0",
"eslint-plugin-standard": "^3.0.1",
"eslint-plugin-import": "^2.16.0",
"eslint-plugin-node": "^8.0.1",
"eslint-plugin-promise": "^4.0.1",
"eslint-plugin-standard": "^4.0.0",
"execa": "^0.6.1",
"istanbul": "^0.4.4",
"mocha": "^3.1.0",
Expand All @@ -59,7 +59,7 @@
"should": "^6.0.0",
"sinon": "^2.0.0",
"through2": "^2.0.3",
"tslint": "^5.10.0",
"tslint": "^5.13.0",
"tslint-config-semistandard": "^7.0.0",
"typescript": "^2.8.3"
},
Expand Down
62 changes: 20 additions & 42 deletions test/assignment/test.range.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,20 @@ const should = require('should');

describe('Range Assignment', function () {
const topicPartition = {
'RebalanceTopic': [
'0',
'1',
'2'
],
'RebalanceTest': [
'0',
'1',
'2'
]
RebalanceTopic: ['0', '1', '2'],
RebalanceTest: ['0', '1', '2']
};

const groupMembers = [
{
'subscription': [
'RebalanceTopic',
'RebalanceTest'
],
'version': 0,
'id': 'consumer1'
subscription: ['RebalanceTopic', 'RebalanceTest'],
version: 0,
id: 'consumer1'
},
{
'subscription': [
'RebalanceTopic',
'RebalanceTest'
],
'version': 0,
'id': 'consumer2'
subscription: ['RebalanceTopic', 'RebalanceTest'],
version: 0,
id: 'consumer2'
}
];

Expand Down Expand Up @@ -65,12 +51,9 @@ describe('Range Assignment', function () {
it('should partition two topics of three partitions between three consumers', function (done) {
const gm = groupMembers.slice(0);
gm.push({
'subscription': [
'RebalanceTopic',
'RebalanceTest'
],
'version': 0,
'id': 'consumer3'
subscription: ['RebalanceTopic', 'RebalanceTest'],
version: 0,
id: 'consumer3'
});

range.assign(topicPartition, gm, function (error, result) {
Expand Down Expand Up @@ -99,21 +82,16 @@ describe('Range Assignment', function () {

it('should partition two topics of three partitions between four consumers', function (done) {
const gm = groupMembers.slice(0);
gm.push({
'subscription': [
'RebalanceTopic',
'RebalanceTest'
],
'version': 0,
'id': 'consumer3'
},
gm.push(
{
subscription: ['RebalanceTopic', 'RebalanceTest'],
version: 0,
id: 'consumer3'
},
{
'subscription': [
'RebalanceTopic',
'RebalanceTest'
],
'version': 0,
'id': 'consumer4'
subscription: ['RebalanceTopic', 'RebalanceTest'],
version: 0,
id: 'consumer4'
}
);

Expand Down

0 comments on commit 7174b52

Please # to comment.