-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Receiving [ERR_STREAM_PREMATURE_CLOSE] in the logs when intentionally closing the ResultSet stream before end #263
Comments
If the desired logic cannot be implemented via the ClickHouse query, you could just use AbortController. import { pipeline } from 'stream/promises'
import { createWriteStream } from 'fs'
import { createClient } from '@clickhouse/client'
async function* myAsyncGenerator() {
const clickHouseClient = createClient({
// ...
})
const abortController = new AbortController()
const resultSet = await clickHouseClient.query({
query: `select * from system.numbers limit 10`,
format: 'JSONCompactStringsEachRowWithNamesAndTypes',
abort_signal: abortController.signal,
})
let i = 0
for await (const rows of resultSet.stream()) {
for (const row of rows) {
i++
// If a condition is true, I want to stop consuming the stream prematurely
if (i === 5) {
console.log('aborting')
abortController.abort()
break
}
const result = row.json<string[]>().join(',') + '\n\n'
console.log('Yielding row', result)
yield result
}
}
}
;(async () => {
const asyncGen = myAsyncGenerator()
const outputStream = createWriteStream('output.txt')
await pipeline(asyncGen, outputStream)
})() console output:
The output.txt file contents: ![]() NB: You might want to add https://clickhouse.com/docs/en/operations/settings/settings#cancel-http-readonly-queries-on-client-close. Also, see the abort_request example. |
You are right. I tried adjusting the asyncGenerator approach again, which was not trivial. Maybe something like this will do the trick? Please notice the backpressure comment — you'll likely need to handle this with such a manual approach. import { createWriteStream } from 'fs'
import { createClient, ClickHouseLogLevel } from '@clickhouse/client'
(async () => {
process.on('unhandledRejection', (err) => {
console.error('unhandledRejection:', err)
})
process.on('uncaughtException', (err) => {
console.error('uncaughtException:', err)
})
const client = createClient({
log: {
// level: ClickHouseLogLevel.TRACE,
},
})
const abortController = new AbortController()
const resultSet = await client.query({
query: `select * from system.numbers limit 10`,
format: 'JSONCompactStringsEachRowWithNamesAndTypes',
abort_signal: abortController.signal,
})
let i = 0
const stream = resultSet.stream<string[]>()
const outputStream = createWriteStream('output.txt')
await new Promise((resolve, reject) => {
stream
.on('data', (rows) => {
for (const row of rows) {
if (i++ === 5) {
console.log('Reached the condition, ending stream...')
abortController.abort()
return
}
const result = row.json().join(',') + '\n\n'
console.log('Yielding row', result)
// important: need to add backpressure handling here
outputStream.write(result)
}
})
.on('error', (err) => {
console.error('Error in stream', err)
reject(err)
})
.on('end', () => {
console.log('End of stream, resolve...')
resolve(0)
})
})
console.log('Closing client...')
await client.close()
})() Prints:
This output looks right to me - the socket is properly released on the abort event and is back in the keep-alive pool, and it is only destroyed when we close the client. |
And with just 1 socket and the second consecutive request, all seems OK, too. import { createWriteStream } from 'fs'
import { createClient } from '@clickhouse/client'
import { ClickHouseLogLevel } from '@clickhouse/client-common'
;(async () => {
process.on('unhandledRejection', (err) => {
console.error('unhandledRejection:', err)
})
process.on('uncaughtException', (err) => {
console.error('uncaughtException:', err)
})
const client = createClient({
max_open_connections: 1,
log: {
level: ClickHouseLogLevel.TRACE,
},
})
const abortController = new AbortController()
const resultSet = await client.query({
query: `select * from system.numbers limit 10`,
format: 'JSONCompactStringsEachRowWithNamesAndTypes',
abort_signal: abortController.signal,
})
let i = 0
const stream = resultSet.stream<string[]>()
const outputStream = createWriteStream('output.txt')
await new Promise((resolve, reject) => {
stream
.on('data', (rows) => {
for (const row of rows) {
if (i++ === 5) {
console.log('Reached the condition, ending stream...')
abortController.abort()
return
}
const result = row.json().join(',') + '\n\n'
console.log('Yielding row', result)
// important: need to add backpressure handling here
outputStream.write(result)
}
})
.on('error', (err) => {
console.error('Error in stream', err)
reject(err)
})
.on('end', () => {
console.log('End of stream, resolve...')
resolve(0)
})
})
const rs = await client.query({
query: 'SELECT 1 AS number',
format: 'JSONEachRow',
})
console.log(
'Verifying that we can query using the same socket one more time...',
await rs.json(),
)
console.log('Closing client...')
await client.close()
})()
|
I think I understand what is happening now. Due to the query being in the async generator function body, an unfinished request goes out of scope, and so does the AbortController that is used internally to cancel the request on errors in the Node.js connection src. That internal AbortController is used to prevent the underlying sockets from being stuck while dialing an unreachable host, which can happen even if the request was timed out (and that was the only sensible solution I could find to that issue). A fun fact about the AbortController is that it fires the abort signal when it goes out of scope. This also explains why my stream example does not produce any errors, as there is no extra function there (but I believe it is still incorrect), so the query (and the request) does not go out of scope. Then, the error is printed here. I will check if I can make it less annoying and more transparent to the user so that just the |
Should be fixed in 1.0.2. import { pipeline } from 'stream/promises'
import { createWriteStream } from 'fs'
import { createClient } from '@clickhouse/client'
async function* myAsyncGenerator() {
const clickHouseClient = createClient({
// ...
})
const resultSet = await clickHouseClient.query({
query: `SELECT * FROM system.numbers LIMIT 10`,
format: 'JSONCompactStringsEachRowWithNamesAndTypes',
})
const stream = resultSet.stream()
let i = 0
for await (const rows of stream) {
for (const row of rows) {
// If a condition is true, I want to stop consuming the stream prematurely
if (i++ === 5) {
return stream.destroy()
}
yield row.json<string[]>().join(',') + '\n'
}
}
}
;(async () => {
const asyncGen = myAsyncGenerator()
const outputStream = createWriteStream('output.txt')
await pipeline(asyncGen, outputStream)
})() Yields the following file:
Without unnecessary errors in the logs. Please feel free to re-open or create a new one if there are still any issues. |
Describe the bug
I have a stream generated from a ResultSet of a query. When consuming the stream, I need to end it prematurely if a certain condition is met:
This code terminates successfully (exit code 0) but it logs the following
[ERR_STREAM_PREMATURE_CLOSE]
error on the console:Expected behaviour
I could be wrong, but this
[ERR_STREAM_PREMATURE_CLOSE]
error doesn't seem to originate on the stream object received from the call to the database, I suspect instead that this can orginate from another stream that is writing on this one (e.g. a stream from the socket who is writing the data?).In this case I am voluntarily closing the stream before consuming all the data, so I don't want to see this
[ERR_STREAM_PREMATURE_CLOSE]
logged on the console. Also it doesn't seem to be an error, but just the log of an error, because the program terminates with success (exit code 0).Is there any way to prevent this
[ERR_STREAM_PREMATURE_CLOSE]
error log to appear in the console?Error log
Configuration
Environment
0.2.7
Node v18.18.0
Darwin arm64
ClickHouse server
The text was updated successfully, but these errors were encountered: