-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
test: test ReadRows logic with local gRPC server #1282
Conversation
49cf82b
to
318aeff
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments about comments! Thanks for such a helpful PR description. Also double check there are some GHA lint warnings
test/streams.ts
Outdated
|
||
it('should create read stream and read synchronously', done => { | ||
const keyFrom = 0; | ||
const keyTo = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is 1000 a particularly special number that triggers particular behavior? If so consider a quick comment about why that value is used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the test that currently fails, 1000 is big enough to trigger the problem, and it does not happen with e.g. 10 or 100. I'll add comments.
test/streams.ts
Outdated
it('should be able to stop reading from the read stream', done => { | ||
const keyFrom = 0; | ||
const keyTo = 1000; | ||
const stopAfter = 42; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding a comment about why 42 is the chosen "stopAfter" value if there's anything of note about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's random (and also the answer to life the universe and everything). I'll note that!
test/utils/readRowsImpl.ts
Outdated
import {GoogleError, Status} from 'google-gax'; | ||
|
||
const valueSize = 1024 * 1024; | ||
const chunkSize = 1023 * 1024 - 1; // make it uneven |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
Clarification when you say "make it uneven" do you mean make it an odd number, make it offset from valueSize by one, or make it not a multiple of 1024?
-
Double checking that this should be 1023*1024 - I think it should, because the number of chunks would logically be smaller than the number of values but I am just wanted to double check 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just making it so that one row occupies multiple chunks (2 in this case), and that these chunks have different sizes.
if (chunkIdx === errorAfterChunkNo) { | ||
debugLog(`sending error after chunk #${chunkIdx}`); | ||
errorAfterChunkNo = undefined; // do not send error for the second time | ||
const error = new GoogleError('Uh oh'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol, love this test error message
test/utils/readRowsImpl.ts
Outdated
let chunksSent = 0; | ||
const chunks = generateChunks(keyFrom, keyTo, stream); | ||
let lastScannedRowKey: string | undefined; | ||
let firstN: protos.google.bigtable.v2.ReadRowsResponse.ICellChunk[] = []; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename this variable to downstreamChunks
or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I was struggling with finding a good name, since chunks
is already used :) downstreamChunks
or responseChunks
maybe. I'll rename.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed to currentResponseChunks
, does it make more sense now?
dfc75a4
to
3f2385b
Compare
test/readrows.ts
Outdated
import {MockService} from '../src/util/mock-servers/mock-service'; | ||
import {debugLog, readRowsImpl} from './utils/readRowsImpl'; | ||
|
||
describe('Bigtable/Streams', () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update to Bigtable/ReadRows as well
In this PR I'm adding some tests for
ReadRows
logic. This is a pretty straightforward but rather big PR, so please bear with me while I explain what's going on here.I'm reusing @danieljbruce's awesome implementation of the local gRPC server (#1090) to present a reasonable mock for
ReadRows
intest/utils/readRowsImpl.ts
. This new mock generates rows with incremental keys which are just numbers converted to strings and padded with zeros, e.g. forreadRowsImpl(0, 3)
the generated keys will be00000000
,00000001
,00000002
. The rows are unevenly split into chunks, and chunks are grouped into response messages that are sent over the server stream.There is some primitive support for range queries which is important for stream retries, and it's also possible to cancel the stream and request an error to be emitted.
The server implementation I suggest in this PR is backpressure-aware (checks return value of
stream.write(...)
), and - which is the most crucial part - is asynchronous, imitating the behavior of a remote gRPC server; by that I mean that allstream.write(...)
calls are being sent after a zerosetTimeout
to move them to the next event loop iteration. This was the main missing piece to reliably reproduce an issue described in #607.Now, the new test file I'm adding checks five basic use cases:
The first test case runs the very basic scenario when the table scan is requested and the result is consumed using the regular
.on('data', ...)
event handler. It works.The second test case pipes the read stream to a
Transform
stream, which then pipes it to aPassThrough
, but all components run synchronously. It works as well.The third test case does the same, but the
Transform
stream usessetTimeout
to delay processing of each row, triggering the issue described in #607. Since this test currently fails, it's skipped.The fourth test cases stops streaming from the user's code by calling
.end()
on a read stream; it works (with one caveat caused by grpc/grpc-node#2446 which required me to set a custom timeout for that server call).Now, the fifth test looks like a new issue for me. It tests the case when the server emits a retryable
error
event, and from what I see, this test case uncovers a problem in theChunkTransformer
implementation where it prematurely updateslastRowKey
before this row is actually fully received and committed. We can discuss it offline and fix it if it's indeed a bug.I suggest merging this PR which will help us fix the code (and un-skip the two tests in this PR whenever they start passing). As I said, it's a lot of code, but mostly straightforward, so I will really appreciate some extra 👀 :)
Thanks folks!