Skip to content

Commit 062bc9b

Browse files
committed
Add dedicated instant/range query handlers
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 7046357 commit 062bc9b

File tree

7 files changed

+697
-26
lines changed

7 files changed

+697
-26
lines changed

pkg/api/handlers.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
"github.com/cortexproject/cortex/pkg/querier"
2929
"github.com/cortexproject/cortex/pkg/querier/codec"
30+
"github.com/cortexproject/cortex/pkg/querier/queryapi"
3031
"github.com/cortexproject/cortex/pkg/querier/stats"
3132
"github.com/cortexproject/cortex/pkg/util"
3233
util_log "github.com/cortexproject/cortex/pkg/util/log"
@@ -195,10 +196,13 @@ func NewQuerierHandler(
195196
Help: "Current number of inflight requests to the querier.",
196197
}, []string{"method", "route"})
197198

199+
statsRenderer := querier.StatsRenderer
200+
corsOrigin := regexp.MustCompile(".*")
201+
translateSampleAndChunkQueryable := querier.NewErrorTranslateSampleAndChunkQueryable(queryable)
198202
api := v1.NewAPI(
199203
engine,
200-
querier.NewErrorTranslateSampleAndChunkQueryable(queryable), // Translate errors to errors expected by API.
201-
nil, // No remote write support.
204+
translateSampleAndChunkQueryable, // Translate errors to errors expected by API.
205+
nil, // No remote write support.
202206
exemplarQueryable,
203207
func(ctx context.Context) v1.ScrapePoolsRetriever { return nil },
204208
func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} },
@@ -214,7 +218,7 @@ func NewQuerierHandler(
214218
func(context.Context) v1.RulesRetriever { return &querier.DummyRulesRetriever{} },
215219
0, 0, 0, // Remote read samples and concurrency limit.
216220
false,
217-
regexp.MustCompile(".*"),
221+
corsOrigin,
218222
func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, errors.New("not implemented") },
219223
&v1.PrometheusVersion{
220224
Version: version.Version,
@@ -229,7 +233,7 @@ func NewQuerierHandler(
229233
// This is used for the stats API which we should not support. Or find other ways to.
230234
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
231235
reg,
232-
querier.StatsRenderer,
236+
statsRenderer,
233237
false,
234238
nil,
235239
false,
@@ -240,11 +244,18 @@ func NewQuerierHandler(
240244
api.ClearCodecs()
241245
cm := codec.NewInstrumentedCodecMetrics(reg)
242246

243-
api.InstallCodec(codec.NewInstrumentedCodec(v1.JSONCodec{}, cm))
244-
// Install Protobuf codec to give the option for using either.
245-
api.InstallCodec(codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: false}, cm))
246-
// Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation.
247-
api.InstallCodec(codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: true}, cm))
247+
codecs := []v1.Codec{
248+
codec.NewInstrumentedCodec(v1.JSONCodec{}, cm),
249+
// Protobuf codec to give the option for using either.
250+
codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: false}, cm),
251+
// Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation.
252+
codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: true}, cm),
253+
}
254+
255+
// Install codecs
256+
for _, c := range codecs {
257+
api.InstallCodec(c)
258+
}
248259

249260
router := mux.NewRouter()
250261

