GCM: Time-range fix (#98455)

* Include timeRange on all query executors

* Ensure queries run against query specific time range

* Fix lint

* Improve safety of annotations queries
This commit is contained in:
Andreas Christou 2025-01-23 12:57:00 +00:00 committed by GitHub
parent e110338dce
commit 4e740d8410
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 32 additions and 22 deletions

View File

@ -3,6 +3,7 @@ package cloudmonitoring
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -38,6 +39,11 @@ func (s *Service) executeAnnotationQuery(ctx context.Context, req *backend.Query
} `json:"timeSeriesList"` } `json:"timeSeriesList"`
}{} }{}
if len(req.Queries) != 1 {
return nil, errors.New("multiple queries received in annotation-request")
}
// It's okay to use the first query for annotations as there should only be one
firstQuery := req.Queries[0] firstQuery := req.Queries[0]
err = json.Unmarshal(firstQuery.JSON, &tslq) err = json.Unmarshal(firstQuery.JSON, &tslq)
if err != nil { if err != nil {

View File

@ -387,11 +387,11 @@ func queryModel(query backend.DataQuery) (grafanaQuery, error) {
func (s *Service) buildQueryExecutors(logger log.Logger, req *backend.QueryDataRequest) ([]cloudMonitoringQueryExecutor, error) { func (s *Service) buildQueryExecutors(logger log.Logger, req *backend.QueryDataRequest) ([]cloudMonitoringQueryExecutor, error) {
cloudMonitoringQueryExecutors := make([]cloudMonitoringQueryExecutor, 0, len(req.Queries)) cloudMonitoringQueryExecutors := make([]cloudMonitoringQueryExecutor, 0, len(req.Queries))
startTime := req.Queries[0].TimeRange.From
endTime := req.Queries[0].TimeRange.To
durationSeconds := int(endTime.Sub(startTime).Seconds())
for _, query := range req.Queries { for index, query := range req.Queries {
startTime := req.Queries[index].TimeRange.From
endTime := req.Queries[index].TimeRange.To
durationSeconds := int(endTime.Sub(startTime).Seconds())
q, err := queryModel(query) q, err := queryModel(query)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not unmarshal CloudMonitoringQuery json: %w", err) return nil, fmt.Errorf("could not unmarshal CloudMonitoringQuery json: %w", err)
@ -401,8 +401,9 @@ func (s *Service) buildQueryExecutors(logger log.Logger, req *backend.QueryDataR
switch query.QueryType { switch query.QueryType {
case string(dataquery.QueryTypeTIMESERIESLIST), string(dataquery.QueryTypeANNOTATION): case string(dataquery.QueryTypeTIMESERIESLIST), string(dataquery.QueryTypeANNOTATION):
cmtsf := &cloudMonitoringTimeSeriesList{ cmtsf := &cloudMonitoringTimeSeriesList{
refID: query.RefID, refID: query.RefID,
aliasBy: q.AliasBy, aliasBy: q.AliasBy,
timeRange: req.Queries[index].TimeRange,
} }
if q.TimeSeriesList.View == nil || *q.TimeSeriesList.View == "" { if q.TimeSeriesList.View == nil || *q.TimeSeriesList.View == "" {
fullString := "FULL" fullString := "FULL"
@ -417,7 +418,7 @@ func (s *Service) buildQueryExecutors(logger log.Logger, req *backend.QueryDataR
aliasBy: q.AliasBy, aliasBy: q.AliasBy,
parameters: q.TimeSeriesQuery, parameters: q.TimeSeriesQuery,
IntervalMS: query.Interval.Milliseconds(), IntervalMS: query.Interval.Milliseconds(),
timeRange: req.Queries[0].TimeRange, timeRange: req.Queries[index].TimeRange,
logger: logger, logger: logger,
} }
case string(dataquery.QueryTypeSLO): case string(dataquery.QueryTypeSLO):
@ -425,6 +426,7 @@ func (s *Service) buildQueryExecutors(logger log.Logger, req *backend.QueryDataR
refID: query.RefID, refID: query.RefID,
aliasBy: q.AliasBy, aliasBy: q.AliasBy,
parameters: q.SloQuery, parameters: q.SloQuery,
timeRange: req.Queries[index].TimeRange,
} }
cmslo.setParams(startTime, endTime, durationSeconds, query.Interval.Milliseconds()) cmslo.setParams(startTime, endTime, durationSeconds, query.Interval.Milliseconds())
queryInterface = cmslo queryInterface = cmslo
@ -433,7 +435,7 @@ func (s *Service) buildQueryExecutors(logger log.Logger, req *backend.QueryDataR
refID: query.RefID, refID: query.RefID,
aliasBy: q.AliasBy, aliasBy: q.AliasBy,
parameters: q.PromQLQuery, parameters: q.PromQLQuery,
timeRange: req.Queries[0].TimeRange, timeRange: req.Queries[index].TimeRange,
logger: logger, logger: logger,
} }
queryInterface = cmp queryInterface = cmp

View File

@ -31,7 +31,7 @@ func (promQLQ *cloudMonitoringProm) run(ctx context.Context, req *backend.QueryD
return dr, backend.DataResponse{}, "", nil return dr, backend.DataResponse{}, "", nil
} }
span := traceReq(ctx, req, dsInfo, r, "") span := traceReq(ctx, req, dsInfo, r, "", promQLQ.timeRange)
defer span.End() defer span.End()
requestBody := map[string]any{ requestBody := map[string]any{

View File

@ -12,7 +12,7 @@ import (
func (sloQ *cloudMonitoringSLO) run(ctx context.Context, req *backend.QueryDataRequest, func (sloQ *cloudMonitoringSLO) run(ctx context.Context, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, logger log.Logger) (*backend.DataResponse, any, string, error) { s *Service, dsInfo datasourceInfo, logger log.Logger) (*backend.DataResponse, any, string, error) {
return runTimeSeriesRequest(ctx, req, s, dsInfo, sloQ.parameters.ProjectName, sloQ.params, nil, logger) return runTimeSeriesRequest(ctx, req, s, dsInfo, sloQ.parameters.ProjectName, sloQ.params, nil, logger, sloQ.timeRange)
} }
func (sloQ *cloudMonitoringSLO) parseResponse(queryRes *backend.DataResponse, func (sloQ *cloudMonitoringSLO) parseResponse(queryRes *backend.DataResponse,

View File

@ -17,7 +17,7 @@ import (
func (timeSeriesFilter *cloudMonitoringTimeSeriesList) run(ctx context.Context, req *backend.QueryDataRequest, func (timeSeriesFilter *cloudMonitoringTimeSeriesList) run(ctx context.Context, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, logger log.Logger) (*backend.DataResponse, any, string, error) { s *Service, dsInfo datasourceInfo, logger log.Logger) (*backend.DataResponse, any, string, error) {
return runTimeSeriesRequest(ctx, req, s, dsInfo, timeSeriesFilter.parameters.ProjectName, timeSeriesFilter.params, nil, logger) return runTimeSeriesRequest(ctx, req, s, dsInfo, timeSeriesFilter.parameters.ProjectName, timeSeriesFilter.params, nil, logger, timeSeriesFilter.timeRange)
} }
func parseTimeSeriesResponse(queryRes *backend.DataResponse, func parseTimeSeriesResponse(queryRes *backend.DataResponse,

View File

@ -18,7 +18,7 @@ func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) appendGraphPeriod(req *ba
if timeSeriesQuery.parameters.GraphPeriod != "disabled" { if timeSeriesQuery.parameters.GraphPeriod != "disabled" {
if timeSeriesQuery.parameters.GraphPeriod == "auto" || timeSeriesQuery.parameters.GraphPeriod == "" { if timeSeriesQuery.parameters.GraphPeriod == "auto" || timeSeriesQuery.parameters.GraphPeriod == "" {
intervalCalculator := gcmTime.NewCalculator(gcmTime.CalculatorOptions{}) intervalCalculator := gcmTime.NewCalculator(gcmTime.CalculatorOptions{})
interval := intervalCalculator.Calculate(req.Queries[0].TimeRange, time.Duration(timeSeriesQuery.IntervalMS/1000)*time.Second, req.Queries[0].MaxDataPoints) interval := intervalCalculator.Calculate(timeSeriesQuery.timeRange, time.Duration(timeSeriesQuery.IntervalMS/1000)*time.Second, req.Queries[0].MaxDataPoints)
timeSeriesQuery.parameters.GraphPeriod = interval.Text timeSeriesQuery.parameters.GraphPeriod = interval.Text
} }
return fmt.Sprintf(" | graph_period %s", timeSeriesQuery.parameters.GraphPeriod) return fmt.Sprintf(" | graph_period %s", timeSeriesQuery.parameters.GraphPeriod)
@ -29,14 +29,14 @@ func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) appendGraphPeriod(req *ba
func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) run(ctx context.Context, req *backend.QueryDataRequest, func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) run(ctx context.Context, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, logger log.Logger) (*backend.DataResponse, any, string, error) { s *Service, dsInfo datasourceInfo, logger log.Logger) (*backend.DataResponse, any, string, error) {
timeSeriesQuery.parameters.Query += timeSeriesQuery.appendGraphPeriod(req) timeSeriesQuery.parameters.Query += timeSeriesQuery.appendGraphPeriod(req)
from := req.Queries[0].TimeRange.From from := timeSeriesQuery.timeRange.From
to := req.Queries[0].TimeRange.To to := timeSeriesQuery.timeRange.To
timeFormat := "2006/01/02-15:04:05" timeFormat := "2006/01/02-15:04:05"
timeSeriesQuery.parameters.Query += fmt.Sprintf(" | within d'%s', d'%s'", from.UTC().Format(timeFormat), to.UTC().Format(timeFormat)) timeSeriesQuery.parameters.Query += fmt.Sprintf(" | within d'%s', d'%s'", from.UTC().Format(timeFormat), to.UTC().Format(timeFormat))
requestBody := map[string]any{ requestBody := map[string]any{
"query": timeSeriesQuery.parameters.Query, "query": timeSeriesQuery.parameters.Query,
} }
return runTimeSeriesRequest(ctx, req, s, dsInfo, timeSeriesQuery.parameters.ProjectName, nil, requestBody, logger) return runTimeSeriesRequest(ctx, req, s, dsInfo, timeSeriesQuery.parameters.ProjectName, nil, requestBody, logger, timeSeriesQuery.timeRange)
} }
func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) parseResponse(queryRes *backend.DataResponse, func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) parseResponse(queryRes *backend.DataResponse,

View File

@ -42,7 +42,8 @@ type (
aliasBy string aliasBy string
parameters *dataquery.TimeSeriesList parameters *dataquery.TimeSeriesList
// Processed properties // Processed properties
params url.Values timeRange backend.TimeRange
params url.Values
} }
// cloudMonitoringSLO is used to build time series with a filter but for the SLO case // cloudMonitoringSLO is used to build time series with a filter but for the SLO case
cloudMonitoringSLO struct { cloudMonitoringSLO struct {
@ -50,7 +51,8 @@ type (
aliasBy string aliasBy string
parameters *dataquery.SLOQuery parameters *dataquery.SLOQuery
// Processed properties // Processed properties
params url.Values timeRange backend.TimeRange
params url.Values
} }
// cloudMonitoringProm is used to build a promQL queries // cloudMonitoringProm is used to build a promQL queries

View File

@ -124,11 +124,11 @@ func doRequestWithPagination(ctx context.Context, r *http.Request, dsInfo dataso
return d, nil return d, nil
} }
func traceReq(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasourceInfo, _ *http.Request, target string) trace.Span { func traceReq(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasourceInfo, _ *http.Request, target string, timeRange backend.TimeRange) trace.Span {
_, span := tracing.DefaultTracer().Start(ctx, "cloudMonitoring query", trace.WithAttributes( _, span := tracing.DefaultTracer().Start(ctx, "cloudMonitoring query", trace.WithAttributes(
attribute.String("target", target), attribute.String("target", target),
attribute.String("from", req.Queries[0].TimeRange.From.String()), attribute.String("from", timeRange.From.String()),
attribute.String("until", req.Queries[0].TimeRange.To.String()), attribute.String("until", timeRange.To.String()),
attribute.Int64("datasource_id", dsInfo.id), attribute.Int64("datasource_id", dsInfo.id),
attribute.Int64("org_id", req.PluginContext.OrgID), attribute.Int64("org_id", req.PluginContext.OrgID),
)) ))
@ -137,7 +137,7 @@ func traceReq(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasou
} }
func runTimeSeriesRequest(ctx context.Context, req *backend.QueryDataRequest, func runTimeSeriesRequest(ctx context.Context, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, projectName string, params url.Values, body map[string]any, logger log.Logger) (*backend.DataResponse, cloudMonitoringResponse, string, error) { s *Service, dsInfo datasourceInfo, projectName string, params url.Values, body map[string]any, logger log.Logger, timeRange backend.TimeRange) (*backend.DataResponse, cloudMonitoringResponse, string, error) {
dr := &backend.DataResponse{} dr := &backend.DataResponse{}
projectName, err := s.ensureProject(ctx, dsInfo, projectName) projectName, err := s.ensureProject(ctx, dsInfo, projectName)
if err != nil { if err != nil {
@ -154,7 +154,7 @@ func runTimeSeriesRequest(ctx context.Context, req *backend.QueryDataRequest,
return dr, cloudMonitoringResponse{}, "", nil return dr, cloudMonitoringResponse{}, "", nil
} }
span := traceReq(ctx, req, dsInfo, r, params.Encode()) span := traceReq(ctx, req, dsInfo, r, params.Encode(), timeRange)
defer span.End() defer span.End()
d, err := doRequestWithPagination(ctx, r, dsInfo, params, body, logger) d, err := doRequestWithPagination(ctx, r, dsInfo, params, body, logger)