Skip to content

Improve query performance for ClickHouse #133

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 1 commit into from
Sep 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan
- [#108](https://github.com/kobsio/kobs/pull/108): Improve tooltip position in all nivo charts.
- [#121](https://github.com/kobsio/kobs/pull/121): :warning: *Breaking change:* :warning: Allow multiple queries in the panel options for the Elasticsearch plugin.
- [#130](https://github.com/kobsio/kobs/pull/130): :warning: *Breaking change:* :warning: Allow multiple queries in the panel options for the Jaeger plugin.
- [#133](https://github.com/kobsio/kobs/pull/133): Improve querie performance to get logs from ClickHouse.

## [v0.5.0](https://github.com/kobsio/kobs/releases/tag/v0.5.0) (2021-08-03)

Expand Down
54 changes: 8 additions & 46 deletions plugins/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (router *Router) getLogs(w http.ResponseWriter, r *http.Request) {
return
}

documents, fields, took, newOffset, err := i.GetLogs(r.Context(), query, parsedLimit, parsedOffset, parsedTimeStart, parsedTimeEnd)
documents, fields, count, took, buckets, newOffset, newTimeStart, err := i.GetLogs(r.Context(), query, parsedLimit, parsedOffset, parsedTimeStart, parsedTimeEnd)
if err != nil {
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not get logs")
return
Expand All @@ -133,56 +133,19 @@ func (router *Router) getLogs(w http.ResponseWriter, r *http.Request) {
data := struct {
Documents []map[string]interface{} `json:"documents"`
Fields []string `json:"fields"`
Count int64 `json:"count"`
Took int64 `json:"took"`
Buckets []instance.Bucket `json:"buckets"`
Offset int64 `json:"offset"`
TimeStart int64 `json:"timeStart"`
}{
documents,
fields,
took,
newOffset,
}

render.JSON(w, r, data)
}

func (router *Router) getLogsStats(w http.ResponseWriter, r *http.Request) {
name := chi.URLParam(r, "name")
query := r.URL.Query().Get("query")
timeStart := r.URL.Query().Get("timeStart")
timeEnd := r.URL.Query().Get("timeEnd")

log.WithFields(logrus.Fields{"name": name, "query": query, "timeStart": timeStart, "timeEnd": timeEnd}).Tracef("getLogsCount")

i := router.getInstance(name)
if i == nil {
errresponse.Render(w, r, nil, http.StatusBadRequest, "Could not find instance name")
return
}

parsedTimeStart, err := strconv.ParseInt(timeStart, 10, 64)
if err != nil {
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not parse start time")
return
}

parsedTimeEnd, err := strconv.ParseInt(timeEnd, 10, 64)
if err != nil {
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not parse end time")
return
}

count, buckets, err := i.GetLogsStats(r.Context(), query, parsedTimeStart, parsedTimeEnd)
if err != nil {
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not get logs count")
return
}

data := struct {
Count int64 `json:"count"`
Buckets []instance.Bucket `json:"buckets"`
}{
count,
took,
buckets,
newOffset,
newTimeStart,
}

render.JSON(w, r, data)
Expand Down Expand Up @@ -220,8 +183,7 @@ func Register(clusters *clusters.Clusters, plugins *plugin.Plugins, config Confi
}

router.Get("/sql/{name}", router.getSQL)
router.Get("/logs/documents/{name}", router.getLogs)
router.Get("/logs/stats/{name}", router.getLogsStats)
router.Get("/logs/{name}", router.getLogs)

return router
}
143 changes: 76 additions & 67 deletions plugins/clickhouse/pkg/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func (i *Instance) GetSQL(ctx context.Context, query string) ([][]interface{}, [

// GetLogs parses the given query into the sql syntax, which is then run against the ClickHouse instance. The returned
// rows are converted into a document schema which can be used by our UI.
func (i *Instance) GetLogs(ctx context.Context, query string, limit, offset, timeStart, timeEnd int64) ([]map[string]interface{}, []string, int64, int64, error) {
func (i *Instance) GetLogs(ctx context.Context, query string, limit, offset, timeStart, timeEnd int64) ([]map[string]interface{}, []string, int64, int64, []Bucket, int64, int64, error) {
var count int64
var buckets []Bucket
var documents []map[string]interface{}
fields := defaultFields
queryStartTime := time.Now()
Expand All @@ -85,20 +87,88 @@ func (i *Instance) GetLogs(ctx context.Context, query string, limit, offset, tim
if query != "" {
parsedQuery, err := parseLogsQuery(query)
if err != nil {
return nil, nil, 0, offset, err
return nil, nil, 0, 0, nil, offset, timeStart, err
}

conditions = fmt.Sprintf("AND %s", parsedQuery)
}

// The count of documents and the buckets are only needed for the first query where the offset is 0. For the
// following queries we can reuse the data returned by the first query, because the number of documents shouldn't
// change in the selected time range.
if offset == 0 {
// Determine the number of documents, which are available in the users selected time range. We are using the same
// query as to get the documents, but we are skipping the limit and offset parameters.
sqlQueryCount := fmt.Sprintf("SELECT count(*) FROM %s.logs WHERE timestamp >= ? AND timestamp <= ? %s SETTINGS skip_unavailable_shards = 1", i.database, conditions)
log.WithFields(logrus.Fields{"query": sqlQueryCount, "timeStart": timeStart, "timeEnd": timeEnd}).Tracef("sql count query")
rowsCount, err := i.client.QueryContext(ctx, sqlQueryCount, time.Unix(timeStart, 0), time.Unix(timeEnd, 0))
if err != nil {
return nil, nil, 0, 0, nil, offset, timeStart, err
}
defer rowsCount.Close()

for rowsCount.Next() {
if err := rowsCount.Scan(&count); err != nil {
return nil, nil, 0, 0, nil, offset, timeStart, err
}
}

// Now we are creating 30 buckets for the selected time range and count the documents in each bucket. This is used
// to render the distribution chart, which shows how many documents/rows are available within a bucket.
interval := (timeEnd - timeStart) / 30
sqlQueryBuckets := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d second) AS interval_data , count(*) AS count_data FROM %s.logs WHERE timestamp >= ? AND timestamp <= ? %s GROUP BY interval_data SETTINGS skip_unavailable_shards = 1", interval, i.database, conditions)
log.WithFields(logrus.Fields{"query": sqlQueryBuckets, "timeStart": timeStart, "timeEnd": timeEnd}).Tracef("sql buckets query")
rowsBuckets, err := i.client.QueryContext(ctx, sqlQueryBuckets, time.Unix(timeStart, 0), time.Unix(timeEnd, 0))
if err != nil {
return nil, nil, 0, 0, nil, offset, timeStart, err
}
defer rowsBuckets.Close()

for rowsBuckets.Next() {
var intervalData time.Time
var countData int64

if err := rowsBuckets.Scan(&intervalData, &countData); err != nil {
return nil, nil, 0, 0, nil, offset, timeStart, err
}

buckets = append(buckets, Bucket{
Interval: intervalData,
IntervalFormatted: intervalData.Format("01-02 15:04:05"),
Count: countData,
})
}

sort.Slice(buckets, func(i, j int) bool {
return buckets[i].Interval.Before(buckets[j].Interval)
})

// We are only returning the first 10000 documents in buckets of the given limit, to speed up the following
// query to get the documents. For that we are looping through the sorted buckets and using the timestamp from
// the bucket where the sum of all newer buckets contains 10000 docuemnts.
// This new start time is then also returned in the response and can be used for the "load more" call as the new
// start date. In these follow up calls the start time isn't changed again, because we are skipping the count
// and bucket queries.
// NOTE: If a user has problems with this limit in the future, we can provide an option for this via the
// config.yaml file or maybe even better via an additional field in the Options component in the React UI.
var bucketCount int64
for i := len(buckets) - 1; i >= 0; i-- {
bucketCount = bucketCount + buckets[i].Count
if bucketCount > 10000 {
timeStart = buckets[i].Interval.Unix()
break
}
}
}

// Now we are building and executing our sql query. We always return all fields from the logs table, where the
// timestamp of a row is within the selected query range and the parsed query. We also order all the results by the
// timestamp field and limiting the results / using a offset for pagination.
sqlQuery := fmt.Sprintf("SELECT %s FROM %s.logs WHERE timestamp >= ? AND timestamp <= ? %s ORDER BY timestamp DESC LIMIT %d OFFSET %d SETTINGS skip_unavailable_shards = 1", defaultColumns, i.database, conditions, limit, offset)
log.WithFields(logrus.Fields{"query": sqlQuery, "timeStart": timeStart, "timeEnd": timeEnd}).Tracef("sql query")
rows, err := i.client.QueryContext(ctx, sqlQuery, time.Unix(timeStart, 0), time.Unix(timeEnd, 0))
if err != nil {
return nil, nil, 0, offset, err
return nil, nil, 0, 0, nil, offset, timeStart, err
}
defer rows.Close()

Expand All @@ -111,7 +181,7 @@ func (i *Instance) GetLogs(ctx context.Context, query string, limit, offset, tim
for rows.Next() {
var r Row
if err := rows.Scan(&r.Timestamp, &r.Cluster, &r.Namespace, &r.App, &r.Pod, &r.Container, &r.Host, &r.FieldsString.Key, &r.FieldsString.Value, &r.FieldsNumber.Key, &r.FieldsNumber.Value, &r.Log); err != nil {
return nil, nil, 0, offset, err
return nil, nil, 0, 0, nil, offset, timeStart, err
}

var document map[string]interface{}
Expand Down Expand Up @@ -145,72 +215,11 @@ func (i *Instance) GetLogs(ctx context.Context, query string, limit, offset, tim
}

if err := rows.Err(); err != nil {
return nil, nil, 0, offset, err
return nil, nil, 0, 0, nil, offset, timeStart, err
}

sort.Strings(fields)
return documents, fields, time.Now().Sub(queryStartTime).Milliseconds(), offset + limit, nil
}

// GetLogsStats returns the number of documents, which could be returned by the user provided query and the distribution
// of the logs over the selected time range.
func (i *Instance) GetLogsStats(ctx context.Context, query string, timeStart, timeEnd int64) (int64, []Bucket, error) {
var count int64
var buckets []Bucket

conditions := ""
if query != "" {
parsedQuery, err := parseLogsQuery(query)
if err != nil {
return 0, nil, err
}

conditions = fmt.Sprintf("AND %s", parsedQuery)
}

sqlQueryCount := fmt.Sprintf("SELECT count(*) FROM %s.logs WHERE timestamp >= ? AND timestamp <= ? %s SETTINGS skip_unavailable_shards = 1", i.database, conditions)
log.WithFields(logrus.Fields{"query": sqlQueryCount, "timeStart": timeStart, "timeEnd": timeEnd}).Tracef("sql count query")
rowsCount, err := i.client.QueryContext(ctx, sqlQueryCount, time.Unix(timeStart, 0), time.Unix(timeEnd, 0))
if err != nil {
return 0, nil, err
}
defer rowsCount.Close()

for rowsCount.Next() {
if err := rowsCount.Scan(&count); err != nil {
return 0, nil, err
}
}

interval := (timeEnd - timeStart) / 30
sqlQueryBuckets := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d second) AS interval_data , count(*) AS count_data FROM %s.logs WHERE timestamp >= ? AND timestamp <= ? %s GROUP BY interval_data SETTINGS skip_unavailable_shards = 1", interval, i.database, conditions)
log.WithFields(logrus.Fields{"query": sqlQueryBuckets, "timeStart": timeStart, "timeEnd": timeEnd}).Tracef("sql buckets query")
rowsBuckets, err := i.client.QueryContext(ctx, sqlQueryBuckets, time.Unix(timeStart, 0), time.Unix(timeEnd, 0))
if err != nil {
return 0, nil, err
}
defer rowsBuckets.Close()

for rowsBuckets.Next() {
var intervalData time.Time
var countData int64

if err := rowsBuckets.Scan(&intervalData, &countData); err != nil {
return 0, nil, err
}

buckets = append(buckets, Bucket{
Interval: intervalData,
IntervalFormatted: intervalData.Format("01-02 15:04:05"),
Count: countData,
})
}

sort.Slice(buckets, func(i, j int) bool {
return buckets[i].Interval.Before(buckets[j].Interval)
})

return count, buckets, nil
return documents, fields, count, time.Now().Sub(queryStartTime).Milliseconds(), buckets, offset + limit, timeStart, nil
}

// New returns a new ClickHouse instance for the given configuration.
Expand Down
38 changes: 25 additions & 13 deletions plugins/clickhouse/src/components/page/Logs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import {
Button,
ButtonVariant,
Card,
CardActions,
CardBody,
CardHeader,
CardHeaderMain,
CardTitle,
Grid,
GridItem,
Spinner,
Expand All @@ -16,9 +20,9 @@ import { useHistory } from 'react-router-dom';

import { ILogsData } from '../../utils/interfaces';
import { IPluginTimes } from '@kobsio/plugin-core';
import LogsChart from '../panel/LogsChart';
import LogsDocuments from '../panel/LogsDocuments';
import LogsFields from './LogsFields';
import LogsStats from '../panel/LogsStats';

interface IPageLogsProps {
name: string;
Expand All @@ -43,10 +47,11 @@ const PageLogs: React.FunctionComponent<IPageLogsProps> = ({
['clickhouse/logs', query, times],
async ({ pageParam }) => {
try {
console.log(pageParam);
const response = await fetch(
`/api/plugins/clickhouse/logs/documents/${name}?query=${encodeURIComponent(query)}&timeStart=${
times.timeStart
}&timeEnd=${times.timeEnd}&limit=100&offset=${pageParam || ''}`,
`/api/plugins/clickhouse/logs/${name}?query=${encodeURIComponent(query)}&timeStart=${
pageParam && pageParam.timeStart ? pageParam.timeStart : times.timeStart
}&timeEnd=${times.timeEnd}&limit=100&offset=${pageParam && pageParam.offset ? pageParam.offset : ''}`,
{
method: 'get',
},
Expand All @@ -67,7 +72,9 @@ const PageLogs: React.FunctionComponent<IPageLogsProps> = ({
}
},
{
getNextPageParam: (lastPage, pages) => lastPage.offset,
getNextPageParam: (lastPage, pages) => {
return { offset: lastPage.offset, timeStart: lastPage.timeStart };
},
keepPreviousData: true,
},
);
Expand Down Expand Up @@ -111,14 +118,19 @@ const PageLogs: React.FunctionComponent<IPageLogsProps> = ({
</Card>
</GridItem>
<GridItem sm={12} md={12} lg={9} xl={10} xl2={10}>
<LogsStats
name={name}
query={query}
times={times}
took={data.pages[0].took || 0}
isFetchingDocuments={isFetching}
isPanel={false}
/>
<Card isCompact={true}>
<CardHeader>
<CardHeaderMain>
<CardTitle>
{data.pages[0].count} Documents in {data.pages[0].took} Milliseconds
</CardTitle>
</CardHeaderMain>
<CardActions>{isFetching && <Spinner size="md" />}</CardActions>
</CardHeader>
<CardBody>
<LogsChart buckets={data.pages[0].buckets} />
</CardBody>
</Card>

<p>&nbsp;</p>

Expand Down
18 changes: 7 additions & 11 deletions plugins/clickhouse/src/components/panel/Logs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import React, { useState } from 'react';
import { ILogsData, IQuery } from '../../utils/interfaces';
import { IPluginTimes, PluginCard } from '@kobsio/plugin-core';
import LogsActions from './LogsActions';
import LogsChart from './LogsChart';
import LogsDocuments from '../panel/LogsDocuments';
import LogsStats from './LogsStats';

interface ILogsProps {
name: string;
Expand Down Expand Up @@ -50,7 +50,7 @@ const Logs: React.FunctionComponent<ILogsProps> = ({
}

const response = await fetch(
`/api/plugins/clickhouse/logs/documents/${name}?query=${encodeURIComponent(selectedQuery.query)}&timeStart=${
`/api/plugins/clickhouse/logs/${name}?query=${encodeURIComponent(selectedQuery.query)}&timeStart=${
times.timeStart
}&timeEnd=${times.timeEnd}&limit=100&offset=${pageParam || ''}`,
{
Expand Down Expand Up @@ -138,15 +138,11 @@ const Logs: React.FunctionComponent<ILogsProps> = ({
</Alert>
) : data && data.pages.length > 0 ? (
<div>
{showChart && selectedQuery.query ? (
<LogsStats
name={name}
query={selectedQuery.query}
times={times}
took={data.pages[0].took || 0}
isFetchingDocuments={isFetching}
isPanel={true}
/>
{showChart ? (
<div>
<LogsChart buckets={data.pages[0].buckets} />
<p>&nbsp;</p>
</div>
) : null}

<LogsDocuments pages={data.pages} fields={selectedQuery.fields} showDetails={showDetails} />
Expand Down
Loading