-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathresult_set.ts
205 lines (191 loc) · 6.53 KB
/
result_set.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import type {
BaseResultSet,
DataFormat,
ResponseHeaders,
ResultJSONType,
ResultStream,
Row,
} from '@clickhouse/client-common'
import {
isNotStreamableJSONFamily,
isStreamableJSONFamily,
validateStreamFormat,
} from '@clickhouse/client-common'
import { Buffer } from 'buffer'
import type { Readable, TransformCallback } from 'stream'
import Stream, { Transform } from 'stream'
import { getAsText } from './utils'
const NEWLINE = 0x0a as const
/** {@link Stream.Readable} with additional types for the `on(data)` method and the async iterator.
* Everything else is an exact copy from stream.d.ts */
export type StreamReadable<T> = Omit<Stream.Readable, 'on'> & {
[Symbol.asyncIterator](): AsyncIterableIterator<T>
on(event: 'data', listener: (chunk: T) => void): Stream.Readable
on(event: 'close', listener: () => void): Stream.Readable
on(event: 'drain', listener: () => void): Stream.Readable
on(event: 'end', listener: () => void): Stream.Readable
on(event: 'error', listener: (err: Error) => void): Stream.Readable
on(event: 'finish', listener: () => void): Stream.Readable
on(event: 'pause', listener: () => void): Stream.Readable
on(event: 'pipe', listener: (src: Readable) => void): Stream.Readable
on(event: 'readable', listener: () => void): Stream.Readable
on(event: 'resume', listener: () => void): Stream.Readable
on(event: 'unpipe', listener: (src: Readable) => void): Stream.Readable
on(
event: string | symbol,
listener: (...args: any[]) => void,
): Stream.Readable
}
export interface ResultSetOptions<Format extends DataFormat> {
stream: Stream.Readable
format: Format
query_id: string
log_error: (error: Error) => void
response_headers: ResponseHeaders
}
export class ResultSet<Format extends DataFormat | unknown>
implements BaseResultSet<Stream.Readable, Format>
{
public readonly response_headers: ResponseHeaders
private readonly log_error: (error: Error) => void
constructor(
private _stream: Stream.Readable,
private readonly format: Format,
public readonly query_id: string,
log_error?: (error: Error) => void,
_response_headers?: ResponseHeaders,
) {
// eslint-disable-next-line no-console
this.log_error = log_error ?? ((err: Error) => console.error(err))
this.response_headers =
_response_headers !== undefined ? Object.freeze(_response_headers) : {}
}
/** See {@link BaseResultSet.text}. */
async text(): Promise<string> {
if (this._stream.readableEnded) {
throw Error(streamAlreadyConsumedMessage)
}
return (await getAsText(this._stream)).toString()
}
/** See {@link BaseResultSet.json}. */
async json<T>(): Promise<ResultJSONType<T, Format>> {
if (this._stream.readableEnded) {
throw Error(streamAlreadyConsumedMessage)
}
// JSONEachRow, etc.
if (isStreamableJSONFamily(this.format as DataFormat)) {
const result: T[] = []
const stream = this.stream<T>()
for await (const rows of stream) {
for (const row of rows) {
result.push(row.json() as T)
}
}
return result as any
}
// JSON, JSONObjectEachRow, etc.
if (isNotStreamableJSONFamily(this.format as DataFormat)) {
const text = await getAsText(this._stream)
return JSON.parse(text)
}
// should not be called for CSV, etc.
throw new Error(`Cannot decode ${this.format} as JSON`)
}
/** See {@link BaseResultSet.stream}. */
stream<T>(): ResultStream<Format, StreamReadable<Row<T, Format>[]>> {
// If the underlying stream has already ended by calling `text` or `json`,
// Stream.pipeline will create a new empty stream
// but without "readableEnded" flag set to true
if (this._stream.readableEnded) {
throw Error(streamAlreadyConsumedMessage)
}
validateStreamFormat(this.format)
let incompleteChunks: Buffer[] = []
const logError = this.log_error
const toRows = new Transform({
transform(
chunk: Buffer,
_encoding: BufferEncoding,
callback: TransformCallback,
) {
const rows: Row[] = []
let lastIdx = 0
// first pass on the current chunk
// using the incomplete row from the previous chunks
let idx = chunk.indexOf(NEWLINE)
if (idx !== -1) {
let text: string
if (incompleteChunks.length > 0) {
text = Buffer.concat(
[...incompleteChunks, chunk.subarray(0, idx)],
incompleteChunks.reduce((sz, buf) => sz + buf.length, 0) + idx,
).toString()
incompleteChunks = []
} else {
text = chunk.subarray(0, idx).toString()
}
rows.push({
text,
json<T>(): T {
return JSON.parse(text)
},
})
lastIdx = idx + 1 // skipping newline character
// consequent passes on the current chunk with at least one row parsed
// all previous chunks with incomplete rows were already processed
do {
idx = chunk.indexOf(NEWLINE, lastIdx)
if (idx !== -1) {
const text = chunk.subarray(lastIdx, idx).toString()
rows.push({
text,
json<T>(): T {
return JSON.parse(text)
},
})
} else {
// to be processed during the first pass for the next chunk
incompleteChunks.push(chunk.subarray(lastIdx))
this.push(rows)
}
lastIdx = idx + 1 // skipping newline character
} while (idx !== -1)
} else {
incompleteChunks.push(chunk) // this chunk does not contain a full row
}
callback()
},
autoDestroy: true,
objectMode: true,
})
const pipeline = Stream.pipeline(
this._stream,
toRows,
function pipelineCb(err) {
if (
err &&
err.name !== 'AbortError' &&
err.message !== resultSetClosedMessage
) {
logError(err)
}
},
)
return pipeline as any
}
/** See {@link BaseResultSet.close}. */
close() {
this._stream.destroy(new Error(resultSetClosedMessage))
}
static instance<Format extends DataFormat>({
stream,
format,
query_id,
log_error,
response_headers,
}: ResultSetOptions<Format>): ResultSet<Format> {
return new ResultSet(stream, format, query_id, log_error, response_headers)
}
}
const streamAlreadyConsumedMessage = 'Stream has been already consumed'
const resultSetClosedMessage = 'ResultSet has been closed'