Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Improved performance when decoding the entire set of rows with streamable JSON formats #253

Merged
merged 3 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions benchmarks/common/handlers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export function attachExceptionHandlers() {
process.on('uncaughtException', (err) => logAndQuit(err))
process.on('unhandledRejection', (err) => logAndQuit(err))

function logAndQuit(err: unknown) {
console.error(err)
process.exit(1)
}
}
1 change: 1 addition & 0 deletions benchmarks/common/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './handlers'
108 changes: 108 additions & 0 deletions benchmarks/formats/json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import { createClient } from '@clickhouse/client'
import { attachExceptionHandlers } from '../common'

/*
Large strings table definition:

CREATE TABLE large_strings
(
`id` UInt32,
`s1` String,
`s2` String,
`s3` String
)
ENGINE = MergeTree
ORDER BY id;

INSERT INTO large_strings
SELECT number + 1,
randomPrintableASCII(randUniform(500, 2500)) AS s1,
randomPrintableASCII(randUniform(500, 2500)) AS s2,
randomPrintableASCII(randUniform(500, 2500)) AS s3
FROM numbers(100000);
*/

const WarmupIterations = 3
const BenchmarkIterations = 10

const largeStringsQuery = `SELECT * FROM large_strings ORDER BY id ASC LIMIT 50000`
const cellTowersQuery = `SELECT * FROM cell_towers ORDER BY (radio, mcc, net, created) ASC LIMIT 200000`
const queries = [largeStringsQuery, cellTowersQuery]

const formats = ['JSONEachRow'] as const

void (async () => {
const client = createClient({
url: 'http://localhost:8123',
compression: {
request: false,
response: false,
},
})

type TotalPerQuery = Record<string, number>
const results: Record<(typeof formats)[number], TotalPerQuery> = {
JSONEachRow: {},
}

async function benchmarkJSON(
format: (typeof formats)[number],
query: string,
keepResults: boolean,
) {
const start = +new Date()
const rs = await client.query({
query,
format,
})
await rs.json() // discard the result
const elapsed = +new Date() - start
if (keepResults) {
const current = results[format][query] ?? 0
results[format][query] = current + elapsed
}
logResult(format, query, elapsed)
}

attachExceptionHandlers()
process.on('SIGINT', closeAndExit)
process.on('SIGINT', closeAndExit)

console.log('Warmup')
for (let i = 0; i < WarmupIterations; i++) {
await runQueries(false)
}
console.log('Benchmarking')
for (let i = 0; i < BenchmarkIterations; i++) {
await runQueries(true)
}
console.log('Results:', results)
console.log('Average results:')
for (const format of formats) {
for (const query of queries) {
const avg = Math.floor(results[format][query] / BenchmarkIterations)
logResult(format, query, avg)
}
}
await closeAndExit()

function logResult(format: string, query: string, elapsed: number) {
const elapsedStr = elapsed.toString(10) + ' ms'
console.log(
`[${elapsedStr.padEnd(10)}][${format.padEnd(18)}][${query.padEnd(80)}]`,
)
}

async function runQueries(keepResults: boolean) {
for (const query of queries) {
for (const format of formats) {
await benchmarkJSON(format, query, keepResults)
}
}
}

async function closeAndExit() {
await client.close()
process.exit(0)
}
})()
6 changes: 3 additions & 3 deletions benchmarks/leaks/memory_leak_arrays.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { v4 as uuid_v4 } from 'uuid'
import { createClient } from '@clickhouse/client'
import { randomInt } from 'crypto'
import { v4 as uuid_v4 } from 'uuid'
import { attachExceptionHandlers } from '../common'
import {
attachExceptionHandlers,
getMemoryUsageInMegabytes,
logFinalMemoryUsage,
logMemoryUsage,
logMemoryUsageOnIteration,
randomArray,
randomStr,
} from './shared'
import { createClient } from '@clickhouse/client'