@@ -269,13 +280,15 @@ func NewQuerierHandler(
269280
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
270281
api.Register(legacyPromRouter)
271282

283+
c := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
284+
272285
// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
273286
// https://github.com/prometheus/prometheus/pull/7125/files
274287
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
275288
router.Path(path.Join(prefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
276289
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter)
277-
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(promRouter)
278-
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(promRouter)
290+
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(c.Wrap(c.InstantHandler))
291+
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(c.Wrap(c.RangeQueryHandler))
279292
router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(promRouter)
280293
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(promRouter)
281294
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(promRouter)
@@ -287,8 +300,8 @@ func NewQuerierHandler(
287300
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
288301
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
289302
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter)
290-
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(legacyPromRouter)
291-
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(legacyPromRouter)
303+
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(c.Wrap(c.InstantHandler))
304+
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(c.Wrap(c.RangeQueryHandler))
292305
router.Path(path.Join(legacyPrefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(legacyPromRouter)
293306
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyPromRouter)
294307
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyPromRouter)

pkg/querier/queryapi/query_api.go

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package queryapi
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"time"
8+
9+
"github.com/go-kit/log"
10+
"github.com/go-kit/log/level"
11+
"github.com/grafana/regexp"
12+
"github.com/munnerz/goautoneg"
13+
"github.com/prometheus/prometheus/promql"
14+
"github.com/prometheus/prometheus/storage"
15+
"github.com/prometheus/prometheus/util/annotations"
16+
"github.com/prometheus/prometheus/util/httputil"
17+
v1 "github.com/prometheus/prometheus/web/api/v1"
18+
"github.com/weaveworks/common/httpgrpc"
19+
20+
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
21+
"github.com/cortexproject/cortex/pkg/util"
22+
"github.com/cortexproject/cortex/pkg/util/api"
23+
)
24+
25+
type QueryAPI struct {
26+
queryable storage.SampleAndChunkQueryable
27+
queryEngine promql.QueryEngine
28+
now func() time.Time
29+
statsRenderer v1.StatsRenderer
30+
logger log.Logger
31+
codecs []v1.Codec
32+
CORSOrigin *regexp.Regexp
33+
}
34+
35+
func NewQueryAPI(
36+
qe promql.QueryEngine,
37+
q storage.SampleAndChunkQueryable,
38+
statsRenderer v1.StatsRenderer,
39+
logger log.Logger,
40+
codecs []v1.Codec,
41+
CORSOrigin *regexp.Regexp,
42+
) *QueryAPI {
43+
return &QueryAPI{
44+
queryEngine: qe,
45+
queryable: q,
46+
statsRenderer: statsRenderer,
47+
logger: logger,
48+
codecs: codecs,
49+
CORSOrigin: CORSOrigin,
50+
now: time.Now,
51+
}
52+
}
53+
54+
// Custom handler for Query range API
55+
func (c *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
56+
start, err := util.ParseTime(r.FormValue("start"))
57+
if err != nil {
58+
return invalidParamError(err, "start")
59+
}
60+
end, err := util.ParseTime(r.FormValue("end"))
61+
if err != nil {
62+
return invalidParamError(err, "end")
63+
}
64+
if end < start {
65+
return invalidParamError(queryrange.ErrEndBeforeStart, "end")
66+
}
67+
68+
step, err := util.ParseDurationMs(r.FormValue("step"))
69+
if err != nil {
70+
return invalidParamError(err, "step")
71+
}
72+
73+
if step <= 0 {
74+
return invalidParamError(queryrange.ErrNegativeStep, "step")
75+
}
76+
77+
// For safety, limit the number of returned points per timeseries.
78+
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
79+
if (end-start)/step > 11000 {
80+
return apiFuncResult{nil, &apiError{errorBadData, queryrange.ErrStepTooSmall}, nil, nil}
81+
}
82+
83+
ctx := r.Context()
84+
if to := r.FormValue("timeout"); to != "" {
85+
var cancel context.CancelFunc
86+
timeout, err := util.ParseDurationMs(to)
87+
if err != nil {
88+
return invalidParamError(err, "timeout")
89+
}
90+
91+
ctx, cancel = context.WithTimeout(ctx, convertMsToDuration(timeout))
92+
defer cancel()
93+
}
94+
95+
opts, err := extractQueryOpts(r)
96+
if err != nil {
97+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
98+
}
99+
qry, err := c.queryEngine.NewRangeQuery(ctx, c.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
100+
if err != nil {
101+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
102+
}
103+
// From now on, we must only return with a finalizer in the result (to
104+
// be called by the caller) or call qry.Close ourselves (which is
105+
// required in the case of a panic).
106+
defer func() {
107+
if result.finalizer == nil {
108+
qry.Close()
109+
}
110+
}()
111+
112+
ctx = httputil.ContextFromRequest(ctx, r)
113+
114+
res := qry.Exec(ctx)
115+
if res.Err != nil {
116+
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
117+
}
118+
119+
warnings := res.Warnings
120+
qs := c.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))
121+
122+
return apiFuncResult{&v1.QueryData{
123+
ResultType: res.Value.Type(),
124+
Result: res.Value,
125+
Stats: qs,
126+
}, nil, warnings, qry.Close}
127+
}
128+
129+
// Custom handler for Query API
130+
func (c *QueryAPI) InstantHandler(r *http.Request) (result apiFuncResult) {
131+
ts, err := util.ParseTimeParam(r, "time", c.now().Unix())
132+
if err != nil {
133+
return invalidParamError(err, "time")
134+
}
135+
136+
ctx := r.Context()
137+
if to := r.FormValue("timeout"); to != "" {
138+
var cancel context.CancelFunc
139+
timeout, err := util.ParseDurationMs(to)
140+
if err != nil {
141+
return invalidParamError(err, "timeout")
142+
}
143+
144+
ctx, cancel = context.WithDeadline(ctx, c.now().Add(convertMsToDuration(timeout)))
145+
defer cancel()
146+
}
147+
148+
opts, err := extractQueryOpts(r)
149+
if err != nil {
150+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
151+
}
152+
qry, err := c.queryEngine.NewInstantQuery(ctx, c.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
153+
if err != nil {
154+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
155+
}
156+
157+
// From now on, we must only return with a finalizer in the result (to
158+
// be called by the caller) or call qry.Close ourselves (which is
159+
// required in the case of a panic).
160+
defer func() {
161+
if result.finalizer == nil {
162+
qry.Close()
163+
}
164+
}()
165+
166+
ctx = httputil.ContextFromRequest(ctx, r)
167+
168+
res := qry.Exec(ctx)
169+
if res.Err != nil {
170+
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
171+
}
172+
173+
warnings := res.Warnings
174+
qs := c.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))
175+
176+
return apiFuncResult{&v1.QueryData{
177+
ResultType: res.Value.Type(),
178+
Result: res.Value,
179+
Stats: qs,
180+
}, nil, warnings, qry.Close}
181+
}
182+
183+
func (c *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
184+
return func(w http.ResponseWriter, r *http.Request) {
185+
httputil.SetCORS(w, c.CORSOrigin, r)
186+
187+
result := f(r)
188+
if result.finalizer != nil {
189+
defer result.finalizer()
190+
}
191+
192+
if result.err != nil {
193+
api.RespondFromGRPCError(c.logger, w, result.err.err)
194+
return
195+
}
196+
197+
if result.data != nil {
198+
c.respond(w, r, result.data, result.warnings, r.FormValue("query"))
199+
return
200+
}
201+
w.WriteHeader(http.StatusNoContent)
202+
}
203+
}
204+
205+
func (c *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interface{}, warnings annotations.Annotations, query string) {
206+
warn, info := warnings.AsStrings(query, 10, 10)
207+
208+
resp := &v1.Response{
209+
Status: statusSuccess,
210+
Data: data,
211+
Warnings: warn,
212+
Infos: info,
213+
}
214+
215+
codec, err := c.negotiateCodec(req, resp)
216+
if err != nil {
217+
api.RespondFromGRPCError(c.logger, w, httpgrpc.Errorf(http.StatusNotAcceptable, "%s", &apiError{errorNotAcceptable, err}))
218+
return
219+
}
220+
221+
b, err := codec.Encode(resp)
222+
if err != nil {
223+
level.Error(c.logger).Log("error marshaling response", "url", req.URL, "err", err)
224+
http.Error(w, err.Error(), http.StatusInternalServerError)
225+
return
226+
}
227+
228+
w.Header().Set("Content-Type", codec.ContentType().String())
229+
w.WriteHeader(http.StatusOK)
230+
if n, err := w.Write(b); err != nil {
231+
level.Error(c.logger).Log("error writing response", "url", req.URL, "bytesWritten", n, "err", err)
232+
}
233+
}
234+
235+
func (c *QueryAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Codec, error) {
236+
for _, clause := range goautoneg.ParseAccept(req.Header.Get("Accept")) {
237+
for _, codec := range c.codecs {
238+
if codec.ContentType().Satisfies(clause) && codec.CanEncode(resp) {
239+
return codec, nil
240+
}
241+
}
242+
}
243+
244+
defaultCodec := c.codecs[0]
245+
if !defaultCodec.CanEncode(resp) {
246+
return nil, fmt.Errorf("cannot encode response as %s", defaultCodec.ContentType())
247+
}
248+
249+
return defaultCodec, nil
250+
}

0 commit comments

Comments
 (0)