mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
CloudWatch Logs: Queries in an expression should run synchronously (#64443)
This commit is contained in:
parent
fc8a753892
commit
74436d31de
@ -21,6 +21,8 @@ var (
|
||||
logger = log.New("expr")
|
||||
)
|
||||
|
||||
const FromExpressionHeaderName = "FromExpression"
|
||||
|
||||
type QueryError struct {
|
||||
RefID string
|
||||
Err error
|
||||
@ -227,6 +229,7 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
|
||||
},
|
||||
Headers: dn.request.Headers,
|
||||
}
|
||||
req.Headers[FromExpressionHeaderName] = "true"
|
||||
|
||||
responseType := "unknown"
|
||||
defer func() {
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter"
|
||||
"github.com/grafana/grafana/pkg/expr"
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
@ -155,7 +156,7 @@ func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDa
|
||||
to the query, but rather an ID is first returned. Following this, a client is expected to send requests along
|
||||
with the ID until the status of the query is complete, receiving (possibly partial) results each time. For
|
||||
queries made via dashboards and Explore, the logic of making these repeated queries is handled on the
|
||||
frontend, but because alerts are executed on the backend the logic needs to be reimplemented here.
|
||||
frontend, but because alerts and expressions are executed on the backend the logic needs to be reimplemented here.
|
||||
*/
|
||||
q := req.Queries[0]
|
||||
var model DataQueryJson
|
||||
@ -163,11 +164,12 @@ func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDa
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName]
|
||||
isLogAlertQuery := fromAlert && model.QueryMode == logsQueryMode
|
||||
|
||||
if isLogAlertQuery {
|
||||
return e.executeLogAlertQuery(ctx, req)
|
||||
_, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName]
|
||||
_, fromExpression := req.Headers[expr.FromExpressionHeaderName]
|
||||
isSyncLogQuery := (fromAlert || fromExpression) && model.QueryMode == logsQueryMode
|
||||
if isSyncLogQuery {
|
||||
return executeSyncLogQuery(ctx, e, req)
|
||||
}
|
||||
|
||||
var result *backend.QueryDataResponse
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||
"github.com/grafana/grafana/pkg/expr"
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
@ -192,7 +193,7 @@ func Test_CheckHealth(t *testing.T) {
|
||||
}, resp)
|
||||
})
|
||||
}
|
||||
func Test_executeLogAlertQuery(t *testing.T) {
|
||||
func Test_executeSyncLogQuery(t *testing.T) {
|
||||
origNewCWClient := NewCWClient
|
||||
t.Cleanup(func() {
|
||||
NewCWClient = origNewCWClient
|
||||
@ -254,6 +255,70 @@ func Test_executeLogAlertQuery(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, []string{"instance manager's region"}, sess.calledRegions)
|
||||
})
|
||||
|
||||
t.Run("with header", func(t *testing.T) {
|
||||
testcases := []struct {
|
||||
name string
|
||||
headers map[string]string
|
||||
called bool
|
||||
}{
|
||||
{
|
||||
"alert header",
|
||||
map[string]string{ngalertmodels.FromAlertHeaderName: "some value"},
|
||||
true,
|
||||
},
|
||||
{
|
||||
"expression header",
|
||||
map[string]string{expr.FromExpressionHeaderName: "some value"},
|
||||
true,
|
||||
},
|
||||
{
|
||||
"no header",
|
||||
map[string]string{},
|
||||
false,
|
||||
},
|
||||
}
|
||||
origExecuteSyncLogQuery := executeSyncLogQuery
|
||||
var syncCalled bool
|
||||
executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
syncCalled = true
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
for _, tc := range testcases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
syncCalled = false
|
||||
cli = fakeCWLogsClient{queryResults: cloudwatchlogs.GetQueryResultsOutput{Status: aws.String("Complete")}}
|
||||
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||
return DataSource{Settings: models.CloudWatchSettings{AWSDatasourceSettings: awsds.AWSDatasourceSettings{Region: "instance manager's region"}}}, nil
|
||||
})
|
||||
sess := fakeSessionCache{}
|
||||
|
||||
executor := newExecutor(im, newTestConfig(), &sess, featuremgmt.WithFeatures())
|
||||
_, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
|
||||
Headers: tc.headers,
|
||||
PluginContext: backend.PluginContext{DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}},
|
||||
Queries: []backend.DataQuery{
|
||||
{
|
||||
TimeRange: backend.TimeRange{From: time.Unix(0, 0), To: time.Unix(1, 0)},
|
||||
JSON: json.RawMessage(`{
|
||||
"queryMode": "Logs",
|
||||
"type": "logAction",
|
||||
"subtype": "StartQuery",
|
||||
"region": "default",
|
||||
"queryString": "fields @message"
|
||||
}`),
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tc.called, syncCalled)
|
||||
})
|
||||
}
|
||||
|
||||
executeSyncLogQuery = origExecuteSyncLogQuery
|
||||
})
|
||||
}
|
||||
|
||||
func TestQuery_ResourceRequest_DescribeLogGroups_with_CrossAccountQuerying(t *testing.T) {
|
||||
|
@ -18,7 +18,7 @@ const (
|
||||
alertPollPeriod = time.Second
|
||||
)
|
||||
|
||||
func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
resp := backend.NewQueryDataResponse()
|
||||
|
||||
for _, q := range req.Queries {
|
||||
@ -45,7 +45,7 @@ func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *back
|
||||
return nil, err
|
||||
}
|
||||
|
||||
getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, q, logsQuery)
|
||||
getQueryResultsOutput, err := e.syncQuery(ctx, logsClient, q, logsQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -73,7 +73,7 @@ func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *back
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
|
||||
func (e *cloudWatchExecutor) syncQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
|
||||
queryContext backend.DataQuery, logsQuery models.LogsQuery) (*cloudwatchlogs.GetQueryResultsOutput, error) {
|
||||
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, logsQuery, queryContext.TimeRange)
|
||||
if err != nil {
|
Loading…
Reference in New Issue
Block a user