const program = async () => {
const client = createClient({})
Expand Down
8 changes: 4 additions & 4 deletions benchmarks/leaks/memory_leak_brown.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { v4 as uuid_v4 } from 'uuid'
import Path from 'path'
import { createClient } from '@clickhouse/client'
import Fs from 'fs'
import Path from 'path'
import { v4 as uuid_v4 } from 'uuid'
import { attachExceptionHandlers } from '../common'
import {
attachExceptionHandlers,
getMemoryUsageInMegabytes,
logFinalMemoryUsage,
logMemoryUsage,
logMemoryUsageDiff,
} from './shared'
import { createClient } from '@clickhouse/client'

const program = async () => {
const client = createClient({})
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/leaks/memory_leak_random_integers.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import Stream from 'stream'
import { createClient } from '@clickhouse/client'
import { v4 as uuid_v4 } from 'uuid'
import { randomInt } from 'crypto'
import Stream from 'stream'
import { v4 as uuid_v4 } from 'uuid'
import { attachExceptionHandlers } from '../common'
import {
attachExceptionHandlers,
getMemoryUsageInMegabytes,
logFinalMemoryUsage,
logMemoryUsage,
Expand Down
10 changes: 0 additions & 10 deletions benchmarks/leaks/shared.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
import { memoryUsage } from 'process'

export function attachExceptionHandlers() {
process.on('uncaughtException', (err) => logAndQuit(err))
process.on('unhandledRejection', (err) => logAndQuit(err))

function logAndQuit(err: unknown) {
console.error(err)
process.exit(1)
}
}

export interface MemoryUsage {
rss: number
heapTotal: number
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"extends": "../tsconfig.json",
"include": ["leaks/**/*.ts"],
"include": ["dev/**/*.ts", "formats/**/*.ts", "leaks/**/*.ts"],
"compilerOptions": {
"noUnusedLocals": false,
"noUnusedParameters": false,
Expand Down
57 changes: 27 additions & 30 deletions packages/client-common/src/data_formatter/formatter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ const streamableJSONFormats = [
'JSONCompactStringsEachRowWithNames',
'JSONCompactStringsEachRowWithNamesAndTypes',
] as const
// Returned as { row_1: T, row_2: T, ...}
const recordsJSONFormats = ['JSONObjectEachRow'] as const
// See ResponseJSON<T> type
const singleDocumentJSONFormats = [
'JSON',
'JSONStrings',
Expand All @@ -37,38 +35,58 @@ const supportedRawFormats = [
'Parquet',
] as const

/** CSV, TSV, etc. - can be streamed, but cannot be decoded as JSON. */
export type RawDataFormat = (typeof supportedRawFormats)[number]

/** Each row is returned as a separate JSON object or an array, and these formats can be streamed. */
export type StreamableJSONDataFormat = (typeof streamableJSONFormats)[number]

/** Returned as a single {@link ResponseJSON} object, cannot be streamed. */
export type SingleDocumentJSONFormat =
(typeof singleDocumentJSONFormats)[number]

/** Returned as a single object { row_1: T, row_2: T, ...} <br/>
* (i.e. Record<string, T>), cannot be streamed. */
export type RecordsJSONFormat = (typeof recordsJSONFormats)[number]

/** All allowed JSON formats, whether streamable or not. */
export type JSONDataFormat =
| StreamableJSONDataFormat
| SingleDocumentJSONFormat
| RecordsJSONFormat

/** Data formats that are currently supported by the client. <br/>
* This is a union of the following types:<br/>
* * {@link JSONDataFormat}
* * {@link RawDataFormat}
* * {@link StreamableDataFormat}
* * {@link StreamableJSONDataFormat}
* * {@link SingleDocumentJSONFormat}
* * {@link RecordsJSONFormat}
* @see https://clickhouse.com/docs/en/interfaces/formats */
export type DataFormat = JSONDataFormat | RawDataFormat

// TODO add others formats
const streamableFormat = [
...streamableJSONFormats,
...supportedRawFormats,
] as const

/** All data formats that can be streamed, whether it can be decoded as JSON or not. */
export type StreamableDataFormat = (typeof streamableFormat)[number]

function isNotStreamableJSONFamily(
export function isNotStreamableJSONFamily(
format: DataFormat,
): format is SingleDocumentJSONFormat {
// @ts-expect-error JSON is not assignable to notStreamableJSONFormats
return singleDocumentJSONFormats.includes(format)
return (
(singleDocumentJSONFormats as readonly string[]).includes(format) ||
(recordsJSONFormats as readonly string[]).includes(format)
)
}

function isStreamableJSONFamily(
export function isStreamableJSONFamily(
format: DataFormat,
): format is StreamableJSONDataFormat {
// @ts-expect-error JSON is not assignable to streamableJSONFormats
return streamableJSONFormats.includes(format)
return (streamableJSONFormats as readonly string[]).includes(format)
}

export function isSupportedRawFormat(dataFormat: DataFormat) {
Expand All @@ -88,27 +106,6 @@ export function validateStreamFormat(
return true
}

/**
* Decodes a string in a ClickHouse format into a plain JavaScript object or an array of objects.
* @param text a string in a ClickHouse data format
* @param format One of the supported formats: https://clickhouse.com/docs/en/interfaces/formats/
*/
export function decode(text: string, format: DataFormat): any {
if (isNotStreamableJSONFamily(format)) {
return JSON.parse(text)
}
if (isStreamableJSONFamily(format)) {
return text
.split('\n')
.filter(Boolean)
.map((l) => decode(l, 'JSON'))
}
if (isSupportedRawFormat(format)) {
throw new Error(`Cannot decode ${format} to JSON`)
}
throw new Error(`The client does not support [${format}] format decoding.`)
}

/**
* Encodes a single row of values into a string in a JSON format acceptable by ClickHouse.
* @param value a single value to encode.
Expand Down
19 changes: 11 additions & 8 deletions packages/client-common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,21 @@ export {
} from './settings'

/** For implementations usage only - should not be re-exported */
export type {
RawDataFormat,
JSONDataFormat,
StreamableDataFormat,
StreamableJSONDataFormat,
SingleDocumentJSONFormat,
} from './data_formatter'
export {
formatQuerySettings,
formatQueryParams,
encodeJSON,
isSupportedRawFormat,
decode,
isStreamableJSONFamily,
isNotStreamableJSONFamily,
validateStreamFormat,
StreamableDataFormat,
} from './data_formatter'
export {
type ValuesEncoder,
Expand Down Expand Up @@ -84,11 +93,5 @@ export type {
ConnPingResult,
ConnOperation,
} from './connection'
export {
type RawDataFormat,
type JSONDataFormat,
formatQuerySettings,
formatQueryParams,
} from './data_formatter'
export type { QueryParamsWithFormat } from './client'
export type { IsSame } from './ts_utils'
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { ResultSet } from '../../src'
import type {
ClickHouseClient as BaseClickHouseClient,
DataFormat,
Expand Down Expand Up @@ -525,10 +526,13 @@ xdescribe('[Node.js] Query and ResultSet types', () => {
})

describe('Type inference with ambiguous format variants', () => {
// expect-type itself fails a bit here sometimes. It can get a wrong order of the variants = flaky ESLint run.
type JSONFormat = 'JSON' | 'JSONEachRow'
type ResultSetJSONFormat = ResultSet<JSONFormat>

// TODO: Maybe there is a way to infer the format without an extra type parameter?
it('should infer types for JSON or JSONEachRow (no extra type params)', async () => {
// $ExpectType (format: "JSONEachRow" | "JSON") => Promise<ResultSet<"JSONEachRow" | "JSON">>
function runQuery(format: 'JSONEachRow' | 'JSON') {
function runQuery(format: JSONFormat): Promise<ResultSetJSONFormat> {
return client.query({
query,
format,
Expand All @@ -537,21 +541,21 @@ xdescribe('[Node.js] Query and ResultSet types', () => {

// ResultSet falls back to both possible formats (both JSON and JSONEachRow); 'JSON' string provided to `runQuery`
// cannot be used to narrow down the literal type, since the function argument is just DataFormat.
// $ExpectType ResultSet<"JSONEachRow" | "JSON">
// $ExpectType ResultSetJSONFormat
const rs = await runQuery('JSON')
// $ExpectType unknown[] | ResponseJSON<unknown>
await rs.json()
// $ExpectType Data[] | ResponseJSON<Data>
await rs.json<Data>()
// $ExpectType string
await rs.text()
// $ExpectType StreamReadable<Row<unknown, "JSONEachRow" | "JSON">[]>
// $ExpectType StreamReadable<Row<unknown, JSONFormat>[]>
rs.stream()
})

it('should infer types for JSON or JSONEachRow (with extra type parameter)', async () => {
// $ExpectType <F extends "JSONEachRow" | "JSON">(format: F) => Promise<QueryResult<F>>
function runQuery<F extends 'JSON' | 'JSONEachRow'>(format: F) {
// $ExpectType <F extends JSONFormat>(format: F) => Promise<QueryResult<F>>
function runQuery<F extends JSONFormat>(format: F) {
return client.query({
query,
format,
Expand Down
Loading
Loading