diff --git a/src/commands/registration/retr.js b/src/commands/registration/retr.js index 766e984b..f1b361fa 100644 --- a/src/commands/registration/retr.js +++ b/src/commands/registration/retr.js @@ -10,20 +10,26 @@ module.exports = { .tap(() => this.commandSocket.pause()) .then(() => when.try(this.fs.read.bind(this.fs), command.arg, {start: this.restByteCount})) .then(stream => { - this.restByteCount = 0; + const destroyConnection = (connection, reject) => err => connection && connection.destroy(err) && reject(err); const eventsPromise = when.promise((resolve, reject) => { - this.connector.socket.once('error', err => reject(err)); - - stream.on('data', data => this.connector.socket - && this.connector.socket.write(data, this.transferType)); - stream.once('error', err => reject(err)); + stream.on('data', data => { + if (stream) stream.pause(); + if (this.connector.socket) { + this.connector.socket.write(data, this.transferType, () => stream && stream.resume()); + } + }); stream.once('end', () => resolve()); + stream.once('error', destroyConnection(this.connector.socket, reject)); + + this.connector.socket.once('error', destroyConnection(stream, reject)); }); - return this.reply(150).then(() => this.connector.socket.resume()) + this.restByteCount = 0; + + return this.reply(150).then(() => stream.resume() && this.connector.socket.resume()) .then(() => eventsPromise) - .finally(() => stream.destroy ? stream.destroy() : null); + .finally(() => stream.destroy && stream.destroy()); }) .then(() => this.reply(226)) .catch(when.TimeoutError, err => {