Skip to content

Commit

Permalink
fix: reduce async iterator loops per package in _createSink (libp2p#224)
Browse files Browse the repository at this point in the history
Javascript abstractions are not free. While using pipe here looks nice, it adds a non-neglible cost allocating Promises for each extra `for await ()` iteration.

- Similar rationale to libp2p#1420 (comment)

This PR merges sink pipe components in a single iteration

Co-authored-by: achingbrain <alex@achingbrain.net>
  • Loading branch information
dapplion and achingbrain authored Nov 25, 2022
1 parent 9d4dd87 commit e2a32ad
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 35 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@
"any-signal": "^3.0.0",
"benchmark": "^2.1.4",
"err-code": "^3.0.1",
"it-pipe": "^2.0.3",
"it-pushable": "^3.1.0",
"it-stream-types": "^1.0.4",
"rate-limiter-flexible": "^2.3.9",
Expand All @@ -172,6 +171,7 @@
"it-drain": "^2.0.0",
"it-foreach": "^1.0.0",
"it-map": "^2.0.0",
"it-pipe": "^2.0.3",
"it-to-buffer": "^3.0.0",
"p-defer": "^4.0.0",
"random-int": "^3.0.0",
Expand Down
20 changes: 1 addition & 19 deletions src/decode.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { MessageTypeNames, MessageTypes } from './message-types.js'
import { Uint8ArrayList } from 'uint8arraylist'
import type { Source } from 'it-stream-types'
import type { Message } from './message-types.js'

export const MAX_MSG_SIZE = 1 << 20 // 1MB
Expand All @@ -13,7 +12,7 @@ interface MessageHeader {
length: number
}

class Decoder {
export class Decoder {
private readonly _buffer: Uint8ArrayList
private _headerInfo: MessageHeader | null
private readonly _maxMessageSize: number
Expand Down Expand Up @@ -136,20 +135,3 @@ function readVarInt (buf: Uint8ArrayList, offset: number = 0) {
offset
}
}

/**
* Decode a chunk and yield an _array_ of decoded messages
*/
export function decode (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) {
return async function * decodeMessages (source: Source<Uint8Array>): Source<Message> {
const decoder = new Decoder(maxMessageSize, maxUnprocessedMessageQueueSize)

for await (const chunk of source) {
const msgs = decoder.write(chunk)

if (msgs.length > 0) {
yield * msgs
}
}
}
}
17 changes: 7 additions & 10 deletions src/mplex.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { pipe } from 'it-pipe'
import { pushableV } from 'it-pushable'
import { abortableSource } from 'abortable-iterator'
import { encode } from './encode.js'
import { decode } from './decode.js'
import { Decoder } from './decode.js'
import { MessageTypes, MessageTypeNames, Message } from './message-types.js'
import { createStream } from './stream.js'
import { toString as uint8ArrayToString } from 'uint8arrays'
Expand Down Expand Up @@ -201,15 +200,13 @@ export class MplexStreamMuxer implements StreamMuxer {
source = abortableSource(source, anySignal(abortSignals))

try {
await pipe(
source,
decode(this._init.maxMsgSize, this._init.maxUnprocessedMessageQueueSize),
async source => {
for await (const msg of source) {
await this._handleIncoming(msg)
}
const decoder = new Decoder(this._init.maxMsgSize, this._init.maxUnprocessedMessageQueueSize)

for await (const chunk of source) {
for (const msg of decoder.write(chunk)) {
await this._handleIncoming(msg)
}
)
}

this._source.end()
} catch (err: any) {
Expand Down
2 changes: 1 addition & 1 deletion test/coder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import { expect } from 'aegir/chai'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { encode } from '../src/encode.js'
import { decode } from '../src/decode.js'
import { decode } from './fixtures/decode.js'
import all from 'it-all'
import { concat as uint8ArrayConcat } from 'uint8arrays/concat'
import { messageWithBytes } from './fixtures/utils.js'
Expand Down
19 changes: 19 additions & 0 deletions test/fixtures/decode.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* eslint-env mocha */

import type { Message } from '../../src/message-types.js'
import { Decoder, MAX_MSG_QUEUE_SIZE, MAX_MSG_SIZE } from '../../src/decode.js'
import type { Source } from 'it-stream-types'

export function decode (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) {
return async function * decodeMessages (source: Source<Uint8Array>): Source<Message> {
const decoder = new Decoder(maxMessageSize, maxUnprocessedMessageQueueSize)

for await (const chunk of source) {
const msgs = decoder.write(chunk)

if (msgs.length > 0) {
yield * msgs
}
}
}
}
6 changes: 3 additions & 3 deletions test/mplex.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import all from 'it-all'
import type { Source } from 'it-stream-types'
import delay from 'delay'
import pDefer from 'p-defer'
import { decode } from '../src/decode.js'
import { decode } from './fixtures/decode.js'
import { pushable } from 'it-pushable'
import { Uint8ArrayList } from 'uint8arraylist'

Expand Down Expand Up @@ -135,8 +135,8 @@ describe('mplex', () => {
streamSourceError.reject(new Error('Stream source did not error'))
})
.catch(err => {
// should have errored before all messages were sent
expect(sent).to.equal(2)
// should have errored before all 102 messages were sent
expect(sent).to.be.lessThan(10)
streamSourceError.resolve(err)
})
}
Expand Down
2 changes: 1 addition & 1 deletion test/restrict-size.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import drain from 'it-drain'
import each from 'it-foreach'
import { Message, MessageTypes } from '../src/message-types.js'
import { encode } from '../src/encode.js'
import { decode } from '../src/decode.js'
import { decode } from './fixtures/decode.js'
import { Uint8ArrayList } from 'uint8arraylist'
import toBuffer from 'it-to-buffer'

Expand Down

0 comments on commit e2a32ad

Please # to comment.